In [None]:
#Installing PySpark
!pip install pyspark==3.3.2 py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.3.2
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m11.8 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824028 sha256=b102c9bac233a04856268649d55bed6b09cfd4c3970018435982400884973386
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspark


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("customer_shopping").getOrCreate()

In [None]:
#Preprocessing and Exploratory Data Analysis
from pyspark.sql.functions import sum
from pyspark.sql.functions import count
from pyspark.sql.functions import isnan
from pyspark.sql.functions import when
from pyspark.sql.functions import col
from pyspark.sql.functions import desc
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import split

#Feature Encoding
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler, VectorIndexer

#Correlation Analysis
from pyspark.sql.functions import corr
from pyspark.ml.stat import Correlation

#Regression Classification Algorithms
from pyspark.ml.regression import LinearRegression
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

#Model Evaluation
from pyspark.ml.evaluation import RegressionEvaluator

#Classification Algorithms
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import LogisticRegression

#Model Evaluation
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Combining models
from pyspark.ml import Pipeline

# Data Preprocessing

In [None]:
df = spark.read.csv("customer_shopping_data.csv", header =True, inferSchema=True)

In [None]:
df.head()

Row(invoice_no='I138884', customer_id='C241288', gender='Female', age=28, category='Clothing', quantity=5, price=1500.4, payment_method='Credit Card', invoice_date='5/8/2022', shopping_mall='Kanyon')

In [None]:
df.describe()

DataFrame[summary: string, invoice_no: string, customer_id: string, gender: string, age: string, category: string, quantity: string, price: string, payment_method: string, invoice_date: string, shopping_mall: string]

In [None]:
df.printSchema()

root
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- shopping_mall: string (nullable = true)



In [None]:
# Check for missing values in each column
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----------+-----------+------+---+--------+--------+-----+--------------+------------+-------------+
|invoice_no|customer_id|gender|age|category|quantity|price|payment_method|invoice_date|shopping_mall|
+----------+-----------+------+---+--------+--------+-----+--------------+------------+-------------+
|         0|          0|     0|  0|       0|       0|    0|             0|           0|            0|
+----------+-----------+------+---+--------+--------+-----+--------------+------------+-------------+



In [None]:
#Check for duplicates
# count the number of rows before removing duplicates
print("Number of rows before removing duplicates:", df.count())

# drop duplicates and count the number of rows after removing duplicates
df = df.dropDuplicates()
print("Number of rows after removing duplicates:", df.count())

Number of rows before removing duplicates: 99457
Number of rows after removing duplicates: 99457


# Exploratory Data Analysis

In [None]:
# Summary statistics of numerical columns
num_cols = ['quantity', 'price']
df.select(num_cols).describe().show()

+-------+------------------+-----------------+
|summary|          quantity|            price|
+-------+------------------+-----------------+
|  count|             99457|            99457|
|   mean| 3.003428617392441|689.2563209226016|
| stddev|1.4130251343054265|941.1845672154691|
|    min|                 1|             5.23|
|    max|                 5|           5250.0|
+-------+------------------+-----------------+



In [None]:
# Distribution of categorical columns
cat_cols = ['gender', 'age', 'category', 'payment_method', 'shopping_mall']
for col in cat_cols:
    df.groupby(col).count().sort(desc("count")).show()

+------+-----+
|gender|count|
+------+-----+
|Female|59482|
|  Male|39975|
+------+-----+

+---+-----+
|age|count|
+---+-----+
| 37| 2057|
| 22| 2051|
| 64| 2002|
| 43| 2000|
| 51| 1993|
| 30| 1981|
| 24| 1977|
| 40| 1960|
| 48| 1955|
| 38| 1954|
| 36| 1954|
| 28| 1953|
| 27| 1950|
| 21| 1947|
| 39| 1947|
| 61| 1945|
| 52| 1945|
| 19| 1936|
| 56| 1916|
| 33| 1913|
+---+-----+
only showing top 20 rows

+---------------+-----+
|       category|count|
+---------------+-----+
|       Clothing|34487|
|      Cosmetics|15097|
|Food & Beverage|14776|
|           Toys|10087|
|          Shoes|10034|
|       Souvenir| 4999|
|     Technology| 4996|
|          Books| 4981|
+---------------+-----+

+--------------+-----+
|payment_method|count|
+--------------+-----+
|          Cash|44447|
|   Credit Card|34931|
|    Debit Card|20079|
+--------------+-----+

+-----------------+-----+
|    shopping_mall|count|
+-----------------+-----+
| Mall of Istanbul|19943|
|           Kanyon|19823|
|        Metro

# Feature Creation


In [None]:
#Because the goal of sales optimisation is to increase total revenue generated by the business, we will need to calculate the value. This is derive by multiplying price and quantity
df = df.withColumn("total_revenue", col("quantity") * col("price"))

In [None]:
#However, the "quantity" and "price" columns are stored as strings, which means 
#they must be converted to numeric types before any numerical analysis or modelling can be performed on them.
df = df.withColumn("quantity", col("quantity").cast(DoubleType()))
df = df.withColumn("price", col("price").cast(DoubleType()))

In [None]:
df.printSchema()

root
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- shopping_mall: string (nullable = true)
 |-- total_revenue: double (nullable = true)



In [None]:
#As part of the analysis data, we will also try to identify seasonal trends & comparing sales performance across different months or years. 
#To this end we will have to split the invoice date column into months and year

In [None]:
df = df.withColumn("invoice_day", split(df["invoice_date"], "/")[0].cast("integer"))
df = df.withColumn("invoice_month", split(df["invoice_date"], "/")[1].cast("integer"))
df = df.withColumn("invoice_year", split(df["invoice_date"], "/")[2].cast("integer"))

In [None]:
df.show(5)

+----------+-----------+------+---+---------------+--------+-------+--------------+------------+----------------+-------------+-----------+-------------+------------+
|invoice_no|customer_id|gender|age|       category|quantity|  price|payment_method|invoice_date|   shopping_mall|total_revenue|invoice_day|invoice_month|invoice_year|
+----------+-----------+------+---+---------------+--------+-------+--------------+------------+----------------+-------------+-----------+-------------+------------+
|   I137794|    C133687|Female| 45|Food & Beverage|     3.0|  15.69|    Debit Card|   12/3/2021|  Viaport Outlet|        47.07|         12|            3|        2021|
|   I291540|    C557245|  Male| 25|Food & Beverage|     1.0|   5.23|    Debit Card|  17/07/2022|          Kanyon|         5.23|         17|            7|        2022|
|   I215906|    C296862|  Male| 21|       Clothing|     4.0|1200.32|    Debit Card|  13/08/2021|Mall of Istanbul|      4801.28|         13|            8|        2021

In [None]:
# EDA - Calculate TotalRevenue by year
revenue_by_year = df.groupBy("invoice_year").agg(sum("total_revenue").alias("total_revenue"))
revenue_by_year.show()

+------------+--------------------+
|invoice_year|       total_revenue|
+------------+--------------------+
|        2023|2.1508409580000207E7|
|        2022| 1.154368140799979E8|
|        2021| 1.145605705899973E8|
+------------+--------------------+



In [None]:
# EDA - Calculate Quantity by year and month
quantity_by_month = df.groupBy("invoice_year", "invoice_month").agg(sum("quantity").alias("TotalQuantity"))
quantity_by_month = quantity_by_month.orderBy("invoice_year", "invoice_month")
quantity_by_month.show()

+------------+-------------+-------------+
|invoice_year|invoice_month|TotalQuantity|
+------------+-------------+-------------+
|        2021|            1|      11483.0|
|        2021|            2|      10179.0|
|        2021|            3|      11383.0|
|        2021|            4|      11270.0|
|        2021|            5|      11620.0|
|        2021|            6|      11352.0|
|        2021|            7|      11986.0|
|        2021|            8|      11205.0|
|        2021|            9|      10901.0|
|        2021|           10|      11711.0|
|        2021|           11|      11355.0|
|        2021|           12|      11651.0|
|        2022|            1|      11619.0|
|        2022|            2|      10248.0|
|        2022|            3|      11949.0|
|        2022|            4|      11288.0|
|        2022|            5|      11597.0|
|        2022|            6|      11322.0|
|        2022|            7|      11713.0|
|        2022|            8|      11716.0|
+----------

In [None]:
quantity_by_month_sorted = quantity_by_month.orderBy(desc("TotalQuantity"))
quantity_by_month_sorted.show(quantity_by_month_sorted.count())

+------------+-------------+-------------+
|invoice_year|invoice_month|TotalQuantity|
+------------+-------------+-------------+
|        2021|            7|      11986.0|
|        2022|            3|      11949.0|
|        2022|           10|      11766.0|
|        2022|            8|      11716.0|
|        2022|            7|      11713.0|
|        2021|           10|      11711.0|
|        2023|            1|      11661.0|
|        2021|           12|      11651.0|
|        2021|            5|      11620.0|
|        2022|            1|      11619.0|
|        2022|            5|      11597.0|
|        2022|           12|      11501.0|
|        2021|            1|      11483.0|
|        2021|            3|      11383.0|
|        2021|           11|      11355.0|
|        2021|            6|      11352.0|
|        2022|            6|      11322.0|
|        2022|            4|      11288.0|
|        2021|            4|      11270.0|
|        2022|           11|      11231.0|
|        20

In [None]:
# EDA on total revenue by shopping mall
df.groupby('shopping_mall').agg(sum('total_revenue').alias('TotalRevenue')).sort(desc('TotalRevenue')).show()

+-----------------+--------------------+
|    shopping_mall|        TotalRevenue|
+-----------------+--------------------+
| Mall of Istanbul| 5.087248168000062E7|
|           Kanyon| 5.055423110000049E7|
|        Metrocity| 3.730278733000052E7|
|     Metropol AVM|2.5379913190000236E7|
|     Istinye Park|2.4618827680000238E7|
|     Zorlu Center|1.2901053820000071E7|
|      Cevahir AVM|1.2645138200000051E7|
|   Viaport Outlet|1.2521339720000036E7|
|Emaar Square Mall|1.2406100290000025E7|
|   Forum Istanbul|1.2303921240000037E7|
+-----------------+--------------------+



In [None]:
# Calculate total revenue by payment method
df.groupby('payment_method').agg(sum('total_revenue').alias('TotalRevenue')).sort(desc('TotalRevenue')).show()

+--------------+--------------------+
|payment_method|        TotalRevenue|
+--------------+--------------------+
|          Cash| 1.128322430199977E8|
|   Credit Card| 8.807712376999915E7|
|    Debit Card|5.0596427460000545E7|
+--------------+--------------------+



In [None]:
# EDA on total revenue by category
df.groupby('category').agg(sum('total_revenue').alias('TotalRevenue')).sort(desc('TotalRevenue')).show()


+---------------+--------------------+
|       category|        TotalRevenue|
+---------------+--------------------+
|       Clothing|1.1399679103999336E8|
|          Shoes| 6.655345147000113E7|
|     Technology|          5.786235E7|
|      Cosmetics|   6792862.900000166|
|           Toys|  3980426.2400001483|
|Food & Beverage|   849535.0499999793|
|          Books|   834552.8999999986|
|       Souvenir|   635824.6500000034|
+---------------+--------------------+



In [None]:
# Calculate total revenue by age group and add the column to the dataset
df = df.withColumn('age_group', when((df.age >= '18') & (df.age <= '25'), '18-25')
                         .when((df.age >= '26') & (df.age <= '35'), '26-35')
                         .when((df.age >= '36') & (df.age <= '45'), '36-45')
                         .when((df.age >= '46') & (df.age <= '55'), '46-55')
                         .when((df.age >= '56') & (df.age <= '65'), '56-65')
                         .when(df.age >= '66', '66+')
                         .otherwise('Unknown'))
df.groupby('age_group').agg(sum('total_revenue').alias('TotalRevenue')).sort(desc('TotalRevenue')).show()

+---------+--------------------+
|age_group|        TotalRevenue|
+---------+--------------------+
|    36-45|5.0184235790000595E7|
|    46-55| 4.821974264000054E7|
|    56-65|4.7893010770000584E7|
|    26-35|4.7879659580000505E7|
|    18-25| 3.811827135000049E7|
|      66+| 1.921087412000013E7|
+---------+--------------------+



# Feature Encoding

In [None]:
# Create a StringIndexer for each categorical column
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df) for col in ["gender", "age", "category", "payment_method", "shopping_mall", "age_group"]]

# Create a OneHotEncoderEstimator to encode the indexed categorical columns
encoder = OneHotEncoder(inputCols=[indexer.getOutputCol() for indexer in indexers], outputCols=["{}_encoded".format(indexer.getOutputCol()) for indexer in indexers])

# Fit the indexers and encoder on the dataframe
indexed_df = df
for indexer in indexers:
    indexed_df = indexer.transform(indexed_df)
encoded_df = encoder.fit(indexed_df).transform(indexed_df)

In [None]:
indexed_df.printSchema()

root
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- shopping_mall: string (nullable = true)
 |-- total_revenue: double (nullable = true)
 |-- invoice_day: integer (nullable = true)
 |-- invoice_month: integer (nullable = true)
 |-- invoice_year: integer (nullable = true)
 |-- age_group: string (nullable = false)
 |-- gender_index: double (nullable = false)
 |-- age_index: double (nullable = false)
 |-- category_index: double (nullable = false)
 |-- payment_method_index: double (nullable = false)
 |-- shopping_mall_index: double (nullable = false)
 |-- age_group_index: double (nullable = false)



In [None]:
# Check column names in encoded_df
encoded_df.printSchema()

root
 |-- invoice_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: double (nullable = true)
 |-- price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- shopping_mall: string (nullable = true)
 |-- total_revenue: double (nullable = true)
 |-- invoice_day: integer (nullable = true)
 |-- invoice_month: integer (nullable = true)
 |-- invoice_year: integer (nullable = true)
 |-- age_group: string (nullable = false)
 |-- gender_index: double (nullable = false)
 |-- age_index: double (nullable = false)
 |-- category_index: double (nullable = false)
 |-- payment_method_index: double (nullable = false)
 |-- shopping_mall_index: double (nullable = false)
 |-- age_group_index: double (nullable = false)
 |-- gender_index_encoded: vector (nullable = true)
 |-- age_index_encoded:

# Correlation Analysis

In [None]:
correlation_df = indexed_df.select("age", "quantity", "price", "gender_index", "age_index", 
                                   "category_index", "payment_method_index", "shopping_mall_index", "age_group_index", "total_revenue" )

In [None]:
correlation_df = encoded_df.select()

In [None]:
# Calculate the correlation between total_revenue and other variables
corr_matrix = correlation_df.select([corr('total_revenue', col).alias(col) for col in correlation_df.columns])

# Show the correlation matrix
corr_matrix.show()

+-------------------+-------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+-------------+
|                age|           quantity|             price|        gender_index|           age_index|    category_index|payment_method_index| shopping_mall_index|     age_group_index|total_revenue|
+-------------------+-------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+-------------+
|0.00273105679304898|0.46117325824072886|0.9623724553160845|0.001021414943205...|-0.00452023498291...|0.1392701632335058|-0.00190633230606...|-2.96025691517805...|-0.00486277533523...|          1.0|
+-------------------+-------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+-------------+



In [None]:
# Calculate the correlation between category_index and other variables
corr_matrix = correlation_df.select([corr('category_index', col).alias(col) for col in correlation_df.columns])

# Show the correlation matrix
corr_matrix.show()

+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+------------------+
|                 age|            quantity|              price|        gender_index|           age_index|category_index|payment_method_index| shopping_mall_index|     age_group_index|     total_revenue|
+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+------------------+
|-1.15338755502333...|-4.98703495640320...|0.17025678520775553|0.004043110136582018|5.092306639406667E-4|           1.0|-0.00166687612461...|-4.08220214280156...|-7.87785934987323...|0.1392701632335058|
+--------------------+--------------------+-------------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+-----

In [None]:
# Correlation Analysis using the selected encoded features
# Calculate correlation matrix between features and target variable
corr_mat = Correlation.corr(transformed_df, 'features').head()

# Select the correlation coefficients between the features and target variable
corr_array = corr_mat[0].toArray()
corr_target = corr_array[-1][:-1]

# Print the correlation coefficients between the features and target variable
print(corr_target)

[-1.44991265e-03  5.36274803e-03  7.95427142e-04  3.58580596e-03
 -4.19209855e-04  3.21845278e-03  5.70278644e-04  1.76180680e-03
  8.47164436e-04  1.79541335e-03 -4.09121715e-03  1.60255997e-03
  2.37820862e-04  2.83961860e-03 -1.71249078e-03  2.96643726e-03
 -2.44588919e-03 -3.74050718e-03 -3.03456419e-03 -5.62759647e-03
 -3.93410756e-03  4.50533405e-03 -1.34255165e-03  4.93905936e-03
  1.10016082e-03  3.56158144e-04 -3.85437069e-03 -2.71264057e-03
  1.79029248e-03  2.01796768e-03 -8.23815538e-04  3.86437146e-03
 -6.06551613e-03  1.51152293e-03  2.14682987e-03  4.61890485e-04
 -1.28260478e-03 -1.52365736e-03  1.51765714e-05  1.72775456e-03
  4.86113144e-04  7.11156250e-04 -4.15567290e-03  2.53604099e-03
 -2.08251358e-03 -2.95913336e-03 -1.71666077e-06  1.18908814e-03
 -4.71403456e-03 -1.53520438e-03 -3.34600317e-04  2.13310382e-03
  5.58581146e-03 -9.54878967e-04  1.36373731e-03 -1.84730042e-03
 -4.18366397e-03  1.63976425e-01 -2.54765236e-01 -2.98954325e-01
 -2.07577136e-01  3.97954

In [None]:
#The features with the highest absolute correlation coefficients (in descending order) are:
#shopping_mall_index_encoded: 0.602977403
#age_index_encoded: -0.254765236
#age_group_index_encoded: -0.298954325
#category_index_encoded: -0.207577136
#payment_method_index_encoded: 0.397954105
#gender_index_encoded: -0.163976425

In [None]:
# Correlation Analysis using the selected encoded features
# Calculate correlation matrix between features and target variable
corr_mat = Correlation.corr(transformed_classification_df, 'features').head()

# Select the correlation coefficients between the features and target variable
corr_array = corr_mat[0].toArray()
corr_target = corr_array[-1][:-1]

# Print the correlation coefficients between the features and target variable
print(corr_target)

[ 1.44991265e-03 -3.89407558e-03 -5.25288925e-03 -1.59241575e-03
 -1.98737986e-04  3.44879843e-01]


# Feature selection

In [None]:
# Select the relevant columns for the regression modeling
selected_df = encoded_df.select("gender_index_encoded", "age_index_encoded", "age_group_index_encoded", "category_index_encoded",
                                "payment_method_index_encoded", "shopping_mall_index_encoded", "quantity", "price", "total_revenue")

In [None]:
selected_df.printSchema()
selected_df.show(5)

root
 |-- gender_index_encoded: vector (nullable = true)
 |-- age_index_encoded: vector (nullable = true)
 |-- age_group_index_encoded: vector (nullable = true)
 |-- category_index_encoded: vector (nullable = true)
 |-- payment_method_index_encoded: vector (nullable = true)
 |-- shopping_mall_index_encoded: vector (nullable = true)
 |-- quantity: double (nullable = true)
 |-- price: double (nullable = true)
 |-- total_revenue: double (nullable = true)



In [None]:
# Create a vector assembler to combine the feature columns
assembler = VectorAssembler(inputCols=["gender_index_encoded", "age_index_encoded", "age_group_index_encoded", "category_index_encoded",
                                       "payment_method_index_encoded", "shopping_mall_index_encoded", "quantity", "price"], outputCol="features")
# Transform the selected_df using the vector assembler
transformed_df = assembler.transform(selected_df).select("features", "total_revenue")

In [None]:
transformed_df.printSchema()
transformed_df.show(5)

root
 |-- features: vector (nullable = true)
 |-- total_revenue: double (nullable = true)

+--------------------+-------------+
|            features|total_revenue|
+--------------------+-------------+
|(77,[0,39,52,59,7...|        47.07|
|(77,[46,56,59,67,...|         5.23|
|(77,[14,56,57,66,...|      4801.28|
|(77,[50,54,58,65,...|       365.94|
|(77,[0,24,54,58,6...|       650.56|
+--------------------+-------------+
only showing top 5 rows



In [None]:
# Split the data into training and testing sets
(train_data, test_data) = transformed_df.randomSplit([0.7, 0.3], seed=42)

# Regression Models

## Linear Regression

In [None]:
#Train a linear regression model
lr = LinearRegression(featuresCol="features", labelCol="total_revenue")

In [None]:
# Fit the model on the training data
lr_model = lr.fit(train_data)

In [None]:
# Evaluate the model on the test data
lr_predictions = lr_model.transform(test_data)

In [None]:
# Compute evaluatiopn metrics
lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="rmse")
lr_rmse = lr_evaluator.evaluate(lr_predictions)

print("Linear Regression - Root Mean Squared Error (RMSE) on test data = %g" % lr_rmse)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="mae")
lr_mae = lr_evaluator.evaluate(lr_predictions)

print("Linear Regression - Mean Absolute  Error (MAE): ", lr_mae)

lr_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="r2")
lr_r2 = lr_evaluator.evaluate(lr_predictions)

print("Linear Regression - R-squared (R2): ", lr_r2)

Linear Regression - Root Mean Squared Error (RMSE) on test data = 586.911
Linear Regression - Mean Absolute  Error (MAE):  366.8170386956974
Linear Regression - R-squared (R2):  0.980414308318927


## Decision Tree

In [None]:
# Train a Decision Tree model
dt = DecisionTreeRegressor(featuresCol='features', labelCol='total_revenue', maxDepth=5)
dt_model = dt.fit(train_data)

# Make predictions on test data
dt_predictions = dt_model.transform(test_data)


In [None]:
# Evaluate the model on test data
dt_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="rmse")
dt_rmse = dt_evaluator.evaluate(dt_predictions)
dt_mae = dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "mae"})
dt_r2 = dt_evaluator.evaluate(dt_predictions, {dt_evaluator.metricName: "r2"})

# Print evaluation metrics
print("Decision Tree Regression - Root Mean Squared Error (RMSE) on test data = {:.3f}".format(dt_rmse))
print("Decision Tree Regression - Mean Absolute Error (MAE) on test data = {:.3f}".format(dt_mae))
print("Decision Tree Regression - R-squared (R2) on test data = {:.3f}".format(dt_r2))

Decision Tree Regression - Root Mean Squared Error (RMSE) on test data = 51.331
Decision Tree Regression - Mean Absolute Error (MAE) on test data = 27.827
Decision Tree Regression - R-squared (R2) on test data = 1.000


## Gradient Booster

In [None]:
# Define the GBTRegressor model
gbt = GBTRegressor(featuresCol="features", labelCol="total_revenue")

In [None]:
# Define the parameter grid to search over
param_grid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [2, 5]) \
    .addGrid(gbt.maxBins, [10, 20]) \
    .addGrid(gbt.stepSize, [0.05, 0.1]) \
    .build()

In [None]:
# Define the evaluator to use for model selection
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="rmse")

In [None]:
# Define the cross-validation object
cv = CrossValidator(estimator=gbt, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, seed=42)

In [None]:
# Train the model on the training data
gbt_model = cv.fit(train_data)

In [None]:
# Make predictions on the test data
predictions = gbt_model.transform(test_data)

In [None]:
# Evaluate the model using RMSE
rmse = evaluator.evaluate(predictions)
print("Gradient Booster Root Mean Squared Error (RMSE) - ", rmse)

# Evaluate the model using MAE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="mae")
mae = evaluator.evaluate(predictions)
print("Gradient Booster MAE - ", mae)

# Evaluate the model using R-squared
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("Gradient Booster R² - ", r2)

Gradient Booster Root Mean Squared Error (RMSE) -  2.8357915527954463
Gradient Booster MAE -  1.695846312169581
Gradient Booster R² -  0.9999995427617041


In [None]:
# Print the best hyperparameters found by the cross-validation
print("Best hyperparameters:", gbt_model.bestModel.extractParamMap())

Best hyperparameters: {Param(parent='GBTRegressor_f37fcb347c92', name='cacheNodeIds', doc='If false, the algorithm will pass trees to executors to match instances with nodes. If true, the algorithm will cache node IDs for each instance. Caching can speed up training of deeper trees. Users can set how often should the cache be checkpointed or disable it by setting checkpointInterval.'): False, Param(parent='GBTRegressor_f37fcb347c92', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the SparkContext.'): 10, Param(parent='GBTRegressor_f37fcb347c92', name='featureSubsetStrategy', doc="The number of features to consider for splits at each tree node. Supported options: 'auto' (choose automatically for task: If numTrees == 1, set to 'all'. If numTrees > 1 (forest), set to 'sqrt' for classification and to

In [None]:
params = gbt_model.extractParamMap()

# Print the values of the hyperparameters
print("Hyperparameters:")
for param in params:
    print("{}: {}".format(param.name, gbt_model.getOrDefault(param.name)))

Hyperparameters:
cacheNodeIds: False
checkpointInterval: 10
featureSubsetStrategy: all
featuresCol: features
impurity: variance
labelCol: total_revenue
leafCol: 
lossType: squared
maxBins: 32
maxDepth: 5
maxIter: 20
maxMemoryInMB: 256
minInfoGain: 0.0
minInstancesPerNode: 1
minWeightFractionPerNode: 0.0
predictionCol: prediction
seed: -6692987171724147510
stepSize: 0.1
subsamplingRate: 1.0
validationTol: 0.01


# Classification Models

In [None]:
# Select the relevant columns for modeling
classification_df = encoded_df.select("gender_index", "age_index", "age_group_index", "category_index", "payment_method_index", "shopping_mall_index", "quantity", "price")

In [None]:
classification_df.show(5)

+------------+---------+---------------+--------------+--------------------+-------------------+--------+-------+
|gender_index|age_index|age_group_index|category_index|payment_method_index|shopping_mall_index|quantity|  price|
+------------+---------+---------------+--------------+--------------------+-------------------+--------+-------+
|         0.0|     38.0|            0.0|           2.0|                 2.0|                8.0|     3.0|  15.69|
|         1.0|     45.0|            4.0|           2.0|                 2.0|                1.0|     1.0|   5.23|
|         1.0|     13.0|            4.0|           0.0|                 2.0|                0.0|     4.0|1200.32|
|         1.0|     49.0|            2.0|           1.0|                 1.0|                1.0|     3.0| 121.98|
|         0.0|     23.0|            2.0|           1.0|                 0.0|                3.0|     4.0| 162.64|
+------------+---------+---------------+--------------+--------------------+------------

In [None]:
# Create a vector assembler to combine the feature columns
assembler = VectorAssembler(inputCols=["gender_index", "age_index", "age_group_index", "payment_method_index", "shopping_mall_index", "quantity", "price"], outputCol="features")


In [None]:
# Transform the selected_df using the vector assembler
transformed_classification_df = assembler.transform(classification_df).select("features", "category_index")

In [None]:
transformed_classification_df.show(5)

+--------------------+--------------+
|            features|category_index|
+--------------------+--------------+
|[0.0,38.0,0.0,2.0...|           2.0|
|[1.0,45.0,4.0,2.0...|           2.0|
|[1.0,13.0,4.0,2.0...|           0.0|
|[1.0,49.0,2.0,1.0...|           1.0|
|[0.0,23.0,2.0,0.0...|           1.0|
+--------------------+--------------+
only showing top 5 rows



In [None]:
transformed_classification_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- category_index: double (nullable = false)



In [None]:
# Split the data into training and testing sets
(training_data, testing_data) = transformed_classification_df.randomSplit([0.7, 0.3], seed=42)

## Logistic Regression


In [None]:
# Create a logistic regression model
lr = LogisticRegression(labelCol="category_index", featuresCol="features")

# Define the parameter grid to search over
param_grid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.1]) \
    .build()

# Create a cross-validator object with 5 folds
cv = CrossValidator(estimator=lr, estimatorParamMaps=param_grid, evaluator=MulticlassClassificationEvaluator(labelCol="category_index", predictionCol="prediction", metricName="accuracy"), numFolds=5)

# Run cross-validation and get the best model
cv_model = cv.fit(training_data)

# Make predictions on the testing data using the best model
predictions = cv_model.transform(testing_data)

# Evaluate the model performance
evaluator = MulticlassClassificationEvaluator(labelCol="category_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
weighted_precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
weighted_recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1_score = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

# Print the evaluation metrics
print("Accuracy: {}".format(accuracy))
print("Weighted Precision: {}".format(weighted_precision))
print("Weighted Recall: {}".format(weighted_recall))
print("F1 Score: {}".format(f1_score))

## Decision Tree


In [None]:
# Train a DecisionTree model
dt = DecisionTreeClassifier(labelCol="category_index", featuresCol="features")

In [None]:
# Create a Decision Tree classifier
dt = DecisionTreeClassifier(labelCol="category_index", featuresCol="features", maxBins=52)

# Train the model on the training data
model = dt.fit(training_data)

# Make predictions on the testing data
predictions = model.transform(testing_data)

# Evaluate the model's performance
evaluator = MulticlassClassificationEvaluator(labelCol="category_index", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
weightedPrecision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
weightedRecall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

In [None]:
#Print evaluation metrics
print(f"Model Performance:\nAccuracy: {accuracy}")
print(f"Weighted Precision: {weightedPrecision}")
print(f"nWeighted Recall: {weightedRecall}")
print(f"F1 Score: {f1}")

Model Performance:
Accuracy: 0.5764602784050693
Weighted Precision: 0.8971363437835769
Weighted Recall: 0.8973002123428495
F1 Score: 0.8961495974019997


# Combining both models

In [None]:
# Train the classification model and predict category index on the regression training set
classification_model = dt.fit(transformed_classification_df)
category_predictions_train = classification_model.transform(transformed_classification_df).select("category_index", "features")
regression_train_augmented = train_data.join(category_predictions_train, on=["features"], how="left")

# Predict category index on the regression testing set
category_predictions_test = classification_model.transform(transformed_df).select("prediction", "features")
regression_test_augmented = test_data.join(category_predictions_test, on=["features"], how="left")

# Train the regression model on the augmented training set
gbt = GBTRegressor(featuresCol="features", labelCol="total_revenue")
gbt_model = gbt.fit(regression_train_augmented)

# Rename the existing "prediction" column to "previous_prediction"
regression_test_augmented = regression_test_augmented.withColumnRenamed("prediction", "previous_prediction")

# Evaluate the model on the augmented testing set
predictions = gbt_model.transform(regression_test_augmented)

In [None]:
# Evaluate the model using RMSE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Gradient Booster Root Mean Squared Error (RMSE) - ", rmse)

# Evaluate the model using MAE
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="mae")
mae = evaluator.evaluate(predictions)
print("Gradient Booster MAE - ", mae)

# Evaluate the model using R-squared
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="total_revenue", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("Gradient Booster R² - ", r2)

Gradient Booster Root Mean Squared Error (RMSE) -  2.0254643760417186
Gradient Booster MAE -  1.0358222199021312
Gradient Booster R² -  0.9999996865665347
