In [1]:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, to_timestamp, unix_timestamp, lag, when, lit, sqrt
from pyspark.sql.window import Window
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, ClusteringEvaluator
from synapse.ml.isolationforest import IsolationForest
import os

In [2]:

# --------------------
# Configuration
# --------------------
# *** IMPORTANT: Path to your HISTORICAL data file ***
HISTORICAL_DATA_PATH = "data/smart_logistics_iot_with_battery.csv"
MODELS_DIR = "models"
os.makedirs(MODELS_DIR, exist_ok=True)

In [3]:
spark = SparkSession.builder \
    .appName("ParcelTrainingBatchJob") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.11.4,org.apache.spark:spark-avro_2.12:3.5.0") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


In [4]:
# --------------------
# Schema (Same as your stream)
# --------------------
schema = StructType([
    StructField("Transaction ID", StringType(), True),
    StructField("Stage", StringType(), True),
    StructField("Entity", StringType(), True),
    StructField("Timestamp", StringType(), True),
    StructField("RFID ID", StringType(), True),
    StructField("GPS Latitude", DoubleType(), True),
    StructField("GPS Longitude", DoubleType(), True),
    StructField("Temperature", DoubleType(), True),
    StructField("Humidity", DoubleType(), True),
    StructField("Route ID", StringType(), True),
    StructField("Speed", DoubleType(), True),
    StructField("Delivery Status", StringType(), True),
    StructField("Risk Factor", StringType(), True),
    StructField("Delay Time", StringType(), True),
    StructField("Cost", StringType(), True),
    StructField("Disruption Type", StringType(), True),
    StructField("battery_level", DoubleType(), True),
    StructField("battery_status", StringType(), True)
])

In [5]:
# --------------------
# Load Historical Data
# --------------------
print(f"Loading historical data from {HISTORICAL_DATA_PATH}...")
# Switched from .parquet to .csv and added header=True
batch_df = spark.read.schema(schema).csv(HISTORICAL_DATA_PATH, header=True)


Loading historical data from data/smart_logistics_iot_with_battery.csv...


In [6]:

# --------------------
# Feature Engineering (Same as your stream)
# --------------------
print("Applying feature engineering...")
parsed2 = batch_df.withColumn("ts", to_timestamp(col("Timestamp")))
w = Window.partitionBy("RFID ID").orderBy("ts")
parsed3 = parsed2.withColumn("prev_lat", lag("GPS Latitude").over(w)) \
    .withColumn("prev_lon", lag("GPS Longitude").over(w)) \
    .withColumn("prev_ts", lag("ts").over(w))

parsed4 = parsed3.withColumn("time_diff_secs", (unix_timestamp(col("ts")) - unix_timestamp(col("prev_ts")))) \
    .withColumn("lat_diff", col("GPS Latitude") - col("prev_lat")) \
    .withColumn("lon_diff", col("GPS Longitude") - col("prev_lon")) \
    .withColumn("dist_approx", sqrt(col("lat_diff")**2 + col("lon_diff")**2)) \
    .withColumn("speed_calc", when(col("time_diff_secs") > 0, col("dist_approx") / col("time_diff_secs")).otherwise(col("Speed")))

# Final engineered features
eng_df = parsed4.withColumn("battery_low", when(col("battery_level") < 20, 1.0).otherwise(0.0)) \
    .withColumn("temp_alert", when((col("Temperature") > 40) | (col("Temperature") < -10), 1.0).otherwise(0.0)) \
    .withColumn("delayed_label", when(col("Delay Time").isNotNull() & (col("Delay Time") != "0"), 1).otherwise(0)) \
    .na.fill(0) # Fill NaNs created by lag() and calcs


Applying feature engineering...


In [7]:
# --------------------
# Model Pipelines (Same definitions)
# --------------------
feature_cols = ["GPS Latitude", "GPS Longitude", "Temperature", "Humidity", "Speed", "dist_approx", "speed_calc", "time_diff_secs", "battery_level", "battery_low", "temp_alert"]
cat_cols = ["Delivery Status", "Stage", "Route ID", "battery_status", "Risk Factor", "Disruption Type"]

indexers = [StringIndexer(inputCol=c, outputCol=c+"_idx", handleInvalid="keep") for c in cat_cols]
assembler_all_features = VectorAssembler(inputCols=feature_cols + [c+"_idx" for c in cat_cols], outputCol="features_raw", handleInvalid="keep")
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)

# --- RF Pipeline ---
rf = RandomForestClassifier(labelCol="delayed_label", featuresCol="features", numTrees=100, maxDepth=8)
pipeline_rf = Pipeline(stages=indexers + [assembler_all_features, scaler, rf])

# --- K-Means Pipeline ---
kmeans_features = VectorAssembler(inputCols=["GPS Latitude", "GPS Longitude", "speed_calc"], outputCol="traj_feats", handleInvalid="keep")
kmeans = KMeans(featuresCol="traj_feats", predictionCol="traj_cluster", k=12, maxIter=20)
pipeline_kmeans = Pipeline(stages=[kmeans_features, kmeans])

# --- Isolation Forest Pipeline ---
# *** FIX 3: Final attempt at correct parameter names ***
# Using 'predictionCol' and 'scoreCol'
iforest = IsolationForest(
    featuresCol="features", 
    predictionCol="anomaly_pred", 
    scoreCol="anomaly_score", 
    contamination=0.05
)
pipeline_iforest = Pipeline(stages=indexers + [assembler_all_features, scaler, iforest])


In [8]:
# --------------------
# Train, Evaluate, and Save Models
# --------------------

print("Splitting data for training and testing...")
(trainingData, testData) = eng_df.randomSplit([0.8, 0.2], seed=42)
trainingData.cache()
testData.cache()

# --- 1. Random Forest ---
print("\n--- Training Random Forest Model ---")
rf_model = pipeline_rf.fit(trainingData)

print("Evaluating RF Model...")
rf_predictions = rf_model.transform(testData)
evaluator_accuracy = MulticlassClassificationEvaluator(labelCol="delayed_label", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="delayed_label", predictionCol="prediction", metricName="f1")
accuracy = evaluator_accuracy.evaluate(rf_predictions)
f1_score = evaluator_f1.evaluate(rf_predictions)

print(f"=====================================")
print(f"RF Model Accuracy: {accuracy * 100:.2f}%")
print(f"RF Model F1 Score: {f1_score:.4f}")
print(f"=====================================")

print("Saving RF Model...")
rf_model.write().overwrite().save(os.path.join(MODELS_DIR, "rf_pipeline"))

# --- 2. K-Means ---
print("\n--- Training K-Means Model ---")
kmeans_model = pipeline_kmeans.fit(trainingData)

print("Evaluating K-Means Model...")
kmeans_predictions = kmeans_model.transform(testData)
kmeans_evaluator = ClusteringEvaluator(featuresCol="traj_feats", predictionCol="traj_cluster", metricName="silhouette")
silhouette_score = kmeans_evaluator.evaluate(kmeans_predictions)

print(f"=====================================")
print(f"K-Means Silhouette Score: {silhouette_score:.4f} (Closer to 1 is better)")
print(f"=====================================")

print("Saving K-Means Model...")
kmeans_model.write().overwrite().save(os.path.join(MODELS_DIR, "kmeans_pipeline"))

# --- 3. Isolation Forest ---
print("\n--- Training Isolation Forest Model ---")
iforest_model = pipeline_iforest.fit(trainingData)

print("Evaluating Isolation Forest (on test data)...")
iforest_predictions = iforest_model.transform(testData)
print(f"=====================================")
print("Anomaly prediction counts (1 = Anomaly, 0 = Inlier):")
# Note: SynapseML IsolationForest uses 1 (anomaly) and 0 (inlier)
iforest_predictions.groupBy("anomaly_pred").count().show()
print(f"=====================================")

print("Saving Isolation Forest Model...")
iforest_model.write().overwrite().save(os.path.join(MODELS_DIR, "iforest_pipeline"))

print("\nAll models trained, evaluated, and saved.")
spark.stop()



Splitting data for training and testing...

--- Training Random Forest Model ---
Evaluating RF Model...
RF Model Accuracy: 100.00%
RF Model F1 Score: 1.0000
Saving RF Model...

--- Training K-Means Model ---
Evaluating K-Means Model...
K-Means Silhouette Score: 0.5838 (Closer to 1 is better)
Saving K-Means Model...

--- Training Isolation Forest Model ---
Evaluating Isolation Forest (on test data)...
Anomaly prediction counts (1 = Anomaly, -1 = Inlier):
+------------+-----+
|anomaly_pred|count|
+------------+-----+
|         1.0|   35|
|         0.0|  127|
+------------+-----+

Saving Isolation Forest Model...

All models trained, evaluated, and saved.
