<a href="https://colab.research.google.com/github/SharWarr/ML_Projects/blob/main/Ecommerce_Churn_Project/Notebook_Decision_Tree.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Setting the environment variables

In [None]:
import os
import sys
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON_OPTS"]="notebook --no-browser"
os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_161/jre"
os.environ["SPARK_HOME"] = "/home/ec2-user/spark-2.4.4-bin-hadoop2.7"
os.environ["PYLIB"] = os.environ["SPARK_HOME"] + "/python/lib"
sys.path.insert(0, os.environ["PYLIB"] + "/py4j-0.10.7-src.zip")
sys.path.insert(0, os.environ["PYLIB"] + "/pyspark.zip")

In [None]:
# Spark environment
from pyspark import SparkConf
from pyspark.sql import SparkSession

In [None]:
MAX_MEMORY = "14G"

spark = SparkSession \
    .builder \
    .appName("demo") \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

spark

23/02/20 13:25:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Data description

The dataset stores the information of a customer session on the e-commerce platform. It records the activity and the associated parameters with it.

- **event_time**: Date and time when user accesses the platform
- **event_type**: Action performed by the customer
            - View
            - Cart
            - Purchase
            - Remove from cart
- **product_id**: Unique number to identify the product in the event
- **category_id**: Unique number to identify the category of the product
- **category_code**: Stores primary and secondary categories of the product
- **brand**: Brand associated with the product
- **price**: Price of the product
- **user_id**: Unique ID for a customer
- **user_session**: Session ID for a user


### Initialising the SparkSession

The dataset provided is 5 GBs in size. Therefore, it is expected that you increase the driver memory to a greater number. You can refer to notebook 1 for the steps involved here.

In [None]:
# Loading the clean data
df=spark.read.parquet("Cleaned_df_final_parquet.parquet")

                                                                                

In [None]:
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 12, 18, 24 ],inputCol="Hour", outputCol="Hour_binned")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)

from pyspark.sql.types import IntegerType,FloatType
df_buck = df_buck.withColumn("Hour_binned", df_buck["Hour_binned"].cast(IntegerType()))

# Check if only the required columns are present to build the model
# If not, drop the redundant columns
df_buck = df_buck.fillna(value ='no category',subset =['category_2'])
df_buck = df_buck.withColumn("price", df_buck["price"].cast(FloatType()))
df_buck = df_buck.drop("category_code","user_id","product_id","brand","Hour","category_id","user_session")

In [None]:
# cast the click column to interger data type. 

from pyspark.sql.types import IntegerType

df_buck = df_buck.withColumn("y", df_buck["target"].cast(IntegerType()))

In [None]:
from math import floor
from pyspark.sql.functions import rand
from pyspark.sql.functions import col

def stratifiedSample(df, N, labelCol="y"):
    ctx = df.groupby(labelCol).count()
    ctx = ctx.withColumn('frac', col("count") / df.count())
    frac = ctx.select("y", "frac").rdd.collectAsMap()
    pos = int(floor(frac[1] * N))
    neg = int(floor(frac[0] * N))
    posDF = df.filter(col(labelCol) == 1).orderBy(rand()).limit(pos)
    negDF = df.filter(col(labelCol) == 0).orderBy(rand()).limit(neg)
    return posDF.unionAll(negDF)

In [None]:
xdf = stratifiedSample(df_buck, 5_000_000)

                                                                                

In [None]:
# onehot encoding 
#import the onehot encoder
from pyspark.ml.feature import OneHotEncoderEstimator
#create the encoder object
ohe = OneHotEncoderEstimator(inputCols=['Hour_binned'], outputCols=['Hour_binned_posEnc'])
#fit the obejct to the dataframe
oh_encoder = ohe.fit(df_buck)
#tranform the dataframe, by adding the 
encoded = oh_encoder.transform(df_buck)

encoded.show()

                                                                                

+----------+------+-----------+--------------------+---------+------+-----------+---+------------------+
|event_type| price| category_1|          category_2|brand_new|target|Hour_binned|  y|Hour_binned_posEnc|
+----------+------+-----------+--------------------+---------+------+-----------+---+------------------+
|      view|341.74|electronics|         no category|   xiaomi|     0|          1|  0|     (3,[1],[1.0])|
|      view| 36.04|no category|         no category| no brand|     0|          1|  0|     (3,[1],[1.0])|
|      view| 34.11|no category|         no category|   Others|     0|          1|  0|     (3,[1],[1.0])|
|      view| 63.06|no category|         no category|   Others|     0|          2|  0|     (3,[2],[1.0])|
|      view|341.91|no category|         no category| no brand|     0|          2|  0|     (3,[2],[1.0])|
|      view|362.34|no category|         no category| no brand|     0|          2|  0|     (3,[2],[1.0])|
|      view|341.91|no category|         no category| no

In [None]:
# import the string indexer
from pyspark.ml.feature import StringIndexer
#similar to the one hot encoder, create a string indexer object and fit it to the dataframe, use the fitted object to transform the dataset.
si = StringIndexer(inputCol='event_type', outputCol='event_type_ix')
encoded = si.fit(encoded).transform(encoded)
#use the output of the sting indexer as an input to the onehot encoder. 
ohe = OneHotEncoderEstimator(inputCols=['event_type_ix'], outputCols=['event_type_ixEnc'])
oh_encoder = ohe.fit(encoded)
encoded = oh_encoder.transform(encoded)

encoded.select("event_type_ixEnc").show()

                                                                                

+----------------+
|event_type_ixEnc|
+----------------+
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
|   (2,[0],[1.0])|
+----------------+
only showing top 20 rows



In [None]:
#similar to the one hot encoder, create a string indexer object and fit it to the dataframe, use the fitted object to transform the dataset.
si = StringIndexer(inputCol='category_1', outputCol='category_1_ix')
encoded = si.fit(encoded).transform(encoded)
#use the output of the sting indexer as an input to the onehot encoder. 
ohe = OneHotEncoderEstimator(inputCols=['category_1_ix'], outputCols=['category_1_ixEnc'])
oh_encoder = ohe.fit(encoded)
encoded = oh_encoder.transform(encoded)

encoded.select("category_1_ixEnc").show()

                                                                                

+----------------+
|category_1_ixEnc|
+----------------+
|  (13,[0],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[1],[1.0])|
|  (13,[2],[1.0])|
|  (13,[2],[1.0])|
|  (13,[2],[1.0])|
|  (13,[2],[1.0])|
|  (13,[2],[1.0])|
|  (13,[2],[1.0])|
|  (13,[0],[1.0])|
|  (13,[0],[1.0])|
|  (13,[0],[1.0])|
|  (13,[0],[1.0])|
+----------------+
only showing top 20 rows



In [None]:
si = StringIndexer(inputCol='category_2', outputCol='category_2_ix')
encoded = si.fit(encoded).transform(encoded)
#use the output of the sting indexer as an input to the onehot encoder. 
ohe = OneHotEncoderEstimator(inputCols=['category_2_ix'], outputCols=['category_2_ixEnc'])
oh_encoder = ohe.fit(encoded)
encoded = oh_encoder.transform(encoded)

encoded.select("category_2_ixEnc").show()

                                                                                

+----------------+
|category_2_ixEnc|
+----------------+
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[3],[1.0])|
|  (85,[3],[1.0])|
|  (85,[3],[1.0])|
|  (85,[3],[1.0])|
|  (85,[3],[1.0])|
|  (85,[3],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
|  (85,[0],[1.0])|
+----------------+
only showing top 20 rows



In [None]:
si = StringIndexer(inputCol='brand_new', outputCol='brand_new_ix')
encoded = si.fit(encoded).transform(encoded)
#use the output of the sting indexer as an input to the onehot encoder. 
ohe = OneHotEncoderEstimator(inputCols=['brand_new_ix'], outputCols=['brand_new_ixEnc'])
oh_encoder = ohe.fit(encoded)
encoded = oh_encoder.transform(encoded)

encoded.select("brand_new_ixEnc").show()

                                                                                

+---------------+
|brand_new_ixEnc|
+---------------+
| (20,[4],[1.0])|
| (20,[1],[1.0])|
| (20,[0],[1.0])|
| (20,[0],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[0],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[1],[1.0])|
| (20,[5],[1.0])|
| (20,[4],[1.0])|
| (20,[5],[1.0])|
| (20,[5],[1.0])|
+---------------+
only showing top 20 rows



In [None]:
#import the vector assembler 
from pyspark.ml.feature import VectorAssembler

#create the assembler object
assembler = VectorAssembler(inputCols=['Hour_binned_posEnc',
 'event_type_ixEnc',
 'category_1_ixEnc',
 'category_2_ixEnc',
 'brand_new_ixEnc'], outputCol="features")
#transform the data frame using the assembler object. 
encoded = assembler.transform(encoded)
encoded.select("features").show()

+--------------------+
|            features|
+--------------------+
|(123,[1,3,5,18,10...|
|(123,[1,3,6,18,10...|
|(123,[1,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,6,18,10...|
|(123,[2,3,7,21,10...|
|(123,[2,3,7,21,10...|
|(123,[2,3,7,21,10...|
|(123,[2,3,7,21,10...|
|(123,[2,3,7,21,10...|
|(123,[2,3,7,21,10...|
|(123,[2,3,5,18,10...|
|(123,[1,3,5,18,10...|
|(123,[0,3,5,18,10...|
|(123,[0,3,5,18,10...|
+--------------------+
only showing top 20 rows



In [None]:
model_df_encoded = encoded.select("features","target")

In [None]:
training_df , test_df = model_df_encoded.randomSplit([0.7,0.3])

In [None]:
training_df.count()

                                                                                

29692484

<hr>

## Task 3: Model Selection
3 models for classification:	
- Logistic Regression
- Decision Tree
- Random Forest

### Model 2: Decision Trees

In [None]:
# Additional steps for Decision Trees, if any

In [None]:
from pyspark.ml.feature import VectorAssembler

In [None]:
df_buck.columns

['event_type',
 'price',
 'category_1',
 'category_2',
 'brand_new',
 'target',
 'Hour_binned']

#### Feature Transformation (Code will be same; check for the columns)

In [None]:
from pyspark.ml.feature import StringIndexer
# Feature transformation for categorical features
indexer = StringIndexer(inputCol="event_type", outputCol="event_type_cat")
indexed = indexer.fit(df_buck).transform(df_buck)
# Feature transformation for categorical features
indexer = StringIndexer(inputCol="category_1", outputCol="category_1_cat")
indexed = indexer.fit(indexed).transform(indexed)
# Feature transformation for categorical features
indexer = StringIndexer(inputCol="category_2", outputCol="category_2_cat")
indexed = indexer.fit(indexed).transform(indexed)
# Feature transformation for categorical features
indexer = StringIndexer(inputCol="brand_new", outputCol="brand_new_cat")
indexed = indexer.fit(indexed).transform(indexed)

                                                                                

In [None]:
indexed.columns

['event_type',
 'price',
 'category_1',
 'category_2',
 'brand_new',
 'target',
 'Hour_binned',
 'event_type_cat',
 'category_1_cat',
 'category_2_cat',
 'brand_new_cat']

In [None]:
#Creating Vector Assembler to combine all the raw features
# Vector assembler to combine all the features
assembler = VectorAssembler(inputCols=[
 'price',
 'Hour_binned',
 'event_type_cat',
 'category_1_cat',
 'brand_new_cat'], outputCol="features")

In [None]:
output = assembler.transform(indexed)

In [None]:
output.show()

+----------+------+-----------+--------------------+---------+------+-----------+--------------+--------------+--------------+-------------+--------------------+
|event_type| price| category_1|          category_2|brand_new|target|Hour_binned|event_type_cat|category_1_cat|category_2_cat|brand_new_cat|            features|
+----------+------+-----------+--------------------+---------+------+-----------+--------------+--------------+--------------+-------------+--------------------+
|      view|341.74|electronics|         no category|   xiaomi|     0|          1|           0.0|           0.0|           0.0|          4.0|[341.739990234375...|
|      view| 36.04|no category|         no category| no brand|     0|          1|           0.0|           1.0|           0.0|          1.0|[36.0400009155273...|
|      view| 34.11|no category|         no category|   Others|     0|          1|           0.0|           1.0|           0.0|          0.0|[34.1100006103515...|
|      view| 63.06|no catego

In [None]:
# Check if only the required columns are present to build the model
# If not, drop the redundant columns
output.select("features","target").show()

+--------------------+------+
|            features|target|
+--------------------+------+
|[341.739990234375...|     0|
|[36.0400009155273...|     0|
|[34.1100006103515...|     0|
|[63.0600013732910...|     0|
|[341.910003662109...|     0|
|[362.339996337890...|     0|
|[341.910003662109...|     0|
|[392.380004882812...|     0|
|[339.279998779296...|     0|
|[448.839996337890...|     0|
|[283.119995117187...|     0|
|[225.229995727539...|     0|
|[283.119995117187...|     0|
|[225.229995727539...|     0|
|[228.470001220703...|     0|
|[283.119995117187...|     0|
|[952.030029296875...|     0|
|[196.910003662109...|     0|
|(5,[0,4],[153.979...|     0|
|(5,[0,4],[166.539...|     0|
+--------------------+------+
only showing top 20 rows



In [None]:
#model_df = output.select("features","target")

In [None]:
# Splitting the data into train and test (Remember you are expected to compare the model later)
#training_df, test_df = model_df.randomSplit([0.7,0.3])

In [None]:
# Number of rows in train and test data
training_df.count()

                                                                                

29693021

In [None]:
test_df.count()

                                                                                

12725523

In [None]:
# Pipeline for the tasks


In [None]:
# Transforming the dataframe df


In [None]:
# Schema of the transformed df


In [None]:
# Checking the elements of the transformed df - Top 20 rows


In [None]:
# Storing the transformed df in S3 bucket to prevent repetition of steps again


#### Model Fitting

In [None]:
# Building the model with hyperparameter tuning
# Create ParamGrid for Cross Validation
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
df_classifier = DecisionTreeClassifier(labelCol="target").fit(training_df)

[Stage 27:>                                                         (0 + 4) / 7]23/02/20 13:44:49 WARN MemoryStore: Not enough space to cache rdd_109_0 in memory! (computed 562.9 MB so far)
23/02/20 13:44:49 WARN BlockManager: Persisting block rdd_109_0 to disk instead.
23/02/20 13:44:49 WARN MemoryStore: Not enough space to cache rdd_109_3 in memory! (computed 562.9 MB so far)
23/02/20 13:44:49 WARN BlockManager: Persisting block rdd_109_3 to disk instead.
23/02/20 13:44:49 WARN MemoryStore: Not enough space to cache rdd_109_2 in memory! (computed 562.9 MB so far)
23/02/20 13:44:49 WARN BlockManager: Persisting block rdd_109_2 to disk instead.
23/02/20 13:44:50 WARN MemoryStore: Not enough space to cache rdd_109_1 in memory! (computed 844.3 MB so far)
23/02/20 13:44:50 WARN BlockManager: Persisting block rdd_109_1 to disk instead.
[Stage 27:>                                                         (0 + 4) / 7]23/02/20 13:45:24 WARN MemoryStore: Not enough space to cache rdd_109_2 in m

In [None]:
df_predictions = df_classifier.transform(test_df)

In [None]:
df_predictions.show()

[Stage 37:>                                                         (0 + 1) / 1]

+--------------------+------+--------------------+--------------------+----------+
|            features|target|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(123,[0,3,5,18,10...|     0|[1.9622281E7,8921...|[0.68743431027226...|       0.0|
|(12

                                                                                

In [None]:
# Run cross-validation steps


In [None]:
# Fitting the models on transformed df


In [None]:
# Best model from the results of cross-validation


#### Model Analysis

Required Steps:
- Fit on test data
- Performance analysis
    - Appropriate Metric with reasoning

In [None]:
df_accuracy = MulticlassClassificationEvaluator(labelCol="target",metricName="accuracy").evaluate(df_predictions)

                                                                                

In [None]:
df_accuracy

0.694056762265776

In [None]:
df_precision = MulticlassClassificationEvaluator(labelCol="target",metricName="weightedPrecision").evaluate(df_predictions)

                                                                                

In [None]:
df_precision

0.7441164064547349

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Assuming you have a decision tree model called "dt" and a test dataset called "testData"
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="target", metricName="weightedRecall")
recall = evaluator.evaluate(df_predictions)



                                                                                

In [None]:
recall

0.694056762265776

#### Summary of the best Decision Tree model

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler





# Create the Random Forest model
rf = RandomForestClassifier(labelCol="target", featuresCol="features")

rfparamGrid = (ParamGridBuilder()

               .addGrid(rf.maxDepth, [2, 5, 10])

               .addGrid(rf.maxBins, [5, 10, 20])

               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

rfevaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")

# Create 5-fold CrossValidator
rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = rfevaluator,
                      numFolds = 5)

rfcvModel = rfcv.fit(training_df)
print(rfcvModel)
rfpredictions = rfcvModel.transform(testing_df)

print('Accuracy:', rfevaluator.evaluate(rfpredictions))
print('AUC:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderROC)
print('PR:', BinaryClassificationMetrics(rfpredictions['label','prediction'].rdd).areaUnderPR)


ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:41719)
Traceback (most recent call last):
  File "/home/ec2-user/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/ec2-user/spark-2.4.4-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:41719)