# Spark ML Assignment

## Problem statement:
A retail company “ABC Private Limited” wants to understand the customer purchase behavior (specifically, purchase amount) against various products of different categories. They have shared purchase summaries of various customers for selected high-volume products from last month. The data set also contains customer demographics (age, gender, marital status, city_type, stay_in_current_city), product details (product_id and product category) and Total purchase_amount from last month. Now, they want to build a model to predict the purchase amount of customers against various products, which will help them create personalized offers for customers against different products.




## Installing the package
Installing the pyspark package using the !pip install pyspark command.

In [1]:
#!pip install pyspark

## Load the Data

The spark.read.csv() method is used to load the data into a Spark DataFrame. 

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

Load the data from CSV file

In [4]:
train_data = spark.read.csv('train.csv', header=True, inferSchema=True)

In [5]:
test_data = spark.read.csv('test.csv', header=True, inferSchema=True)

In [6]:
train_data.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [7]:
test_data.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)



## Question 1: Find the Average Purchase amount.

The agg() function is used to calculate the average purchase amount. 

In [8]:
from pyspark.sql.functions import avg

Calculate the average purchase amount

In [9]:
average_purchase = train_data.agg(avg("purchase")).first()[0]

Print the average purchase amount

In [10]:
print("Average Purchase Amount =", average_purchase)

Average Purchase Amount = 9263.968712959126


## Question 2: Find out the count of null values in all the columns and handle all the missing values.

Calculating the count of null values in each column of the train_data dataset.

In [11]:
from pyspark.sql.functions import col, isnan

### Train Dataset

Iterate over the columns of the Train DataFrame

In [12]:
for c in train_data.columns:
    # Filter the DataFrame for missing values in the current column
    missing_values = train_data.filter(col(c).isNull() | isnan(c)).count()
    
    # Print the column name and the count of missing values
    print(c, "=", missing_values)

User_ID = 0
Product_ID = 0
Gender = 0
Age = 0
Occupation = 0
City_Category = 0
Stay_In_Current_City_Years = 0
Marital_Status = 0
Product_Category_1 = 0
Product_Category_2 = 173638
Product_Category_3 = 383247
Purchase = 0


Two columns have missing values:
Product_Category_2 = 173,638 missing values and Product_Category_3 = 383,247 missing values.

### Handling all the missing values in train dataset
For numerical columns (Product_Category_2 and Product_Category_3), mode is used to fill the missing values since these columns represent product categories, which are discrete and categorical in nature.

In [13]:
from pyspark.sql.functions import col, mode

Calculate the mode of Product_Category_2

In [14]:
mode_value_cat2_train = train_data.select(mode("Product_Category_2")).collect()[0][0]

Calculate the mode of Product_Category_3

In [15]:
mode_value_cat3_train = train_data.select(mode("Product_Category_3")).collect()[0][0]

Fill missing values with the mode values

In [16]:
train_data = train_data.fillna(mode_value_cat2_train, subset=["Product_Category_2"]).fillna(mode_value_cat3_train, subset=["Product_Category_3"])

### Test Dataset
Iterate over the columns of the Test DataFrame

In [17]:
for c in test_data.columns:
    # Filter the DataFrame for missing values in the current column
    missing_values = test_data.filter(col(c).isNull() | isnan(c)).count()
    
    # Print the column name and the count of missing values
    print(c, "=", missing_values)

User_ID = 0
Product_ID = 0
Gender = 0
Age = 0
Occupation = 0
City_Category = 0
Stay_In_Current_City_Years = 0
Marital_Status = 0
Product_Category_1 = 0
Product_Category_2 = 72344
Product_Category_3 = 162562


Two columns have missing values: Product_Category_2 = 72344 missing values and Product_Category_3 = 162562 missing values.

### Handling all the missing values in test dataset
For numerical columns (Product_Category_2 and Product_Category_3), mode is used to fill the missing values since these columns represent product categories, which are discrete and categorical in nature.

Calculate the mode of Product_Category_2

In [18]:
mode_value_cat2_test = test_data.select(mode("Product_Category_2")).collect()[0][0]

Calculate the mode of Product_Category_3

In [19]:
mode_value_cat3_test = test_data.select(mode("Product_Category_3")).collect()[0][0]

Fill missing values with the mode values

In [20]:
test_data = test_data.fillna(mode_value_cat2_test, subset=["Product_Category_2"]).fillna(mode_value_cat3_test, subset=["Product_Category_3"])

## Question 3: How many distinct values per column?
Calculating and displaying the distinct counts for each column in a train_data dataset.

In [21]:
from pyspark.sql import functions as F

Calculate the count of distinct values for each column

In [22]:
distinct_counts = train_data.agg(*(F.countDistinct(F.col(c)).alias(c) for c in train_data.columns))

Print the distinct counts for each column

In [23]:
for c in train_data.columns:
    distinct_values = distinct_counts.select(c).collect()[0][0]
    print(c, "=", distinct_values)

User_ID = 5891
Product_ID = 3631
Gender = 2
Age = 7
Occupation = 21
City_Category = 3
Stay_In_Current_City_Years = 5
Marital_Status = 2
Product_Category_1 = 20
Product_Category_2 = 17
Product_Category_3 = 15
Purchase = 18105


## Question 4: Count category values within each of the following columns:
#### ● Gender
#### ● Age
#### ● City_Category
#### ● Stay_In_Current_City_Years
#### ● Marital_Status

Counting the number of occurrences of each category within specified columns in a train_data dataset and displaying the results.

In [24]:
from pyspark.sql.functions import count

Define the columns to count categories

In [25]:
category_columns = ["Gender", "Age", "City_Category", "Stay_In_Current_City_Years", "Marital_Status"]

Count category values within each column

In [26]:
category_counts = {}
for column in category_columns:
    category_counts[column] = train_data.groupBy(column).agg(count("*").alias("count"))

Display the category counts

In [27]:
for column, counts_df in category_counts.items():
    print(f"Category Counts {column} =")
    counts_df.show()

Category Counts Gender =
+------+------+
|Gender| count|
+------+------+
|     F|135809|
|     M|414259|
+------+------+

Category Counts Age =
+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
| 0-17| 15102|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+

Category Counts City_Category =
+-------------+------+
|City_Category| count|
+-------------+------+
|            B|231173|
|            C|171175|
|            A|147720|
+-------------+------+

Category Counts Stay_In_Current_City_Years =
+--------------------------+------+
|Stay_In_Current_City_Years| count|
+--------------------------+------+
|                         3| 95285|
|                         0| 74398|
|                        4+| 84726|
|                         1|193821|
|                         2|101838|
+--------------------------+------+

Category Counts Marital_Status =
+--------------+------+
|Marital_Status| count|
+--------------+------+
|             1|22533

## Question 5: Calculate the average Purchase for each of the following columns:
#### ● Gender
#### ● Age
#### ● City_Category
#### ● Stay_In_Current_City_Years
#### ● Marital_Status

Calculating the average purchase for each of the specified columns in a train_data dataset and displaying the results.

In [28]:
from pyspark.sql.functions import avg

Define the columns to calculate average purchase

In [29]:
average_purchase_columns = ["Gender", "Age", "City_Category", "Stay_In_Current_City_Years", "Marital_Status"]

Calculate the average purchase for each column

In [30]:
average_purchase = {}
for column in average_purchase_columns:
    average_purchase[column] = train_data.groupBy(column).agg(avg("Purchase").alias("Average_purchase"))

Display the average purchase

In [31]:
for column, avg_purchase_df in average_purchase.items():
    print(f"Average Purchase by {column} =")
    avg_purchase_df.show()

Average Purchase by Gender =
+------+-----------------+
|Gender| Average_purchase|
+------+-----------------+
|     F|8734.565765155476|
|     M|9437.526040472265|
+------+-----------------+

Average Purchase by Age =
+-----+-----------------+
|  Age| Average_purchase|
+-----+-----------------+
|18-25|9169.663606261289|
|26-35|9252.690632869888|
| 0-17|8933.464640444974|
|46-50|9208.625697468327|
|51-55|9534.808030960236|
|36-45|9331.350694917874|
|  55+|9336.280459449405|
+-----+-----------------+

Average Purchase by City_Category =
+-------------+-----------------+
|City_Category| Average_purchase|
+-------------+-----------------+
|            B|9151.300562781986|
|            C| 9719.92099313568|
|            A|8911.939216084484|
+-------------+-----------------+

Average Purchase by Stay_In_Current_City_Years =
+--------------------------+-----------------+
|Stay_In_Current_City_Years| Average_purchase|
+--------------------------+-----------------+
|                         3|92

## Question 6: Label encode the following columns:
#### ● Age
#### ● Gender
#### ● Stay_In_Current_City_Years
#### ● City_Category


To label encode the specified columns in the train_data DataFrame, the StringIndexer class is used from the pyspark.ml.feature module. 

In [32]:
from pyspark.ml.feature import StringIndexer

### Train Dataset

Define the columns to be label encoded

In [33]:
columns_to_encode = ["Age", "Gender", "Stay_In_Current_City_Years", "City_Category"]

Create a StringIndexer object for each column and set the input and output column names

In [34]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_labelEncoded") for column in columns_to_encode]

Fit the StringIndexer models on the training data

In [35]:
indexers_models = [indexer.fit(train_data) for indexer in indexers]

Transform the training data using the fitted StringIndexer models

In [36]:
for model in indexers_models:
    train_data = model.transform(train_data)

Show the encoded columns

In [37]:
train_data.select(["Age_labelEncoded", "Gender_labelEncoded", "Stay_In_Current_City_Years_labelEncoded", "City_Category_labelEncoded"]).show(5)

+----------------+-------------------+---------------------------------------+--------------------------+
|Age_labelEncoded|Gender_labelEncoded|Stay_In_Current_City_Years_labelEncoded|City_Category_labelEncoded|
+----------------+-------------------+---------------------------------------+--------------------------+
|             6.0|                1.0|                                    1.0|                       2.0|
|             6.0|                1.0|                                    1.0|                       2.0|
|             6.0|                1.0|                                    1.0|                       2.0|
|             6.0|                1.0|                                    1.0|                       2.0|
|             5.0|                0.0|                                    3.0|                       1.0|
+----------------+-------------------+---------------------------------------+--------------------------+
only showing top 5 rows



### Test Dataset

Define the columns to be label encoded

In [38]:
columns_to_encode = ["Age", "Gender", "Stay_In_Current_City_Years", "City_Category"]

Create a StringIndexer object for each column and set the input and output column names

In [39]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_labelEncoded") for column in columns_to_encode]

Fit the StringIndexer models on the test data

In [40]:
indexers_models = [indexer.fit(test_data) for indexer in indexers]

Transform the test data using the fitted StringIndexer models

In [41]:
for model in indexers_models:
    test_data = model.transform(test_data)

Show the encoded columns

In [42]:
test_data.select(["Age_labelEncoded", "Gender_labelEncoded", "Stay_In_Current_City_Years_labelEncoded", "City_Category_labelEncoded"]).show(5)

+----------------+-------------------+---------------------------------------+--------------------------+
|Age_labelEncoded|Gender_labelEncoded|Stay_In_Current_City_Years_labelEncoded|City_Category_labelEncoded|
+----------------+-------------------+---------------------------------------+--------------------------+
|             3.0|                0.0|                                    1.0|                       0.0|
|             0.0|                0.0|                                    4.0|                       1.0|
|             1.0|                1.0|                                    3.0|                       0.0|
|             1.0|                1.0|                                    3.0|                       0.0|
|             0.0|                1.0|                                    0.0|                       1.0|
+----------------+-------------------+---------------------------------------+--------------------------+
only showing top 5 rows



## Question: 7. One-Hot encode the following columns:
#### ● Gender
#### ● City_Category
#### ● Occupation

In [43]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

### Train Dataset
Define the columns to be one-hot encoded

In [44]:
columns_to_encode = ["Gender", "City_Category", "Occupation"]

Create a StringIndexer for each column and set the input and output column names

In [45]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index") for column in columns_to_encode]

Fit the StringIndexer models on the training data

In [46]:
indexer_models = [indexer.fit(train_data) for indexer in indexers]

Transform the training data using the fitted StringIndexer models

In [47]:
for model in indexer_models:
    train_data = model.transform(train_data)

Create a OneHotEncoder for each indexed column and set the input and output column names

In [48]:
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_OneHotEncoded") for column in columns_to_encode]

Fit the OneHotEncoder models on the training data

In [49]:
encoder_models = [encoder.fit(train_data) for encoder in encoders]

Transform the training data using the fitted OneHotEncoder models

In [50]:
for model in encoder_models:
    train_data = model.transform(train_data)

Show the encoded columns

In [51]:
train_data.select(["Gender_OneHotEncoded", "City_Category_OneHotEncoded", "Occupation_OneHotEncoded"]).show(5)

+--------------------+---------------------------+------------------------+
|Gender_OneHotEncoded|City_Category_OneHotEncoded|Occupation_OneHotEncoded|
+--------------------+---------------------------+------------------------+
|           (1,[],[])|                  (2,[],[])|         (20,[12],[1.0])|
|           (1,[],[])|                  (2,[],[])|         (20,[12],[1.0])|
|           (1,[],[])|                  (2,[],[])|         (20,[12],[1.0])|
|           (1,[],[])|                  (2,[],[])|         (20,[12],[1.0])|
|       (1,[0],[1.0])|              (2,[1],[1.0])|          (20,[9],[1.0])|
+--------------------+---------------------------+------------------------+
only showing top 5 rows



### Test Dataset

Fit the StringIndexer models on the test data

In [52]:
indexer_models = [indexer.fit(test_data) for indexer in indexers]

Transform the test data using the fitted StringIndexer models

In [53]:
for model in indexer_models:
    test_data = model.transform(test_data)

Create a OneHotEncoder for each indexed column and set the input and output column names

In [54]:
encoders = [OneHotEncoder(inputCol=column+"_index", outputCol=column+"_OneHotEncoded") for column in columns_to_encode]

Fit the OneHotEncoder models on the test data

In [55]:
encoder_models = [encoder.fit(test_data) for encoder in encoders]

Transform the test data using the fitted OneHotEncoder models

In [56]:
for model in encoder_models:
    test_data = model.transform(test_data)

Show the encoded columns

In [57]:
test_data.select(["Gender_OneHotEncoded", "City_Category_OneHotEncoded", "Occupation_OneHotEncoded"]).show(5)

+--------------------+---------------------------+------------------------+
|Gender_OneHotEncoded|City_Category_OneHotEncoded|Occupation_OneHotEncoded|
+--------------------+---------------------------+------------------------+
|       (1,[0],[1.0])|              (2,[0],[1.0])|          (20,[2],[1.0])|
|       (1,[0],[1.0])|              (2,[1],[1.0])|          (20,[4],[1.0])|
|           (1,[],[])|              (2,[0],[1.0])|          (20,[3],[1.0])|
|           (1,[],[])|              (2,[0],[1.0])|          (20,[3],[1.0])|
|           (1,[],[])|              (2,[1],[1.0])|          (20,[3],[1.0])|
+--------------------+---------------------------+------------------------+
only showing top 5 rows



In [58]:
train_data.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- Age_labelEncoded: double (nullable = false)
 |-- Gender_labelEncoded: double (nullable = false)
 |-- Stay_In_Current_City_Years_labelEncoded: double (nullable = false)
 |-- City_Category_labelEncoded: double (nullable = false)
 |-- Gender_index: double (nullable = false)
 |-- City_Category_index: double (nullable = false)
 |-- Occupation_index: double (nullable = false)
 |-- Gender_OneHotEncoded: vector (nullable = true)
 |-- City_Category_OneH

In [59]:
test_data.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Age_labelEncoded: double (nullable = false)
 |-- Gender_labelEncoded: double (nullable = false)
 |-- Stay_In_Current_City_Years_labelEncoded: double (nullable = false)
 |-- City_Category_labelEncoded: double (nullable = false)
 |-- Gender_index: double (nullable = false)
 |-- City_Category_index: double (nullable = false)
 |-- Occupation_index: double (nullable = false)
 |-- Gender_OneHotEncoded: vector (nullable = true)
 |-- City_Category_OneHotEncoded: vector (nullable = true)
 |-- 

In [60]:
train_data.toPandas()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,...,Age_labelEncoded,Gender_labelEncoded,Stay_In_Current_City_Years_labelEncoded,City_Category_labelEncoded,Gender_index,City_Category_index,Occupation_index,Gender_OneHotEncoded,City_Category_OneHotEncoded,Occupation_OneHotEncoded
0,1000001,P00069042,F,0-17,10,A,2,0,3,8,...,6.0,1.0,1.0,2.0,1.0,2.0,12.0,(0.0),"(0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1000001,P00248942,F,0-17,10,A,2,0,1,6,...,6.0,1.0,1.0,2.0,1.0,2.0,12.0,(0.0),"(0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,1000001,P00087842,F,0-17,10,A,2,0,12,8,...,6.0,1.0,1.0,2.0,1.0,2.0,12.0,(0.0),"(0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,1000001,P00085442,F,0-17,10,A,2,0,12,14,...,6.0,1.0,1.0,2.0,1.0,2.0,12.0,(0.0),"(0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,1000002,P00285442,M,55+,16,C,4+,0,8,8,...,5.0,0.0,3.0,1.0,0.0,1.0,9.0,(1.0),"(0.0, 1.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
550063,1006033,P00372445,M,51-55,13,B,1,1,20,8,...,4.0,0.0,0.0,0.0,0.0,0.0,17.0,(1.0),"(1.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
550064,1006035,P00375436,F,26-35,1,C,3,0,20,8,...,0.0,1.0,2.0,1.0,1.0,1.0,3.0,(0.0),"(0.0, 1.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
550065,1006036,P00375436,F,26-35,15,B,4+,1,20,8,...,0.0,1.0,3.0,0.0,1.0,0.0,14.0,(0.0),"(1.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
550066,1006038,P00375436,F,55+,1,C,2,0,20,8,...,5.0,1.0,1.0,1.0,1.0,1.0,3.0,(0.0),"(0.0, 1.0)","(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


## Question 8: Build a baseline model using any of the ML algorithms.

In [61]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

Split the data into train and test sets

In [62]:
train, test = train_data.randomSplit([0.7, 0.3], seed=42)

Define the feature columns

In [63]:
feature_cols = [
    "Age_labelEncoded",
    "Gender_labelEncoded",
    "Marital_Status",
    "Stay_In_Current_City_Years_labelEncoded",
    "City_Category_labelEncoded",
    "Occupation_OneHotEncoded",
    "Gender_index",
    "City_Category_index",
    "Occupation_index",
    "Product_Category_1",
    "Product_Category_2",
    "Product_Category_3"
]

Create a vector assembler to combine the features into a single vector column

In [64]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

Transform the train and test data using the vector assembler

In [65]:
train = assembler.transform(train)
test = assembler.transform(test)

Create a Linear Regression model

In [66]:
lr = LinearRegression(labelCol="Purchase", featuresCol="features")

Fit the model on the training data

In [67]:
baseline_model = lr.fit(train)

Make predictions on the test data

In [68]:
predictions = baseline_model.transform(test)

Evaluate the model using R2 and RMSE

In [69]:
evaluator_r2 = RegressionEvaluator(labelCol="Purchase", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

evaluator_rmse = RegressionEvaluator(labelCol="Purchase", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)

Print the R2 and RMSE

In [70]:
print("R Squared (R2):", r2)
print("Root Mean Squared Error (RMSE):", rmse)

R Squared (R2): 0.12848219958436558
Root Mean Squared Error (RMSE): 4699.353548842734


## Question 9: Use grid search cv for model improvement.

In [71]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

Create a Linear Regression model

In [72]:
lr = LinearRegression(labelCol="Purchase", featuresCol="features")

Define the parameter grid for grid search

In [73]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]) \
    .build()

Create a CrossValidator with the Linear Regression model and parameter grid

In [74]:
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="Purchase", metricName="r2"),
                          numFolds=3)

Run cross-validation and choose the best model

In [75]:
cv_model = crossval.fit(train)

Get the best model from cross-validation

In [76]:
best_model = cv_model.bestModel

Print the best hyperparameters

In [77]:
print("Best Hyperparameters:")
print("regParam =", best_model._java_obj.getRegParam())
print("elasticNetParam =", best_model._java_obj.getElasticNetParam())

Best Hyperparameters:
regParam = 1.0
elasticNetParam = 1.0


Make predictions on the test data using the best model

In [78]:
predictions = best_model.transform(test)

Evaluate the model using R2 score and RMSE

In [79]:
evaluator_r2 = RegressionEvaluator(labelCol="Purchase", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)

evaluator_rmse = RegressionEvaluator(labelCol="Purchase", metricName="rmse")
rmse = evaluator_rmse.evaluate(predictions)

Print the R2 score and RMSE

In [80]:
print("R2 Score:", r2)
print("Root Mean Squared Error (RMSE):", rmse)

R2 Score: 0.1284883009804303
Root Mean Squared Error (RMSE): 4699.337098996604


## Question 10: Create a Spark ML Pipeline for the final model.

In [81]:
from pyspark.ml import Pipeline

Define the feature columns that will be used in the model

In [82]:
feature_cols = [
    "Age_labelEncoded",
    "Gender_labelEncoded",
    "Marital_Status",
    "Stay_In_Current_City_Years_labelEncoded",
    "City_Category_labelEncoded",
    "Occupation_OneHotEncoded",
    "Gender_index",
    "City_Category_index",
    "Occupation_index",
    "Product_Category_1",
    "Product_Category_2",
    "Product_Category_3"
]

Create a VectorAssembler to combine the features into a single vector column

In [83]:
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

Create a LinearRegression model with the desired hyperparameters

In [90]:
lr = LinearRegression(labelCol="Purchase", featuresCol="features", regParam=1.0, elasticNetParam=1.0)

Create the pipeline by specifying the stages, which include the VectorAssembler and the LinearRegression model

In [91]:
pipeline = Pipeline(stages=[assembler, lr])

Fit the pipeline on the training data

In [92]:
pipeline_model = pipeline.fit(train_data)

Make predictions on the test data

In [93]:
predictions = pipeline_model.transform(test_data)

Select the columns of interest from the predictions

In [94]:
predicted_values = predictions.select("Product_ID", "prediction")

Show the predicted values

In [95]:
predicted_values.show(5)

+----------+------------------+
|Product_ID|        prediction|
+----------+------------------+
| P00128942|11261.354003793087|
| P00113442|10412.006335802065|
| P00288442|  8766.63244437881|
| P00145342| 9176.251714236765|
| P00053842| 9733.169576693785|
+----------+------------------+
only showing top 5 rows

