# Alkemy project - Maria Cicone, Andrea Buscemi, Alessandro Ponzianelli, Carlamaria Sciammarella

In [1]:
import pandas as pd
import numpy as np
import sqlite3
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark
import plotly.express as px

In [None]:
## Creating a SparkSession
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("Alkemy App").enableHiveSupport().getOrCreate()
spark

In [None]:
##Creating a database
spark.sql("CREATE DATABASE IF NOT EXISTS sales_db")

In [None]:
#Checking if the database exists
v=spark.sql("show databases")
v.show()

In [None]:
###Creating schema for the tables

from pyspark.sql.types import *

stock_schema=StructType([
        StructField("stock_date", DateType(), nullable=False),
        StructField("product_id", StringType(), nullable=False),
        StructField("total_stock", IntegerType(), nullable=False) ])

clicks_schema= StructType([
        StructField("date", TimestampType(), nullable=False),
        StructField("seller", StringType(), nullable=False),
        StructField("position", IntegerType(), nullable=False),
        StructField("price_max", DoubleType(), nullable=False),
        StructField("price_min", DoubleType(), nullable=False),
        StructField("price", DoubleType(), nullable=False),
        StructField("type", StringType(), nullable=False),
        StructField("product_id", StringType(), nullable=False) ])

sellers_schema= StructType([
        StructField("seller_id", StringType(), nullable=False),
        StructField("seller_name", StringType(), nullable=False)])

catalog_schema= StructType([
        StructField("product_id", StringType(), nullable=False),
        StructField("coded_cat1", StringType(), nullable=False),
        StructField("coded_cat2", StringType(), nullable=False),
        StructField("coded_cat3", StringType(), nullable=False),
        StructField("coded_brand", StringType(), nullable=False),
        StructField("coded_name", StringType(), nullable=False)])

competitor_schema= StructType([
        StructField("comp_date", DateType(), nullable=False),
        StructField("seller_id", StringType(), nullable=False),
        StructField("product_id", StringType(), nullable=False),
        StructField("price", DoubleType(), nullable=False) ])

sales_schema= StructType([
        StructField("sale_date", DateType(), nullable=False),
        StructField("product_id", StringType(), nullable=False),
        StructField("quantity", IntegerType(), nullable=False),
        StructField("sales_price_tax", DoubleType(), nullable=False),
        StructField("regular_price_tax", DoubleType(), nullable=False),
        StructField("sales_price", DoubleType(), nullable=False),
        StructField("regular_price", DoubleType(), nullable=False),
        StructField("purchase_price", DoubleType(), nullable=False)])


In [None]:
##Reading the datasets
stock = spark.read.option("header",True).schema(stock_schema).csv("stock.csv")
clicks_b = spark.read.option("header",True).schema(clicks_schema).csv("clicks_bidding.csv")
clicks_r = spark.read.option("header",True).schema(clicks_schema).csv("clicks_regular.csv")
sellers = spark.read.option("header",True).schema(sellers_schema).csv("sellers_list.csv")
catalog = spark.read.option("header",True).schema(catalog_schema).csv("product_catalog.csv")
competitor = spark.read.option("header",True).schema(competitor_schema).csv("prices_competitor.csv")
sales = spark.read.option("header",True).schema(sales_schema).csv("sales_data.csv")

In [None]:
##Adding seller_id specification to our client
from pyspark.sql.functions import lit
sales=sales.withColumn("seller_id", lit(24))

## Cleaning the dataframes

### Prices_competitor

In [None]:
#prices_competitor

##Checking for missing values, duplicates or outliers in the prices set
from pyspark.sql.functions import *

competitor.filter(competitor.price.isNull()).count()

In [None]:
competitor.count()

In [None]:
competitor.distinct().count()
####There are no absolute duplicates, even if prices are set multiple times in the same day by the same seller

In [None]:
competitor.filter(competitor.price== 0).show()  ##Remove 0 values

In [None]:
##Looking for outliers by comparing mean vs median
out1= competitor.groupBy(["product_id", "seller_id"]).agg(mean('price').alias("mean"), percentile_approx("price", 0.5).alias("median"))

In [None]:
px.scatter(data_frame=out1.toPandas(), x="mean", y="median", trendline="ols", color_discrete_sequence=['royalblue'], title="Price Set by Competitors - Mean vs Median Per Product ID and Seller")

In [None]:
##Finding the outlier
out1.filter(out1.median == 107936).show(truncate=False)

In [None]:
outlier=competitor.filter((out1.seller_id == 41) & (out1.product_id == 164429))

In [None]:
px.histogram(outlier.toPandas(), x="price", title="Distribution of price - Product 164429/Seller 41")

In [None]:
outlier.sort(outlier.price.desc()).show()

In [None]:
outlier.filter(outlier.comp_date=="2021-04-02").show() ###It is likely a typo, there is an additional 0

In [None]:
###Correcting the typo
competitor = competitor.withColumn("price", when((competitor.price == 1040290) & (competitor.product_id==164429),104290).otherwise(competitor.price))

In [None]:
###Dropping rows with 0 price values
competitor=competitor.filter(competitor.price!=0)

### Sales_data

In [None]:
#sales
sales.filter(sales.regular_price.isNull()).count() ###0
sales.count()

In [None]:
sales.distinct().count() ###No duplicates

In [None]:
sales.filter(sales.regular_price== 0).show() ##Empty

In [None]:
##Looking for outliers - Regular Price
out2= sales.groupBy("product_id").agg(mean('regular_price').alias("mean"), percentile_approx("regular_price", 0.5).alias("median"))

In [None]:
px.scatter(data_frame=out2.toPandas(), x="mean", y="median", trendline="ols", color_discrete_sequence=['royalblue'], title="Regular Price - Mean vs Median Per Product ID")

In [None]:
sales.filter(sales.sales_price.isNull()).count() ##0

In [None]:
sales.filter(sales.sales_price== 0).show() #Empty

In [None]:
###Looking for outliers --- sales_price
out3= sales.groupBy("product_id").agg(mean('sales_price').alias("mean"), percentile_approx("sales_price", 0.5).alias("median"))

In [None]:
px.scatter(data_frame=out3.toPandas(), x="mean", y="median", trendline="ols", color_discrete_sequence=['royalblue'], title="Sale Price - Mean vs Median Per Product ID")

In [None]:
##Finding the hypothetical outlier
out3.filter(out3.median == 291658).show(truncate=False)

In [None]:
sales.filter(sales.purchase_price.isNull()).count() ##0

In [None]:
sales.filter(sales.purchase_price== 0).show() ##Empty

In [None]:
###Looking for outliers ---purchase price
out4= sales.groupBy("product_id").agg(mean('purchase_price').alias("mean"), percentile_approx("purchase_price", 0.5).alias("median"))
px.scatter(data_frame=out4.toPandas(), x="mean", y="median", trendline="ols", color_discrete_sequence=['royalblue'], title="Purchase Price - Mean vs Median Per Product ID")

In [None]:
out4.filter(out4.median>272000 ).show(truncate=False)

In [None]:
outlier_sales=sales.filter(sales.product_id ==163731)

In [None]:
outlier_sales.show() ###The sales_price reacted to changes in the purchase_price, therefore there is no outlier

In [None]:
###Dropping variables we are not going to use
column_sales=["regular_price_tax", "sales_price_tax"]
sales=sales.drop(*column_sales)

### Clicks_regular and clicks_bidding

In [None]:
#clicks_regular
##Missing values
clicks_r.filter(clicks_r.price.isNull()).count() ## 1093244  ###DROP

In [None]:
clicks_r.count()

In [None]:
clicks_r.distinct().count()
###We decided not to drop the duplicates as the user_id is not mentioned and it is possible for different users to click on the same insertion at the same time

In [None]:
##Missing values
clicks_b.filter(clicks_b.price.isNull()).count() ###0

In [None]:
clicks_b.count()

In [None]:
clicks_b.distinct().count() ###We decided to keep the duplicates for the same reason we mentioned above

In [None]:
###Dropping the price column from the click dataframes, as it has a lot of missing values and it is redundant, since price is listed in the prices_competitor dataframe
##Dropping price_max and price_min, as well as the position, as they are inconsistent and not useful for our analysis
click_columns=["price", "price_max", "price_min", "position"]
clicks_b=clicks_b.drop(*click_columns)
clicks_r=clicks_r.drop(*click_columns)

### Inserting the tables in the database

In [None]:
### Converting the dataframes into tables of our database ###Overwrite mode to rewrite the files in case of updates
catalog.write.mode('overwrite').saveAsTable("sales_db.product_catalog")
sales.write.mode('overwrite').saveAsTable("sales_db.sales_data")
sellers.write.mode('overwrite').saveAsTable("sales_db.sellers_list")
competitor.write.mode('overwrite').saveAsTable("sales_db.prices_competitor")
stock.write.mode('overwrite').saveAsTable("sales_db.stock")
clicks_b.write.mode('overwrite').saveAsTable("sales_db.clicks")
clicks_r.write.mode('append').saveAsTable("sales_db.clicks") ##Append values to an already existing table

In [None]:
### Showing the tables of the database
b=spark.sql("show tables from sales_db")
b.show()

In [None]:
### Checking if data was correctly inserted into the tables
df1=spark.sql("select * from sales_db.clicks a where a.type='Bidding'")
df1.show()

### Some data visualization

In [None]:
## Top 10 most clicked products

top_clicks=spark.sql('''
          SELECT a.product_id, count(*) AS n_clicks
          FROM sales_db.clicks a
          GROUP BY a.product_id
          ORDER BY n_clicks desc
          LIMIT 10
          ''')

In [None]:
plt.figure(figsize=(10,5))
fig_top=sns.barplot(top_clicks.toPandas(), x="product_id", y="n_clicks", color="limegreen", order=top_clicks.toPandas().sort_values("n_clicks",ascending = False).product_id)
ple=fig_top.set_title("The 10 Most Clicked Products")
plt.ylabel("N° of clicks")

In [None]:
##N of products offered per seller

df_p_seller=spark.sql('''SELECT s.seller_name, count(*) AS n_products
          FROM sales_db.sellers_list s, (SELECT a.seller_id, a.product_id, count(*) 
          FROM sales_db.prices_competitor a
          GROUP BY a.seller_id, a.product_id
          ) ca
          WHERE s.seller_id=ca.seller_id
          GROUP BY s.seller_name
          ORDER BY n_products desc
          ''')

In [None]:
fig_p_seller=sns.barplot(df_p_seller.toPandas(), x="seller_name", y="n_products", color="#A61022", order=df_p_seller.toPandas().sort_values("n_products",ascending = False).seller_name)
ple=fig_p_seller.set_title("N° of products offered per competitor")
plt.ylabel("N° of products")

In [None]:
## N of products per macrocategory

p_per_cat= spark.sql('''SELECT a.coded_cat1, count(*) AS n_products
          FROM sales_db.product_catalog a
          GROUP BY a.coded_cat1''')

In [None]:
fig_p_cat=sns.barplot(p_per_cat.toPandas(), x="coded_cat1", y="n_products", color="royalblue", order=p_per_cat.toPandas().sort_values("n_products",ascending = False).coded_cat1)
ple=fig_p_cat.set_title("N° of products per macro-category")
plt.ylabel("N° of products")

### TASK 1

In [None]:
#QUERY CAT_1 1676

df_1676=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count(distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1676 ORDER BY a.n_date desc LIMIT 1")
df_1676.show()

In [None]:
##PRODUCT 139545
df_p1=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=139545 ORDER BY date")
df_p1.show()

In [None]:
graph_p1=px.line(data_frame=df_p1.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 139545")
graph_p1.show()

###By filtering sellers and timeframe on the graphs, a strong relationship among sellers n° 48,26,24 and 180 can be identified

In [None]:
#QUERY CAT_1 2259
df_2259=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=2259 ORDER BY a.n_date desc LIMIT 1")
df_2259.show()

In [None]:
##PRODUCT 139038
df_p2=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=139038 ORDER BY date")
df_p2.show()

In [None]:
graph_p2=px.line(data_frame=df_p2.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 139038")
graph_p2.show()

In [None]:
#QUERY CAT_1 1375
df_1375=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1375 ORDER BY a.n_date desc LIMIT 1")
df_1375.show()

In [None]:
##PRODUCT 107693
df_p3=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=107693 ORDER BY date")
df_p3.show()

In [None]:
graph_p3=px.line(data_frame=df_p3.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 107693")
graph_p3.show()

In [None]:
#QUERY CAT_1 1776  ##EMPTY QUERY
df_1776=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1776 ORDER BY a.n_date desc LIMIT 1")
df_1776.show()

In [None]:
#QUERY CAT_1 1127
df_1127=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1127 ORDER BY a.n_date desc LIMIT 1")
df_1127.show()

In [None]:
##PRODUCT 133528
df_p4=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=133528 ORDER BY date")
df_p4.show()

In [None]:
graph_p4=px.line(data_frame=df_p3.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 133528")
graph_p4.show()

In [None]:
#QUERY CAT_1 1163
df_1163=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1163 ORDER BY a.n_date desc LIMIT 1")
df_1163.show()

In [None]:
#PRODUCT 132575
df_p5=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=132575 ORDER BY date")
df_p5.show()

In [None]:
graph_p5=px.line(data_frame=df_p5.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 132575")
graph_p5.show()

In [None]:
#QUERY CAT_1 1354
df_1354=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1354 ORDER BY a.n_date desc LIMIT 1")
df_1354.show()

In [None]:
#PRODUCT 148202
df_p6=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=148202 ORDER BY date")
df_p6.show()

In [None]:
graph_p6=px.line(data_frame=df_p6.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 148202")
graph_p6.show()

In [None]:
#QUERY CAT_1 2180
df_2180=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=2180 ORDER BY a.n_date desc LIMIT 1")
df_2180.show()

In [None]:
#PRODUCT 135173
df_p7=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=135173 ORDER BY date")
df_p7.show()

In [None]:
graph_p7=px.line(data_frame=df_p7.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 135173")
graph_p7.show()

In [None]:
#QUERY CAT_1 2880
df_2880=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=2880 ORDER BY a.n_date desc LIMIT 1")
df_2880.show()

In [None]:
#PRODUCT 160792
df_p8=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=160792 ORDER BY date")
df_p8.show()

In [None]:
graph_p8=px.line(data_frame=df_p8.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 160792")
graph_p8.show()

In [None]:
#QUERY CAT_1 1617
df_1617=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=1617 ORDER BY a.n_date desc LIMIT 1")
df_1617.show()

In [None]:
#PRODUCT 140647
df_p9=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=140647 ORDER BY date")
df_p9.show()

In [None]:
graph_p9=px.line(data_frame=df_p9.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 140647")
graph_p9.show()

In [None]:
#QUERY CAT_1 885
df_885=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=885 ORDER BY a.n_date desc LIMIT 1")
df_885.show()

In [None]:
#PRODUCT 103455
df_p10=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=103455 ORDER BY date")
df_p10.show()

In [None]:
graph_p10=px.line(data_frame=df_p10.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 103455")
graph_p10.show()

In [None]:
#QUERY CAT_1 624
df_624=spark.sql("SELECT * FROM (SELECT p.product_id, pc.coded_cat1, count( distinct p.seller_id) as n_sellers, count(p.product_id) as n_date FROM sales_db.prices_competitor p, sales_db.product_catalog pc WHERE p.product_id=pc.product_id GROUP BY p.product_id, pc.coded_cat1 ORDER BY n_sellers desc) a WHERE a.n_sellers=9 AND a.coded_cat1=624 ORDER BY a.n_date desc LIMIT 1")
df_624.show()

In [None]:
#PRODUCT 158032
df_p11=spark.sql("SELECT p.comp_date as date, p.seller_id, p.product_id, p.price FROM sales_db.prices_competitor p WHERE p.product_id=158032 ORDER BY date")
df_p11.show()

In [None]:
graph_p11=px.line(data_frame=df_p11.toPandas(), x="date", y="price", color="seller_id", title="Price History - Product 158032")
graph_p11.show()

### TASK 2

In [None]:
###Querying from clicks the number of daily clicks per product, the number of total clicks in a day, 
##the daily rank based on clicks of every product (in ascending order, so the product ranked first is the one with the fewest clicks)
##and the rank of the day based on the total clicks compared with the number of clicks registered by the others (in ascending order)

df_clicks=spark.sql(''' WITH clicks as (SELECT a.date, a.product_id, a.n_clicks, 
          SUM(a.n_clicks) OVER (PARTITION BY a.date) AS total_daily_clicks, RANK() OVER(PARTITION BY a.date ORDER BY a.n_clicks) AS asc_daily_rank
          FROM (SELECT date(c.date) AS date, c.product_id, COUNT(c.product_id) n_clicks 
          FROM sales_db.clicks c
          WHERE date(c.date) LIKE "2021%"
          GROUP BY date(c.date), c.product_id) a)
          SELECT cl.date, cl.product_id, cl.n_clicks, cl.total_daily_clicks, cl.asc_daily_rank, 
          DENSE_RANK() OVER(ORDER BY cl.total_daily_clicks) AS asc_rank_days
          FROM clicks cl
          ORDER BY cl.date, cl.n_clicks DESC ''')

In [None]:
df_clicks.show()

In [None]:
##Scaling the ranks (0 to 1 values, the highest values correspond to the the products/days with more clicks )

from pyspark.ml.feature import MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

##Scaling the clicks rank
assembler = VectorAssembler(inputCols=["asc_daily_rank"], outputCol="asc_daily_rank_vector")
scaler = MinMaxScaler(inputCol="asc_daily_rank_vector", outputCol="asc_daily_rank_norm")
pipe = Pipeline(stages=[assembler, scaler])
scalerM = pipe.fit(df_clicks)
df_clicks = scalerM.transform(df_clicks).drop("asc_daily_rank_vector")

In [None]:
##Scaling the days rank
assembler_2 = VectorAssembler(inputCols=["asc_rank_days"], outputCol="asc_rank_days_vector")
scaler_2 = MinMaxScaler(inputCol="asc_rank_days_vector", outputCol="asc_rank_days_norm")
pipe_2 = Pipeline(stages=[assembler_2, scaler_2])
scalerM_2 = pipe_2.fit(df_clicks)
df_clicks = scalerMl_2.transform(df_clicks).drop("asc_rank_days_vector")

In [None]:
##Converting the resulting vectors to arrays and then to simple floats
from pyspark.ml.functions import vector_to_array
df_clicks=df_clicks.withColumn('asc_rank_days_norm', vector_to_array('asc_rank_days_norm'))
df_clicks=df_clicks.withColumn("asc_rank_days_norm",df_clicks.asc_rank_days_norm[0])

df_clicks=df_clicks.withColumn('asc_daily_rank_norm', vector_to_array('asc_daily_rank_norm'))
df_clicks=df_clicks.withColumn("asc_daily_rank_norm",df_clicks.asc_daily_rank_norm[0])

In [None]:
df_clicks.show()

In [None]:
##Defining the popularity index function
import math
def popularity_index (row):
    return math.exp(row["asc_daily_rank_norm"])+ row["asc_rank_days_norm"]

In [None]:
##Converting the pyspark dataframe into a pandas dataframe
df_clicks_pd=df_clicks.toPandas()

In [None]:
df_clicks_pd

In [None]:
##Applying the popularity index function to create a new column in the dataframe
df_clicks_pd["popularity_index"]= df_clicks_pd.apply(lambda row: popularity_index(row), axis=1)

In [None]:
##Normalizing the index (0 to 1 values)
from sklearn import preprocessing
df_clicks_pd["popularity_index"] = (df_clicks_pd["popularity_index"] - df_clicks_pd["popularity_index"].min()) / (df_clicks_pd["popularity_index"].max() - df_clicks_pd["popularity_index"].min())

In [None]:
df_clicks_pd

In [None]:
##Plotting the distribution of the index
fig_dis=sns.histplot(data=df_clicks_pd, x="popularity_index")
dis=fig_dis.set_title("Popularity index distribution")
plt.ylabel("N° of products")

In [None]:
##Defining the function to divide the dates in quarters
def t_bin(x):
    q1 = '2021-04-01'
    q2 = '2021-07-01'
    q3 = '2021-10-01'
    if x < pd.to_datetime(q1):
        return "Q1"
    elif x < pd.to_datetime(q2):
        return "Q2"
    elif x < pd.to_datetime(q3):
        return "Q3"
    else:
        return "Q4"

In [None]:
#Defining the function to filter the Black Friday timeframe

def blackfriday(x):
    start = '2021-11-01'
    end = '2021-11-30'
    if x >= pd.to_datetime(start) and x <= pd.to_datetime(end):
        return 1
    else:
        return 0

In [None]:
#Applying the first function to a new column in the df
df_clicks_pd["Qs"] = df_clicks_pd['date'].apply(lambda row:t_bin(row))

In [None]:
#Applying the second function to a new column in the df
df_clicks_pd["BlackFriday"] = df_clicks_pd['date'].apply(lambda row:blackfriday(row))

In [None]:
##Grouping the rows in order to compute the mean of each product's popularity index in the various quarters
df_grouped = pd.DataFrame(df_clicks_pd.groupby(['product_id', 'Qs'])['popularity_index'].mean())

In [None]:
df_grouped

In [None]:
#Resetting the index
df_clicks_pd_grouped = df_grouped.reset_index()
df_clicks_pd_grouped

In [None]:
##Sorting the rows in order to find the top 5 products by popularity in Q1 
df_clicks_pd_grouped[df_clicks_pd_grouped["Qs"]=="Q1"].sort_values(by="popularity_index", ascending= False)

In [None]:
##Sorting the rows in order to find the top 5 products by popularity in Q2
df_clicks_pd_grouped[df_clicks_pd_grouped["Qs"]=="Q2"].sort_values(by="popularity_index", ascending= False)

In [None]:
##Sorting the rows in order to find the top 5 products by popularity in Q3
df_clicks_pd_grouped[df_clicks_pd_grouped["Qs"]=="Q3"].sort_values(by="popularity_index", ascending= False)

In [None]:
##Sorting the rows in order to find the top 5 products by popularity in Q4
df_clicks_pd_grouped[df_clicks_pd_grouped["Qs"]=="Q4"].sort_values(by="popularity_index", ascending= False)

In [None]:
##Filtering the df to select only the Black Friday timeframe
df_black_f = df_clicks_pd[df_clicks_pd['BlackFriday'] == 1]

In [None]:
##Grouping the rows in order to compute the mean of each product's popularity index in the timeframe
df_bf_grouped = pd.DataFrame(df_black_f.groupby('product_id')['popularity_index'].mean())
df_bf_grouped

In [None]:
#Resetting the index
df_black_friday= df_bf_grouped.reset_index()

In [None]:
#Sorting the rows in order to find the top 5 products by popularity during the Black Friday period
df_black_friday=df_black_friday.sort_values(by="popularity_index", ascending= False)
df_black_friday.head(10)

In [None]:
##spark.stop() ##Uncomment to close Spark