In [0]:
# Part 1: Load and Explore Data

# Read the JSON data into your environment
df = spark.read.option("multiline", "true").json("/Volumes/workspace/default/project_transit_performance_analysis/Swift Assignment 4 - Dataset (1).json")

# display dataframe
display(df)

# correct way to handle nested json and array format .option("multiline","true")

In [0]:
# Perform initial data exploration to understand the structure

# check the schema
df.printSchema()

In [0]:
# display all column names
print(df.columns)

In [0]:
# count rows and columns
print("Number of rows:", df.count())
print("Number of columns:", len(df.columns))
print("Columns:", df.columns)

In [0]:
# check for missing or null values and display the result
from pyspark.sql.functions import col,sum,when
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).display()


In [0]:

'''
# Part 2: Flatten and Extract Transit Data

Extract and flatten the following information for each shipment:

Shipment Identifiers:

1. Tracking number (from trackingNumber field)
2. Service type (from service.type field)
3. Service description (from service.description field)
4. Carrier code (from carrierCode field)
'''

# use explode() function --- it converts each element in the array into a seperate row
from pyspark.sql.functions import explode, col

# Flatten the 'trackDetails' array
df_flat = df.select(explode("trackDetails").alias("details"))

# shipment identifiers fields
df_shipments = df_flat.select(
    col("details.trackingNumber").alias("tracking_number"),
    col("details.service.type").alias("service_type"),
    col("details.service.description").alias("service_description"),
    col("details.carrierCode").alias("carrier_code")
)

# display result
df_shipments.display()


In [0]:
'''
Weight and Package Information:
1. Package weight (value and units)
2. Packaging type
'''
# extracting weight and package information from details

df_weight_pkg = df_flat.select(
    col("details.trackingNumber").alias("tracking_number"),
    col("details.packageWeight.value").alias("package_weight_value"),
    col("details.packageWeight.units").alias("package_weight_units"),
    col("details.packaging.type").alias("packaging_type")
)

# display result
df_weight_pkg.display()


In [0]:
'''
Location Information:
1. Origin city, state, postal code
2. Destination city, state, postal code

'''

from pyspark.sql.functions import col, explode, coalesce, lit

df_location = df_flat.select(
    col("details.trackingNumber").alias("tracking_number"),

    # 1. Origin address
    coalesce(col("details.shipperAddress.city"), lit("Unknown")).alias("origin_city"),
    coalesce(col("details.shipperAddress.stateOrProvinceCode"), lit("Unknown")).alias("origin_state"),
    coalesce(col("details.shipperAddress.countryCode"), lit("Unknown")).alias("origin_country"),

    # 2. Destination address
    coalesce(col("details.destinationAddress.city"), lit("Unknown")).alias("destination_city"),
    coalesce(col("details.destinationAddress.stateOrProvinceCode"), lit("Unknown")).alias("destination_state"),
    coalesce(col("details.destinationAddress.countryCode"), lit("Unknown")).alias("destination_country")
)

# display location information
df_location.display()

In [0]:
'''
Transit Events:
For each shipment, extract ALL events from the events array:
1. Event type (from eventType field - can be any value like IT, AR, DP, PU, OD, DL, etc.)
2. Event timestamp (from timestamp field)
3. Event description (from eventDescription field)
4. Event location city (from address.city field)
5. Event location state (from address.stateOrProvinceCode field)
6. Event location postal code (from address.postalCode field)
7. Arrival location type (from arrivalLocation field - can be any value)
'''

# flatten events array in each shipment
df_events_flat = df_flat.select(
    col("details.trackingNumber").alias("tracking_number"),
    explode("details.events").alias("event")
)

# extracting the event fields

df_events = df_events_flat.select(
    col("tracking_number"),
    col("event.eventType").alias("event_type"),
    col("event.timestamp").alias("event_timestamp"),
    col("event.eventDescription").alias("event_description"),
    col("event.address.city").alias("event_city"),
    col("event.address.stateOrProvinceCode").alias("event_state"),
    col("event.address.postalCode").alias("event_postal_code"),
    col("event.arrivalLocation").alias("arrival_location_type")
)

# display transit events
df_events.display()

In [0]:
'''
Part 3: Compute Transit Performance Metrics

For each shipment, calculate the following derived metrics:

1. Facility Touchpoints:

i. Total number of unique facilities visited (count distinct facilities from events where
arrivalLocation field contains the substring "FACILITY")

ii. Number of events with specific event types (count events by eventType - you should
identify which event types represent "in transit" vs "arrival" based on the data)

iii. List all unique event types found in the dataset

'''

# we define facility touchpoints as events where arrival_location_type contains "FACILITY"
from pyspark.sql.functions import col, when, countDistinct

# Filter events for facility touchpoints
df_facilities = df_events.filter(col("arrival_location_type").contains("FACILITY"))

# Count unique facilities per shipment
df_facility_count = df_facilities.groupBy("tracking_number") \
    .agg(countDistinct("arrival_location_type").alias("unique_facilities_count"))

# i. Total number of unique facilities visited
# display Total number of unique facilities 
df_facility_count.display()

In [0]:
# count events by event type
from pyspark.sql.functions import count

# Count number of events by event type per shipment
df_event_type_counts = df_events.groupBy("tracking_number", "event_type") \
    .agg(count("*").alias("event_count")) \
    .orderBy("tracking_number")

# display the number of event count
df_event_type_counts.display()

In [0]:
# Define event type categories
# ii. Number of events with specific event types
in_transit_types = ["IT", "DP", "OD"]
arrival_types = ["AR", "PU", "DL"]

df_events_categorized = df_events.withColumn(
    "event_category",
    when(col("event_type").isin(in_transit_types), "IN_TRANSIT")
    .when(col("event_type").isin(arrival_types), "ARRIVAL")
    .otherwise("OTHER")
)

# Count per category per shipment
df_event_category_count = df_events_categorized.groupBy("tracking_number", "event_category") \
    .agg(count("*").alias("count_events"))

# display Number of events with specific event types
df_event_category_count.display()

In [0]:
# iii. list all unique types in dataset and display the result
df_events.select("event_type").distinct().display()

In [0]:
# Example join
df_summary = df_facility_count.join(df_event_category_count.groupBy("tracking_number") \
    .pivot("event_category").sum("count_events"), on="tracking_number", how="left")

display(df_summary)

In [0]:
'''
2. Transit Time Analysis:

i. Total transit time in hours (from first pickup-type event to final delivery-type event -
identify these by analyzing eventType and eventDescription fields)

ii. Time in inter-facility transit in hours (calculate based on event timestamps and facility
touchpoints)
'''

# Extract Epoch Milliseconds and Convert to Timestamp

# from pyspark.sql.functions import col, (unix_timestamp, from_unixtime)
from pyspark.sql.types import LongType

# Extract the number inside the struct and convert to long
df_events_ts = df_events.withColumn(
    "event_epoch_ms",
    col("event_timestamp.$numberLong").cast(LongType())
)

# Convert milliseconds to seconds
df_events_ts = df_events_ts.withColumn(
    "event_timestamp_ts",
    (col("event_epoch_ms") / 1000).cast("timestamp")
)


In [0]:
# identify pickup and delivery events
from pyspark.sql.functions import when

# Create flags for pickup and delivery events
df_events_flagged = df_events_ts.withColumn(
    "is_pickup",
    when(col("event_type") == "PU", 1).otherwise(0)
).withColumn(
    "is_delivery",
    when(col("event_type") == "DL", 1).otherwise(0)
)


In [0]:
# calculate total transit time per shipment
from pyspark.sql.functions import min, max, unix_timestamp

df_transit_time = df_events_flagged.groupBy("tracking_number").agg(
    min(when(col("is_pickup") == 1, col("event_timestamp_ts"))).alias("pickup_time"),
    max(when(col("is_delivery") == 1, col("event_timestamp_ts"))).alias("delivery_time")
)

# i. Calculating transit time in hours (from first pickup-type event to final delivery-type event -
# identify these by analyzing eventType and eventDescription fields)
df_transit_time = df_transit_time.withColumn(
    "total_transit_hours",
    (unix_timestamp(col("delivery_time")) - unix_timestamp(col("pickup_time"))) / 3600
)

# display the transit time in hours
df_transit_time.display()

In [0]:
'''
ii. Time in inter-facility transit in hours (calculate based on event timestamps and facility
touchpoints) 

'''
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Filter facility events
df_facility_events = df_events_flagged.filter(col("arrival_location_type").contains("FACILITY"))

# Define window partitioned by shipment, ordered by timestamp
window_spec = Window.partitionBy("tracking_number").orderBy("event_timestamp_ts")

# Get previous facility timestamp
df_facility_events = df_facility_events.withColumn(
    "prev_facility_ts",
    lag("event_timestamp_ts").over(window_spec)
)

# Calculate inter-facility time in hours
df_facility_events = df_facility_events.withColumn(
    "inter_facility_hours",
    (unix_timestamp(col("event_timestamp_ts")) - unix_timestamp(col("prev_facility_ts"))) / 3600
)

# Total inter-facility transit time per shipment
df_inter_facility_time = df_facility_events.groupBy("tracking_number").agg(
    sum("inter_facility_hours").alias("total_inter_facility_hours")
)

# display the totalinterfacility time in hours
df_inter_facility_time.display()



In [0]:
'''
3. Transit Velocity:
i. Average hours per facility (total transit time / number of facilities)
ii. Service category classification (classify based on service.type field - you should
identify patterns like express vs standard services)

'''

from pyspark.sql.functions import col, when, min, max, unix_timestamp, countDistinct
from pyspark.sql.types import LongType

# Convert event_timestamp (struct with $numberLong) to proper timestamp
df_events_ts = df_events.withColumn(
    "event_epoch_ms",
    col("event_timestamp.$numberLong").cast(LongType())
).withColumn(
    "event_timestamp_ts",
    (col("event_epoch_ms") / 1000).cast("timestamp")
)

# Flag pickup and delivery events
df_events_flagged = df_events_ts.withColumn(
    "is_pickup",
    when(col("event_type") == "PU", 1).otherwise(0)
).withColumn(
    "is_delivery",
    when(col("event_type") == "DL", 1).otherwise(0)
)

# Calculate total transit time per shipment
df_transit_time = df_events_flagged.groupBy("tracking_number").agg(
    min(when(col("is_pickup") == 1, col("event_timestamp_ts"))).alias("pickup_time"),
    max(when(col("is_delivery") == 1, col("event_timestamp_ts"))).alias("delivery_time")
).withColumn(
    "total_transit_hours",
    (unix_timestamp(col("delivery_time")) - unix_timestamp(col("pickup_time"))) / 3600
)

# Count unique facility touchpoints
df_facilities = df_events_flagged.filter(col("arrival_location_type").contains("FACILITY"))
df_facility_count = df_facilities.groupBy("tracking_number").agg(
    countDistinct("arrival_location_type").alias("unique_facilities_count")
)

# Combine transit time and facility counts
df_transit_summary = df_transit_time.join(df_facility_count, on="tracking_number", how="left")

# display result
df_transit_summary.display()



In [0]:
# i. calculating avg hours per facility (total transit time / number of facilities)
from pyspark.sql.functions import col, round

# Ensure unique_facilities_count is > 0 to avoid division by zero
df_transit_velocity = df_transit_summary.withColumn(
    "avg_hours_per_facility",
    round(col("total_transit_hours") / col("unique_facilities_count"), 2)
)

# display avg hours per facility
df_transit_velocity.select("tracking_number", "total_transit_hours", "unique_facilities_count", "avg_hours_per_facility").display()

In [0]:
'''
ii. Service category classification (classify based on service.type field - you should
identify patterns like express vs standard services)
'''

from pyspark.sql.functions import when

# Join service info with transit summary
df_velocity_service = df_transit_velocity.join(
    df_shipments.select("tracking_number", "service_type"),
    on="tracking_number",
    how="left"
)

# Classify service category
df_velocity_service = df_velocity_service.withColumn(
    "service_category",
    when(col("service_type").rlike("(?i)EXPRESS|OVERNIGHT|PRIORITY"), "EXPRESS")
    .when(col("service_type").rlike("(?i)GROUND|STANDARD"), "STANDARD")
    .otherwise("OTHER")
)

# display service category
df_velocity_service.select(
    "tracking_number", "service_type", "service_category", "avg_hours_per_facility"
).display()

In [0]:
'''
4. Delivery Characteristics:
i. Final delivery location type (from deliveryLocationType field)
ii. Number of out-for-delivery attempts (count events where eventType or
eventDescription indicates out-for-delivery status)
iii. Was delivered on first attempt (TRUE if only 1 out-for-delivery event, FALSE otherwise)

'''

# i. Final delivery location type
from pyspark.sql.functions import col

# Extract deliveryLocationType from trackDetails
df_delivery_type = df.selectExpr("explode(trackDetails) as details") \
    .select(
        col("details.trackingNumber").alias("tracking_number"),
        col("details.deliveryLocationType").alias("delivery_location_type")
    )

# display delivery location type
df_delivery_type.display()


In [0]:
# ii. Number of out-for-delivery attempts
from pyspark.sql.functions import when, sum as spark_sum, lower

# Flag out-for-delivery events
df_out_for_delivery = df_events.withColumn(
    "is_out_for_delivery",
    when(
        (col("event_type") == "OD") |
        (lower(col("event_description")).contains("out-for-delivery")),
        1
    ).otherwise(0)
)

# Count out-for-delivery events per shipment
df_delivery_attempts = df_out_for_delivery.groupBy("tracking_number").agg(
    spark_sum("is_out_for_delivery").alias("num_out_for_delivery_attempts")
)

# display number of out-for-delivery attempts
df_delivery_attempts.display()

In [0]:
# iii. Was delivered on first attempt
from pyspark.sql.functions import when

df_delivery_attempts = df_delivery_attempts.withColumn(
    "delivered_first_attempt",
    when(col("num_out_for_delivery_attempts") == 1, True).otherwise(False)
)

# display delivered_first_attempt
df_delivery_attempts.display()

In [0]:
# Combine with Delivery Location Type
df_delivery_characteristics = df_delivery_type.join(
    df_delivery_attempts,
    on="tracking_number",
    how="left"
)

# combine and display the results

df_delivery_characteristics.select(
    "tracking_number",
    "delivery_location_type",
    "num_out_for_delivery_attempts",
    "delivered_first_attempt"
).display()

In [0]:
'''
Part 4: Handle Edge Cases
Your solution should handle:

i. Shipments with missing or null values in any field
ii. Timestamps in different formats (MongoDB $numberLong format vs ISO string format)
iii. Shipments with incomplete event sequences
iv. Missing address information (city, state, postal code)
v. Duplicate events at the same timestamp
vi. Events array being empty or missing
vii. Nested fields that may not exist in all records

'''

# i. Shipments with missing or null values in any field
from pyspark.sql.functions import coalesce, lit

df_shipments_clean = df.selectExpr("explode(trackDetails) as details").select(
    col("details.trackingNumber").alias("tracking_number"),
    coalesce(col("details.service.type"), lit("Unknown")).alias("service_type"),
    coalesce(col("details.service.description"), lit("Unknown")).alias("service_description"),
    coalesce(col("details.carrierCode"), lit("Unknown")).alias("carrier_code"),
    coalesce(col("details.packageWeight.value"), lit(0.0)).alias("package_weight_value"),
    coalesce(col("details.packageWeight.units"), lit("Unknown")).alias("package_weight_units"),
    coalesce(col("details.packaging"), lit("Unknown")).alias("packaging_type"),
    coalesce(col("details.deliveryLocationType"), lit("Unknown")).alias("delivery_location_type")
)


In [0]:
# ii. Timestamps in different formats
from pyspark.sql.functions import to_timestamp, when, col
from pyspark.sql.types import LongType

df_events_ts = df_events.withColumn(
    "event_timestamp_ts",
    when(col("event_timestamp.$numberLong").isNotNull(),
         (col("event_timestamp.$numberLong").cast(LongType()) / 1000).cast("timestamp")
    ).otherwise(
         to_timestamp(col("event_timestamp"), "yyyy-MM-dd'T'HH:mm:ss")
    )
)

In [0]:
# iii. Shipments with Incomplete Event Sequences
from pyspark.sql.functions import unix_timestamp

df_transit_time = df_events_ts.groupBy("tracking_number").agg(
    min(when(col("event_type") == "PU", col("event_timestamp_ts"))).alias("pickup_time"),
    max(when(col("event_type") == "DL", col("event_timestamp_ts"))).alias("delivery_time")
).withColumn(
    "total_transit_hours",
    when(
        col("pickup_time").isNotNull() & col("delivery_time").isNotNull(),
        (unix_timestamp(col("delivery_time")) - unix_timestamp(col("pickup_time"))) / 3600
    ).otherwise(None)
)


In [0]:
# iv. Missing address information (city, state, postal code)

df_location_clean = df_flat.select(
    col("details.trackingNumber").alias("tracking_number"),
    coalesce(col("details.shipperAddress.city"), lit("Unknown")).alias("origin_city"),
    coalesce(col("details.shipperAddress.stateOrProvinceCode"), lit("Unknown")).alias("origin_state"),
    coalesce(col("details.shipperAddress.countryCode"), lit("Unknown")).alias("origin_country"),
    coalesce(col("details.destinationAddress.city"), lit("Unknown")).alias("destination_city"),
    coalesce(col("details.destinationAddress.stateOrProvinceCode"), lit("Unknown")).alias("destination_state"),
    coalesce(col("details.destinationAddress.countryCode"), lit("Unknown")).alias("destination_country")
)

In [0]:
# v. Duplicate events at the same timestamp
df_events_nodup = df_events_ts.dropDuplicates(["tracking_number", "event_timestamp_ts", "event_type"])

In [0]:
# vi. Events array being empty or missing

from pyspark.sql.functions import explode_outer

df_flat_events = df.select(explode_outer("trackDetails").alias("details")) \
                   .select("details.trackingNumber", explode_outer("details.events").alias("event"))

In [0]:
# vii. Nested fields that may not exist in all records

from pyspark.sql.functions import col, lit, coalesce

col("event.address.city")  # If missing, coalesce with default
coalesce(col("event.address.city"), lit("Unknown")).alias("event_city")

In [0]:
'''
Part 5: Output Detailed Transit CSV
Create a CSV file named transit_performance_detailed.csv with the following
columns:
1. tracking_number
2. service_type
3. carrier_code
4. package_weight_kg
5. packaging_type
6. origin_city, origin_state, origin_pincode
7. destination_city, destination_state, destination_pincode
8. pickup_datetime_ist, delivery_datetime_ist
9. total_transit_hours
10. num_facilities_visited
11. num_in_transit_events
12. time_in_inter_facility_transit_hours
13. avg_hours_per_facility
14. is_express_service
15. delivery_location_type
16. num_out_for_delivery_attempts
17. first_attempt_delivery
18. total_events_count
'''

from pyspark.sql.functions import when

df_transit_summary = df_transit_summary.join(
    df_flat.select("tracking_number","package_weight_value","package_weight_units","packaging_type").distinct(),
    "tracking_number","left"
).withColumn(
    "package_weight_kg",
    when(col("package_weight_units").isin("KG","kg"), col("package_weight_value"))
    .when(col("package_weight_units").isin("G","g"), col("package_weight_value")/1000)
    .when(col("package_weight_units").isin("LB","lb"), col("package_weight_value")*0.453592)
    .otherwise(col("package_weight_value"))
)

In [0]:
from pyspark.sql.functions import (
    col, when, to_timestamp, coalesce, lit, round, countDistinct, unix_timestamp,
    min, max, sum as spark_sum
)
from pyspark.sql.types import LongType
from pyspark.sql.window import Window
from pyspark.sql.functions import lag

# Fix timestamps safely for mixed formats
df_events_ts = df_events.withColumn(
    "event_timestamp_ts",
    when(
        col("event_timestamp.$numberLong").isNotNull(),
        (col("event_timestamp.$numberLong").cast(LongType()) / 1000).cast("timestamp")
    ).otherwise(
        to_timestamp(col("event_timestamp").cast("string"), "yyyy-MM-dd'T'HH:mm:ss")
    )
)

# Flag pickup and delivery events
df_events_flagged = df_events_ts.withColumn(
    "is_pickup", when(col("event_type") == "PU", 1).otherwise(0)
).withColumn(
    "is_delivery", when(col("event_type") == "DL", 1).otherwise(0)
)

# Calculate total transit time correctly using aggregation
df_transit_time = df_events_flagged.groupBy("tracking_number").agg(
    min(when(col("is_pickup") == 1, col("event_timestamp_ts"))).alias("pickup_time"),
    max(when(col("is_delivery") == 1, col("event_timestamp_ts"))).alias("delivery_time")
)

df_transit_time = df_transit_time.withColumn(
    "total_transit_hours",
    (unix_timestamp(col("delivery_time")) - unix_timestamp(col("pickup_time"))) / 3600
)

# Inter-facility transit calculation
df_facility_events = df_events_flagged.filter(col("arrival_location_type").contains("FACILITY"))
window_spec = Window.partitionBy("tracking_number").orderBy("event_timestamp_ts")

df_facility_events = df_facility_events.withColumn(
    "prev_facility_ts", lag("event_timestamp_ts").over(window_spec)
).withColumn(
    "inter_facility_hours",
    (unix_timestamp(col("event_timestamp_ts")) - unix_timestamp(col("prev_facility_ts"))) / 3600
)

df_inter_facility_time = df_facility_events.groupBy("tracking_number").agg(
    spark_sum("inter_facility_hours").alias("total_inter_facility_hours")
)

# Join all dataframes together
df_final = (
    df_shipments
    .join(df_weight_pkg, "tracking_number", "left")
    .join(df_location, "tracking_number", "left")
    .join(df_transit_time, "tracking_number", "left")
    .join(df_facility_count, "tracking_number", "left")
    .join(df_event_category_count.groupBy("tracking_number")
           .pivot("event_category").sum("count_events"), "tracking_number", "left")
    .join(df_inter_facility_time, "tracking_number", "left")
    .join(df_velocity_service.select("tracking_number", "avg_hours_per_facility", "service_category"), "tracking_number", "left")
    .join(df_delivery_characteristics, "tracking_number", "left")
)

# Select and rename columns for final CSV
df_final = df_final.select(
    col("tracking_number"),
    col("service_type"),
    col("carrier_code"),
    round(col("package_weight_value"), 2).alias("package_weight_kg"),
    col("packaging_type"),
    col("origin_city"),
    col("origin_state"),
    col("origin_country").alias("origin_pincode"),  # Replace if postal code exists
    col("destination_city"),
    col("destination_state"),
    col("destination_country").alias("destination_pincode"),  # Replace if postal code exists
    col("pickup_time").alias("pickup_datetime_ist"),
    col("delivery_time").alias("delivery_datetime_ist"),
    round(col("total_transit_hours"), 2).alias("total_transit_hours"),
    col("unique_facilities_count").alias("num_facilities_visited"),
    coalesce(col("IN_TRANSIT"), lit(0)).alias("num_in_transit_events"),
    round(col("total_inter_facility_hours"), 2).alias("time_in_inter_facility_transit_hours"),
    round(col("avg_hours_per_facility"), 2).alias("avg_hours_per_facility"),
    when(col("service_category") == "EXPRESS", lit(True)).otherwise(lit(False)).alias("is_express_service"),
    col("delivery_location_type"),
    col("num_out_for_delivery_attempts"),
    col("delivered_first_attempt").alias("first_attempt_delivery"),
    (coalesce(col("IN_TRANSIT"), lit(0)) +
     coalesce(col("ARRIVAL"), lit(0)) +
     coalesce(col("OTHER"), lit(0))).alias("total_events_count")
)

# Handle missing values
df_final = df_final.fillna({
    "num_facilities_visited": 0,
    "num_in_transit_events": 0,
    "time_in_inter_facility_transit_hours": 0.0,
    "avg_hours_per_facility": 0.0,
    "total_transit_hours": 0.0,
    "is_express_service": False,
    "first_attempt_delivery": False
})

# displaying the result
df_final.display()


# Convert to IST timezone
# from pyspark.sql.functions import from_utc_timestamp
# df_final = df_final.withColumn("pickup_datetime_ist", from_utc_timestamp("pickup_datetime_ist", "Asia/Kolkata"))
# df_final = df_final.withColumn("delivery_datetime_ist", from_utc_timestamp("delivery_datetime_ist", "Asia/Kolkata"))

# print(f"Final CSV successfully created at: {output_path}")


In [0]:
output_path = "/Volumes/workspace/default/transittarget/transit_performance_detailed.csv"
df_final.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path)
print(f" Final CSV successfully created at: {output_path}")

In [0]:
'''
Part 6: Output Network Performance Summary CSV

Create a summary CSV file named transit_performance_summary.csv with the
following statistics:

Overall Metrics:

i. total_shipments_analyzed
ii. avg_transit_hours, median_transit_hours, std_dev_transit_hours
iii. min_transit_hours, max_transit_hours

Facility Metrics:
i. avg_facilities_per_shipment, median_facilities_per_shipment
ii. mode_facilities_per_shipment
iii. avg_hours_per_facility, median_hours_per_facility

Service Type Comparison:
(Group by unique values found in service.type field)

i. avg_transit_hours_by_service_type (for each unique service type)
ii. avg_facilities_by_service_type (for each unique service type)
iii. count_shipments_by_service_type (for each unique service type)


Delivery Performance:
i. pct_first_attempt_delivery
ii. avg_out_for_delivery_attempts

'''


from pyspark.sql import functions as F
from pyspark.sql.window import Window
import statistics

# Overall Metrics
overall_metrics = (
    df_final.agg(
        F.count("*").alias("total_shipments_analyzed"),
        F.avg("total_transit_hours").alias("avg_transit_hours"),
        F.expr("percentile(total_transit_hours, 0.5)").alias("median_transit_hours"),
        F.stddev("total_transit_hours").alias("std_dev_transit_hours"),
        F.min("total_transit_hours").alias("min_transit_hours"),
        F.max("total_transit_hours").alias("max_transit_hours")
    )
)

# display the overall metrics
overall_metrics.display()

In [0]:

#  Facility Metrics:
facility_metrics = (
    df_final.agg(
        F.avg("num_facilities_visited").alias("avg_facilities_per_shipment"),
        F.expr("percentile(num_facilities_visited, 0.5)").alias("median_facilities_per_shipment"),
        F.avg("avg_hours_per_facility").alias("avg_hours_per_facility"),
        F.expr("percentile(avg_hours_per_facility, 0.5)").alias("median_hours_per_facility")
    )
)

# displaying the Facility metrics
facility_metrics.display()

In [0]:
# Calculate mode of facilities manually
mode_facility = (
    df_final.groupBy("num_facilities_visited")
    .count()
    .orderBy(F.desc("count"))
    .first()
)
mode_facilities_per_shipment = mode_facility["num_facilities_visited"] if mode_facility else None

facility_metrics = facility_metrics.withColumn("mode_facilities_per_shipment", F.lit(mode_facilities_per_shipment))
facility_metrics.display()

In [0]:
# Service Type Comparison
service_type_summary = (
    df_final.groupBy("service_type").agg(
        F.avg("total_transit_hours").alias("avg_transit_hours_by_service_type"),
        F.avg("num_facilities_visited").alias("avg_facilities_by_service_type"),
        F.count("*").alias("c(ount_shipments_by_service_type")
    )
)

# displaying the service type comparison
service_type_summary.display()

In [0]:
# Delivery Performance
delivery_metrics = (
    df_final.agg(
        (F.sum(F.when(F.col("first_attempt_delivery") == True, 1).otherwise(0)) / F.count("*") * 100)
        .alias("pct_first_attempt_delivery"),
        F.avg("num_out_for_delivery_attempts").alias("avg_out_for_delivery_attempts")
    )
)

# display the delivery performance
delivery_metrics.display()

In [0]:
from pyspark.sql.functions import lit

# Add a section label to each DataFrame
df_summary_labeled = df_summary.withColumn("section", lit("Overall Metrics"))

service_type_summary_labeled = service_type_summary.withColumn("section", lit("Service Type Comparison"))

# Align columns (fill missing columns in each df with nulls)
# Get all columns
all_cols = list(set(df_summary_labeled.columns) | set(service_type_summary_labeled.columns))

def add_missing_columns(df, all_cols):
    for c in all_cols:
        if c not in df.columns:
            df = df.withColumn(c, lit(None))
    return df.select(all_cols)

df_summary_aligned = add_missing_columns(df_summary_labeled, all_cols)
service_type_aligned = add_missing_columns(service_type_summary_labeled, all_cols)

# Union both DataFrames
df_combined = df_summary_aligned.unionByName(service_type_aligned)
df_combined.display()


In [0]:
# Save combined CSV
output_path_combined = "/Volumes/workspace/default/transitperformance/transit_performance_summary.csv"
df_combined.coalesce(1).write.mode("overwrite").option("header", "true").csv(output_path_combined)

print(f"Combined network performance summary CSV created at: {output_path_combined}")