# Importing Libraries

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg,countDistinct,count
from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=49b98c3c72386c858621abbf906e708a828dafae40a61d1e634cb6372b35fd01
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


# Find the Average Purchase amount.

In [11]:
spark = SparkSession.builder.getOrCreate()

# Read the data into a DataFrame
train = spark.read.csv("train.csv", header=True, inferSchema=True)
test = spark.read.csv("test.csv",header=True, inferSchema=True)
# Calculate the average purchase amount
train_avg_pur = train.agg(avg("Purchase")).collect()[0][0]
# Print the result
print("Average train purchase amount: ", train_avg_pur,)

Average train purchase amount:  9263.968712959126


# Find out the count of null values in all the columns

In [12]:
def null_count(data):
    column_counts = []
    for c in data.columns:
        missing_values = data.filter(F.col(c).isNull()).count()
        column_counts.append((c, missing_values))
    return column_counts

Training Data

In [13]:
columns_with_null = null_count(train)

In [14]:
print("Train Data Misssing values: ")
for c in columns_with_null:
    print(c)

Train Data Misssing values: 
('User_ID', 0)
('Product_ID', 0)
('Gender', 0)
('Age', 0)
('Occupation', 0)
('City_Category', 0)
('Stay_In_Current_City_Years', 0)
('Marital_Status', 0)
('Product_Category_1', 0)
('Product_Category_2', 173638)
('Product_Category_3', 383247)
('Purchase', 0)


Test Data

In [15]:
columns_with_null = null_count(test)

In [16]:
print("Test Data Misssing values: ")
for c in columns_with_null:
    print(c)

Test Data Misssing values: 
('User_ID', 0)
('Product_ID', 0)
('Gender', 0)
('Age', 0)
('Occupation', 0)
('City_Category', 0)
('Stay_In_Current_City_Years', 0)
('Marital_Status', 0)
('Product_Category_1', 0)
('Product_Category_2', 72344)
('Product_Category_3', 162562)


# Handling the missing values.

In [17]:
def missing_impute(data):
    return data.na.fill({'Product_Category_2': -1,
                    'Product_Category_3': -1,})

Training Data

In [18]:
train_no_na = missing_impute(train)

In [19]:
train_no_na.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|
|1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
|1000001| P00087842|     F|0-17|        10|            A|                         2|             0|                12|                -1|                -1|    1422|
|100

Test Data

In [20]:
test_no_na = missing_impute(test)

In [21]:
test_no_na.show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|1000004| P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|                -1|
|1000009| P00113442|     M|26-35|        17|            C|                         0|             0|                 3|                 5|                -1|
|1000010| P00288442|     F|36-45|         1|            B|                        4+|             1|                 5|                14|                -1|
|1000010| P00145342|     F|36-45|         1|        

# Count distinct values per column

Train data with na values

In [22]:
distinct_train = train.agg(*(countDistinct(column).alias(column) for column in train.columns))
distinct_train.show(truncate=False)

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|5891   |3631      |2     |7  |21        |3            |5                         |2             |20                |17                |15                |18105   |
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



Test data with na values

In [23]:
distinct_test = test.agg(*(countDistinct(column).alias(column) for column in test.columns))
distinct_test.show(truncate=False)

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|5891   |3491      |2     |7  |21        |3            |5                         |2             |18                |17                |15                |
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+



Train data without na values

In [24]:
distinct_train_no_na = train_no_na.agg(*(countDistinct(column).alias(column) for column in train_no_na.columns))
distinct_train_no_na.show(truncate=False)

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|5891   |3631      |2     |7  |21        |3            |5                         |2             |20                |18                |16                |18105   |
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



Test data without na values

In [25]:
distinct_test_no_na = test_no_na.agg(*(countDistinct(column).alias(column) for column in test_no_na.columns))
distinct_test_no_na.show(truncate=False)

+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+
|5891   |3491      |2     |7  |21        |3            |5                         |2             |18                |18                |16                |
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+



# Count category values within each categorical columns

In [29]:
def counts(data):
    category_counts = {
        "Gender": data.groupBy("Gender").agg(count("*").alias("Count")),
        "Age": data.groupBy("Age").agg(count("*").alias("Count")),
        "City_Category": data.groupBy("City_Category").agg(count("*").alias("Count")),
        "Stay_In_Current_City_Years": data.groupBy("Stay_In_Current_City_Years").agg(count("*").alias("Count")),
        "Marital_Status": data.groupBy("Marital_Status").agg(count("*").alias("Count"))
    }
    return category_counts.items()


Train data

In [27]:
train_count=counts(train)

In [28]:
for column, counts in train_count:
    print(f"Category value counts for {column}:")
    counts.show(truncate=False)

Category value counts for Gender:
+------+------+
|Gender|Count |
+------+------+
|F     |135809|
|M     |414259|
+------+------+

Category value counts for Age:
+-----+------+
|Age  |Count |
+-----+------+
|18-25|99660 |
|26-35|219587|
|0-17 |15102 |
|46-50|45701 |
|51-55|38501 |
|36-45|110013|
|55+  |21504 |
+-----+------+

Category value counts for City_Category:
+-------------+------+
|City_Category|Count |
+-------------+------+
|B            |231173|
|C            |171175|
|A            |147720|
+-------------+------+

Category value counts for Stay_In_Current_City_Years:
+--------------------------+------+
|Stay_In_Current_City_Years|Count |
+--------------------------+------+
|3                         |95285 |
|0                         |74398 |
|4+                        |84726 |
|1                         |193821|
|2                         |101838|
+--------------------------+------+

Category value counts for Marital_Status:
+--------------+------+
|Marital_Status|Count |


Test Data

In [30]:
test_count=counts(test)

In [31]:
for column, counts in test_count:
    print(f"Category value counts for {column}:")
    counts.show(truncate=False)

Category value counts for Gender:
+------+------+
|Gender|Count |
+------+------+
|F     |57827 |
|M     |175772|
+------+------+

Category value counts for Age:
+-----+-----+
|Age  |Count|
+-----+-----+
|18-25|42293|
|26-35|93428|
|0-17 |6232 |
|46-50|19577|
|51-55|16283|
|36-45|46711|
|55+  |9075 |
+-----+-----+

Category value counts for City_Category:
+-------------+-----+
|City_Category|Count|
+-------------+-----+
|B            |98566|
|C            |72509|
|A            |62524|
+-------------+-----+

Category value counts for Stay_In_Current_City_Years:
+--------------------------+-----+
|Stay_In_Current_City_Years|Count|
+--------------------------+-----+
|3                         |40143|
|0                         |31318|
|4+                        |35945|
|1                         |82604|
|2                         |43589|
+--------------------------+-----+

Category value counts for Marital_Status:
+--------------+------+
|Marital_Status|Count |
+--------------+------+
|1 

# Calculate the average Purchase for categorical columns:

In [32]:
def avg_pur(data):
    average_purchase = {
    "Gender": data.groupBy("Gender").agg(avg("Purchase").alias("Average_Purchase")),
    "Age": data.groupBy("Age").agg(avg("Purchase").alias("Average_Purchase")),
    "City_Category": data.groupBy("City_Category").agg(avg("Purchase").alias("Average_Purchase")),
    "Stay_In_Current_City_Years": data.groupBy("Stay_In_Current_City_Years").agg(avg("Purchase").alias("Average_Purchase")),
    "Marital_Status": data.groupBy("Marital_Status").agg(avg("Purchase").alias("Average_Purchase"))
}
    return average_purchase.items()

In [33]:
train_avg=avg_pur(train)

In [34]:
for column, averages in train_avg:
    print(f"Average purchase for {column}:")
    averages.show(truncate=False)
    print()

Average purchase for Gender:
+------+-----------------+
|Gender|Average_Purchase |
+------+-----------------+
|F     |8734.565765155476|
|M     |9437.526040472265|
+------+-----------------+


Average purchase for Age:
+-----+-----------------+
|Age  |Average_Purchase |
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
|0-17 |8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|55+  |9336.280459449405|
+-----+-----------------+


Average purchase for City_Category:
+-------------+-----------------+
|City_Category|Average_Purchase |
+-------------+-----------------+
|B            |9151.300562781986|
|C            |9719.92099313568 |
|A            |8911.939216084484|
+-------------+-----------------+


Average purchase for Stay_In_Current_City_Years:
+--------------------------+-----------------+
|Stay_In_Current_City_Years|Average_Purchase |
+--------------------------+-----------------+
|3                         

# Label Encoding

Train data

In [35]:
LE_age = StringIndexer(inputCol="Age",outputCol="Age_le",handleInvalid="skip")
LE_train = LE_age.fit(train_no_na)
train_le = LE_train.transform(train_no_na)

LE_Gender = StringIndexer(inputCol="Gender",outputCol="Gender_le",handleInvalid="skip")
LE_train = LE_Gender.fit(train_le)
train_le = LE_train.transform(train_le)

LE_Stay_In_Current_City_Years = StringIndexer(inputCol="Stay_In_Current_City_Years",outputCol="Stay_In_Current_City_Years_le",handleInvalid="skip")
LE_train = LE_Stay_In_Current_City_Years.fit(train_le)
train_le = LE_train.transform(train_le)

LE_City_Category = StringIndexer(inputCol="City_Category",outputCol="City_Category_le",handleInvalid="skip")
LE_train = LE_City_Category.fit(train_le)
train_le = LE_train.transform(train_le)

train_le.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+------+---------+-----------------------------+----------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Age_le|Gender_le|Stay_In_Current_City_Years_le|City_Category_le|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+------+---------+-----------------------------+----------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|   6.0|      1.0|                          1.0|             2.0|
|1000001| P00248942|     F|0-17|        10|            A|                       

Test Data

In [36]:
LE_age = StringIndexer(inputCol="Age",outputCol="Age_le",handleInvalid="skip")
LE_test = LE_age.fit(test_no_na)
test_le = LE_test.transform(test_no_na)

LE_Gender = StringIndexer(inputCol="Gender",outputCol="Gender_le",handleInvalid="skip")
LE_test = LE_Gender.fit(test_le)
test_le = LE_test.transform(test_le)

LE_Stay_In_Current_City_Years = StringIndexer(inputCol="Stay_In_Current_City_Years",outputCol="Stay_In_Current_City_Years_le",handleInvalid="skip")
LE_test = LE_Stay_In_Current_City_Years.fit(test_le)
test_le = LE_test.transform(test_le)

LE_City_Category = StringIndexer(inputCol="City_Category",outputCol="City_Category_le",handleInvalid="skip")
LE_test = LE_City_Category.fit(test_le)
test_le = LE_test.transform(test_le)

test_le.show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+------+---------+-----------------------------+----------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Age_le|Gender_le|Stay_In_Current_City_Years_le|City_Category_le|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+------+---------+-----------------------------+----------------+
|1000004| P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|                -1|   3.0|      0.0|                          1.0|             0.0|
|1000009| P00113442|     M|26-35|        17|            C|                         0|             0|            

# One hot encoding

Train Data

In [37]:
LE_Occupation = StringIndexer(inputCol="Occupation",outputCol="Occupation_le",handleInvalid="skip")
LE_train = LE_Occupation.fit(train_le)
train_le = LE_train.transform(train_le)

OHE_train = OneHotEncoder(inputCol="Gender_le",outputCol="Gender_OHE")
OHE_obj = OHE_train.fit(train_le)
encoded_train = OHE_obj.transform(train_le)

OHE_train = OneHotEncoder(inputCol="City_Category_le",outputCol="City_Category_OHE")
OHE_obj = OHE_train.fit(encoded_train)
encoded_train = OHE_obj.transform(encoded_train)

OHE_train = OneHotEncoder(inputCol="Occupation_le",outputCol="Occupation_OHE")
OHE_obj = OHE_train.fit(encoded_train)
encoded_train = OHE_obj.transform(encoded_train)

encoded_train.show(5)

+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+------+---------+-----------------------------+----------------+-------------+-------------+-----------------+---------------+
|User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|Age_le|Gender_le|Stay_In_Current_City_Years_le|City_Category_le|Occupation_le|   Gender_OHE|City_Category_OHE| Occupation_OHE|
+-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+------+---------+-----------------------------+----------------+-------------+-------------+-----------------+---------------+
|1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|      

Test Data

In [38]:
LE_Occupation = StringIndexer(inputCol="Occupation",outputCol="Occupation_le",handleInvalid="skip")
LE_test = LE_Occupation.fit(test_le)
test_le = LE_train.transform(test_le)

OHE_test = OneHotEncoder(inputCol="Gender_le",outputCol="Gender_OHE")
OHE_obj = OHE_test.fit(test_le)
encoded_test = OHE_obj.transform(test_le)

OHE_test = OneHotEncoder(inputCol="City_Category_le",outputCol="City_Category_OHE")
OHE_obj = OHE_test.fit(encoded_test)
encoded_test = OHE_obj.transform(encoded_test)

OHE_test = OneHotEncoder(inputCol="Occupation_le",outputCol="Occupation_OHE")
OHE_obj = OHE_test.fit(encoded_test)
encoded_test = OHE_obj.transform(encoded_test)

encoded_test.show(5)

+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+------+---------+-----------------------------+----------------+-------------+-------------+-----------------+--------------+
|User_ID|Product_ID|Gender|  Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Age_le|Gender_le|Stay_In_Current_City_Years_le|City_Category_le|Occupation_le|   Gender_OHE|City_Category_OHE|Occupation_OHE|
+-------+----------+------+-----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+------+---------+-----------------------------+----------------+-------------+-------------+-----------------+--------------+
|1000004| P00128942|     M|46-50|         7|            B|                         2|             1|                 1|                11|             

In [39]:
encoded_train.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Age_le',
 'Gender_le',
 'Stay_In_Current_City_Years_le',
 'City_Category_le',
 'Occupation_le',
 'Gender_OHE',
 'City_Category_OHE',
 'Occupation_OHE']

# Base Line Model

Vector Assembler

In [40]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Marital_Status', 'Product_Category_1',
                                       'Product_Category_2', 'Product_Category_3', 'Age_le', 'Gender_le',
                                       'Stay_In_Current_City_Years_le', 'City_Category_le', 'Occupation_le',
                                       'Gender_OHE', 'City_Category_OHE', 'Occupation_OHE'],
                            outputCol="vector_features")
vector_train = assembler.transform(encoded_train)
vector_test = assembler.transform(encoded_test)

In [41]:
vector_train.select("vector_features").show()

+--------------------+
|     vector_features|
+--------------------+
|(32,[1,2,3,4,5,6,...|
|(32,[1,2,3,4,5,6,...|
|(32,[1,2,3,4,5,6,...|
|(32,[1,2,3,4,5,6,...|
|(32,[1,2,3,4,6,7,...|
|(32,[1,2,3,6,7,8,...|
|(32,[0,1,2,3,4,6,...|
|(32,[0,1,2,3,4,6,...|
|(32,[0,1,2,3,4,6,...|
|(32,[0,1,2,3,7,8,...|
|(32,[0,1,2,3,7,8,...|
|(32,[0,1,2,3,7,8,...|
|(32,[0,1,2,3,7,8,...|
|(32,[0,1,2,3,7,8,...|
|(32,[1,2,3,4,5,7,...|
|(32,[1,2,3,4,5,7,...|
|(32,[1,2,3,4,5,7,...|
|(32,[1,2,3,4,5,7,...|
|(32,[0,1,2,3,4,8,...|
|(32,[0,1,2,3,6,7,...|
+--------------------+
only showing top 20 rows



In [42]:
vector_test.select("vector_features").show()

+--------------------+
|     vector_features|
+--------------------+
|(32,[0,1,2,3,4,6,...|
|(32,[1,2,3,6,7,8,...|
|(32,[0,1,2,3,4,5,...|
|(32,[0,1,2,3,4,5,...|
|(32,[1,2,3,5,7,8,...|
|(32,[0,1,2,3,4,6,...|
|(32,[0,1,2,3,4,6,...|
|(32,[0,1,2,3,4,6,...|
|(32,[1,2,3,7,8,9,...|
|(32,[1,2,3,4,6,7,...|
|(32,[0,1,2,3,6,8,...|
|(32,[0,1,2,3,6,8,...|
|(32,[0,1,2,3,6,8,...|
|(32,[0,1,2,3,6,8,...|
|(32,[0,1,2,3,5,6,...|
|(32,[1,2,3,4,7,8,...|
|(32,[0,1,2,3,4,7,...|
|(32,[0,1,2,3,4,7,...|
|(32,[1,2,3,4,5,6,...|
|(32,[0,1,2,3,4,6,...|
+--------------------+
only showing top 20 rows



In [43]:
vector_train.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = false)
 |-- Product_Category_3: integer (nullable = false)
 |-- Purchase: integer (nullable = true)
 |-- Age_le: double (nullable = false)
 |-- Gender_le: double (nullable = false)
 |-- Stay_In_Current_City_Years_le: double (nullable = false)
 |-- City_Category_le: double (nullable = false)
 |-- Occupation_le: double (nullable = false)
 |-- Gender_OHE: vector (nullable = true)
 |-- City_Category_OHE: vector (nullable = true)
 |-- Occupation_OHE: vector (nullable = true)
 |-- vector_features: vector (nullable = true)



Base Model

In [44]:
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

(training_data, test_data) = vector_train.randomSplit([0.75, 0.25], seed=42)
Model_DT = DecisionTreeRegressor(featuresCol="vector_features", labelCol="Purchase")
Model_DT= Model_DT.fit(training_data)

In [45]:
prediction = Model_DT.transform(test_data)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)

evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse_score = evaluator.evaluate(prediction)
# Print the R-squared score for test data
print("R-squared Score:", r2_score,"\nRMSE score:", rmse_score)

R-squared Score: 0.5432348551475981 
RMSE score: 3399.260634974966


In [46]:
prediction = Model_DT.transform(training_data)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)

# Print the R-squared score for train data
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse_score = evaluator.evaluate(prediction)
print("R-squared Score:", r2_score,"\nRMSE score:", rmse_score)

R-squared Score: 0.5445869052262053 
RMSE score: 3388.291787192326


Choosing Better model(Gradient Booster)

In [47]:
from pyspark.ml.regression import GBTRegressor
Model_GB = GBTRegressor(featuresCol="vector_features", labelCol="Purchase")
Model_GB= Model_GB.fit(training_data)

In [48]:
prediction = Model_GB.transform(test_data)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)

evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse_score = evaluator.evaluate(prediction)
# Print the R-squared score test data
print("R-squared Score:", r2_score,"\nRMSE score:", rmse_score)

R-squared Score: 0.6529017316899641 
RMSE score: 2963.2221435873735


In [49]:
prediction = Model_GB.transform(training_data)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)

evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse_score = evaluator.evaluate(prediction)
# Print the R-squared score for train data
print("R-squared Score:", r2_score,"\nRMSE score:", rmse_score)

R-squared Score: 0.6527663403913098 
RMSE score: 2958.618406946371


In [50]:
final_data = Model_GB.transform(vector_test)
final_data.select("User_ID","Product_ID","prediction").show(10)

+-------+----------+------------------+
|User_ID|Product_ID|        prediction|
+-------+----------+------------------+
|1000004| P00128942|13604.075859744866|
|1000009| P00113442|10005.284305217088|
|1000010| P00288442| 6203.351088966585|
|1000010| P00145342|2379.1003496817734|
|1000011| P00053842| 2254.220547197046|
|1000013| P00350442|11096.508291875527|
|1000013| P00155442| 13712.96676207319|
|1000013|  P0094542|11066.356644282409|
|1000015| P00161842|17722.663875606035|
|1000022| P00067942| 6056.022786774092|
+-------+----------+------------------+
only showing top 10 rows



Feature Engineering

In [51]:
fi = Model_GB.featureImportances
selected_columns = ['Marital_Status', 'Product_Category_1',
                                       'Product_Category_2', 'Product_Category_3', 'Age_le', 'Gender_le',
                                       'Stay_In_Current_City_Years_le', 'City_Category_le', 'Occupation_le',
                                       'Gender_OHE', 'City_Category_OHE', 'Occupation_OHE']
for feature, importance in zip(selected_columns, fi):
    print(feature, importance)

Marital_Status 0.0002594737668745476
Product_Category_1 0.9382925481082617
Product_Category_2 0.020020247377938463
Product_Category_3 0.01472183966730809
Age_le 0.006345633372363194
Gender_le 0.0005863304358550551
Stay_In_Current_City_Years_le 0.003091138345193482
City_Category_le 0.0036592941423116898
Occupation_le 0.01192930657604661
Gender_OHE 0.0
City_Category_OHE 0.0
Occupation_OHE 0.0010105415946172406


In [52]:
k = 7
gbt = GBTRegressor(featuresCol="features", labelCol="Purchase", maxDepth=5)
top_features = [feature for _, feature in sorted(zip(fi, selected_columns), reverse=True)[:k]]
print(top_features)
# assembler.setInputCols(top_features)
# data = assembler.transform(encoded_train)
# model = gbt.fit(data)

['Product_Category_1', 'Product_Category_2', 'Product_Category_3', 'Occupation_le', 'Age_le', 'City_Category_le', 'Stay_In_Current_City_Years_le']


In [53]:
assembler = VectorAssembler(inputCols=top_features, outputCol="features")
data = assembler.transform(training_data)
valid = assembler.transform(test_data)

In [54]:
model = gbt.fit(data)

In [55]:
prediction = model.transform(valid)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)

evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse_score = evaluator.evaluate(prediction)
# Print the R-squared score for test data
print("R-squared Score:", r2_score,"\nRMSE score:", rmse_score)

R-squared Score: 0.6537024455337623 
RMSE score: 2959.802272085355


In [56]:
prediction = model.transform(data)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)

evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="rmse")
rmse_score = evaluator.evaluate(prediction)
# Print the R-squared score for train data
print("R-squared Score:", r2_score,"\nRMSE score:", rmse_score)

R-squared Score: 0.6535490597554907 
RMSE score: 2955.281929687356


#  grid search cv for model improvement

In [57]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
gbt = GBTRegressor(featuresCol="features", labelCol="Purchase")
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [10,20]) \
    .addGrid(gbt.maxBins, [30,40]) \
    .addGrid(gbt.stepSize, [0.1, 0.01]) \
    .build()

In [58]:
cv_GB = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)
cv_model = cv_GB.fit(data)
best_model = cv_model.bestModel

In [59]:
prediction = best_model.transform(valid)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)
# Print the R-squared score for test data
print(r2_score)

0.6945482376221294


In [60]:
prediction = best_model.transform(data)
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score = evaluator.evaluate(prediction)
# Print the R-squared score for train data
print(r2_score)

0.6963517622289211


In [61]:
print("Max Depth:", best_model.getMaxDepth())
print("Max Bins:", best_model.getMaxBins())

Max Depth: 10
Max Bins: 30


# Pipeline

In [62]:
from pyspark.ml import Pipeline,Transformer
from pyspark.ml.param.shared import HasInputCol,HasOutputCol,Param

In [63]:
from pyspark.ml.util import DefaultParamsWritable, DefaultParamsReadable

class NullValuesTransformer(Transformer, DefaultParamsWritable):
    def __init__(self):
        super(NullValuesTransformer, self).__init__()

    def _transform(self, dataframe):
        dataframe = dataframe.fillna({
            "Product_Category_2": -1,
            "Product_Category_3": -1
        })
        return dataframe

    def copy(self, extra=None):
        return Params.copy(self, extra)

In [64]:
stage1 = NullValuesTransformer()
stage2 = StringIndexer(inputCol="Age",outputCol="Age_le",handleInvalid="skip")
stage3 = StringIndexer(inputCol="Gender",outputCol="Gender_le",handleInvalid="skip")
stage4 = StringIndexer(inputCol="Stay_In_Current_City_Years",outputCol="Stay_In_Current_City_Years_le",handleInvalid="skip")
stage5 = StringIndexer(inputCol="City_Category",outputCol="City_Category_le",handleInvalid="skip")
stage6 = StringIndexer(inputCol="Occupation",outputCol="Occupation_le",handleInvalid="skip")
stage7 = OneHotEncoder(inputCol="Gender_le",outputCol="Gender_OHE")
stage8 = OneHotEncoder(inputCol="City_Category_le",outputCol="City_Category_OHE")
stage9 = OneHotEncoder(inputCol="Occupation",outputCol="Occupation_OHE")
stage10= VectorAssembler(inputCols=top_features,
                            outputCol="vector_features")
stage11 = GBTRegressor(featuresCol="vector_features", labelCol="Purchase")


In [65]:
pipeline = Pipeline(stages=[stage1,stage2,stage3,stage4,
                           stage5,stage6,stage7,stage8,
                           stage9,stage10,stage11])

In [66]:
(dtrain, dtest) = train.randomSplit([0.75, 0.25], seed=42)

In [67]:
pipeline_model = pipeline.fit(dtrain)

final_data = pipeline_model.transform(dtrain)

In [68]:
final_data.select("User_ID","Product_ID","Purchase","prediction").show(10)

+-------+----------+--------+------------------+
|User_ID|Product_ID|Purchase|        prediction|
+-------+----------+--------+------------------+
|1000001| P00000142|   13650| 9734.628554060122|
|1000001| P00004842|   13645| 9492.582529863323|
|1000001| P00051842|    2849|2217.7794171484375|
|1000001| P00059442|   16622| 15703.37940085043|
|1000001| P00069042|    8370| 9591.346438713848|
|1000001| P00085942|   12842|12665.517442843593|
|1000001| P00110842|   11769|13103.299941140234|
|1000001| P00111842|    8094| 7344.005528523275|
|1000001| P00117942|    8839| 6042.174664429319|
|1000001| P00165942|   10003| 7344.005528523275|
+-------+----------+--------+------------------+
only showing top 10 rows



In [71]:
final_test = pipeline_model.transform(dtest)
final_test.select("User_ID","Product_ID","Purchase","prediction").show(10)

+-------+----------+--------+------------------+
|User_ID|Product_ID|Purchase|        prediction|
+-------+----------+--------+------------------+
|1000001| P00051442|    9938| 7781.368812042285|
|1000001| P00085442|    1057|1560.9462065103264|
|1000001| P00087842|    1422|1560.9462065103264|
|1000001| P00102642|    2763|2524.7069278030926|
|1000001| P00142242|    7882| 7344.005528523275|
|1000001| P00210342|   11039| 9513.948973340584|
|1000001| P00258742|    6910| 5998.968411651766|
|1000002| P00044442|   15669| 14253.15263507992|
|1000002| P00062842|   15699|14309.048825733958|
|1000002| P00112842|    6187| 7915.040815281675|
+-------+----------+--------+------------------+
only showing top 10 rows



In [70]:
predictions = final_test.select("prediction")
actual_purchase = final_test.select("Purchase")
predictions = final_data.select("prediction")
actual_purchase = final_data.select("Purchase")
evaluator = RegressionEvaluator(labelCol="Purchase", predictionCol="prediction", metricName="r2")
r2_score_test = evaluator.evaluate(final_test)
r2_score_train = evaluator.evaluate(final_data)
print("Test Score",r2_score_test,"\nTrain score",r2_score_train)

Test Score 0.6537024455337623 
Train score 0.6535490597554907


In [72]:
final_data = pipeline_model.transform(test)
final_data.select("User_ID","Product_ID","prediction").show(10)

+-------+----------+------------------+
|User_ID|Product_ID|        prediction|
+-------+----------+------------------+
|1000004| P00128942|13654.657424189727|
|1000009| P00113442|10011.306417033258|
|1000010| P00288442| 6262.814620793571|
|1000010| P00145342|2438.4193735126905|
|1000011| P00053842|2279.5752525659004|
|1000013| P00350442|11194.717198651011|
|1000013| P00155442|13649.320828755763|
|1000013|  P0094542|11144.099149730208|
|1000015| P00161842|17642.378546728633|
|1000022| P00067942| 6090.755831515424|
+-------+----------+------------------+
only showing top 10 rows

