---

<center><h1> Spark ML Assignment </h1></center>

----

A retail company “ABC Private Limited” wants to understand the customer purchase behaviour (specifically, purchase amount) against various products of different categories.
They have shared purchase summary of various customers for selected high volume products from last month.
The data set also contains customer demographics (age, gender, marital status, city_type, stay_in_current_city),
product details (product_id and product category) and
Total purchase_amount from last month.

Now, they want to build a model to predict the purchase amount of customer against various products which will help them to create personalized offer for customers against different products.


So, in this notebook we will build a Regression model to predict the purchase amount of customer against various products.

----

In [0]:
# !pip3 install matplotlib --user

In [0]:
# importing the required libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as tp
from pyspark.sql import functions as F
import pandas as pd

from pyspark.ml.feature import MinMaxScaler

# libraries to make plots
import matplotlib.pyplot as plt
%matplotlib inline

In [0]:
# create spark session
spark = SparkSession.builder.getOrCreate()

In [0]:
# reading the train data
train_data = spark.read.csv("/FileStore/tables/train_Spark_ML_Assignment.csv",inferSchema=True, header=True)

# reading the test data
test_data  = spark.read.csv("/FileStore/tables/test_Spark_ML_Assignment.csv", inferSchema=True, header=True)

In [0]:
train_data.count() # number of rows

In [0]:
train_data.columns # columns

---

#### `READING THE DATASET`

---

 * **`User_ID`**:  User ID 
 * **`Product_ID`**:   Product ID
 * **`Gender`**:   Sex of user
 * **`Age`**:  Age in Bins
 * **`Occupation`**:  Occupation (Masked)
 * **`City_category`**:  Category of the city (A,B,C)
 * **`Stay_In_Current_City_Years`**:  Number of Years stay in current city
 * **`Marital_Status`**: Marital status
 * **`Product_Category_1`**:  Product Category (Masked)
 * **`Product_Category_2`**: Product Category may belongs to other category also (masked)
 * **`Product_Category_3`**:  Product Category may belongs to other category also (masked)
 * **`Purchase`**:  Purchase Amount( **`Target Variable`** )

---

In [0]:
# data type of the columns
train_data.printSchema()

---

**`Numerical`**
- User_ID
- Occupation
- Marital_Status
- Product_Category_1
- Product_Category_2
- Product_Category_3
- Purchase

**`Categorical`**
- Product_ID
- Gender
- Age
- City_Category
- Stay_In_Current_City_Years


---

The `Target Variable` for our use-case is `Purchase`.

#### SQL Querying

In [0]:
# Create a view
train_data.createOrReplaceTempView('train_table')

Questions:
1. Average Purchase amount?
2. Counting and Removing null values
3. How many distinct values per column?
4. Count category values within each of the following column:
● Gender
● Age
● City_Category
● Stay_In_Current_City_Years
● Marital_Status
5. Calculate average Purchase for each of the following columns:
● Gender
● Age
● City_Category
● Stay_In_Current_City_Years
● Marital_Status
6. Label encode the following columns:
● Age
● Gender
● Stay_In_Current_City_Years
● City_Category
7. One-Hot encode following columns:
● Gender
● City_Category
● Occupation
8. Build a baseline model using any of the ML algorithms.
9. Model improvement with Grid-Search CV
10. Create a Spark ML Pipeline for the final model.

1. Average Purchase amount?

In [0]:
%python

Avg_Purchase_Amount = spark.sql("""

SELECT round(avg(Purchase),2) as Avg_Purchase_Amount
FROM train_table 

""")

Avg_Purchase_Amount.show()

Avg_Purchase_Amount = 9263.97

---------

2.Counting and Removing null values

In [0]:
### null values in each column
for c in train_data.columns:
    # define the condition
    missing_values = F.isnull(c)
    
    # filter the data with condition and count the number of data points
    Count_of_missing_values = train_data.filter(missing_values).count()
    
    # print the result
    print(c, Count_of_missing_values)

---

Count of null values on:

 * **`Product_Category_2`**:   173638
 * **`Product_Category_3`**:   383247


---

###### Removing null values

In [0]:
train_data = train_data.dropna()

In [0]:
# Verifying it

### null values in each column
for c in train_data.columns:
    # define the condition
    missing_values = F.isnull(c)
    
    # filter the data with condition and count the number of data points
    Count_of_missing_values = train_data.filter(missing_values).count()
    
    # print the result
    print(c, Count_of_missing_values)

---------

3.How many distinct values per column?

In [0]:
# distinct values in each column
distinctcols = train_data.agg(*(F.countDistinct(F.col(c)).alias(c) for c in train_data.columns))
distinctDF = pd.DataFrame(distinctcols.collect(), columns= distinctcols.schema.names)
distinctDF.head()

Unnamed: 0,User_ID,Product_ID,Gender,Age,Occupation,City_Category,Stay_In_Current_City_Years,Marital_Status,Product_Category_1,Product_Category_2,Product_Category_3,Purchase
0,5870,528,2,7,21,3,5,2,12,14,15,13876


---

 - There are 5870 unique values in the `User_ID` column.
 - There are 528 unique values in the `Product_ID` column.
 - Gender has 2 distinct values, age has 7, City_Category has 3,  Stay_In_Current_City_Years has 5, marital_status has 5
 - Occupation has 21 unique values, Product_Category_1 has 12, Product_Category_2 has 14, Product_Category_3 has 15
---

---

4.Count category values within each of the following column:

● Gender
● Age
● City_Category
● Stay_In_Current_City_Years
● Marital_Status

In [0]:
# number of datapoints with each gender
top_gender = train_data.groupBy("Gender").agg(F.count("Gender").alias("gender_count"), F.round((F.count("Gender")/train_data.count())*100, 4).alias("percentage"))

top_gender.show()

In [0]:
# number of datapoints with each Age Bins
top_Age_bins = train_data.groupBy("Age").agg(F.count("Age").alias("Age_bin_count"), F.round((F.count("Age")/train_data.count())*100, 4).alias("percentage"))


top_Age_bins.show()

In [0]:
# number of datapoints with each City_Category
top_City_Category = train_data.groupBy("City_Category").agg(F.count("City_Category").alias("City_Category_count"), F.round((F.count("City_Category")/train_data.count())*100, 4).alias("percentage"))

top_City_Category.show()

In [0]:
# number of datapoints with each Stay_In_Current_City_Years
top_Stay_In_Current_City_Years = train_data.groupBy("Stay_In_Current_City_Years").agg(F.count("Stay_In_Current_City_Years").alias("Stay_In_Current_City_Years_count"), F.round((F.count("Stay_In_Current_City_Years")/train_data.count())*100, 4).alias("percentage"))

top_Stay_In_Current_City_Years.show()

In [0]:
# number of datapoints with each Marital_Status
top_Marital_Status = train_data.groupBy("Marital_Status").agg(F.count("Marital_Status").alias("Marital_Status_count"), F.round((F.count("Marital_Status")/train_data.count())*100, 4).alias("percentage"))

top_Marital_Status.show()

---

5.Calculate average Purchase for each of the following columns:


● Gender
● Age
● City_Category
● Stay_In_Current_City_Years
● Marital_Status

In [0]:
%python

# Gender

Avg_Purchase_Amount_Gender = spark.sql("""

SELECT Gender, avg(Purchase) as Avg_Purchase_Amount_Gender
FROM train_table 
group by Gender

""")

Avg_Purchase_Amount_Gender.show()

In [0]:
%python

# Age

Avg_Purchase_Amount_Age = spark.sql("""

SELECT Age, avg(Purchase) as Avg_Purchase_Amount_Age
FROM train_table 
group by Age
order by Age

""")

Avg_Purchase_Amount_Age.show()

In [0]:
%python

# City_Category

Avg_Purchase_Amount_City_Category = spark.sql("""

SELECT City_Category, avg(Purchase) as Avg_Purchase_Amount_City_Category
FROM train_table 
group by City_Category
order by City_Category

""")

Avg_Purchase_Amount_City_Category.show()


In [0]:

Avg_Purchase_Amount_City = train_data.groupBy("City_Category").agg(F.avg("Purchase").alias("Avg_Purchase_Amount_City_Category"))
Avg_Purchase_Amount_City.show()

In [0]:
%python

# Stay_In_Current_City_Years

Avg_Purchase_Amount_Stay_In_Current_City_Years = spark.sql("""

SELECT Stay_In_Current_City_Years, avg(Purchase) as Avg_Purchase_Amount_Stay_In_Current_City_Years
FROM train_table 
group by Stay_In_Current_City_Years
order by Stay_In_Current_City_Years

""")

Avg_Purchase_Amount_Stay_In_Current_City_Years.show()


In [0]:
%python

# Marital_Status

Avg_Purchase_Amount_Marital_Status = spark.sql("""

SELECT Marital_Status, avg(Purchase) as Avg_Purchase_Amount_Marital_Status
FROM train_table 
group by Marital_Status
order by Marital_Status

""")

Avg_Purchase_Amount_Marital_Status.show()


---

6.Label encode the following columns:

● Age
● Gender
● Stay_In_Current_City_Years
● City_Category

In [0]:
# importing libraries
from pyspark.ml.feature import StringIndexer

##### Creating the StringIndexer Objects

In [0]:


# label encode Age

SI_Age = StringIndexer(inputCol= "Age", outputCol= "Age_le" , handleInvalid="skip")

# label encode Gender
SI_Gender = StringIndexer(inputCol= "Gender", outputCol= "Gender_le", handleInvalid= "skip")

# label encode Stay_In_Current_City_Years 
SI_Stay_In_Current_City_Years      = StringIndexer(inputCol= "Stay_In_Current_City_Years", outputCol= "Stay_In_Current_City_Years_le", handleInvalid= "skip")

# label encode City_Category
SI_City_Category = StringIndexer(inputCol= "City_Category", outputCol= "City_Category_le", handleInvalid="skip")

##### Fit the StringIndexer Objects

In [0]:
# label encode object Age
SI_Age_Obj = SI_Age.fit(train_data)

# label encode object Gender
SI_Gender_Obj = SI_Gender.fit(train_data)

# label encode object Stay_In_Current_City_Years
SI_Stay_In_Current_City_Years_Obj = SI_Stay_In_Current_City_Years.fit(train_data)

# label encode object City_Category
SI_City_Category_Obj = SI_City_Category.fit(train_data)

##### Transform the StringIndexer Objects

In [0]:
# label encode Age
train_data_encoded = SI_Age_Obj.transform(train_data)

# label encode Gender
train_data_encoded = SI_Gender_Obj.transform(train_data_encoded)

# label encode object Stay_In_Current_City_Years
train_data_encoded = SI_Stay_In_Current_City_Years_Obj.transform(train_data_encoded)

# label encode object City_Category
train_data_encoded = SI_City_Category_Obj.transform(train_data_encoded)



----

###### `Make the Transformation on the test data`

---

In [0]:
# label encode Age
test_data_encoded = SI_Age_Obj.transform(test_data)

# label encode Gender
test_data_encoded = SI_Gender_Obj.transform(test_data_encoded)

# label encode object Stay_In_Current_City_Years
test_data_encoded = SI_Stay_In_Current_City_Years_Obj.transform(test_data_encoded)

# label encode object City_Category
test_data_encoded = SI_City_Category_Obj.transform(test_data_encoded)



##### Label encoded columns ● Age ● Gender ● Stay_In_Current_City_Years ● City_Category

In [0]:
train_data_encoded.select('Age_le',
 'Gender_le',
 'Stay_In_Current_City_Years_le',
 'City_Category_le').show()

In [0]:
test_data_encoded.select('Age_le',
 'Gender_le',
 'Stay_In_Current_City_Years_le',
 'City_Category_le').show()

---

7.One-Hot encode following columns:


● Gender
● City_Category
● Occupation

In [0]:
from pyspark.ml.feature import OneHotEncoder


In [0]:
OHE_train = OneHotEncoder(inputCols=["Gender_le",
                                     "City_Category_le",
                                      "Occupation"],
                          outputCols=["Gender_ohe",
                                      "City_Category_ohe",
                                      "Occupation_ohe"])


Fit & Transform the data.

In [0]:
# OHE object
OHE_Obj = OHE_train.fit(train_data_encoded)

In [0]:
# Transform train data
train_data_encoded = OHE_Obj.transform(train_data_encoded)

In [0]:
# view the one hot encoded data
train_data_encoded.select("Gender_ohe",
                          "City_Category_ohe", 
                          "Occupation_ohe").show()
                                             

----

###### `Make the Transformation on the test data`

---

In [0]:

# test data
test_data_encoded = OHE_Obj.transform(test_data_encoded)

In [0]:
# view the one hot encoded data
test_data_encoded.select("Gender_ohe",
                          "City_Category_ohe", 
                          "Occupation_ohe").show()
                                             

---

In [0]:
train_data_encodedCopy = train_data_encoded.select('*')
test_data_encodedCopy = test_data_encoded.select('*')

-----
----------

Feature engg 
- Product_ID and Purchase -->  1. Mean_Purchase
- Product_ID and count of products  -->  2. category_count

In [0]:
#feat Engg

# Mean_Purchase value for each product ID

Mean_Purchase_valueDF = train_data.groupby("Product_ID").agg(F.avg("Purchase").alias("Mean_Purchase"))
Mean_Purchase_valueDF.show()

In [0]:

train_data_encoded = train_data_encoded.join(Mean_Purchase_valueDF, train_data_encoded.Product_ID == Mean_Purchase_valueDF.Product_ID, "inner").drop(train_data_encoded.Product_ID)
train_data_encoded.select("Product_ID", "Mean_Purchase").show(truncate=False)


In [0]:

test_data_encoded = test_data_encoded.join(Mean_Purchase_valueDF, test_data_encoded.Product_ID == Mean_Purchase_valueDF.Product_ID, "inner").drop(test_data_encoded.Product_ID)
test_data_encoded.select("Product_ID", "Mean_Purchase").show(truncate=False)


In [0]:
#feat Engg

# Category count for each product ID
Prod_CategoryDF = train_data.groupBy("Product_ID").agg(F.count("Product_ID").alias("Product_Category_count"))
Prod_CategoryDF.show()

In [0]:

train_data_encoded = train_data_encoded.join(Prod_CategoryDF, train_data_encoded.Product_ID == Prod_CategoryDF.Product_ID, "inner").drop(train_data_encoded.Product_ID)
train_data_encoded.select("Product_ID", "Product_Category_count").show(truncate=False)


In [0]:

test_data_encoded = test_data_encoded.join(Prod_CategoryDF, test_data_encoded.Product_ID == Prod_CategoryDF.Product_ID, "inner").drop(test_data_encoded.Product_ID)
test_data_encoded.select("Product_ID", "Product_Category_count").show(truncate=False)



--------

In [0]:
train_data_encodedCopyModel = train_data_encoded.select('*')
test_data_encodedCopyModel = test_data_encoded.select('*')

8.Build a baseline model using any of the ML algorithms

---


#### `BaseLine MODEL BUILDING`

This is a `Regression Problem`, so we will train the data on 

 * **Linear Regression**
 
 and the Evaluation Metric is Root Mean Squared Error (RMSE)



---

In [0]:
## columns in the dataset
train_data_encoded.columns

----

So, now we will select the features and create a vector using `VectorAssembler`

 * Age_le (Label Encoded)
 * Gender_le (Label Encoded)
 * Occupation_ohe (One Hot Encoded)
 * City_Category_ohe (One Hot Encoded)
 * Stay_In_Current_City_Years_le (Label Encoded)
 * Marital_Status 
 * Product_Category_1 
 * Product_Category_2
 * Product_Category_3
 * Mean_Purchase
 * Product_Category_count

----

In [0]:
# import the library
from pyspark.ml.feature import VectorAssembler

# create feature vector
feature_vector = VectorAssembler(inputCols= ['Age_le',
                                             'Gender_le',
                                             'Occupation_ohe',
                                             'City_Category_ohe',
                                             'Stay_In_Current_City_Years_le',
                                             'Marital_Status',
                                             'Product_Category_1',
                                             'Product_Category_2',
                                             'Product_Category_3',
                                            'Mean_Purchase', 
                                             'Product_Category_count'],
                                outputCol= 'features')


In [0]:
# transform the feature vector
train_data_encoded = feature_vector.transform(train_data_encoded)

In [0]:
# transform the feature vector on test data

test_data_encoded = feature_vector.transform(test_data_encoded)

Train\Test Split

In [0]:
train_data_encoded_Split, valid_data_encoded_Split = train_data_encoded.randomSplit([0.7, 0.3])


Training the Machine Learning Algorithm

In [0]:
# importing the libraries
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
LR = LinearRegression(featuresCol="features", labelCol="Purchase")

In [0]:
# Fit the model

model_LR = LR.fit(train_data_encoded_Split)

print("Coefficients: " + str(model_LR.coefficients))
print("Intercept: " + str(model_LR.intercept))


Evaluating Model Performance

In [0]:
trainingSummary = model_LR.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
# Evaluate training data
evaluator = RegressionEvaluator(labelCol="Purchase", metricName="rmse") 

evaluator.evaluate(model_LR.transform(train_data_encoded_Split)) 

In [0]:
# Evaluate validation data
evaluator.evaluate(model_LR.transform(valid_data_encoded_Split))

Predicting Values on Validation data

In [0]:
predictions = model_LR.transform(valid_data_encoded_Split)


In [0]:
predictions.select("features","Purchase","prediction").show()

____

9.Model improvement with Grid-Search CV

In [0]:
# import the CrossValidator and ParamGridBuilder
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
# create the ParamGridBuilder
params = ParamGridBuilder().build()

In [0]:
# create the object of the Logistic Regression Model
model_LR_CV = LinearRegression(featuresCol= "features",  labelCol="Purchase")

In [0]:
# create object of the cross validation model with numFolds = 3
cv = CrossValidator(estimator=model_LR_CV,
                    estimatorParamMaps=params,
                    evaluator=evaluator,
                    numFolds=3,
                    seed=27)

Fit the model

In [0]:
## fit the model
cv_model_LR = cv.fit(train_data_encoded_Split)

In [0]:
# Evaluate training data
evaluator = RegressionEvaluator(labelCol="Purchase", metricName="rmse") 

evaluator.evaluate(cv_model_LR.transform(train_data_encoded_Split)) 

In [0]:
# evaluate model on validation data
evaluator.evaluate(cv_model_LR.transform(valid_data_encoded_Split))

--Grid Search--

In [0]:
# create parameter builder

updated_params = ParamGridBuilder() \
                .addGrid(model_LR_CV.regParam, [0.01, 0.005, 0.0001]) \
                .addGrid(model_LR_CV.elasticNetParam, [0.1, 0.001]) \
                .build()

In [0]:
# create object of the Cross Calidator with 3 folds
cv = CrossValidator(estimator=model_LR_CV,
                    estimatorParamMaps=updated_params,
                    evaluator=evaluator,
                    numFolds=3,
                    seed=27)

In [0]:
# fit the model
grid_model = cv.fit(train_data_encoded_Split)

In [0]:

# Evaluate training data
evaluator = RegressionEvaluator(labelCol="Purchase", metricName="rmse") 
evaluator.evaluate(grid_model.transform(train_data_encoded_Split)) 

In [0]:
# evaluate model on validation data
evaluator.evaluate(grid_model.transform(valid_data_encoded_Split))

---

Get the best model parameters.

---

In [0]:
# extract the best model parameters dictionary
param_dict = grid_model.bestModel.extractParamMap()

In [0]:
param_dict

In [0]:
# created a filtered dictionary
final_dict = {}
for k, v in param_dict.items():
    final_dict[k.name] = v

In [0]:
# get the best elastic net parameter
final_dict["elasticNetParam"]

In [0]:
# get the best regularization parameter
final_dict["regParam"]

----

In [0]:
#finding Mode of Product_Category_2 for missing value imputation

Product_Category_2_DF= train_data.select('Product_Category_2')
# Create a view
Product_Category_2_DF.registerTempTable('table')

Product_Category_2_DF=spark.sql(
    'SELECT Product_Category_2, COUNT(Product_Category_2) AS count FROM table GROUP BY Product_Category_2 ORDER BY count desc limit 1'
)
Product_Category_2_DF.show()


In [0]:
#finding Mode of Product_Category_3 for missing value imputation

Product_Category_3_DF= train_data.select('Product_Category_3')
# Create a view
Product_Category_3_DF.registerTempTable('table')
Product_Category_3_DF=spark.sql(
    'SELECT Product_Category_3, COUNT(Product_Category_3) AS count FROM table GROUP BY Product_Category_3 ORDER BY count desc limit 1'
)
Product_Category_3_DF.show()


------

10.Create a Spark ML Pipeline for the final model.

-------

In [0]:
# importing the required libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as tp
from pyspark.sql import functions as F
import pandas as pd

from pyspark.ml.feature import MinMaxScaler

# libraries to make plots
import matplotlib.pyplot as pltA
%matplotlib inline

# create spark session
spark = SparkSession.builder.getOrCreate()


# reading the train data
train_data = spark.read.csv("/FileStore/tables/train_Spark_ML_Assignment.csv",inferSchema=True, header=True)

# reading the test data
test_data  = spark.read.csv("/FileStore/tables/test_Spark_ML_Assignment.csv", inferSchema=True, header=True)

In [0]:
train_data, valid_data = train_data.randomSplit([0.7, 0.3])


In [0]:
## Import the Required Libraries 
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import MinMaxScaler



---
### `Define the stages of the Pipeline`


  * **STAGE 1**: [Transformer] Fill null values with in each column Product_Category_2 , Product_Category_3
    
  * **STAGE 2**: [Estimator] Label Encode: Age
  * **STAGE 3**: [Estimator] Label Encode: Gender
  * **STAGE 4**: [Estimator] Label Encode: Stay_In_Current_City_Years
  * **STAGE 5**: [Estimator] Label Encode: City_Category
    
  * **STAGE 6**: [Estimator] OHE: Gender, City_Category, Occupation, Product_Category_1, Product_Category_2 , Product_Category_3

  * **STAGE 7**: [Transformer] Create Feature: Purchase per Product_ID -- >Mean_Purchase , count of products per Product_ID --> category_count
    
  * **STAGE 8**: [Transformer] VectorAssembler: Mean_Purchase  & Product_Category_count
  
  * **STAGE 9**: [Transformer] MinMaxScaler: Mean_Purchase & Product_Category_count --> FETemp_scaled
      
  * **STAGE 10**: [Transformer] Create Vector [ Age (LE), Stay_In_Current_City_Years,(LE), Gender (OHE), Occupation(OHE), City_Category(OHE), Product_Category_1(OHE), Product_Category_2(OHE) , Product_Category_3(OHE), Marital_Status, FETemp_scaled
                                              
  * **STAGE 11**: [Estimator] Predict Labels Using the Linear Regression

In [0]:
# custom transformer to fill null values

class nullValuesTransformer(Transformer):
    
    def __init__(self, dataframe = None):
        self.dataframe = dataframe
    
    def _transform(self, dataframe):
        dataframe = dataframe.fillna({
            "Product_Category_2" : 2,
            "Product_Category_3": 16,
            
        })
        
        return dataframe

In [0]:

Mean_Purchase_valueDF = train_data.groupby("Product_ID").agg(F.avg("Purchase").alias("Mean_Purchase"))
Prod_CategoryDF = train_data.groupBy("Product_ID").agg(F.count("Product_ID").alias("Product_Category_count"))


In [0]:
# Creating two new features:
# Purchase per Product_ID -- >Mean_Purchase ,
# count of products per Product_ID --> category_count

class FeatEngg(Transformer):
    
    def __init__(self, dataframe = None):
        self.dataframe = dataframe
        
    def _transform(self, dataframe):
        
        
        # join  Purchase per Product_ID 
        dataframe = dataframe.join(Mean_Purchase_valueDF, ['Product_ID'], "inner")
        
        # join count of products per Product_ID 
        dataframe = dataframe.join(Prod_CategoryDF, ['Product_ID'], "inner")
        
        # replace null values
        dataframe = dataframe.fillna({
            'Mean_Purchase': 0.0,
            'Product_Category_count' : 0.0,
        })
        
        return dataframe
        

In [0]:
# Stage 1 - replace null values
stage_1 = nullValuesTransformer()

# stage 2 - Label Encode: Age
stage_2 =  StringIndexer(inputCol= "Age", outputCol= "Age_le" , handleInvalid="skip")

# Stage 3 -  Label Encode: Gender
stage_3 = StringIndexer(inputCol= "Gender", outputCol= "Gender_le", handleInvalid= "skip")

# Stage 4 - Label Encode: Stay_In_Current_City_Years
stage_4 = StringIndexer(inputCol= "Stay_In_Current_City_Years", outputCol= "Stay_In_Current_City_Years_le", handleInvalid= "skip")

# Stage 5 - Label Encode: City_Category
stage_5 = StringIndexer(inputCol= "City_Category", outputCol= "City_Category_le", handleInvalid="skip")

# Stage 6 - OHE: Gender, City_Category, Occupation, Product_Category_1, Product_Category_2 , Product_Category_3

stage_6 =  OneHotEncoder(inputCols=["Gender_le",
                                     "City_Category_le",
                                      "Occupation", 'Product_Category_1', 'Product_Category_2' , 'Product_Category_3'],
                          outputCols=["Gender_ohe",
                                      "City_Category_ohe",
                                      "Occupation_ohe", 'Product_Category_1_ohe', 'Product_Category_2_ohe','Product_Category_3_ohe'])

# stage 7 - Create new features 
stage_7 = FeatEngg()

# stage 8 - VectorAssembler - Mean_Purchase
stage_8 = VectorAssembler(inputCols=['Mean_Purchase', 'Product_Category_count'],
                          
                          outputCol = 'FETemp')


# # Stage 9 -  MinMaxScaler: Mean_Purchase 
stage_9 =  MinMaxScaler(inputCol='FETemp', outputCol='FETemp_scaled')
 

# Stage 10 - Create vector from the columns
stage_10 = VectorAssembler(inputCols= ['Age_le','Stay_In_Current_City_Years_le',
                                       'Gender_ohe', 'City_Category_ohe','Occupation_ohe',
                                       'Product_Category_1_ohe', 'Product_Category_2_ohe','Product_Category_3_ohe',
                                      'Marital_Status','FETemp_scaled'],

                         outputCol=  "feature_vector")

# Stage 11 - Train ML model
stage_11 = LinearRegression(featuresCol= "feature_vector", labelCol= "Purchase", maxIter=20, regParam=0.0001, elasticNetParam=0.001)



In [0]:
# Define pipeline
pipeline = Pipeline(stages= [stage_1,
                             stage_2,
                             stage_3,
                             stage_4,
                             stage_5,
                             stage_6,
                             stage_7,
                             stage_8,
                             stage_9,
                             stage_10,
                             stage_11
                            ])

In [0]:
# fit the pipeline with the training data
pipeline_model = pipeline.fit(train_data)


In [0]:
# transform data
final_train_data = pipeline_model.transform(train_data)


# TRANSFORM THE VALIDATION DATA
predictions_valid_data  = pipeline_model.transform(valid_data)


# TRANSFORM THE TEST DATA
predictions_test_data = pipeline_model.transform(test_data)

In [0]:
# Evaluate training data
evaluator = RegressionEvaluator(labelCol="Purchase", metricName="rmse") 

# evaluate model on train data
evaluator.evaluate(final_train_data)


In [0]:

trainingSummary = pipeline_model.stages[-1].summary

print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
# evaluate model on validation data
evaluator.evaluate(predictions_valid_data)


In [0]:
predictions_valid_data.select("feature_vector","Purchase","prediction").show()



---- End ---