In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, count, isnan, regexp_replace
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler

In [2]:
spark = SparkSession.builder.appName("MerchSalesProcessing").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/01 02:14:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = spark.read.csv('merch_sales_dataset.csv',header=True,inferSchema=True)

In [4]:
data.show(5,0)

+--------+----------+----------+----------------+------------+---------+--------------+----------------------+-----------+----------------+--------------+--------+-----------+------+----------------------------------------------------+
|Order ID|Order Date|Product ID|Product Category|Buyer Gender|Buyer Age|Order Location|International Shipping|Sales Price|Shipping Charges|Sales per Unit|Quantity|Total Sales|Rating|Review                                              |
+--------+----------+----------+----------------+------------+---------+--------------+----------------------+-----------+----------------+--------------+--------+-----------+------+----------------------------------------------------+
|189440  |7/21/2024 |BF1543    |Clothing        |Male        |30       |New Jersey    |No                    |100        |0               |100           |1       |100        |4     |The delivery team handled the product with care.    |
|187385  |7/20/2024 |BF1543    |Clothing        |Male   

In [5]:
# Check for missing values
missing_counts = data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns])
missing_counts.show()

+--------+----------+----------+----------------+------------+---------+--------------+----------------------+-----------+----------------+--------------+--------+-----------+------+------+
|Order ID|Order Date|Product ID|Product Category|Buyer Gender|Buyer Age|Order Location|International Shipping|Sales Price|Shipping Charges|Sales per Unit|Quantity|Total Sales|Rating|Review|
+--------+----------+----------+----------------+------------+---------+--------------+----------------------+-----------+----------------+--------------+--------+-----------+------+------+
|       0|         0|         0|               0|           0|        0|             0|                     0|          0|               0|             0|       0|          0|     0|     0|
+--------+----------+----------+----------------+------------+---------+--------------+----------------------+-----------+----------------+--------------+--------+-----------+------+------+



In [6]:
# Drop rows in Total Sales and Review columns with NULL values
data = data.dropna(subset=["Total Sales", "Review"])

In [7]:
# Fill missing numerical values with the column mean
numeric_cols = ["Sales Price", "Shipping Charges", "Sales per Unit", "Quantity", "Total Sales", "Rating", "Buyer Age"]
for col_name in numeric_cols:
    mean_value = data.select(mean(col(col_name))).collect()[0][0]
    data = data.fillna({col_name: mean_value})

In [8]:
# Fill missing categorical values with "Unknown"
categorical_cols = ["Product Category", "Buyer Gender", "Order Location", "International Shipping"]
data = data.fillna({col_name: "Unknown" for col_name in categorical_cols})

In [12]:
data = data.dropDuplicates()

In [13]:
from pyspark.sql.functions import stddev

# Define function to remove outliers
def remove_outliers(data, col_name):
    stats = data.select(mean(col(col_name)).alias("mean"), stddev(col(col_name)).alias("std")).collect()[0]
    mean_val, std_val = stats["mean"], stats["std"]
    return data.filter((col(col_name) > mean_val - 3 * std_val) & (col(col_name) < mean_val + 3 * std_val))

# Apply to numeric columns
for col_name in ["Sales Price", "Shipping Charges", "Sales per Unit", "Quantity", "Total Sales"]:
    data = remove_outliers(data, col_name)

In [19]:
if "feature_raw" in data.columns:
    data = data.drop("feature_raw")

#Standardization for numeric columns
assembler = VectorAssembler(inputCols=["Sales Price", "Shipping Charges", "Sales per Unit", "Quantity", "Buyer Age"], outputCol="feature_raw")
data = assembler.transform(data)

scaler = StandardScaler(inputCol="feature_raw", outputCol="feature", withStd=True, withMean=True)
data = scaler.fit(data).transform(data)

In [20]:
data_regression = data.select("feature", "Total Sales")

In [23]:
# Split into train and test
train_reg, test_reg = data_regression.randomSplit([0.8, 0.2], seed=42)

# Train Linear Regression Model
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol="feature", labelCol="Total Sales")
model_reg = lr.fit(train_reg)

25/04/01 02:41:52 WARN Instrumentation: [d1d32b5c] regParam is zero, which might cause numerical instability and overfitting.
25/04/01 02:41:52 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


In [24]:
# Evaluate Regression Model
predictions_reg = model_reg.transform(test_reg)
predictions_reg.select("Total Sales", "prediction").show(5)

+-----------+-------------------+
|Total Sales|         prediction|
+-----------+-------------------+
|          9|-13.769508030241582|
|          9|-13.769508030241582|
|          9|-13.769508030241582|
|          9|-13.769508030241582|
|          9|-13.721150350667173|
+-----------+-------------------+
only showing top 5 rows



In [25]:
#Preparing for classification
# Convert text reviews into numerical labels using StringIndexer
indexer = StringIndexer(inputCol="Review", outputCol="label")
data_classification = indexer.fit(data).transform(data).select("features", "label")

In [26]:
# Split into train and test
train_cls, test_cls = data_classification.randomSplit([0.8, 0.2], seed=42)

# Train a Classification Model (Logistic Regression)
from pyspark.ml.classification import LogisticRegression
lr_cls = LogisticRegression(featuresCol="features", labelCol="label")
model_cls = lr_cls.fit(train_cls)

In [27]:
# Evaluate Classification Model
predictions_cls = model_cls.transform(test_cls)
predictions_cls.select("label", "prediction").show(5)

+-----+----------+
|label|prediction|
+-----+----------+
|  2.0|       7.0|
|  6.0|       7.0|
|  8.0|       7.0|
| 24.0|       7.0|
|  4.0|       7.0|
+-----+----------+
only showing top 5 rows

