In [None]:
!pip install pyspark

In [None]:
!pip install --upgrade gdown

In [None]:
!rm ~/.cache/gdown/cookies.json
# Download all data unzipped
!gdown --id 1-VDT520b2ApUSttqd1aKc5bYAUiJX1zE   # Oct19
#!gdown --id 1-ThB1vQbQHgt3Am3FGw_F0tf5h_3K5Xs   # Nov19
!gdown --id 1-6gv2iI4-4BFcw5FD5xi3kXsBhtLHl9p   # Dec19
!gdown --id 1-LzWyIcna1rkbgYCXxHYlABtN8vZpeE2   # Jan20
!gdown --id 1-Em1YaZq13OdbZbu3XHAmIXSgEBNYcSF   # Feb20
!gdown --id 1-RHOXC5IoCGDSMukI5uPS4R2XZedty1i   # Mar20
!gdown --id 1-D_V71xZY2WCr_RUqYsuEu3hzjpMeyAd   # Apr20

rm: cannot remove '/root/.cache/gdown/cookies.json': No such file or directory
Downloading...
From: https://drive.google.com/uc?id=1-VDT520b2ApUSttqd1aKc5bYAUiJX1zE
To: /content/2019-Oct.csv
100% 5.67G/5.67G [01:44<00:00, 54.0MB/s]
Downloading...
From: https://drive.google.com/uc?id=1-6gv2iI4-4BFcw5FD5xi3kXsBhtLHl9p
To: /content/2019-Dec.csv
100% 9.36G/9.36G [03:33<00:00, 43.8MB/s]
Downloading...
From: https://drive.google.com/uc?id=1-LzWyIcna1rkbgYCXxHYlABtN8vZpeE2
To: /content/2020-Jan.csv
100% 7.78G/7.78G [01:48<00:00, 71.4MB/s]
Downloading...
From: https://drive.google.com/uc?id=1-Em1YaZq13OdbZbu3XHAmIXSgEBNYcSF
To: /content/2020-Feb.csv
100% 7.68G/7.68G [02:03<00:00, 62.3MB/s]
Downloading...
From: https://drive.google.com/uc?id=1-RHOXC5IoCGDSMukI5uPS4R2XZedty1i
To: /content/2020-Mar.csv
100% 7.82G/7.82G [02:06<00:00, 62.0MB/s]
Downloading...
From: https://drive.google.com/uc?id=1-D_V71xZY2WCr_RUqYsuEu3hzjpMeyAd
To: /content/2020-Apr.csv
100% 9.27G/9.27G [02:32<00:00, 60.8MB/s]


In [None]:
import glob as g
paths = g.glob("*.csv", recursive=True)
paths

['2020-Apr.csv',
 '2019-Oct.csv',
 '2019-Dec.csv',
 '2020-Mar.csv',
 '2020-Jan.csv',
 '2020-Feb.csv']

In [None]:
from datetime import datetime
import pyspark
from pyspark import SparkContext
from pyspark.sql import DataFrame
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col, udf, when
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler, OneHotEncoder

In [None]:
# Creating spark context
sc = SparkContext()
# Create a spark session
spark = SparkSession(sc)

In [None]:
df = spark.read.option("header",True).csv(paths)
df_corrected=df.withColumn("category_code",when(col("category_code")=="construction.tools.light","electronics.smartphone").otherwise(col("category_code")))
df_targets = df_corrected.filter(df_corrected["event_type"].isin('cart','purchase')).drop_duplicates(subset=['event_type', 'product_id','price', 'user_id','user_session']) # select only cart and purchase events

df_targets = df_targets.dropna(how='any')

df_targets = df_targets.withColumn("is_purchased", (df_targets["event_type"]=="purchase").cast("int")) # add boolean based on event type, 1 for purchase, 0 otherwise
df_targets = df_targets.withColumn("price", df_targets["price"].cast("float"))
df_interm = df_targets.groupby(["user_session","product_id"]).max("is_purchased") # assign the final purchase value of the user session to all the session actions
df_interm = df_interm.withColumnRenamed("user_session","user_session2").withColumnRenamed("product_id","product_id2") # to avoid duplicate column names after join
df_targets = df_targets.join(df_interm, (df_targets.user_session == df_interm.user_session2) & (df_targets.product_id == df_interm.product_id2)) # update is_purchased column in original DF
df_targets = df_targets.drop("user_session2").drop("product_id2").drop("is_purchased").withColumnRenamed("max(is_purchased)","is_purchased") # cleanup join result

df_targets = df_targets.filter(df_targets["event_type"]== 'cart').drop_duplicates(["user_session","product_id","is_purchased"]) # dropping purchase events as they're redundant

weekday_func = udf(lambda s: str(datetime.strptime(str(s)[0:10], "%Y-%m-%d").weekday()))
df_targets = df_targets.withColumn('event_weekday',weekday_func(col('event_time')).cast("int")) # adding weekday column

split_col = pyspark.sql.functions.split(df_targets["category_code"], '\.')
df_targets = df_targets.withColumn('category_code_level1', split_col.getItem(0)).withColumn('category_code_level2', split_col.getItem(1)) # splitting category

df_interm = df_targets.groupby(["user_session"]).count() # activity count
df_interm = df_interm.withColumnRenamed("user_session","user_session2") # to avoid duplicate column names after join
df_targets = df_targets.join(df_interm, df_targets.user_session == df_interm.user_session2)
df_targets = df_targets.drop("user_session2").withColumnRenamed("count","activity_count") 
df_targets = df_targets.drop("event_time").drop("event_type").drop("product_id").drop("category_id").drop("category_code").drop("user_id").drop("user_session")

In [None]:
brand_indexer = StringIndexer(inputCol="brand", outputCol="brand_index").fit(df_targets)
category_code_level1_indexer = StringIndexer(inputCol="category_code_level1", outputCol="category_code_level1_index")
category_code_level2_indexer = StringIndexer(inputCol="category_code_level2", outputCol="category_code_level2_index")

train, test = df_targets.randomSplit([0.9, 0.1], seed=12345)

vecAssembler = VectorAssembler(outputCol="features")


# create the trainer and set its parameters

vecAssembler.setInputCols(["price", "event_weekday", 'activity_count', "brand_index", "category_code_level1_index", "category_code_level2_index"])
gbt = GBTClassifier(labelCol="is_purchased", featuresCol="features", maxIter=10, maxBins = 4000)
indexers = [ brand_indexer, category_code_level1_indexer, category_code_level2_indexer, vecAssembler, gbt]
pipeline = Pipeline(stages=indexers)

model = pipeline.fit(train)


In [None]:
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="is_purchased", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
evaluator.setMetricName("weightedPrecision")
precision = evaluator.evaluate(predictions)
evaluator.setMetricName("weightedRecall")
recall = evaluator.evaluate(predictions)

In [None]:
print('Accuracy is {0}\nPrecision is {1}\nRecall is {2}'.format(accuracy,precision,recall))