In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=94836e9f39e3fe0202578c9d39e814617a3984d7dc59bc784e1f84aeafc6fc7e
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
# importing pyspark
import pyspark
from pyspark.sql import SparkSession
# creating a sparksession by default it will do two clusters

In [3]:
# Now we will create a cluster
spark=SparkSession.builder.appName('BlackFridaySalesReg').getOrCreate()

In [4]:
# importing data
salesdf=spark.read.csv("/content/drive/MyDrive/colab/Black Friday sales_train.csv",inferSchema=True,header=True)

In [5]:
salesdf.show(5) # displays top 5 rows

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              NULL|              NULL|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|              NULL|              NULL|    1422|
|100

In [6]:
salesdf.describe().show() # there are missing values in Product_Category_2 and Product_Category_3

+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|summary|           User_ID|Product_ID|Gender|   Age|       Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
+-------+------------------+----------+------+------+-----------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
|  count|            550068|    550068|550068|550068|           550068|       550068|                    550068|             550068|            550068|            376430|            166821|           550068|
|   mean|1003028.8424013031|      NULL|  NULL|  NULL|8.076706879876669|         NULL|         1.468494139793958|0.40965298835780306| 5.404270017525106| 9.84232925112238

In [7]:
salesdf.columns # gives you the names of variables

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase']

In [8]:
# Imputing Missing Values

In [9]:
salesdf.groupBy("Product_Category_2").count().show() # finding the value counts of the variable

+------------------+------+
|Product_Category_2| count|
+------------------+------+
|                12|  5528|
|              NULL|173638|
|                13| 10531|
|                 6| 16466|
|                16| 43255|
|                 3|  2884|
|                 5| 26235|
|                15| 37855|
|                 9|  5693|
|                17| 13320|
|                 4| 25677|
|                 8| 64088|
|                 7|   626|
|                10|  3043|
|                11| 14134|
|                14| 55108|
|                 2| 49217|
|                18|  2770|
+------------------+------+



In [10]:
# we can clearly see the presence of null values is more that is 17638 no of null values are there.we need to impute
# them with a Numeric Missing Code - 999
salesdf=salesdf.na.fill(value=999,subset=['Product_Category_2'])

In [11]:
salesdf.groupBy("Product_Category_3").count().show() # finding the value counts of the variable

+------------------+------+
|Product_Category_3| count|
+------------------+------+
|                12|  9246|
|              NULL|383247|
|                13|  5459|
|                16| 32636|
|                 6|  4890|
|                 3|   613|
|                 5| 16658|
|                15| 28013|
|                17| 16702|
|                 9| 11579|
|                 4|  1875|
|                 8| 12562|
|                10|  1726|
|                11|  1805|
|                14| 18428|
|                18|  4629|
+------------------+------+



In [12]:
# we can clearly see the presence of null values is more that is 383247 no of null values are there.we need to impute
# them with a Numeric Missing Code - 999
salesdf=salesdf.na.fill(value=999,subset=['Product_Category_3'])

In [13]:
salesdf.printSchema() # printSchema gives the variable types

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [14]:
salesdf.groupBy("Gender").count().show() # gives the no of males and females

+------+------+
|Gender| count|
+------+------+
|     F|135809|
|     M|414259|
+------+------+



In [15]:
# how many age categories there?
salesdf.groupBy("Age").count().show()

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
| 0-17| 15102|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+



In [16]:
# Cross Tabulation of Gender nad Age
salesdf.crosstab('Gender','Age').show()

+----------+-----+-----+------+-----+-----+-----+-----+
|Gender_Age| 0-17|18-25| 26-35|36-45|46-50|51-55|  55+|
+----------+-----+-----+------+-----+-----+-----+-----+
|         F| 5083|24628| 50752|27170|13199| 9894| 5083|
|         M|10019|75032|168835|82843|32502|28607|16421|
+----------+-----+-----+------+-----+-----+-----+-----+



In [17]:
# Exploratory Data Analysis

In [18]:
salesdf.select('Purchase').describe().show() # gives the statistical calculations of the variable

+-------+-----------------+
|summary|         Purchase|
+-------+-----------------+
|  count|           550068|
|   mean|9263.968712959126|
| stddev|5023.065393820575|
|    min|               12|
|    max|            23961|
+-------+-----------------+



In [19]:
from pyspark.sql.functions import skewness,kurtosis # importing skewness,kurtosis

In [20]:
salesdf.select(skewness('Purchase'),kurtosis('Purchase')).show() # calculating skewness,kurtosis of the variable#

+------------------+-------------------+
|skewness(Purchase)| kurtosis(Purchase)|
+------------------+-------------------+
|0.6001383671643392|-0.3383853975360327|
+------------------+-------------------+



In [21]:
# Average Purchase by Gender
salesdf.groupBy('Gender').mean('Purchase').show()

+------+-----------------+
|Gender|    avg(Purchase)|
+------+-----------------+
|     F|8734.565765155476|
|     M|9437.526040472265|
+------+-----------------+



In [22]:
# Average Purchase by Age
salesdf.groupBy('Age').mean('Purchase').show()

+-----+-----------------+
|  Age|    avg(Purchase)|
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
| 0-17|8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|  55+|9336.280459449405|
+-----+-----------------+



In [23]:
# Encoding the object columns

In [24]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
# Importing stringindexer and pipeline

In [25]:
indexer=[StringIndexer(inputCol=column,outputCol=column+'dummy').fit(salesdf)for column in list(set(salesdf.columns)-set(['Purchase','User_ID','Product_ID']))]

In [26]:
pipeline=Pipeline(stages=indexer)

In [27]:
sales=pipeline.fit(salesdf).transform(salesdf)

In [28]:
sales.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+--------+------------------+-------------------------------+-------------------+-----------------------+---------------+-----------------------+-----------------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Genderdummy|Agedummy|City_Categorydummy|Stay_In_Current_City_Yearsdummy|Marital_Statusdummy|Product_Category_3dummy|Occupationdummy|Product_Category_2dummy|Product_Category_1dummy|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+-----------+--------+------------------+-------------------------------+-------------------+-----------------------+---------------+-----------

In [29]:
sales.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Genderdummy',
 'Agedummy',
 'City_Categorydummy',
 'Stay_In_Current_City_Yearsdummy',
 'Marital_Statusdummy',
 'Product_Category_3dummy',
 'Occupationdummy',
 'Product_Category_2dummy',
 'Product_Category_1dummy']

In [30]:
# droping columns
colsdrop=['User_ID','Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Marital_Status', 'Product_Category_1',
 'Product_Category_2', 'Product_Category_3']

In [31]:
sales=sales.drop(*colsdrop)

In [32]:
sales.show(5)

+--------+-----------+--------+------------------+-------------------------------+-------------------+-----------------------+---------------+-----------------------+-----------------------+
|Purchase|Genderdummy|Agedummy|City_Categorydummy|Stay_In_Current_City_Yearsdummy|Marital_Statusdummy|Product_Category_3dummy|Occupationdummy|Product_Category_2dummy|Product_Category_1dummy|
+--------+-----------+--------+------------------+-------------------------------+-------------------+-----------------------+---------------+-----------------------+-----------------------+
|    8370|        1.0|     6.0|               2.0|                            1.0|                0.0|                    0.0|           12.0|                    0.0|                    6.0|
|   15200|        1.0|     6.0|               2.0|                            1.0|                0.0|                    3.0|           12.0|                    8.0|                    1.0|
|    1422|        1.0|     6.0|              

In [33]:
 from pyspark.ml.feature import RFormula

In [34]:
formula=RFormula(formula='Purchase~.',featuresCol='features',labelCol='label')

In [35]:
sales=formula.fit(sales).transform(sales)

In [36]:
sales.select('features','label').show(5)

+--------------------+-------+
|            features|  label|
+--------------------+-------+
|[1.0,6.0,2.0,1.0,...| 8370.0|
|[1.0,6.0,2.0,1.0,...|15200.0|
|[1.0,6.0,2.0,1.0,...| 1422.0|
|[1.0,6.0,2.0,1.0,...| 1057.0|
|[0.0,5.0,1.0,3.0,...| 7969.0|
+--------------------+-------+
only showing top 5 rows



In [37]:
# building models

In [38]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [39]:
reg=LinearRegression()

In [40]:
regmodel=reg.fit(sales)

In [41]:
regmodel.summary.r2adj # adjusted r square

0.06356300117930469

In [42]:
regmodel.summary.rootMeanSquaredError # root mean square error

4860.759869813397

In [43]:
rsquare=RegressionEvaluator(metricName='r2')
rmse=RegressionEvaluator(metricName='rmse')

In [44]:
# Decision Tree

In [45]:
from pyspark.ml.regression import DecisionTreeRegressor

In [46]:
tree=DecisionTreeRegressor()

In [47]:
treemodel=tree.fit(sales)

In [48]:
treepredict=treemodel.transform(sales)

In [49]:
rsquare.evaluate(treepredict)

0.643827508033008

In [50]:
rmse.evaluate(treepredict)

2997.772179128243

In [51]:
# Random Forest

In [52]:
from pyspark.ml.regression import RandomForestRegressor

In [53]:
rf=RandomForestRegressor(numTrees=500)

In [54]:
rfmodel=rf.fit(sales)

In [55]:
rfpredict=rfmodel.transform(sales)

In [56]:
rsquare.evaluate(rfpredict)

0.6157736747503728

In [57]:
rmse.evaluate(rfpredict)

3113.594102533163

In [58]:
# Gradient boosting

In [59]:
from pyspark.ml.regression import GBTRegressor

In [60]:
gbm=GBTRegressor()

In [61]:
gbmmodel=gbm.fit(sales)

In [62]:
gbmpredict=gbmmodel.transform(sales)

In [63]:
rsquare.evaluate(gbmpredict)

0.6599964752046416

In [64]:
rmse.evaluate(gbmpredict)

2928.9377873687395

| Model Name | Adjusted RSquare | RMSE|
|--|--|--|
| Linear Regression | 0.06356300117930469 | 4860.759869813397 |
| Decision Tree |0.643827508033008|2997.772179128243 |
| Random Forest |0.6157736747503728|3113.594102533163 |
| **Gradient Boosting Machine** | **0.6599964752046416** | **2928.9377873687395**|

 From Adjusted RSquare and RMSE we can conclude that Gradient Boosting Machine is best model with 
 0.6599964752046416 Adjusted RSquare and 2928.9377873687395 RMSE