In [27]:
import os
import findspark
findspark.init()
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler, VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.sql.functions import col, countDistinct, isnan, when, sum
from pyspark.sql import SparkSession, Row

spark_home = "C:\\Program Files\\ApacheSpark"

os.environ["SPARK_HOME"] = spark_home

# Add Spark bin and executors to PATH
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "bin")
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "sbin")

# Add Spark Python libraries to PYTHONPATH
os.environ["PYTHONPATH"] = os.path.join(spark_home, "python") + os.pathsep + os.environ.get("PYTHONPATH", "")
os.environ["PYTHONPATH"] += os.pathsep + os.path.join(spark_home, "python", "lib")

# Add PySpark to the system path
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "python", "lib", "pyspark.zip")
os.environ["PATH"] += os.pathsep + os.path.join(spark_home, "python", "lib", "py4j-0.10.9-src.zip")

os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [28]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("RetailInsight-Optimizer") \
    .getOrCreate()

# Load full dataset

In [29]:
data = spark.read.csv("../data/data.csv", header=True, inferSchema=True)

# Inspect the data

In [30]:
print("Number of records: ", data.rdd.count())

print("Sample data: ")
data.show(5)


Number of records:  3900
Sample data: 
+-----------+---+------+--------------+--------+---------------------+-------------+----+---------+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+----------------------+
|Customer ID|Age|Gender|Item Purchased|Category|Purchase Amount (USD)|     Location|Size|    Color|Season|Review Rating|Subscription Status|Shipping Type|Discount Applied|Promo Code Used|Previous Purchases|Payment Method|Frequency of Purchases|
+-----------+---+------+--------------+--------+---------------------+-------------+----+---------+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+----------------------+
|          1| 55|  Male|        Blouse|Clothing|                   53|     Kentucky|   L|     Gray|Winter|          3.1|                Yes|      Express|             Yes|            Yes|                14|         Venmo|     

In [31]:

print("Data schema: ")
data.printSchema()

Data schema: 
root
 |-- Customer ID: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Item Purchased: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Purchase Amount (USD): integer (nullable = true)
 |-- Location: string (nullable = true)
 |-- Size: string (nullable = true)
 |-- Color: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- Review Rating: double (nullable = true)
 |-- Subscription Status: string (nullable = true)
 |-- Shipping Type: string (nullable = true)
 |-- Discount Applied: string (nullable = true)
 |-- Promo Code Used: string (nullable = true)
 |-- Previous Purchases: integer (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Frequency of Purchases: string (nullable = true)



In [32]:
print("Data summary: ")
data.describe().show()

Data summary: 
+-------+------------------+-----------------+------+--------------+-----------+---------------------+--------+----+------+------+------------------+-------------------+--------------+----------------+---------------+------------------+--------------+----------------------+
|summary|       Customer ID|              Age|Gender|Item Purchased|   Category|Purchase Amount (USD)|Location|Size| Color|Season|     Review Rating|Subscription Status| Shipping Type|Discount Applied|Promo Code Used|Previous Purchases|Payment Method|Frequency of Purchases|
+-------+------------------+-----------------+------+--------------+-----------+---------------------+--------+----+------+------+------------------+-------------------+--------------+----------------+---------------+------------------+--------------+----------------------+
|  count|              3900|             3900|  3900|          3900|       3900|                 3900|    3900|3900|  3900|  3900|              3900|           

In [33]:

print("Number of unique values: ")
data.select([countDistinct(col).alias(f"{col}_unique") for col in data.columns]).show()

# for column in data.columns:
#     print(f"Column: {column}")
#     data.select(column).distinct().show(5, truncate=False)


Number of unique values: 
+------------------+----------+-------------+---------------------+---------------+----------------------------+---------------+-----------+------------+-------------+--------------------+--------------------------+--------------------+-----------------------+----------------------+-------------------------+---------------------+-----------------------------+
|Customer ID_unique|Age_unique|Gender_unique|Item Purchased_unique|Category_unique|Purchase Amount (USD)_unique|Location_unique|Size_unique|Color_unique|Season_unique|Review Rating_unique|Subscription Status_unique|Shipping Type_unique|Discount Applied_unique|Promo Code Used_unique|Previous Purchases_unique|Payment Method_unique|Frequency of Purchases_unique|
+------------------+----------+-------------+---------------------+---------------+----------------------------+---------------+-----------+------------+-------------+--------------------+--------------------------+--------------------+--------------

In [34]:

print("Check for nulls: ")
null_nan_counts = data.select([
    sum(when(col(c).isNull() | isnan(col(c)), 1).otherwise(0)).alias(c)
    for c in data.columns
])
null_nan_counts.show()


Check for nulls: 
+-----------+---+------+--------------+--------+---------------------+--------+----+-----+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+----------------------+
|Customer ID|Age|Gender|Item Purchased|Category|Purchase Amount (USD)|Location|Size|Color|Season|Review Rating|Subscription Status|Shipping Type|Discount Applied|Promo Code Used|Previous Purchases|Payment Method|Frequency of Purchases|
+-----------+---+------+--------------+--------+---------------------+--------+----+-----+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+----------------------+
|          0|  0|     0|             0|       0|                    0|       0|   0|    0|     0|            0|                  0|            0|               0|              0|                 0|             0|                     0|
+-----------+---+------+--------------

As we inspecting the data, we found that it's mainly about apparel products. It has good analytical features as,
- No nulls
- No outliers as seen in **Min** and **Max** of each column in the summary
- Standardized format of values (e.g. Gender has 2 values (Male, Female) Which always appear in consistent formats, no M or F for example)

However some issues were detected
- Checking the Customer ID column, it doesn't have any useful indication, so it can be dropped. 
- Categorical data should be encoded for better machine learning model training (e.g., Yes/No to 1/0).
- Numerical data should be standardized to avoid bias to certain features.
- Binning columns as age into categories (e.g., '18-25', '26-35', etc.) can capture non-linear relationships and make the model more interpretable. The same applies for Purchase amount which can be Purchase Tier: (e.g., 'Low', 'Medium', 'High') since. We might then drop the original columns.
- Finally, splitting the data for model train and test datasets should be done (Data is split first then standardized based on training set statistics to avoid data leakage)

In [35]:
# Drop columns of irrelevant data
data = data.drop("Customer ID")

In [None]:
# Binning columns
import preprocessing_utils as pf

def bin_all_columns(row):
    try:
        new_age = pf.bin_age(row)
        new_purchase_amount = pf.bin_purchase_amount(row)
        new_previous_purchase = pf.bin_previous_purchases(row)

        new_row = row.asDict()
        new_row["Age"] = new_age
        new_row["Purchase Amount (USD)"] = new_purchase_amount
        new_row["Previous Purchases"] = new_previous_purchase

        return Row(**new_row)
    except Exception as e:
        print("Error processing row:", row)
        print("Error message:", e)
        raise e



rdd = data.rdd
rdd = rdd.map(bin_all_columns)
data = spark.createDataFrame(rdd)

In [40]:
data.show(5)

+-----+------+--------------+--------+---------------------+-------------+----+---------+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+----------------------+
|  Age|Gender|Item Purchased|Category|Purchase Amount (USD)|     Location|Size|    Color|Season|Review Rating|Subscription Status|Shipping Type|Discount Applied|Promo Code Used|Previous Purchases|Payment Method|Frequency of Purchases|
+-----+------+--------------+--------+---------------------+-------------+----+---------+------+-------------+-------------------+-------------+----------------+---------------+------------------+--------------+----------------------+
|51-70|  Male|        Blouse|Clothing|               Medium|     Kentucky|   L|     Gray|Winter|          3.1|                Yes|      Express|             Yes|            Yes|             11-15|         Venmo|           Fortnightly|
|18-25|  Male|       Sweater|Clothing|               Medium|

In [37]:
# Categorical columns (all but not 'Review Rating')
categorical_cols = data.columns
categorical_cols.remove("Review Rating")

distinct_values = {}
for col in categorical_cols:
    distinct_values[col] = rdd.map(lambda row: row[col]).distinct().collect()

# Index mappings for each categorical column
category_to_index = {}
for col in categorical_cols:
    category_to_index[col] = {value: idx for idx, value in enumerate(distinct_values[col])}

# Applying one-hot encoding and string indexing
def encode_row(row):
    # Convert Row to dictionary so we can modify it
    row_dict = row.asDict()
    
    for col in categorical_cols:
        index = category_to_index[col].get(row[col], -1) 
        row_dict[col + "_index"] = index
        
        one_hot = [0] * len(distinct_values[col])
        if index >= 0:
            one_hot[index] = 1  # Set the corresponding index to 1
        row_dict[col + "_onehot"] = one_hot
    
    return Row(**row_dict)

# Apply the encoding logic with Map
encoded_rdd = rdd.map(encode_row)
encoded_data = spark.createDataFrame(encoded_rdd)

encoded_data = encoded_data.drop(*([col + "_index" for col in categorical_cols] + [col for col in categorical_cols]))

for col in encoded_data.columns:
    if "_onehot" in col:
        new_col = col.replace("_onehot", "")
        encoded_data = encoded_data.withColumnRenamed(col, new_col)

# Show the result
encoded_data.show(5, truncate=False)

+-------------+------------+------+---------------------------------------------------------------------------+------------+---------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------+---------------------------------------------------------------------------+------------+-------------------+------------------+----------------+---------------+------------------------------+------------------+----------------------+
|Review Rating|Age         |Gender|Item Purchased                                                             |Category    |Purchase Amount (USD)|Location                                                                                                                                              |Size        |Color                                                                      |Season      |Subscription Status|Shipping Type     |Discount Applied|

In [38]:
# Standardizing numerical columns
from pyspark.ml.functions import vector_to_array

# Assemble the numerical column into a vector
assembler = VectorAssembler(inputCols=["Review Rating"], outputCol="review_vector")
assembled_data = assembler.transform(encoded_data)

# Split the data into training and test sets
train_data, test_data = assembled_data.randomSplit([0.8, 0.2], seed=42)
print("Training data count: ", train_data.count())
print("Test data count: ", test_data.count())

# Apply standardization
scaler = StandardScaler(inputCol="review_vector", outputCol="review_scaled", withMean=True, withStd=True)
scaler_model = scaler.fit(train_data)
train_data = scaler_model.transform(train_data)
test_data = scaler_model.transform(test_data)

# Drop original scalar column and rename scaled vector
train_data = train_data.drop("Review Rating", "review_vector").withColumnRenamed("review_scaled", "Review Rating (scaled)")

train_data = train_data.withColumn("Review Rating (scaled)", vector_to_array("Review Rating (scaled)")[0])
train_data.show(5, truncate=False)

test_data = test_data.drop("Review Rating", "review_vector").withColumnRenamed("review_scaled", "Review Rating (scaled)")

test_data = test_data.withColumn("Review Rating (scaled)", vector_to_array("Review Rating (scaled)")[0])
test_data.show(5, truncate=False)

Training data count:  3177
Test data count:  723
+------------+------+---------------------------------------------------------------------------+------------+---------------------+------------------------------------------------------------------------------------------------------------------------------------------------------+------------+---------------------------------------------------------------------------+------------+-------------------+------------------+----------------+---------------+------------------------------+------------------+----------------------+----------------------+
|Age         |Gender|Item Purchased                                                             |Category    |Purchase Amount (USD)|Location                                                                                                                                              |Size        |Color                                                                      |Season      |Subscription

In [39]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# UDF to convert array to a single double value
@udf(returnType=DoubleType())
def array_to_double(arr):
    if arr is None:
        return None
    try:
        return float(arr[0]) if len(arr) > 0 else 0.0
    except:
        return 0.0

# Identify all columns to flatten (exclude 'review_vector')
array_cols = [col for col in train_data.columns if col != "review_vector"]

# Flatten arrays
flat_train_data = train_data.select(
    *[array_to_double(col).alias(f"{col}_flat") if col != "review_vector" else col for col in array_cols]
)
flat_test_data = test_data.select(
    *[array_to_double(col).alias(f"{col}_flat") if col != "review_vector" else col for col in array_cols]
)

# List of all target columns
target_cols = [col for col in array_cols if col != "review_vector"]  # or provide explicitly

for target_col in target_cols:
    print(f"\nTraining model for target: {target_col}")

    # Feature columns exclude current target and review_vector
    feature_cols = [col for col in flat_train_data.columns if col not in [f"{target_col}_flat", "review_vector"] and col.endswith("_flat")]

    # Assemble features
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    train_assembled = assembler.transform(flat_train_data.select(*feature_cols, f"{target_col}_flat"))
    test_assembled = assembler.transform(flat_test_data.select(*feature_cols, f"{target_col}_flat"))

    # Determine task type
    if target_col == "Review Rating":
        model = RandomForestRegressor(labelCol=f"{target_col}_flat", featuresCol="features")
        evaluator = RegressionEvaluator(labelCol=f"{target_col}_flat", predictionCol="prediction", metricName="rmse")
    else:
        model = RandomForestClassifier(labelCol=f"{target_col}_flat", featuresCol="features")
        evaluator = MulticlassClassificationEvaluator(labelCol=f"{target_col}_flat", predictionCol="prediction", metricName="accuracy")

    # Fit and predict
    fitted_model = model.fit(train_assembled)
    predictions = fitted_model.transform(test_assembled)

    # Evaluate
    if target_col != "Review Rating":
        # Classification metrics
        for metric in ["f1", "weightedPrecision", "weightedRecall", "accuracy"]:
            evaluator.setMetricName(metric)
            score = evaluator.evaluate(predictions)
            print(f"{metric} for {target_col}: {score:.4f}")
    else:
        # Regression metric
        score = evaluator.evaluate(predictions)
        print(f"RMSE for {target_col}: {score:.4f}")


Training model for target: Age
f1 for Age: 0.4434
weightedPrecision for Age: 0.7589
weightedRecall for Age: 0.5934
accuracy for Age: 0.5934

Training model for target: Gender
f1 for Gender: 0.7261
weightedPrecision for Gender: 0.8472
weightedRecall for Gender: 0.7206
accuracy for Gender: 0.7206

Training model for target: Item Purchased
f1 for Item Purchased: 0.9484
weightedPrecision for Item Purchased: 0.9320
weightedRecall for Item Purchased: 0.9654
accuracy for Item Purchased: 0.9654

Training model for target: Category
f1 for Category: 0.4877
weightedPrecision for Category: 0.7242
weightedRecall for Category: 0.6003
accuracy for Category: 0.6003

Training model for target: Purchase Amount (USD)
f1 for Purchase Amount (USD): 0.5017
weightedPrecision for Purchase Amount (USD): 0.4119
weightedRecall for Purchase Amount (USD): 0.6418
accuracy for Purchase Amount (USD): 0.6418

Training model for target: Location
f1 for Location: 0.9772
weightedPrecision for Location: 0.9698
weightedRe