# Random Forest Classifier in PySpark - Lab

## Introduction  

In this lab, you will build a Random Forest Classifier model to study the ecommerce behavior of consumers from a multi-category store. First, you will need to download the data to your local machine, then you will load the data from the local machine onto a Pandas Dataframe.

## Objectives  

* Use the kaggle eCommerce dataset in PySpark
* Build and train a random forest classifier in PySpark

## Instruction
* Accept the Kaggle policy and download the data from [Kaggle](https://www.kaggle.com/code/tshephisho/ecommerce-behaviour-using-xgboost/data)
* For the first model you will only use the 2019-Nov csv data (which is still around ~2gb zipped)
* You will run this notebook in a new `pyspark-env` environment following [these setup instructions without docker](https://github.com/learn-co-curriculum/dsc-spark-docker-installation)

In [2]:
# install pandas
!pip install pandas 

Collecting pandas
  Obtaining dependency information for pandas from https://files.pythonhosted.org/packages/c3/6c/ea362eef61f05553aaf1a24b3e96b2d0603f5dc71a3bd35688a24ed88843/pandas-2.0.3-cp38-cp38-win_amd64.whl.metadata
  Downloading pandas-2.0.3-cp38-cp38-win_amd64.whl.metadata (18 kB)
Collecting tzdata>=2022.1 (from pandas)
  Using cached tzdata-2023.3-py2.py3-none-any.whl (341 kB)
Downloading pandas-2.0.3-cp38-cp38-win_amd64.whl (10.8 MB)
   ---------------------------------------- 0.0/10.8 MB ? eta -:--:--
   - -------------------------------------- 0.3/10.8 MB 5.9 MB/s eta 0:00:02
   --- ------------------------------------ 0.9/10.8 MB 9.8 MB/s eta 0:00:02
   ------- -------------------------------- 2.1/10.8 MB 14.6 MB/s eta 0:00:01
   ----------- ---------------------------- 3.0/10.8 MB 16.0 MB/s eta 0:00:01
   --------------- ------------------------ 4.3/10.8 MB 18.3 MB/s eta 0:00:01
   ------------------- -------------------- 5.2/10.8 MB 18.6 MB/s eta 0:00:01
   -------------

In [3]:
# import necessary libraries
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as dates
from datetime import datetime

In [4]:
from pyspark.sql import SparkSession  # entry point for pyspark

# instantiate spark instance
spark = (
    SparkSession.builder.appName("Random Forest eCommerce")
    .config("spark.executor.memory", "4g")
    .config("spark.driver.memory", "4g")
    .master("local[*]")
    .getOrCreate()
)

In [5]:
path = "2019-Nov.csv"  # wherever path you saved the kaggle file to
df = spark.read.csv(path, header=True, inferSchema=True)
df.printSchema()  # to see the schema

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



If you want to use Pandas to explore the dataset instead of Pyspark, you have to use the `action` functions, which then means there will be a network shuffle. For smaller dataset such as the Iris dataset which is about ~1KB this is no problem. The current dataset may be too large, and may throw an `OutOfMemory` error if you attempt to load the data into a Pandas dataframe. You should only take a few rows for exploratory analysis if you are more comfortable with Pandas. Otherwise, stick with native PySpark functions. 

In [6]:
pd.DataFrame(df.take(10), columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9
event_time,2019-11-01 00:00:00 UTC,2019-11-01 00:00:00 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:01 UTC,2019-11-01 00:00:02 UTC,2019-11-01 00:00:02 UTC,2019-11-01 00:00:02 UTC
event_type,view,view,view,view,view,view,view,view,view,view
product_id,1003461,5000088,17302664,3601530,1004775,1306894,1306421,15900065,12708937,1004258
category_id,2053013555631882655,2053013566100866035,2053013553853497655,2053013563810775923,2053013555631882655,2053013558920217191,2053013558920217191,2053013558190408249,2053013553559896355,2053013555631882655
category_code,electronics.smartphone,appliances.sewing_machine,,appliances.kitchen.washer,electronics.smartphone,computers.notebook,computers.notebook,,,electronics.smartphone
brand,xiaomi,janome,creed,lg,xiaomi,hp,hp,rondell,michelin,apple
price,489.07,293.65,28.31,712.87,183.27,360.09,514.56,30.86,72.72,732.07
user_id,520088904,530496790,561587266,518085591,558856683,520772685,514028527,518574284,532364121,532647354
user_session,4d3b30da-a5e4-49df-b1a8-ba5943f1dd33,8e5f4f83-366c-4f70-860e-ca7417414283,755422e7-9040-477b-9bd2-6a6e8fd97387,3bfb58cd-7892-48cc-8020-2f17e6de6e7f,313628f1-68b8-460d-84f6-cec7a8796ef2,816a59f3-f5ae-4ccd-9b23-82aa8c23d33c,df8184cc-3694-4549-8c8c-6b5171877376,5e6ef132-4d7c-4730-8c7f-85aa4082588f,0a899268-31eb-46de-898d-09b2da950b24,d2d3d2c6-631d-489e-9fb5-06f340b85be0


### Know your Customers

How many unique customers visit the site?

In [7]:
# using native pyspark
from pyspark.sql.functions import countDistinct

df.select(countDistinct("user_id")).show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                3696117|
+-----------------------+



Did you notice the spark progress bar when you triggered the `action` function? The `show()` function is the `action` function which means the lazy evaluation of Spark was triggered and completed a certain job. `read.csv` should have been another job. If you go to `localhost:4040` you should be able to see 2 completed jobs under the `Jobs` tab, which are `csv` and `showString`. While a heavy job is getting executed, you can take a look at the `Executors` tab to examine the executors completing the tasks in parellel. Now, you may not see if we run this on a local machine, but this behavior should definitely be visible if you're on a cloud system, such as EMR.

### (Optional) Visitors Daily Trend

Does traffic flunctuate by date? Try using the event_time to see traffic, and draw the plots for visualization.

In [8]:
# for event_time you should use a window and groupby a time period
from pyspark.sql.functions import window

Question: You would still like to see the cart abandonment rate using the dataset. What relevant features can we use for modeling?

In [10]:
# your answer

# Yes, it would be nice to see the cart abandonment and price will can be used for modeling.

Now, you will start building the model. Add the columns you would like to use for predictor features in the model to the `feature_cols` list

In [34]:
from pyspark.ml.feature import VectorAssembler

features_cols = [
    "user_id",
    "product_id",
    "category_id",
    "price",
]  # columns you'd like to use
assembler = VectorAssembler(inputCols=features_cols, outputCol="feature")
df = assembler.transform(df)
df.show()

+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+--------+-------+--------------------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|features|encoded|             feature|
+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+--------+-------+--------------------+
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|      []| 1495.0|[5.20088904E8,100...|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|      []|31357.0|[5.3049679E8,5000...|
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|                null|   creed| 28.31|561587266|755422e7-9040-477...|      []|  19

To use a string column, you can use the `StringIndexer` to encode the column. Update the `inputCol` keyword argument so that you can encode the target feature.

In [37]:
from pyspark.ml.feature import StringIndexer

labeler = StringIndexer(
    inputCol="event_type", outputCol="encode"
)  # what should we use for the inputCol here?
df = labeler.fit(df).transform(df)
df.show()

+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+--------+-------+--------------------+------+
|          event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|features|encoded|             feature|encode|
+--------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+--------+-------+--------------------+------+
|2019-11-01 00:00:...|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|      []| 1495.0|[5.20088904E8,100...|   0.0|
|2019-11-01 00:00:...|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|      []|31357.0|[5.3049679E8,5000...|   0.0|
|2019-11-01 00:00:...|      view|  17302664|2053013553853497655|                null|   creed| 28.31|561587266

Now build the train/test dataset with a 70/30 `randomSplit` and a random seed set to 42

In [38]:
train, test = df.randomSplit([0.7, 0.3], seed=42)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

Training Dataset Count: 47253900
Test Dataset Count: 20248079


Next you need to add in the name of the feature column and the name of the `labelCol` you previously encoded for training the model.

In [39]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(featuresCol="feature", labelCol="encode")
model = rf.fit(train)
predictions = model.transform(test)
# what goes in the select() function?
predictions.select().show(25)

++
||
++
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
||
++
only showing top 25 rows



Once the job execution is done, evaluate the model's performance. Add in the `labelCol` below.

In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="encode", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %s" % (accuracy))
print("Test Error = %s" % (1.0 - accuracy))

Accuracy = 0.9132064981073577
Test Error = 0.08679350189264234


### Extra: Use the confusion matrix to see the other metrics

In [43]:
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F

preds_and_labels = (
    predictions.select(["prediction", "encode"])
    .withColumn("encode", F.col("encode").cast(FloatType()))
    .orderBy("prediction")
)
preds_and_labels = preds_and_labels.select(["prediction", "encode"])
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())

[[19064602.        0.        0.]
 [  908334.        0.        0.]
 [  275143.        0.        0.]]
