In [70]:
# META DATA - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 

    # Developer details: 
        # Name: Harish S
        # Role: Architect
        # Code ownership rights: Harish S
    # Version:
        # Version: V 1.0 (August 29th )
            # Developer: Harish S
     
    # Description: The code enables to explore PySpark ML
    
# CODE - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 

# Dependency: 
    # Environment:     
        #Python 3.12

IMPORT FOUNDATIONAL LIBRARIES

In [71]:
import numpy as np # Library for numerical computation
import pandas as pd # Library for data manipulation
import os # system related operations

INITIATE PYSPARK

In [72]:
import pyspark # import pyspark

In [73]:
from pyspark.sql import SparkSession,functions as F # initiate pyspark session to configure spark 
from pyspark.sql.types import IntegerType,StructType,StringType,LongType,FloatType,DoubleType # import datatypes to find the data based on its type


In [74]:
sc = SparkSession.builder \
        .appName(" spark ml ") \
        .master("local[*]") \
        .getOrCreate()

In [69]:
#sc.stop()

READ THE DATA SOURCE 

In [77]:
data_file_path = input("enter the path of source file: ") #datafile path
main_df=sc.read.parquet(data_file_path+'combined_file.parquet') # customer data

SCHEMA

In [78]:
main_df.printSchema() #check datatypes

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)
 |-- payment_sequential: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: string (nullable = true)
 |-- payment_value: string (nullable = true)



PREVIEW DATA

In [79]:
main_df.show(5) # preview certain portion of data

+--------------------+--------------------+--------------------+------------------------+-------------+--------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+
|            order_id|         customer_id|  customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|payment_sequential|payment_type|payment_installments|payment_value|
+--------------------+--------------------+--------------------+------------------------+-------------+--------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+
|

In [80]:
# make a copy of main dataframe
df_copy=main_df.select("*") 

In [81]:
# shape of the dataframe
num_rows = df_copy.count() #no of rows
num_columns = len(df_copy.columns) #no of cols
print(f"Shape: ({num_rows}, {num_columns})")

Shape: (103886, 16)


ISOLATE CATEGORICAL AND NUMERICAL COLUMNS

In [82]:
categorical_cols=[field.name for field in df_copy.schema.fields if isinstance(field.dataType,StringType)]

In [83]:
numeric_types=[IntegerType,LongType,DoubleType,FloatType]
Numerical_cols=[field.name for field in df_copy.schema.fields if isinstance(field.dataType,FloatType)]

In [84]:
len(categorical_cols) #indicates no of categorical cols

16

In [15]:
len(Numerical_cols) #indicates no of numerical cols

0

### Preprocessing Steps

a) Handling Missing Values

In [85]:
df_copy.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in categorical_cols]).show()

+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+
|order_id|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|payment_sequential|payment_type|payment_installments|payment_value|
+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+
|       0|          0|                 0|                       0|          

In [86]:
# as there 100000 rows, there are only 3000 rows at max where missing values are present hence its less than 10 %, hence deleting the rows with missing values

In [87]:
df_clean = df_copy.dropna(subset=['order_approved_at', 'order_delivered_carrier_date', 'order_delivered_customer_date'])

In [88]:
df_clean.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in categorical_cols]).show()

+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+
|order_id|customer_id|customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|order_status|order_purchase_timestamp|order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|payment_sequential|payment_type|payment_installments|payment_value|
+--------+-----------+------------------+------------------------+-------------+--------------+------------+------------------------+-----------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+
|       0|          0|                 0|                       0|          

b) Encoding Categorical Variables

In [90]:
from pyspark.ml.feature import StringIndexer

indexers = [StringIndexer(inputCol=c, outputCol=c+"_index") for c in categorical_cols]

for indexer in indexers:
    df_clean= indexer.fit(df_clean).transform(df_clean)

In [91]:
df_clean.show(5) #preview the categorically converted data

24/09/02 20:28:31 WARN DAGScheduler: Broadcasting large task binary with size 34.9 MiB
24/09/02 20:28:32 WARN DAGScheduler: Broadcasting large task binary with size 34.9 MiB


+--------------------+--------------------+--------------------+------------------------+-------------+--------------+------------+------------------------+-------------------+----------------------------+-----------------------------+-----------------------------+------------------+------------+--------------------+-------------+--------------+-----------------+------------------------+------------------------------+-------------------+--------------------+------------------+------------------------------+-----------------------+----------------------------------+-----------------------------------+-----------------------------------+------------------------+------------------+--------------------------+-------------------+
|            order_id|         customer_id|  customer_unique_id|customer_zip_code_prefix|customer_city|customer_state|order_status|order_purchase_timestamp|  order_approved_at|order_delivered_carrier_date|order_delivered_customer_date|order_estimated_delivery_date|

                                                                                

d) Vector assembler

In PySpark, vector assembler converts all the rows and columns as a matrix which is further used for ML modelling 

In [93]:
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(inputCols=[c+"_index" for c in categorical_cols], outputCol="features")
df_clean = assembler.transform(df_clean)

In [94]:
df_clean.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_unique_id: string (nullable = true)
 |-- customer_zip_code_prefix: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- order_status: string (nullable = true)
 |-- order_purchase_timestamp: string (nullable = true)
 |-- order_approved_at: string (nullable = true)
 |-- order_delivered_carrier_date: string (nullable = true)
 |-- order_delivered_customer_date: string (nullable = true)
 |-- order_estimated_delivery_date: string (nullable = true)
 |-- payment_sequential: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- payment_installments: string (nullable = true)
 |-- payment_value: string (nullable = true)
 |-- order_id_index: double (nullable = false)
 |-- customer_id_index: double (nullable = false)
 |-- customer_unique_id_index: double (nullable = false)
 |-- customer_zip_code_prefix_index: do

In [95]:
# drop the original columns
df_clean = df_clean.drop(*categorical_cols)

In [100]:
df_clean.show(5)

24/09/02 20:31:44 WARN DAGScheduler: Broadcasting large task binary with size 34.9 MiB
24/09/02 20:31:44 WARN DAGScheduler: Broadcasting large task binary with size 34.9 MiB
[Stage 68:>                                                         (0 + 3) / 3]

+--------------+-----------------+------------------------+------------------------------+-------------------+--------------------+------------------+------------------------------+-----------------------+----------------------------------+-----------------------------------+-----------------------------------+------------------------+------------------+--------------------------+-------------------+--------------------+
|order_id_index|customer_id_index|customer_unique_id_index|customer_zip_code_prefix_index|customer_city_index|customer_state_index|order_status_index|order_purchase_timestamp_index|order_approved_at_index|order_delivered_carrier_date_index|order_delivered_customer_date_index|order_estimated_delivery_date_index|payment_sequential_index|payment_type_index|payment_installments_index|payment_value_index|            features|
+--------------+-----------------+------------------------+------------------------------+-------------------+--------------------+------------------+

                                                                                

e) Splitting the data

In [97]:
train_df, test_df = df_clean.randomSplit([0.8, 0.2], seed=42)

f) Model Training

In [103]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol="features", labelCol="payment_type_index", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model on train data
model = lr.fit(train_df)

24/09/02 20:39:50 WARN DAGScheduler: Broadcasting large task binary with size 34.9 MiB
24/09/02 20:39:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/09/02 20:39:52 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:54 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:55 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:55 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:56 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:57 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:58 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:59 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:39:59 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
24/09/02 20:40:00 WARN 

In [104]:
# Make predictions on the test data
predictions = model.transform(test_df)

In [106]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator #import multiclass evaluation class
evaluator = MulticlassClassificationEvaluator(labelCol="payment_type_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions) # calculate metrics
print(f"Test set accuracy = {accuracy}") # accuracy metrics
f1 = MulticlassClassificationEvaluator(labelCol="payment_type_index", predictionCol="prediction", metricName="f1").evaluate(predictions)
print(f"Test set F1-score = {f1}") #f1 score metrics

24/09/02 20:41:44 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB
                                                                                

Test set accuracy = 0.7436101838473419


24/09/02 20:41:46 WARN DAGScheduler: Broadcasting large task binary with size 35.0 MiB

Test set F1-score = 0.6399343823976


                                                                                