In [None]:
!pip install pyspark
!pip install pyspark[pandas_on_spark] plotly
!pip install -U pandas
!pip install dataprep
!pip install sweetviz

In [2]:
import gc
import pyspark
import pandas as pd
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, isnan, when, count, sum, lit, countDistinct
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
import sweetviz as sv

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
from pyspark.ml.classification import LinearSVC


%matplotlib inline

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
def undersampling (df, ratio):
  sample_df = df.sample(False, 1/ratio)
  return sample_df

In [5]:
spark=SparkSession.builder.appName('Market').getOrCreate()
df_oct=spark.read.option('header','true').csv('/content/drive/MyDrive/BigData Dataset/2019-Oct.csv')
df_nov=spark.read.option('header','true').csv('/content/drive/MyDrive/BigData Dataset/2019-Nov.csv')

In [None]:
spark

In [None]:
df_oct.show()

+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+--------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 00:00:...|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 00:00:...|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|  33.20|554748717|9333dfbd-b87a-470...|
|2019-10-01 00:00:...|      view|  17200506|2053013559792632471|furniture.living_...|    null| 543.10|519107250|566511c2-e2e3-422...|
|2019-10-01 00:00:...|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 00:00:...|      view|   1004237|205301355563188265

In [None]:
df_oct.count()

42448764

In [None]:
df_oct.select("user_id").distinct().count()

3022290

In [None]:
df_oct.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_oct.columns]).show()

+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|  brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+
|         0|         0|         0|          0|     13515609|6117080|    0|      0|           2|
+----------+----------+----------+-----------+-------------+-------+-----+-------+------------+



### **Data prep**

In [6]:
df_oct = df_oct.withColumn("price", df_oct["price"].cast("float"))
df = df_oct.filter(col("event_type").isin("view"))
df_viewList = df.groupBy("user_id","event_type").agg(sum('price'), count("product_id"))
viewList = df_viewList.withColumn('Average', col("sum(price)")/col("count(product_id)"))

df_BuyList = df_oct.filter(col("event_type").isin("purchase", "view"))
df_BuyList2 = df_BuyList.groupBy("user_id","event_type").sum()
BuyList = df_BuyList2.groupBy("user_id").agg(count("event_type"))

df = viewList.join(BuyList,viewList.user_id ==  BuyList.user_id,"left").drop(BuyList.user_id)

df.dtypes

[('event_type', 'string'),
 ('sum(price)', 'double'),
 ('count(product_id)', 'bigint'),
 ('Average', 'double'),
 ('user_id', 'string'),
 ('count(event_type)', 'bigint')]

In [7]:
df_view = df.filter(col("count(event_type)") == 1)
df_purchase = df.filter(col("count(event_type)") > 1)

df_view = df_view.withColumn('Buy', lit(0))
df_purchase = df_purchase.withColumn('Buy', lit(1))

### **Balancing Data**

In [9]:
df_view.count()

2675074


In [10]:
df_purchase.count()

347056


In [11]:
ratio = 2675074/347056
df_view = undersampling(df_view, ratio)
df = df_view.unionAll(df_purchase)

In [13]:
numcol = ["sum(price)", "count(product_id)", "Average"]
assembler = VectorAssembler(inputCols=numcol, outputCol="features")
df = assembler.transform(df)

train, test = df.randomSplit([0.7, 0.3], seed = 2000)

In [14]:
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 485859
Test Dataset Count: 208741


### **RF Model**

In [16]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = "Buy")
rfModel = rf.fit(train)
predictions = rfModel.transform(test)

In [17]:
evaluator = MulticlassClassificationEvaluator(labelCol="Buy", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.7019562821613515
Test Error = 0.29804371783864847


In [18]:
preds_and_labels = predictions.select(['prediction','Buy']).withColumn('Buy', F.col('Buy').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','Buy'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[69590. 34975.]
 [27156. 77020.]]


### **Lsvc Model**

In [None]:
lsvc = LinearSVC(featuresCol = 'features', labelCol = 'Buy')
lsvcModel = lsvc.fit(train)
predictions = lsvcModel.transform(test)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="Buy", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.6425355666794379
Test Error = 0.3574644333205621


In [None]:
preds_and_labels = predictions.select(['prediction','Buy']).withColumn('Buy', F.col('Buy').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','Buy'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[91856. 12715.]
 [58312. 45864.]]


### **Add category count**

In [None]:
df_oct = df_oct.withColumn("price", df_oct["price"].cast("float"))
df = df_oct.filter(col("event_type").isin("view"))
df_viewList = df.groupBy("user_id","event_type").agg(sum('price'), count("product_id"))
viewList = df_viewList.withColumn('Average', col("sum(price)")/col("count(product_id)"))

df_BuyList = df_oct.filter(col("event_type").isin("purchase", "view"))
df_BuyList2 = df_BuyList.groupBy("user_id","event_type").sum()
BuyList = df_BuyList2.groupBy("user_id").agg(count("event_type"))

df = df_oct.filter(col("event_type").isin("view"))
categoryList = df.groupBy("user_id").agg(countDistinct("category_id"))

df = viewList.join(BuyList,viewList.user_id ==  BuyList.user_id,"left").drop(BuyList.user_id)
df = df.join(categoryList,df.user_id ==  categoryList.user_id,"left").drop(categoryList.user_id)

df.dtypes

[('event_type', 'string'),
 ('sum(price)', 'double'),
 ('count(product_id)', 'bigint'),
 ('Average', 'double'),
 ('user_id', 'string'),
 ('count(event_type)', 'bigint'),
 ('user_id', 'string'),
 ('count(category_id)', 'bigint')]

In [49]:
df_view = df.filter(col("count(event_type)") == 1)
df_purchase = df.filter(col("count(event_type)") > 1)
df_view = df_view.withColumn('Buy', lit(0))
df_purchase = df_purchase.withColumn('Buy', lit(1))

In [50]:
ratio = 2675074/347056
df_view = undersampling(df_view, ratio)
df = df_view.unionAll(df_purchase)

In [None]:
numcol = ["sum(price)", "count(product_id)", "Average", "count(category_id)"]
assembler = VectorAssembler(inputCols=numcol, outputCol="features")
df = assembler.transform(df)

train, test = df.randomSplit([0.7, 0.3], seed = 2000)

### **RF Model**

In [None]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = "Buy")
rfModel = rf.fit(train)
predictions = rfModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Buy", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

preds_and_labels = predictions.select(['prediction','Buy']).withColumn('Buy', F.col('Buy').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','Buy'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

Accuracy = 0.705438477087805
Test Error = 0.29456152291219495




[[70352. 34092.]
 [27341. 76987.]]


### **Lsvc Model**

In [24]:
lsvc = LinearSVC(featuresCol = 'features', labelCol = 'Buy')
lsvcModel = lsvc.fit(train)
predictions = lsvcModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Buy", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

preds_and_labels = predictions.select(['prediction','Buy']).withColumn('Buy', F.col('Buy').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','Buy'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

Accuracy = 0.6398578824544024
Test Error = 0.36014211754559755




[[91419. 12555.]
 [58728. 45600.]]


## **Add more feature**

In [63]:
df_oct = df_oct.withColumn("price", df_oct["price"].cast("float"))
df = df_oct.filter(col("event_type").isin("view"))
df_viewList = df.groupBy("user_id","event_type").agg(sum('price'), count("product_id"))
viewList = df_viewList.withColumn('Average', col("sum(price)")/col("count(product_id)"))

df_BuyList = df_oct.filter(col("event_type").isin("purchase", "view"))
df_BuyList2 = df_BuyList.groupBy("user_id","event_type").sum()
BuyList = df_BuyList2.groupBy("user_id").agg(count("event_type"))

df = df_oct.filter(col("event_type").isin("view"))
df_categoryList = df.groupBy("user_id").agg(countDistinct("category_id"))
categoryList = df_categoryList.withColumnRenamed("user_id","cat_user_id")

df = df_oct.filter(col("event_type").isin("cart"))
df_cartList = df.groupBy("user_id").agg(sum('price'), count("product_id"))
cartList = df_cartList.withColumn('Average', col("sum(price)")/col("count(product_id)"))

cartList = cartList.withColumnRenamed("user_id","cart_user_id")
cartList = cartList.withColumnRenamed("sum(price)","cart_sum")
cartList = cartList.withColumnRenamed("count(product_id)","cart_count")
cartList = cartList.withColumnRenamed("Average","cart_average")

df = viewList
df = df.join(BuyList,df.user_id ==  BuyList.user_id,"left").drop(BuyList.user_id)
df = df.join(categoryList,df.user_id ==  categoryList.cat_user_id,"left").drop(categoryList.cat_user_id)
df = df.join(cartList,df.user_id == cartList.cart_user_id,"left").drop(cartList.cart_user_id)

df.dtypes

[('event_type', 'string'),
 ('sum(price)', 'double'),
 ('count(product_id)', 'bigint'),
 ('Average', 'double'),
 ('user_id', 'string'),
 ('count(event_type)', 'bigint'),
 ('count(category_id)', 'bigint'),
 ('cart_sum', 'double'),
 ('cart_count', 'bigint'),
 ('cart_average', 'double')]

In [None]:
df_view = df.filter(col("count(event_type)") == 1)
df_purchase = df.filter(col("count(event_type)") > 1)
df_view = df_view.withColumn('Buy', lit(0))
df_purchase = df_purchase.withColumn('Buy', lit(1))

In [None]:
ratio = 2675074/347056
df_view = undersampling(df_view, ratio)
df = df_view.unionAll(df_purchase)

In [None]:
numcol = ["sum(price)", "count(product_id)", "Average", "count(category_id)", "cart_sum", "cart_count", "cart_average"]
assembler = VectorAssembler(inputCols=numcol, outputCol="features")
df = assembler.transform(df)

train, test = df.randomSplit([0.7, 0.3], seed = 2000)

In [54]:
rf = RandomForestClassifier(featuresCol = 'features', labelCol = "Buy")
rfModel = rf.fit(train)
predictions = rfModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Buy", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

preds_and_labels = predictions.select(['prediction','Buy']).withColumn('Buy', F.col('Buy').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','Buy'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

Accuracy = 0.7044034593691963
Test Error = 0.29559654063080365




[[69118. 34587.]
 [26899. 77720.]]


In [55]:
lsvc = LinearSVC(featuresCol = 'features', labelCol = 'Buy')
lsvcModel = lsvc.fit(train)
predictions = lsvcModel.transform(test)

evaluator = MulticlassClassificationEvaluator(labelCol="Buy", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

preds_and_labels = predictions.select(['prediction','Buy']).withColumn('Buy', F.col('Buy').cast(FloatType())).orderBy('prediction')
preds_and_labels = preds_and_labels.select(['prediction','Buy'])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

Accuracy = 0.6429876748188368
Test Error = 0.3570123251811632




[[91161. 12544.]
 [58315. 46304.]]
