In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!unzip "/content/drive/MyDrive/BlackFridaySale.zip"

Archive:  /content/drive/MyDrive/BlackFridaySale.zip
  inflating: test.csv                
  inflating: train.csv               


In [3]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=63ba05f4c31111888385752d6e44afd4a603807fb32b9eb37df98770ca67598d
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [4]:
# Import SparkSession
from pyspark.sql import SparkSession

# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Check Spark Session Information
spark

In [5]:
train = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").load("/content/drive/MyDrive/pandas_preprocessed_data.csv")
test = spark.read.format("csv").option("header", "true").option("mode", "FAILFAST").option("inferSchema", "true").load("test.csv")

In [6]:
train.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [7]:
train.show(5)

+---+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|_c0|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+---+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|  0|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                 0|                 0|    8370|
|  1|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|  2|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                 0|         

In [8]:
train.columns

['_c0',
 'User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase']

### Making the Product ID's into Indexes

In [9]:
from pyspark.ml.feature import StringIndexer
plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'productid')
labeller = plan_indexer.fit(train)

In [10]:
train1 = labeller.transform(train)
test1 = labeller.transform(test)

In [11]:
# See Results
print(train1.select('Product_ID').head(5))
print(train1.select('productid').head(5))

[Row(Product_ID='P00069042'), Row(Product_ID='P00248942'), Row(Product_ID='P00087842'), Row(Product_ID='P00085442'), Row(Product_ID='P00285442')]
[Row(productid=765.0), Row(productid=183.0), Row(productid=1496.0), Row(productid=480.0), Row(productid=860.0)]


## Prediction
### Getting the Data ready for the Model

In [12]:
from pyspark.ml.feature import RFormula
df_formula = "Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Product_Category_3 + Gender"
formula = RFormula(formula=df_formula,featuresCol="features",labelCol="label")

In [13]:
t1 = formula.fit(train)
train_1 = t1.transform(train)
test_1 = t1.transform(test)

In [14]:
# See Results
train_1.select('features').show()
train_1.select('label').show()

+--------------------+
|            features|
+--------------------+
|(17,[6,10,13],[10...|
|(17,[6,10,13,14,1...|
|(17,[6,10,13],[10...|
|(17,[6,10,13,14],...|
|(17,[5,6,8,12,13,...|
|(17,[0,6,11,13,14...|
|(17,[3,6,7,10,13,...|
|(17,[3,6,7,10,13,...|
|(17,[3,6,7,10,13,...|
|(17,[0,6,9,13,16]...|
|(17,[0,6,9,13,14,...|
|(17,[0,6,9,13,16]...|
|(17,[0,6,9,13,16]...|
|(17,[0,6,9,13,14,...|
|(17,[4,6,9,13,14,...|
|(17,[4,6,9,13,14]...|
|(17,[4,6,9,13,14,...|
|(17,[4,6,9,13,14]...|
|(17,[1,6,7,9,13,1...|
|(17,[0,6,8,12,13,...|
+--------------------+
only showing top 20 rows

+-------+
|  label|
+-------+
| 8370.0|
|15200.0|
| 1422.0|
| 1057.0|
| 7969.0|
|15227.0|
|19215.0|
|15854.0|
|15686.0|
| 7871.0|
| 5254.0|
| 3957.0|
| 6073.0|
|15665.0|
| 5378.0|
| 2079.0|
|13055.0|
| 8851.0|
|11788.0|
|19614.0|
+-------+
only showing top 20 rows



In [15]:
train_1.select('purchase').count()

550068

In [16]:
train_cv, test_cv = train_1.randomSplit([0.7, 0.3])

In [17]:
print((train_cv.count(), len(train_cv.columns)))

(385026, 15)


In [18]:
print((test_cv.count(), len(test_cv.columns)))

(165042, 15)


In [19]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()

In [32]:
rf = RandomForestRegressor(labelCol='label',featuresCol='features')

In [33]:
rfModel = rf.fit(train_cv)

In [34]:
predictions = rfModel.transform(test_cv)

In [35]:
df = predictions.select('User_ID', 'Product_ID', 'prediction')

In [36]:
mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse"})

In [37]:
np.sqrt(mse)

3935.3280184655496

In [38]:
df.toPandas().to_csv('black_friday_cv_results.csv')

In [20]:
#import pandas as pd
#import numpy as np

In [21]:
# mean_se = []
# num_trees = []
# depth_type = []
# for trees in range(3,6):
#     for depth in range(5,7):
#         rf = RandomForestRegressor(numTrees=trees, maxDepth=depth, seed = 1)
#         model_1 = rf.fit(train_cv)
#         predictions = model_1.transform(test_cv)
#         mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
#         mean_se.append(np.sqrt(mse))
#         num_trees.append(trees)
#         depth_type.append(depth)

In [24]:
# spark_grid_search = pd.DataFrame({'depth': depth, 'num_trees': num_trees, 'mean_se': mean_se})\
# .sort_values('mean_se', ascending = True)[['depth', 'num_trees', 'mean_se']].reset_index(drop = True)

In [25]:
# spark_grid_search

Unnamed: 0,depth,num_trees,mean_se
0,6,4,3685.395976
1,6,4,3845.533726
2,6,5,3933.05283
3,6,3,3946.888587
4,6,3,4042.985432
5,6,5,4056.838251


In [26]:
# rf = RandomForestRegressor(numTrees=4, maxDepth=6, seed = 1)
# model_1 = rf.fit(train_cv)
# predictions = model_1.transform(test_cv)
# mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })

In [27]:
# evaluator = RegressionEvaluator()
# mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
# import numpy as np
# np.sqrt(mse), mse

(3685.3959760178122, 13582143.500048283)

In [29]:
# predictions.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase').show()

+-------+----------+------------------+
|User_ID|Product_ID|          Purchase|
+-------+----------+------------------+
|1000004| P00184942|14251.987110682236|
|1000005| P00145042|13229.650640627506|
|1000008| P00214442| 7935.253190578615|
|1000009| P00161442| 7178.554269515551|
|1000010| P00085942|11380.891482261355|
|1000010| P00118742| 5935.471983010488|
|1000010| P00058342|10286.394936399318|
|1000010| P00032442| 5935.471983010488|
|1000010| P00155442| 12536.43052031698|
|1000010|  P0094542|11580.293757294436|
|1000010| P00148642|12536.790150965633|
|1000010| P00312142| 8080.661666875826|
|1000011| P00192642| 8267.017996722721|
|1000011| P00110842|13935.501024380006|
|1000012| P00304242|12465.723102976035|
|1000013| P00140742| 12538.21547041308|
|1000015| P00247542| 8016.200503086933|
|1000015| P00166242| 7800.561847456589|
|1000015| P00042142|13247.469367201818|
|1000016| P00217742| 6788.496797638035|
+-------+----------+------------------+
only showing top 20 rows

