I started by reading the Bronze Delta table into a Spark DataFrame. This serves as the raw-but-ingested source for the Silver layer, and I also validated row counts before applying any transformations

In [0]:
from pyspark.sql import functions as F

df_bronze = spark.table("capstone_project.logistics.bronze_last_mile_deliveries")
print("Initial Bronze Count:", df_bronze.count())

Here, I checked the schema and total record count of the Bronze data. This gives me a baseline so I can later confirm how many records are affected by cleaning and transformations.

In [0]:
df_bronze.printSchema()

In [0]:
df_bronze.count()

In [0]:

display(df_bronze.limit(10))

I displayed the Bronze data and listed the column names to get a quick sense of the data structure and verify that all expected fields are present.

In [0]:
df_bronze.columns

In this step, I removed duplicate records and dropped rows with missing values in critical operational fields such as order ID, courier ID, city, and key timestamps. These fields are essential for reliable downstream calculations.

After removing duplicates and nulls from key columns, I checked the row count again to understand how much data was filtered out during this initial cleanup step.

In [0]:
df_silver = (
    df_bronze
    .dropDuplicates()
    .dropna(subset=[ 
        "order_id",
        "courier_id",
        "city",
        "accept_time",
        "pickup_time",
        "time_window_start",
        "time_window_end"])
)

print("After dropping duplicates and dropna on key columns:", df_silver.count())

The raw timestamp columns lack a year component, so I temporarily prefixed a fixed year to each time-related column. This makes the values compatible with Spark’s timestamp parsing functions. This prevents malformed timestamps from silently corrupting duration calculations.

In [0]:
df_silver = (
    df_silver
    .withColumn("accept_time_fixed", F.concat(F.lit("2026-"), F.col("accept_time")))
    .withColumn("pickup_time_fixed", F.concat(F.lit("2026-"), F.col("pickup_time")))
    .withColumn("window_start_fixed", F.concat(F.lit("2026-"), F.col("time_window_start")))
    .withColumn("window_end_fixed", F.concat(F.lit("2026-"), F.col("time_window_end")))
)

Using try_to_timestamp, I converted the fixed timestamp strings into proper timestamp columns. Invalid or malformed values are automatically handled without breaking the pipeline.

In [0]:
df_silver = (
    df_silver
    .withColumn("accept_time", F.expr("try_to_timestamp(accept_time_fixed, 'yyyy-MM-dd HH:mm:ss')"))
    .withColumn("pickup_time", F.expr("try_to_timestamp(pickup_time_fixed, 'yyyy-MM-dd HH:mm:ss')"))
    .withColumn("time_window_start", F.expr("try_to_timestamp(window_start_fixed, 'yyyy-MM-dd HH:mm:ss')"))
    .withColumn("time_window_end", F.expr("try_to_timestamp(window_end_fixed, 'yyyy-MM-dd HH:mm:ss')"))
)

In [0]:
display(df_silver.limit(10))

After conversion, I removed any rows where timestamp parsing failed. This ensures that all time-based calculations in the Silver layer are based on valid timestamps.

In [0]:
df_silver = df_silver.dropna(
    subset=["accept_time", "pickup_time", "time_window_start", "time_window_end"]
)

print("After timestamp fix:", df_silver.count())

Once the timestamps were successfully converted, I dropped the temporary helper columns to keep the schema clean and focused on business-relevant fields.

In [0]:
df_silver = df_silver.drop(
    "accept_time_fixed",
    "pickup_time_fixed",
    "window_start_fixed",
    "window_end_fixed"
)

Here, I calculated the time difference between order acceptance and pickup in minutes. This metric helps measure courier responsiveness and operational efficiency.

In [0]:
df_silver = df_silver.withColumn(
    "accept_to_pickup_minutes",
    (F.col("pickup_time").cast("long") - F.col("accept_time").cast("long")) / 60
)

In this step, I computed how late a pickup occurred relative to the end of its allowed time window. Positive values indicate late pickups, while negative values indicate early or on-time pickups.

In [0]:
df_silver = df_silver.withColumn(
    "pickup_delay_minutes",
    (F.col("pickup_time").cast("long") - F.col("time_window_end").cast("long")) / 60
)
print("After adding duration columns:", df_silver.count())

Since missing GPS data often indicates operational or tracking issues, I created a binary flag to mark records where pickup latitude or longitude is missing. After adding the GPS flag, I verified the row count to ensure the transformation only added information and did not remove data.

In [0]:

df_silver = df_silver.withColumn(
    "missing_pickup_gps",
    F.when(
        F.col("pickup_gps_lat").isNull() | F.col("pickup_gps_lng").isNull(),
        1
    ).otherwise(0)
)
print("After adding missing GPS flag:", df_silver.count())

I created an initial risk label based on delayed pickups, long accept-to-pickup times,or missing GPS data. This preliminary label is designed to highlight potentially problematic deliveries.It helps in exploring the data distribution before refining the final risk thresholds. I checked the row count again to confirm that adding the risk label was a non-destructive transformation.


In [0]:
df_silver = df_silver.withColumn(
    "high_risk_delivery",
    F.when(
        (F.col("pickup_delay_minutes") > 10) |
        (F.col("accept_to_pickup_minutes") > 30) |
        (F.col("missing_pickup_gps") == 1),
        1
    ).otherwise(0)
)
print("After adding high-risk label:", df_silver.count())

Here, I inspected the maximum accept-to-pickup time to understand the scale of outliers present in the data.

In [0]:
from pyspark.sql import functions as F

df_silver.select(
    F.max("accept_to_pickup_minutes").alias("max_accept_to_pickup_minutes")
).show(truncate=False)

In [0]:
df_silver.select(
    "order_id",
    "accept_to_pickup_minutes",
    "accept_time",
    "pickup_time"
).orderBy(
    F.col("accept_to_pickup_minutes").desc()
).show(10, truncate=False)


I calculated the median, P95, P99, and maximum values for accept-to-pickup duration to better understand the distribution and identify realistic thresholds for risk classification.


In [0]:
df_silver.select(
    F.expr("percentile_approx(accept_to_pickup_minutes, 0.50)").alias("median"),
    F.expr("percentile_approx(accept_to_pickup_minutes, 0.95)").alias("p95"),
    F.expr("percentile_approx(accept_to_pickup_minutes, 0.99)").alias("p99"),
    F.max("accept_to_pickup_minutes").alias("max")
).show(truncate=False)

I displayed the records with the highest accept-to-pickup durations to visually verify whether these extreme values are plausible or represent data anomalies.

In [0]:
df_silver.withColumn(
    "bucket",
    F.when(F.col("accept_to_pickup_minutes") < 30, "<30m")
     .when(F.col("accept_to_pickup_minutes") < 60, "30–60m")
     .when(F.col("accept_to_pickup_minutes") < 120, "1–2h")
     .when(F.col("accept_to_pickup_minutes") < 180, "2–3h")
     .when(F.col("accept_to_pickup_minutes") < 360, "3–6h")
     .when(F.col("accept_to_pickup_minutes") < 1440, "6–24h")
     .otherwise(">1 day")
).groupBy("bucket").count().orderBy("bucket").show()


Based on the distribution analysis, I updated the high-risk definition to use a more realistic threshold for accept-to-pickup duration, while retaining the other risk indicators.

In [0]:
df_silver = df_silver.withColumn(
    "high_risk_delivery",
    F.when(
        (F.col("accept_to_pickup_minutes") > 180) |
        (F.col("pickup_delay_minutes") > 10) |
        (F.col("missing_pickup_gps") == 1),
        1
    ).otherwise(0)
)

df_silver.count()

I grouped records by the high-risk flag to confirm that both high-risk and non-risk deliveries are reasonably represented.

In [0]:
df_silver.groupBy("high_risk_delivery").count().show()

To validate the logic qualitatively, I displayed a small sample of high-risk deliveries and reviewed the contributing factors.

In [0]:
df_silver.filter(F.col("high_risk_delivery") == 1) \
    .select("accept_to_pickup_minutes", "pickup_delay_minutes", "missing_pickup_gps") \
    .show(10, truncate=False)


Execution Plan

By calling `df_silver.explain(mode="formatted")`, I can inspect:
- The **logical plan**: what transformations are requested
- The **optimized plan**: how Spark simplifies and reorders operations
- The **physical plan**: how the computation will actually be executed across the cluster

This step helps verify that Spark is efficiently combining transformations and avoiding
unnecessary scans or shuffles before persisting the Silver table.

In [0]:
df_silver.explain(mode="formatted")

After validating the transformations and risk logic, I persisted the cleaned and enriched dataset as a Silver Delta table. This table serves as a trusted, analytics-ready source for downstream Gold aggregations and machine learning.


In [0]:
df_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("capstone_project.logistics.silver_last_mile_deliveries")