In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, datediff, when, count, avg

# Initialize Spark
spark = SparkSession.builder \
    .appName("SupplyChainLogisticsAnalysis") \
    .getOrCreate()

# Load dataset
df = spark.read.csv("SupplyChainDatasetDemo.csv", header=True, inferSchema=True)

df.printSchema()
df.show(5)

root
 |-- Unique_id: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Days for shipping (real): integer (nullable = true)
 |-- Days for shipment (scheduled): integer (nullable = true)
 |-- Benefit per order: double (nullable = true)
 |-- Sales per customer: double (nullable = true)
 |-- Delivery Status: string (nullable = true)
 |-- Late_delivery_risk: integer (nullable = true)
 |-- Category Id: integer (nullable = true)
 |-- Category Name: string (nullable = true)
 |-- Customer City: string (nullable = true)
 |-- Customer Country: string (nullable = true)
 |-- Customer Email: string (nullable = true)
 |-- Customer Fname: string (nullable = true)
 |-- Customer Id: integer (nullable = true)
 |-- Customer Lname: string (nullable = true)
 |-- Customer Password: string (nullable = true)
 |-- Customer Segment: string (nullable = true)
 |-- Customer State: string (nullable = true)
 |-- Customer Street: string (nullable = true)
 |-- Customer Zipcode: integer (nullable = true

In [3]:
#Average Delivery Days (Real vs Scheduled)

avg_days = df.agg(
    avg(col("Days for shipping (real)")).alias("Avg_Actual_Days"),
    avg(col("Days for shipment (scheduled)")).alias("Avg_Scheduled_Days")
)
avg_days.show()

+------------------+------------------+
|   Avg_Actual_Days|Avg_Scheduled_Days|
+------------------+------------------+
|3.4976539865609713| 2.931846509231715|
+------------------+------------------+



In [4]:
#SLA Breach % (Late deliveries)

sla_breach = df.agg((avg(col("Late_delivery_risk"))*100).alias("SLA_Breach_Percentage"))
sla_breach.show()

+---------------------+
|SLA_Breach_Percentage|
+---------------------+
|    54.82913155955883|
+---------------------+



In [5]:
#On-time vs Late Count

delivery_status = df.groupBy("Delivery Status") \
                    .agg(count("*").alias("Count"))
delivery_status.show()

+-----------------+-----+
|  Delivery Status|Count|
+-----------------+-----+
| Shipping on time|32196|
| Advance shipping|41592|
|Shipping canceled| 7754|
|    Late delivery|98977|
+-----------------+-----+



In [6]:
#Late Delivery % by Shipping Mode

late_by_mode = df.groupBy("Shipping Mode") \
    .agg((avg(col("Late_delivery_risk"))*100).alias("Late_Percentage"))
late_by_mode.show()

+--------------+------------------+
| Shipping Mode|   Late_Percentage|
+--------------+------------------+
|   First Class| 95.32249946070324|
|      Same Day| 45.74304200472425|
|  Second Class|  76.6327805542935|
|Standard Class|38.071683124211155|
+--------------+------------------+



In [7]:
#Late Delivery % by Region

late_by_region = df.groupBy("Order Region") \
    .agg((avg(col("Late_delivery_risk"))*100).alias("Late_Percentage"))
late_by_region.show()

+---------------+------------------+
|   Order Region|   Late_Percentage|
+---------------+------------------+
|     South Asia| 56.26697710516104|
| Eastern Europe| 55.66326530612245|
|Southern Europe| 54.38447672569187|
|   West of USA |53.959714750406604|
|   Central Asia| 55.33453887884268|
|Central America|    54.75459581525|
|    East of USA| 55.66160520607375|
|   North Africa| 54.51732673267327|
|Northern Europe| 54.04411764705882|
|      Caribbean|53.077662899735515|
|  South America| 54.30867090726481|
|Southern Africa|  53.3275713050994|
| South of  USA | 55.77255871446229|
| Western Europe| 55.84861116234461|
|        Oceania| 54.02049664958612|
|         Canada|48.800834202294055|
|     US Center | 55.24036011550875|
|      West Asia| 55.28374105508404|
|   Eastern Asia| 54.32692307692307|
|    East Africa| 55.93952483801296|
+---------------+------------------+
only showing top 20 rows


In [8]:
#Category-Wise Delivery Performance

late_by_category = df.groupBy("Category Name") \
    .agg((avg(col("Late_delivery_risk"))*100).alias("Late_Percentage"))
late_by_category.show()

+--------------------+------------------+
|       Category Name|   Late_Percentage|
+--------------------+------------------+
| Children's Clothing| 53.37423312883436|
|      Sporting Goods| 55.46218487394958|
|    Camping & Hiking|54.534197683735165|
| Fitness Accessories|56.957928802588995|
|            Cameras |58.108108108108105|
|           Computers|50.678733031674206|
|Consumer Electronics| 55.22041763341067|
|          Basketball|55.223880597014926|
|   Health and Beauty| 55.80110497237569|
|        Pet Supplies| 58.94308943089431|
|                DVDs| 53.62318840579711|
|      Men's Footwear| 54.48619976625011|
|              Crafts| 55.99173553719008|
|    Women's Clothing| 56.46153846153846|
|         Electronics|56.083650190114064|
|         Video Games|54.295942720763726|
|     Women's Apparel| 54.55669122890421|
|      Girls' Apparel| 55.28726061615321|
|     As Seen on  TV!| 57.35294117647059|
|        Boxing & MMA| 56.26477541371159|
+--------------------+------------

In [9]:
#Top Profitable Products

top_profit_products = df.groupBy("Product Name") \
    .agg(avg("Order Profit Per Order").alias("Avg_Profit")) \
    .orderBy(col("Avg_Profit").desc()) \
    .limit(10)
top_profit_products.show()

+--------------------+------------------+
|        Product Name|        Avg_Profit|
+--------------------+------------------+
|         Dell Laptop|157.59459314733027|
|Bowflex SelectTec...|119.07799950200001|
|          Lawn mower| 69.09712831847314|
|Diamondback Boys'...| 59.68310336206896|
|Polar FT4 Heart R...|    57.94749952825|
|    Porcelain crafts| 52.75035136456612|
|          Web Camera| 51.16520261088683|
|The North Face Wo...| 49.18947305731579|
|Elevation Trainin...| 44.58878351331082|
|Field & Stream Sp...| 43.64910633128438|
+--------------------+------------------+



In [10]:
#Sales vs Profit by Region

sales_profit_region = df.groupBy("Order Region") \
    .agg(avg("Sales").alias("Avg_Sales"),
         avg("Order Profit Per Order").alias("Avg_Profit"))
sales_profit_region.show()

+---------------+------------------+------------------+
|   Order Region|         Avg_Sales|        Avg_Profit|
+---------------+------------------+------------------+
|     South Asia|200.96765233430065|21.433695527605607|
| Eastern Europe|197.51698088647578|20.335982122494634|
|Southern Europe| 217.1475789295174| 24.47558370087625|
|   West of USA |196.59901931474968| 20.63563874079782|
|   Central Asia|198.62555550938512| 23.59001817619349|
|Central America| 199.9122155554133| 21.74734732898647|
|    East of USA|198.28083687881724|22.597729601427467|
|   North Africa|196.39610847400198|19.987580448307234|
|Northern Europe|220.16244368751526|23.840951863455675|
|      Caribbean|198.48753693891655|20.657085840781548|
|  South America|198.25118229852058|22.440870493278588|
|Southern Africa|197.27881979733783| 26.64308569230511|
| South of  USA |194.26055516942773| 21.78365386835748|
| Western Europe|217.43261518711725|23.071529032712625|
|        Oceania|198.72430031155142|19.853963390

In [11]:
#Predictive Modeling (MLlib)
#predict Late Delivery Risk using MLlib.

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Encode categorical vars
indexer1 = StringIndexer(inputCol="Shipping Mode", outputCol="ShipModeIndex")
indexer2 = StringIndexer(inputCol="Order Region", outputCol="RegionIndex")
indexer3 = StringIndexer(inputCol="Category Name", outputCol="CategoryIndex")

df_ml = indexer1.fit(df).transform(df)
df_ml = indexer2.fit(df_ml).transform(df_ml)
df_ml = indexer3.fit(df_ml).transform(df_ml)

# Features
assembler = VectorAssembler(
    inputCols=["ShipModeIndex", "RegionIndex", "CategoryIndex", 
               "Days for shipment (scheduled)", "Sales", "Order Profit Per Order"],
    outputCol="features"
)

ml_data = assembler.transform(df_ml).select("features", col("Late_delivery_risk").alias("label"))

# Train/Test Split
train, test = ml_data.randomSplit([0.7, 0.3], seed=42)

# Random Forest Classifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxBins=100   # increase bins to handle more categories
)
model = rf.fit(train)

# Predictions
predictions = model.transform(test)

# Evaluate
evaluator = BinaryClassificationEvaluator()
auc = evaluator.evaluate(predictions)
print("ROC-AUC:", auc)

ROC-AUC: 0.7298048429797557


In [12]:
from pyspark.sql.functions import col, when, datediff

df = df.withColumn(
    "Delay_vs_SLA",
    col("Days for shipping (real)") - col("Days for shipment (scheduled)")
)

df = df.withColumn(
    "Is_Expensive_Order",
    when(col("Sales") > 500, 1).otherwise(0)
)

df = df.withColumn(
    "Is_High_Profit",
    when(col("Order Profit Per Order") > 50, 1).otherwise(0)
)

In [13]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

# Categorical columns
categorical_cols = ["Shipping Mode", "Order Region", "Category Name"]
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in categorical_cols]
encoders = [OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_ohe") for col in categorical_cols]

# Numerical columns
numeric_cols = ["Sales", "Order Profit Per Order", "Days for shipment (scheduled)", "Days for shipping (real)"]

# VectorAssembler
assembler = VectorAssembler(
    inputCols=[enc.getOutputCol() for enc in encoders] + numeric_cols,
    outputCol="features"
)

# Random Forest
rf = RandomForestClassifier(labelCol="Late_delivery_risk", featuresCol="features", seed=42)

# Complete pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])


In [14]:
train, test = df.randomSplit([0.7, 0.3], seed=42)

In [15]:
# ===============================
# Supply Chain Late Delivery ML
# PySpark End-to-End Pipeline
# ===============================

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# -------------------------------
# 1️⃣ Initialize Spark
# -------------------------------
spark = SparkSession.builder \
    .appName("SupplyChainLogisticsAnalysis") \
    .getOrCreate()

# -------------------------------
# 2️⃣ Load dataset
# -------------------------------
df = spark.read.csv("SupplyChainDatasetDemo.csv", header=True, inferSchema=True)

# Optional: drop unnecessary columns (like emails, passwords)
drop_cols = ["Customer Email", "Customer Password", "Product Image", "Product Description"]
df = df.drop(*drop_cols)

# -------------------------------
# 3️⃣ Feature Engineering
# -------------------------------
# Derived features
df = df.withColumn(
    "Delay_vs_SLA",
    col("Days for shipping (real)") - col("Days for shipment (scheduled)")
)

df = df.withColumn(
    "Is_Expensive_Order",
    when(col("Sales") > 500, 1).otherwise(0)
)

df = df.withColumn(
    "Is_High_Profit",
    when(col("Order Profit Per Order") > 50, 1).otherwise(0)
)

# -------------------------------
# 4️⃣ Define categorical and numeric columns
# -------------------------------
categorical_cols = ["Shipping Mode", "Order Region", "Category Name"]
numeric_cols = ["Sales", "Order Profit Per Order", "Days for shipment (scheduled)",
                "Days for shipping (real)", "Delay_vs_SLA", "Is_Expensive_Order", "Is_High_Profit"]

# Indexers for categorical features
indexers = [StringIndexer(inputCol=col, outputCol=col+"_idx", handleInvalid="keep") for col in categorical_cols]

# One-hot encoders
encoders = [OneHotEncoder(inputCol=col+"_idx", outputCol=col+"_ohe") for col in categorical_cols]

# Assemble all features
assembler = VectorAssembler(
    inputCols=[enc.getOutputCol() for enc in encoders] + numeric_cols,
    outputCol="features"
)

# -------------------------------
# 5️⃣ Define Random Forest model
# -------------------------------
rf = RandomForestClassifier(
    labelCol="Late_delivery_risk",
    featuresCol="features",
    seed=42
)

# -------------------------------
# 6️⃣ Build pipeline
# -------------------------------
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

# -------------------------------
# 7️⃣ Train/Test split
# -------------------------------
train, test = df.randomSplit([0.7, 0.3], seed=42)
train.cache()
test.cache()

# -------------------------------
# 8️⃣ Hyperparameter tuning
# (Smaller grid for faster training)
# -------------------------------
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10]) \
    .addGrid(rf.maxBins, [64]) \
    .build()

evaluator = BinaryClassificationEvaluator(
    labelCol="Late_delivery_risk",
    metricName="areaUnderROC"
)

cv = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,       # 3-fold CV
    parallelism=4     # adjust based on CPU cores
)

# -------------------------------
# 9️⃣ Fit the model
# -------------------------------
cvModel = cv.fit(train)

# -------------------------------
# 10️⃣ Make predictions
# -------------------------------
predictions = cvModel.transform(test)

# -------------------------------
# 11️⃣ Evaluate model
# -------------------------------
auc = evaluator.evaluate(predictions)
print("Tuned Random Forest ROC-AUC:", auc)

# -------------------------------
# 12️⃣ Feature importance
# -------------------------------
rfModel = cvModel.bestModel.stages[-1]  # Last stage is RF
importances = rfModel.featureImportances.toArray()
feature_names = assembler.getInputCols()

sorted_features = sorted(zip(feature_names, importances), key=lambda x: -x[1])
print("Feature Importance Ranking:")
for f, imp in sorted_features:
    print(f"{f}: {imp:.4f}")


Tuned Random Forest ROC-AUC: 0.9729171728974981
Feature Importance Ranking:
Shipping Mode_ohe: 0.0567
Category Name_ohe: 0.0465
Order Region_ohe: 0.0215
Sales: 0.0015
Delay_vs_SLA: 0.0000
Is_Expensive_Order: 0.0000
Days for shipment (scheduled): 0.0000
Days for shipping (real): 0.0000
Order Profit Per Order: 0.0000
Is_High_Profit: 0.0000


In [16]:
predictions.groupBy("Late_delivery_risk", "prediction").count().show()

+------------------+----------+-----+
|Late_delivery_risk|prediction|count|
+------------------+----------+-----+
|                 0|       0.0|23135|
|                 1|       1.0|29410|
|                 0|       1.0| 1339|
+------------------+----------+-----+



In [22]:
# Make predictions
predictions = cvModel.transform(test)


In [27]:
from pyspark.sql.functions import col, count, sum as spark_sum

# Overall metrics
total_orders = predictions.count()
total_late = predictions.filter(col("Late_delivery_risk") == 1).count()
late_percent = (total_late / total_orders) * 100

# Late deliveries by Region
late_per_region = predictions.groupBy("Order Region").agg(
    count("*").alias("Total_Orders"),
    spark_sum(col("Late_delivery_risk")).alias("Total_Late")
).withColumn("Late_Percent", (col("Total_Late") / col("Total_Orders")) * 100)

# Late deliveries by Category
late_per_category = predictions.groupBy("Category Name").agg(
    count("*").alias("Total_Orders"),
    spark_sum(col("Late_delivery_risk")).alias("Total_Late")
).withColumn("Late_Percent", (col("Total_Late") / col("Total_Orders")) * 100)

# Late deliveries by Shipping Mode
late_per_shipping = predictions.groupBy("Shipping Mode").agg(
    count("*").alias("Total_Orders"),
    spark_sum(col("Late_delivery_risk")).alias("Total_Late")
).withColumn("Late_Percent", (col("Total_Late") / col("Total_Orders")) * 100)


In [28]:
late_region_pdf = late_per_region.toPandas()
late_category_pdf = late_per_category.toPandas()
late_shipping_pdf = late_per_shipping.toPandas()


In [29]:

# Save to CSV
late_region_pdf.to_csv("D:/SupplyChain/late_per_region.csv", index=False)
late_category_pdf.to_csv("D:/SupplyChain/late_per_category.csv", index=False)
late_shipping_pdf.to_csv("D:/SupplyChain/late_per_shipping.csv", index=False)


In [30]:
# After training
rfModel = cvModel.bestModel.stages[-1]  # Last stage is RF
importances = rfModel.featureImportances.toArray()
feature_names = assembler.getInputCols()

sorted_features = sorted(zip(feature_names, importances), key=lambda x: -x[1])

# Save as CSV
import pandas as pd
feature_df = pd.DataFrame(sorted_features, columns=["Feature", "Importance"])
feature_df.to_csv("D:/SupplyChain/feature_importance.csv", index=False)

# Optional: also save as pickle
import pickle
with open("D:/SupplyChain/feature_importance.pkl", "wb") as f:
    pickle.dump(sorted_features, f)


In [36]:
from pyspark.sql.functions import concat_ws, col

# Take first 100 rows from predictions
sample_orders = predictions.select(
    "Order ID", "Customer Fname", "Customer Lname",
    "Category Name", "Order Region", "Shipping Mode",
    "Late_delivery_risk", "prediction"
).limit(100)

# Combine first and last name
sample_orders = sample_orders.withColumn(
    "Customer Name",
    concat_ws(" ", col("Customer Fname"), col("Customer Lname"))
)

# Select final columns
sample_orders = sample_orders.select(
    "Order ID", "Customer Name", "Category Name", "Order Region",
    "Shipping Mode", "Late_delivery_risk", "prediction"
)

# Convert to Pandas and save for dashboard
sample_orders_pdf = sample_orders.toPandas()
sample_orders_pdf.to_csv("D:/SupplyChain/sample_orders.csv", index=False)
