In [1]:
import pyspark
#sc = pyspark.SparkContext(appName = "MyAPP")
from pyspark.sql.functions import *
from pyspark.sql.session import SparkSession
#spark = SparkSession(sc)
import sys
from pyspark.sql import SQLContext, SparkSession
from pyspark import SparkContext, SparkConf
#from pyspark.sql.functions import udf,col

import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from pyspark.sql import SQLContext
from pyspark.mllib.stat import Statistics
import pandas as pd
from pyspark.sql.functions import udf
from pyspark.ml.feature import  StringIndexer, VectorAssembler,StandardScaler
from pyspark.ml import Pipeline
from sklearn.metrics import confusion_matrix

In [2]:
# sparkConf = SparkConf().setMaster("local").setAppName("MongoSparkConnectorTour").set("spark.app.id", "MongoSparkConnectorTour")
# sqlContext = SQLContext(sc)
# # create and load dataframe from MongoDB URI
# df1 = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")\
#                     .option("spark.mongodb.input.uri", "mongodb+srv://admin:12345678Pc@cluster0.k5xvg.mongodb.net/2020MSBD5003GP11.BehaviorData?retryWrites=true&w=majority")\
#                     .load()

# # print data frame schema
# df1.printSchema()
# df = df1.limit(5000)
df = spark.read.csv('../data/2019-Oct.csv', header=True, inferSchema=True).limit(50000)

In [3]:
original_columns = df.columns

In [4]:
#Need Some Plot to show large Occurence of these types of data
#drop rows with null value on category_code and brand
df = df.where("category_code is not null and brand is not null")

In [5]:
#event_type count during each user_session 
df_activity = df.groupBy('user_session').count()

In [6]:
#Need Some Plot to show large Occurence of these types of data
#select rows of event_type = 'cart' or event_type = 'purchase'
#drop duplicates rows based on ['event_type', 'product_id','price', 'user_id','user_session']
df = df.select('*').where("event_type = 'cart' or event_type = 'purchase'").dropDuplicates(subset = ['event_type', 'product_id','price', 'user_id','user_session'])

In [7]:
#add new column is_purchased if event_type = purchase then 1 else 0
df = df.withColumn('is_purchased', when(col('event_type') == 'purchase', 1).otherwise(0))

In [8]:
#chop the dataset into 2 sets by event_type
df_cart = df.select('*').where("event_type = 'cart'")
df_purchase = df.select('*').where("event_type = 'purchase'")

In [9]:
#left join rows of cart with rows of purchase, update the is_purchased of cart to is_purchased of corresponding purchase with key ['user_session', 'product_id']
#only consider goods put into cart and be purchased afterward
import pyspark.sql.functions as f

df_cart_purchase=df_cart.alias('a').join(
    df_purchase.alias('b'), ['user_session', 'product_id'], how='left'
).select('user_session', 'product_id', 'a.event_time', 'a.event_type', 'a.product_id', 'a.category_id', 'a.category_code', 'a.brand', 'a.price', 'a.user_id',
    f.coalesce('b.is_purchased', 'a.is_purchased').alias('is_purchased')
)

In [10]:
#join the activity_count
df_cart_purchase=df_cart_purchase.join(df_activity, 'user_session') \
  .select('*') \
  .withColumnRenamed('count', 'activity_count')

In [11]:
#split the category_code by .
df_cart_purchase = df_cart_purchase.select('*', split('category_code',"\\.")[0], split('category_code',"\\.")[1], split('category_code',"\\.")[2])\
  .withColumnRenamed('split(category_code, \., -1)[0]', 'category_code_level1') \
  .withColumnRenamed('split(category_code, \., -1)[1]', 'category_code_level2') \
  .withColumnRenamed('split(category_code, \., -1)[2]', 'category_code_level4')

In [12]:
#add new field week_day and category_code_level3 to category_code_level2 if category_code_level4 is null
df_cart_purchase = df_cart_purchase.select('*', dayofweek(col('event_time')).alias('week_day'), f.coalesce('category_code_level2', 'category_code_level4').alias('category_code_level3'))

In [13]:
#Index category / String features
indexer = StringIndexer(inputCols=['brand', 'category_code_level1', 'category_code_level2','category_code_level3'], outputCols=['brandIndex', 'category_code_level1_index', 'category_code_level2_index','category_code_level3_index'])

In [14]:
#Vector Assembler - all numeric features
assembler = VectorAssembler()\
         .setInputCols (["product_id","category_id","brandIndex",
                         "price","week_day","user_id",\
                         "category_code_level1_index","category_code_level2_index","category_code_level3_index", "activity_count"])\
         .setOutputCol ("vectorized_features")

In [15]:
#LabelIndex - the label is_purchased
label_indexer = StringIndexer()\
         .setInputCol ("is_purchased")\
         .setOutputCol ("label")

In [17]:
#Standardize numerical features 
scaler = StandardScaler()\
         .setInputCol ("vectorized_features")\
         .setOutputCol ("features")

In [18]:
#Run Pipeline of transformers
pipeline_stages=Pipeline()\
                .setStages([indexer,assembler,label_indexer,scaler])
pipeline_model=pipeline_stages.fit(df_cart_purchase)
pipeline_df=pipeline_model.transform(df_cart_purchase)

In [19]:
#Select Data for Modeling
selectedCols = original_columns + ['features', 'label'] 
pipeline_df = pipeline_df.select(selectedCols)
pipeline_df.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [20]:
#Split train and Test data
train, test = pipeline_df.randomSplit([0.8, 0.2], seed = 7)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 294
Test Dataset Count: 78


In [21]:
from pyspark.ml.classification import LogisticRegression

In [22]:
#Fit Logistic Regression Model and see the data
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=5)
lrModel = lr.fit(train)
predictions = lrModel.transform(test)

In [None]:
#predictions.select('label', 'features',  'rawPrediction', 'prediction', 'probability').toPandas().head(5)

In [23]:
#Accuracy of the Raw Model
accuracy = predictions.filter(predictions.label == predictions.prediction).count() / float(predictions.count())
print("Accuracy : ",accuracy)

Accuracy :  0.6410256410256411


In [None]:
#print(lr.explainParams())

In [24]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [25]:
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# With 2 values for lr.regParam, 3 values of maxIter
# this grid will have 3 x 2  = 6 parameter settings for CrossValidator to choose from.

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.maxIter, [1, 5, 10]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=5)  

# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(train)

predictions_cv = cvModel.transform(test)

accuracy_cv = predictions_cv.filter(predictions_cv.label == predictions_cv.prediction).count() / float(predictions_cv.count())
print("Accuracy : ",accuracy_cv)

Accuracy :  0.6410256410256411
