## Spark ML Assignment

### Questions:
1. Average Purchase amount?
2. Counting and Removing null values
3. How many distinct values per column?
4. Count category values within each of the following column:
     - Gender
     - Age
     - City_Category
     - Stay_In_Current_City_Years
     - Marital_Status
5. Calculate average Purchase for each of the following columns:
     - Gender
     - Age
     - City_Category
     - Stay_In_Current_City_Years
     - Marital_Status
6. Label encode the following columns:
     - Age
     - Gender
     - Stay_In_Current_City_Years
     - City_Category
7. One-Hot encode following columns:
     - Gender
     - City_Category
     - Occupation
8. Build a baseline model using any of the ML algorithms.
9. Model improvement with Grid-Search CV
10. Create a Spark ML Pipeline for the final model.

In [2]:
# Importing Necessary libraries
# importing the required libraries
from pyspark.sql import SparkSession
import pyspark.sql.types as tp
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

In [3]:
# create spark session
spark = SparkSession.builder.getOrCreate()

22/07/04 08:53:16 WARN Utils: Your hostname, codespaces-ae9593 resolves to a loopback address: 127.0.0.1; using 172.16.5.4 instead (on interface eth0)
22/07/04 08:53:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/07/04 08:53:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# reading the train data  
train_data = spark.read.csv("train.csv",inferSchema=True, header=True)

# reading the test data
test_data  = spark.read.csv("test.csv", inferSchema=True, header=True)

                                                                                

In [5]:
train_data.printSchema() 

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)



In [6]:
train_data = train_data.withColumn("Product_ID", F.col("Product_ID").cast(tp.IntegerType()))
test_data = test_data.withColumn("Product_ID", F.col("Product_ID").cast(tp.IntegerType()))

### 1. Average Purchase amount?

In [7]:
target_variable = train_data.agg(F.round(F.avg("Purchase"),4).alias("Average_Purchase"))
j = ((target_variable.collect())[0]).asDict()
print(f"The Average Purchasing Amount is {j['Average_Purchase']}.")
target_variable.show()

The Average Purchasing Amount is 9263.9687.
+----------------+
|Average_Purchase|
+----------------+
|       9263.9687|
+----------------+



### 2. Counting and Removing null values

In [8]:
# null values in each column
for c in train_data.columns:
    missing_values = F.isnull(c)
    missing_values = train_data.filter(missing_values).count()
    print(c, missing_values)

                                                                                

User_ID 0


                                                                                

Product_ID 550068
Gender 0
Age 0
Occupation 0
City_Category 0
Stay_In_Current_City_Years 0
Marital_Status 0
Product_Category_1 0
Product_Category_2 173638
Product_Category_3 383247
Purchase 0


In [9]:
# Average Product_category_2 and Product_category_3 amount
Product_ID_variable = train_data.agg(F.round(F.avg("Product_ID"),4).alias("Average_Product_ID"))
j = ((Product_ID_variable.collect())[0]).asDict()
print(f"The Average Product_ID Amount is {j['Average_Product_ID']}.")
value_1 = j['Average_Product_ID']
Product_Category_2_variable = train_data.agg(F.round(F.avg("Product_Category_2"),4).alias("Average_Product_Category_2"))
j = ((Product_Category_2_variable.collect())[0]).asDict()
print(f"The Average Product_Category_2 Amount is {j['Average_Product_Category_2']}.")
value_2 = j['Average_Product_Category_2']
Product_Category_3_variable = train_data.agg(F.round(F.avg("Product_Category_3"),4).alias("Average_Product_Category_3"))
j = ((Product_Category_3_variable.collect())[0]).asDict()
print(f"The Average Product_Category_3 Amount is {j['Average_Product_Category_3']}.")
value_3 = j['Average_Product_Category_3']

The Average Product_ID Amount is None.
The Average Product_Category_2 Amount is 9.8423.
The Average Product_Category_3 Amount is 12.6682.


In [10]:
# If we want to Replaces the missing_values with the Averages of their respective column
train_data = train_data.fillna({"Product_ID":0,"Product_Category_2": value_2, "Product_Category_3" : value_3})
test_data = test_data.fillna({"Product_ID":0,"Product_Category_2": value_2, "Product_Category_3" : value_3})

train_data = train_data.withColumn("Product_Category_2", F.col("Product_Category_2").cast(tp.IntegerType()))
train_data = train_data.withColumn("Product_Category_3", F.col("Product_Category_3").cast(tp.IntegerType()))


test_data = test_data.withColumn("Product_Category_2", F.col("Product_Category_2").cast(tp.IntegerType()))
test_data = test_data.withColumn("Product_Category_3", F.col("Product_Category_3").cast(tp.IntegerType()))

In [11]:
# null values in each column
for c in train_data.columns:
    missing_values = F.isnull(c)
    missing_values = train_data.filter(missing_values).count()
    print(c, missing_values)

User_ID 0
Product_ID 0
Gender 0
Age 0
Occupation 0
City_Category 0
Stay_In_Current_City_Years 0
Marital_Status 0
Product_Category_1 0
Product_Category_2 0
Product_Category_3 0
Purchase 0


### 3. How many distinct values per column?

In [12]:
# distinct values in each column
train_data.agg(*(F.countDistinct(F.col(c)).alias(c) for c in train_data.columns)).show()



+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|User_ID|Product_ID|Gender|Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
|   5891|         1|     2|  7|        21|            3|                         5|             2|                20|                17|                15|   18105|
+-------+----------+------+---+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+



                                                                                

### 4. Count category values within each of the following column:
     - Gender
     - Age
     - City_Category
     - Stay_In_Current_City_Years
     - Marital_Status

In [13]:
for c in ['Gender', 'Age','City_Category']:
    variable = train_data.groupBy(c).agg(F.count(c).alias("count")).show()


+------+------+
|Gender| count|
+------+------+
|     F|135809|
|     M|414259|
+------+------+

+-----+------+
|  Age| count|
+-----+------+
|18-25| 99660|
|26-35|219587|
| 0-17| 15102|
|46-50| 45701|
|51-55| 38501|
|36-45|110013|
|  55+| 21504|
+-----+------+

+-------------+------+
|City_Category| count|
+-------------+------+
|            B|231173|
|            C|171175|
|            A|147720|
+-------------+------+



In [14]:
for c in ['Stay_In_Current_City_Years','Marital_Status']:
    variable = train_data.groupBy(c).agg(F.count(c).alias("count")).show()

+--------------------------+------+
|Stay_In_Current_City_Years| count|
+--------------------------+------+
|                         3| 95285|
|                         0| 74398|
|                        4+| 84726|
|                         1|193821|
|                         2|101838|
+--------------------------+------+

+--------------+------+
|Marital_Status| count|
+--------------+------+
|             1|225337|
|             0|324731|
+--------------+------+



### 5. Calculate average Purchase for each of the following columns:
     - Gender
     - Age
     - City_Category
     - Stay_In_Current_City_Years
     - Marital_Status

In [15]:
for c in ['Gender', 'Age','City_Category']:
    (train_data.groupBy(c)).agg(F.round(F.avg('Purchase'),4).alias("Average_Purchase")).show()

+------+----------------+
|Gender|Average_Purchase|
+------+----------------+
|     F|       8734.5658|
|     M|        9437.526|
+------+----------------+

+-----+----------------+
|  Age|Average_Purchase|
+-----+----------------+
|18-25|       9169.6636|
|26-35|       9252.6906|
| 0-17|       8933.4646|
|46-50|       9208.6257|
|51-55|        9534.808|
|36-45|       9331.3507|
|  55+|       9336.2805|
+-----+----------------+

+-------------+----------------+
|City_Category|Average_Purchase|
+-------------+----------------+
|            B|       9151.3006|
|            C|        9719.921|
|            A|       8911.9392|
+-------------+----------------+



In [16]:
for c in ['Stay_In_Current_City_Years','Marital_Status']:
    (train_data.groupBy(c)).agg(F.round(F.avg('Purchase'),4).alias("Average_Purchase")).show()

+--------------------------+----------------+
|Stay_In_Current_City_Years|Average_Purchase|
+--------------------------+----------------+
|                         3|       9286.9041|
|                         0|       9180.0751|
|                        4+|       9275.5989|
|                         1|       9250.1459|
|                         2|       9320.4298|
+--------------------------+----------------+

+--------------+----------------+
|Marital_Status|Average_Purchase|
+--------------+----------------+
|             1|       9261.1746|
|             0|       9265.9076|
+--------------+----------------+



### 6. Label encode the following columns:
     - Age
     - Gender
     - Stay_In_Current_City_Years
     - City_Category

In [17]:
# label encode 
SI_AGE = StringIndexer(inputCol= "Age", outputCol= "Age_le" , handleInvalid="skip")
SI_Gender = StringIndexer(inputCol= "Gender", outputCol= "Gender_le", handleInvalid= "skip")
SI_Stay_In_Current_City_Years  = StringIndexer(inputCol= "Stay_In_Current_City_Years", outputCol= "Stay_In_Current_City_Years_le", handleInvalid= "skip")
SI_City_Category = StringIndexer(inputCol= "City_Category", outputCol= "City_Category_le", handleInvalid= "skip")

In [18]:
# Fit the objects
# label encode objects
SI_AGE_Obj = SI_AGE.fit(train_data)
SI_Gender_Obj = SI_Gender.fit(train_data)
SI_Stay_In_Current_City_Years_Obj = SI_Stay_In_Current_City_Years.fit(train_data)
SI_City_Category_Obj = SI_City_Category.fit(train_data)

                                                                                

In [19]:
# Transform on training data
train_data_encoded = SI_AGE_Obj.transform(train_data)
train_data_encoded = SI_Gender_Obj.transform(train_data_encoded)
train_data_encoded = SI_Stay_In_Current_City_Years_Obj.transform(train_data_encoded)
train_data_encoded = SI_City_Category_Obj.transform(train_data_encoded)

In [20]:
# Transform on test data
test_data_encoded = SI_AGE_Obj.transform(test_data)
test_data_encoded = SI_Gender_Obj.transform(test_data_encoded)
test_data_encoded = SI_Stay_In_Current_City_Years_Obj.transform(test_data_encoded)
test_data_encoded = SI_City_Category_Obj.transform(test_data_encoded)

In [21]:
train_data_encoded.columns


['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Age_le',
 'Gender_le',
 'Stay_In_Current_City_Years_le',
 'City_Category_le']

### 7. One-Hot encode following columns:
     - Gender
     - City_Category
     - Occupation

In [22]:
# Label encoding Occupation
SI_Occupation = StringIndexer(inputCol= "Occupation", outputCol= "Occupation_le", handleInvalid= "skip")
SI_Occupation_Obj = SI_Occupation.fit(train_data)
train_data_encoded = SI_Occupation_Obj.transform(train_data_encoded)
test_data_encoded = SI_Occupation_Obj.transform(test_data_encoded)

In [23]:
#One Hot Encoding
OHE_train = OneHotEncoder(inputCols=['Gender_le','City_Category_le','Occupation_le'],
                                  outputCols=['Gender_le_ohe','City_Category_le_ohe','Occupation_le_ohe'])

In [24]:
# OHE object
OHE_Obj = OHE_train.fit(train_data_encoded)

In [25]:
# Transform train data
train_data_encoded = OHE_Obj.transform(train_data_encoded)

In [26]:
# view the one hot encoded data
train_data_encoded.select('Marital_Status','Age_le','Gender_le_ohe','City_Category_le_ohe','Occupation_le_ohe').show()

+--------------+------+-------------+--------------------+-----------------+
|Marital_Status|Age_le|Gender_le_ohe|City_Category_le_ohe|Occupation_le_ohe|
+--------------+------+-------------+--------------------+-----------------+
|             0|   6.0|    (1,[],[])|           (2,[],[])|  (20,[12],[1.0])|
|             0|   6.0|    (1,[],[])|           (2,[],[])|  (20,[12],[1.0])|
|             0|   6.0|    (1,[],[])|           (2,[],[])|  (20,[12],[1.0])|
|             0|   6.0|    (1,[],[])|           (2,[],[])|  (20,[12],[1.0])|
|             0|   5.0|(1,[0],[1.0])|       (2,[1],[1.0])|   (20,[9],[1.0])|
|             0|   0.0|(1,[0],[1.0])|           (2,[],[])|  (20,[14],[1.0])|
|             1|   3.0|(1,[0],[1.0])|       (2,[0],[1.0])|   (20,[2],[1.0])|
|             1|   3.0|(1,[0],[1.0])|       (2,[0],[1.0])|   (20,[2],[1.0])|
|             1|   3.0|(1,[0],[1.0])|       (2,[0],[1.0])|   (20,[2],[1.0])|
|             1|   0.0|(1,[0],[1.0])|           (2,[],[])|   (20,[5],[1.0])|

In [27]:
test_data_encoded = OHE_Obj.transform(test_data_encoded)

### 8. Build a baseline model using any of the ML algorithms.

In [28]:
## columns in the dataset
train_data_encoded.columns

['User_ID',
 'Product_ID',
 'Gender',
 'Age',
 'Occupation',
 'City_Category',
 'Stay_In_Current_City_Years',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Purchase',
 'Age_le',
 'Gender_le',
 'Stay_In_Current_City_Years_le',
 'City_Category_le',
 'Occupation_le',
 'Gender_le_ohe',
 'City_Category_le_ohe',
 'Occupation_le_ohe']

In [29]:
from pyspark.ml.feature import VectorAssembler

# create feature vector
feature_vector = VectorAssembler(inputCols= ['User_ID',
 'Product_ID',
 'Marital_Status',
 'Product_Category_1',
 'Product_Category_2',
 'Product_Category_3',
 'Age_le',
 'Stay_In_Current_City_Years_le',
 'Gender_le_ohe',
 'City_Category_le_ohe',
 'Occupation_le_ohe'], outputCol= 'feature_vector')

In [30]:
# transform the feature vector
train_data_encoded = feature_vector.transform(train_data_encoded)

In [31]:
# view the feature vector
train_data_encoded.select("feature_vector").show(10)

+--------------------+
|      feature_vector|
+--------------------+
|(31,[0,3,4,5,6,7,...|
|(31,[0,3,4,5,6,7,...|
|(31,[0,3,4,5,6,7,...|
|(31,[0,3,4,5,6,7,...|
|(31,[0,3,4,5,6,7,...|
|(31,[0,3,4,5,7,8,...|
|(31,[0,2,3,4,5,6,...|
|(31,[0,2,3,4,5,6,...|
|(31,[0,2,3,4,5,6,...|
|(31,[0,2,3,4,5,8,...|
+--------------------+
only showing top 10 rows



In [32]:
train_data_encoded.printSchema()

root
 |-- User_ID: integer (nullable = true)
 |-- Product_ID: integer (nullable = false)
 |-- Gender: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Occupation: integer (nullable = true)
 |-- City_Category: string (nullable = true)
 |-- Stay_In_Current_City_Years: string (nullable = true)
 |-- Marital_Status: integer (nullable = true)
 |-- Product_Category_1: integer (nullable = true)
 |-- Product_Category_2: integer (nullable = true)
 |-- Product_Category_3: integer (nullable = true)
 |-- Purchase: integer (nullable = true)
 |-- Age_le: double (nullable = false)
 |-- Gender_le: double (nullable = false)
 |-- Stay_In_Current_City_Years_le: double (nullable = false)
 |-- City_Category_le: double (nullable = false)
 |-- Occupation_le: double (nullable = false)
 |-- Gender_le_ohe: vector (nullable = true)
 |-- City_Category_le_ohe: vector (nullable = true)
 |-- Occupation_le_ohe: vector (nullable = true)
 |-- feature_vector: vector (nullable = true)



In [33]:
test_data_encoded = feature_vector.transform(test_data_encoded)

In [34]:
## Model Building
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [35]:
model_DTR = DecisionTreeRegressor(featuresCol='feature_vector', labelCol="Purchase")


In [36]:
model_DTR = model_DTR.fit(train_data_encoded)

                                                                                

In [37]:
evaluator = RegressionEvaluator(labelCol="Purchase", metricName="rmse")
predictions = model_DTR.transform(train_data_encoded)

In [38]:
evaluator.evaluate(predictions) 


                                                                                

3254.9692382019916

In [39]:
predictions.select("prediction", "Purchase", "feature_vector").show(5)

+------------------+--------+--------------------+
|        prediction|Purchase|      feature_vector|
+------------------+--------+--------------------+
|10635.041568097331|    8370|(31,[0,3,4,5,6,7,...|
|13036.571651917404|   15200|(31,[0,3,4,5,6,7,...|
| 3644.778734866649|    1422|(31,[0,3,4,5,6,7,...|
| 3644.778734866649|    1057|(31,[0,3,4,5,6,7,...|
| 8048.940431943747|    7969|(31,[0,3,4,5,6,7,...|
+------------------+--------+--------------------+
only showing top 5 rows



### 9. Model improvement with Grid-Search CV

In [40]:
# import the CrossValidator and ParamGridBuilder
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [41]:
model_DTR = DecisionTreeRegressor(featuresCol='feature_vector', labelCol="Purchase")

In [42]:
dtparamGrid = (ParamGridBuilder()
             .addGrid(model_DTR.maxDepth, [2, 5, 10])
             .addGrid(model_DTR.maxBins, [10, 20, 40])
             .build())

In [46]:
evaluator = RegressionEvaluator( labelCol="Purchase", metricName="rmse")
dtcv = CrossValidator(estimator = model_DTR,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = evaluator,
                      numFolds = 5)

In [47]:
dtcvModel = dtcv.fit(train_data_encoded)
print(dtcvModel)

                                                                                

CrossValidatorModel_11445e1f35e3


In [49]:
dtpredictions = dtcvModel.transform(train_data_encoded)
print('RMSE:', evaluator.evaluate(dtpredictions))





RMSE: 2925.009142496947


                                                                                

In [50]:
dtpredictions.select("prediction", "Purchase", "feature_vector").show(5)

+------------------+--------+--------------------+
|        prediction|Purchase|      feature_vector|
+------------------+--------+--------------------+
|12317.442622950819|    8370|(31,[0,3,4,5,6,7,...|
| 12964.52896921877|   15200|(31,[0,3,4,5,6,7,...|
|1577.6288659793815|    1422|(31,[0,3,4,5,6,7,...|
|1577.6288659793815|    1057|(31,[0,3,4,5,6,7,...|
| 7782.683359877308|    7969|(31,[0,3,4,5,6,7,...|
+------------------+--------+--------------------+
only showing top 5 rows



In [51]:
# extract the best model parameters dictionary
param_dict = dtcvModel.bestModel.extractParamMap()

In [52]:
# created a filtered dictionary
final_dict = {}
for k, v in param_dict.items():
    final_dict[k.name] = v

In [53]:
print(final_dict)

{'cacheNodeIds': False, 'checkpointInterval': 10, 'featuresCol': 'feature_vector', 'impurity': 'variance', 'labelCol': 'Purchase', 'leafCol': '', 'maxBins': 20, 'maxDepth': 10, 'maxMemoryInMB': 256, 'minInfoGain': 0.0, 'minInstancesPerNode': 1, 'minWeightFractionPerNode': 0.0, 'predictionCol': 'prediction', 'seed': -8027478954484760012}


### 10. Create a Spark ML Pipeline for the final model.

In [54]:
# Import the Required Libraries 
from pyspark.ml import Pipeline

In [None]:
class nullValuesTransformer(Transformer):
    
    def __init__(self, dataframe = None):
        self.dataframe = dataframe
    
    def _transform(self, dataframe):
        dataframe = dataframe.fillna({
            "Fraud" : 0,
            "Country": "IN",
            "TrafficType" : "U",
            "Device": "Generic",
            "Browser": "chrome",
            "OS": "Android",
        })
        
        return dataframe