<a href="https://colab.research.google.com/github/Kalaiselvan88/MLAssignments/blob/main/ML2Assignment1_Notebook3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Setting the environment variables

In [None]:
import os
import sys
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

# Ecommerce Churn Assignment

The aim of the assignment is to build a model that predicts whether a person purchases an item after it has been added to the cart or not. Being a classification problem, you are expected to use your understanding of all the three models covered till now. You must select the most robust model and provide a solution that predicts the churn in the most suitable manner. 

For this assignment, you are provided the data associated with an e-commerce company for the month of October 2019. Your task is to first analyse the data, and then perform multiple steps towards the model building process.

The broad tasks are:
- Data Exploration
- Feature Engineering
- Model Selection
- Model Inference

### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [None]:
# initialising the session with 14 GB driver memory
from pyspark import SparkConf
from pyspark.sql import SparkSession

MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("decision trees") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark.catalog.clearCache()
spark

In [None]:
# Loading the clean data
df = spark.read.parquet('cleaned_df.parquet')
df.count()

28650604

<hr>

## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 2: Decision Trees

In [None]:
# Additional steps for Decision Trees, if any

In [None]:
df.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- cat_l1: string (nullable = true)
 |-- cat_l2: string (nullable = true)
 |-- hour_bucket: double (nullable = true)
 |-- brand_red: string (nullable = true)
 |-- is_purchased: integer (nullable = true)



#### Feature Transformation (Code will be same; check for the columns)

In [None]:
# Check if only the required columns are present to build the model
# If not, drop the redundant columns

# Here we are dropping category_id since we have the category details in cat_l1 and cat_l2
# We are also dropping user_session since it has many unique values which might take a lot of time
# to do OneHotEncoder and moreover it is not very much helpful for Churn prediction and has already been used for EDA
df_dec_tree = df.drop('category_id', 'user_session')
df_dec_tree = df.withColumnRenamed('is_purchased', 'label')
df_dec_tree = df_dec_tree.dropDuplicates()
df_dec_tree.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- cat_l1: string (nullable = true)
 |-- cat_l2: string (nullable = true)
 |-- hour_bucket: double (nullable = true)
 |-- brand_red: string (nullable = true)
 |-- label: integer (nullable = true)



In [None]:
# Feature transformation for categorical features
#import the string indexer
from pyspark.ml.feature import StringIndexer
#import the onehot encoder
from pyspark.ml.feature import OneHotEncoderEstimator


si1 = StringIndexer(inputCol= 'cat_l1', outputCol='cat_l1_ix')
si2 = StringIndexer(inputCol= 'cat_l2', outputCol='cat_l2_ix')
si3 = StringIndexer(inputCol= 'brand_red', outputCol='brand_red_ix')

cat_indx = ['cat_l1_ix','cat_l2_ix','brand_red_ix','day_of_week','hour_bucket']

ohe = OneHotEncoderEstimator(inputCols=cat_indx,
                             outputCols=['cat_l1_en','cat_l2_en','brand_red_en','day_of_week_en','hour_bucket_en'])

In [None]:
# Vector assembler to combine all the features
#import the vector assembler 
from pyspark.ml.feature import VectorAssembler
cols = ['price','cat_l1_en','cat_l2_en','brand_red_en','day_of_week_en','hour_bucket_en']
assembler = VectorAssembler(inputCols=cols,
                            outputCol="features")

In [None]:
# Pipeline for the tasks
# import pipline API
from pyspark.ml import Pipeline

In [None]:
# Transforming the dataframe df
#create the pipeline object
pipeline = Pipeline(stages=[si1, si2, si3, ohe, assembler])

#use the object to transform the dataframe 
df_dec_encoded = pipeline.fit(df_dec_tree).transform(df_dec_tree)
df_dec_encoded.select("features").show()

+--------------------+
|            features|
+--------------------+
|(101,[0,8,24,71,9...|
|(101,[0,10,33,71,...|
|(101,[0,1,15,75,9...|
|(101,[0,5,18,71,9...|
|(101,[0,1,15,73,9...|
|(101,[0,1,19,75,9...|
|(101,[0,2,14,71,9...|
|(101,[0,3,16,71,9...|
|(101,[0,1,15,73,9...|
|(101,[0,1,19,71,9...|
|(101,[0,2,14,71,1...|
|(101,[0,6,25,71,9...|
|(101,[0,1,15,75,9...|
|(101,[0,4,28,78],...|
|(101,[0,3,21,71,9...|
|(101,[0,6,26,71,9...|
|(101,[0,1,22,72,9...|
|(101,[0,3,16,89,9...|
|(101,[0,5,18,71,9...|
|(101,[0,3,35,71,9...|
+--------------------+
only showing top 20 rows



In [None]:
# Schema of the transformed df
df_dec_encoded.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- cat_l1: string (nullable = true)
 |-- cat_l2: string (nullable = true)
 |-- hour_bucket: double (nullable = true)
 |-- brand_red: string (nullable = true)
 |-- label: integer (nullable = true)
 |-- cat_l1_ix: double (nullable = false)
 |-- cat_l2_ix: double (nullable = false)
 |-- brand_red_ix: double (nullable = false)
 |-- cat_l1_en: vector (nullable = true)
 |-- day_of_week_en: vector (nullable = true)
 |-- hour_bucket_en: vector (nullable = true)
 |-- brand_red_en: vector (nullable = true)
 |-- cat_l2_en: vector (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
# Checking the elements of the transformed df - Top 20 rows
df_dec_encoded.show()

+----------+-------------------+-------+---------+--------------------+-----------+------------+-----------+-----------+---------+-----+---------+---------+------------+--------------+--------------+--------------+---------------+---------------+--------------------+
|product_id|        category_id|  price|  user_id|        user_session|day_of_week|      cat_l1|     cat_l2|hour_bucket|brand_red|label|cat_l1_ix|cat_l2_ix|brand_red_ix|     cat_l1_en|day_of_week_en|hour_bucket_en|   brand_red_en|      cat_l2_en|            features|
+----------+-------------------+-------+---------+--------------------+-----------+------------+-----------+-----------+---------+-----+---------+---------+------------+--------------+--------------+--------------+---------------+---------------+--------------------+
|  27700139|2053013560086233771|  35.39|539259746|25c30a37-eebd-464...|          2|construction|      tools|        1.0|   others|    0|      7.0|     10.0|         0.0|(13,[7],[1.0])| (7,[2],[1.0

#### Train-test split

In [None]:
# Splitting the data into train and test (Remember you are expected to compare the model later)
df_dec_train, df_dec_test = df_dec_encoded.randomSplit([0.7,0.3])

In [None]:
# Number of rows in train and test data
df_dec_train.count()

20057768

In [None]:
df_dec_test.count()

8592836

#### Let us cache both train and test data since it is huge

In [None]:
df_dec_train.cache()

DataFrame[product_id: int, category_id: bigint, price: double, user_id: int, user_session: string, day_of_week: int, cat_l1: string, cat_l2: string, hour_bucket: double, brand_red: string, label: int, cat_l1_ix: double, cat_l2_ix: double, brand_red_ix: double, cat_l1_en: vector, day_of_week_en: vector, hour_bucket_en: vector, brand_red_en: vector, cat_l2_en: vector, features: vector]

In [None]:
df_dec_test.cache()

DataFrame[product_id: int, category_id: bigint, price: double, user_id: int, user_session: string, day_of_week: int, cat_l1: string, cat_l2: string, hour_bucket: double, brand_red: string, label: int, cat_l1_ix: double, cat_l2_ix: double, brand_red_ix: double, cat_l1_en: vector, day_of_week_en: vector, hour_bucket_en: vector, brand_red_en: vector, cat_l2_en: vector, features: vector]

#### Model Fitting

In [None]:
# Building the model with hyperparameter tuning
# Create ParamGrid for Cross Validation

#Import the libraries required:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create initial Decision Tree Model
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=2)

# Create ParamGrid for Cross Validation
dtparamGrid = (ParamGridBuilder()
               .addGrid(dt.maxDepth, [2, 10, 30])
               .addGrid(dt.maxBins, [10, 30, 50])
               .addGrid(dt.impurity, ['gini','entropy'])
               .build())

In [None]:
# Run cross-validation steps
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
dtevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = dtevaluator,
                      numFolds = 3)

In [None]:
# Fitting the models on transformed df

# Run cross validations
dtcvModel = dtcv.fit(df_dec_train)

In [None]:
# Best model from the results of cross-validation
dtcvModel.bestModel

DecisionTreeClassificationModel (uid=DecisionTreeClassifier_579f9bea40ae) of depth 30 with 401 nodes

#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning

In [None]:
# Use test set here so we can measure the accuracy of our model on new data
dtpredictions = dtcvModel.transform(df_dec_test)

In [None]:
# dtcvModel uses the best model found from the Cross Validation
# Evaluate best model
print('Accuracy:', dtevaluator.evaluate(dtpredictions))

Accuracy: 0.5565098111611423


In [None]:
dtevaluator.evaluate(dtpredictions, {dtevaluator.metricName: "areaUnderROC"})

0.5565098111611424

In [None]:
dtevaluator.evaluate(dtpredictions, {dtevaluator.metricName: "areaUnderPR"})

0.03185758626995479

#### Summary of the best Decision Tree model

#### We can see that the best decision tree model out of 3 * 3 * 2 = 18 combination of hyperparameter and with 3 fold is the one with 30 as maxDepth.

#### Since it took 6 hours for above to arrive I have not increased maxDepth further. Maybe if we increase that we will get a better ROC value than Logistic Regression.