In [0]:
# load the data 
# 3. Read the json into the environment

df = spark.read.option("multiline",True).json("/Volumes/workspace/default/project_shipment_tracking/Swift Assignment 4 - Dataset.json")

# display dataframe
df.display()

In [0]:
# perform the initial exploration
# check the schema
df.printSchema()

In [0]:
# display all column names
print(df.columns)

In [0]:
# count rows and columns
print("Number of rows:", df.count())
print("Number of columns:", len(df.columns))
print("Columns:", df.columns)

In [0]:
# check for missing or null values and display the result
from pyspark.sql.functions import col,sum,when
df.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]).display()

In [0]:
'''
Flatten the Data
1. Flatten the data per item array as needed
2. Properties needed
- Tracking number
- Payment type (Prepaid/COD)
- Pickup Date Time
- Delivery Date Time
- Out for Delivery datetime(s)
- Shipment weight
- Pickup Pincode, City, State
- Drop Pincode, City, State
'''

In [0]:
from pyspark.sql.functions import explode, col, expr

df_exploded = df.select(explode("trackDetails").alias("details"))


In [0]:
df_flat = df_exploded.select(
    col("details.trackingNumber").alias("TrackingNumber"),
    col("details.specialHandlings")[0]["paymentType"].alias("PaymentType"),
    
    # Pickup details
    expr("filter(details.datesOrTimes, x -> x.type = 'ACTUAL_PICKUP')[0].dateOrTimestamp").alias("PickupDateTime"),
    expr("filter(details.datesOrTimes, x -> x.type = 'ACTUAL_DELIVERY')[0].dateOrTimestamp").alias("DeliveryDateTime"),
    expr("transform(filter(details.datesOrTimes, x -> x.type = 'OUT_FOR_DELIVERY'), x -> x.dateOrTimestamp)").alias("OutForDeliveryDateTimes"),
    col("details.shipmentWeight.value").alias("ShipmentWeight"),
    
 
    
    # Pickup address (from shipperAddress)
    col("details.shipperAddress.city").alias("PickupCity"),
    col("details.shipperAddress.stateOrProvinceCode").alias("PickupState"),
    expr("details.events[0].address.postalCode").alias("PickupPincode"),
    
    # Drop address (from destinationAddress)
    col("details.destinationAddress.city").alias("DropCity"),
    col("details.destinationAddress.stateOrProvinceCode").alias("DropState"),
    expr("details.events[size(details.events)-1].address.postalCode").alias("DropPincode")
)

df_flat.display()

In [0]:
'''
Transform the data Compute the following inferred field for every shipment: 
- All date times readable format in IST 
- Days taken for journey completion (Pickup to Delivery, in number of days) 
- Number of delivery attempts (Number of times it has been Out for Delivery + Delivered; handle special case where Out For Delivery and Delivered happens on same day)

'''

In [0]:
from pyspark.sql.functions import col, to_timestamp, from_utc_timestamp, datediff, size, expr, array_distinct
from pyspark.sql import functions as F

# All date times readable format in IST 
# Step 1: Convert pickup, delivery, and out-for-delivery datetimes to IST
df_transformed = (
    df_flat
    .withColumn("PickupDateTime_IST", from_utc_timestamp(to_timestamp(col("PickupDateTime")), "Asia/Kolkata"))
    .withColumn("DeliveryDateTime_IST", from_utc_timestamp(to_timestamp(col("DeliveryDateTime")), "Asia/Kolkata"))
    .withColumn(
        "OutForDelivery_IST",
        F.transform(col("OutForDeliveryDateTimes"), 
                    lambda x: from_utc_timestamp(to_timestamp(x), "Asia/Kolkata"))
    )
)

# Days taken for journey completion (Pickup to Delivery, in number of days) 
# Step 2: Compute days taken for journey (Delivery - Pickup)
df_transformed = df_transformed.withColumn(
    "DaysTakenForJourney",
    datediff(col("DeliveryDateTime_IST"), col("PickupDateTime_IST"))
)

# Step 3: Compute Number of delivery attempts (Number of times it has been Out for Delivery + Delivered; handle special case where Out For Delivery and Delivered happens on same day)

# Remove duplicate same-day attempts (Out for Delivery + Delivered same day)
df_transformed = df_transformed.withColumn(
    "UniqueDeliveryDays",
    array_distinct(F.transform(col("OutForDelivery_IST"), lambda x: F.to_date(x)))
)

df_transformed = df_transformed.withColumn(
    "DeliveryAttempts",
    F.when(
        col("DeliveryDateTime_IST").isNotNull(),
        F.size(col("UniqueDeliveryDays")) + 1  # Add 1 for the final delivery attempt
    ).otherwise(F.size(col("UniqueDeliveryDays")))
)

# Final selection
df_final = df_transformed.select(
    "TrackingNumber",
    "PaymentType",
    "PickupDateTime_IST",
    "DeliveryDateTime_IST",
    "OutForDelivery_IST",
    "DaysTakenForJourney",
    "DeliveryAttempts",
    "ShipmentWeight",
    "PickupPincode", "PickupCity", "PickupState",
    "DropPincode", "DropCity", "DropState"
)

df_final.display()


In [0]:
'''
Output as a CSV
Write out a csv with the following headers
- Tracking number
- Payment type (Prepaid/COD)
- Pickup Date Time in IST
- Delivery Date Time in IST
- Days taken for delivery
- Shipment weight
- Pickup Pincode, City, State
- Drop Pincode, City, State
- Number of delivery attempts needed

'''

In [0]:
# Write final DataFrame to CSV
output_path = "/Volumes/workspace/default/shipmenttarget/final_shipment_output_csv"

df_final.select(
    col("TrackingNumber").alias("Tracking number"),
    col("PaymentType").alias("Payment type (Prepaid/COD)"),
    col("PickupDateTime_IST").alias("Pickup Date Time in IST"),
    col("DeliveryDateTime_IST").alias("Delivery Date Time in IST"),
    col("DaysTakenForJourney").alias("Days taken for delivery"),
    col("ShipmentWeight").alias("Shipment weight"),
    col("PickupPincode").alias("Pickup Pincode"),
    col("PickupCity").alias("Pickup City"),
    col("PickupState").alias("Pickup State"),
    col("DropPincode").alias("Drop Pincode"),
    col("DropCity").alias("Drop City"),
    col("DropState").alias("Drop State"),
    col("DeliveryAttempts").alias("Number of delivery attempts needed")
).coalesce(1) \
 .write \
 .option("header", True) \
 .mode("overwrite") \
 .csv(output_path)

df_final.display()

In [0]:
'''
Output Summary Statistics as a CSV
Output a summary having:
- Mean/Median/Mode of days taken for delivery
- Mean/Median/Mode of delivery attemps
'''

In [0]:
from pyspark.sql import functions as F
from pyspark.sql import Window

# Step 1: Compute mean and median using built-in functions
summary_df = df_final.select(
    F.mean("DaysTakenForJourney").alias("Mean_DaysTakenForDelivery"),
    F.expr("percentile(DaysTakenForJourney, 0.5)").alias("Median_DaysTakenForDelivery"),
    F.mean("DeliveryAttempts").alias("Mean_DeliveryAttempts"),
    F.expr("percentile(DeliveryAttempts, 0.5)").alias("Median_DeliveryAttempts")
)

# Step 2: Compute mode for both columns
# Mode for DaysTakenForJourney
days_mode_df = (
    df_final.groupBy("DaysTakenForJourney")
    .count()
    .orderBy(F.desc("count"))
    .limit(1)
    .withColumnRenamed("DaysTakenForJourney", "Mode_DaysTakenForDelivery")
    .select("Mode_DaysTakenForDelivery")
)

# Mode for DeliveryAttempts
attempts_mode_df = (
    df_final.groupBy("DeliveryAttempts")
    .count()
    .orderBy(F.desc("count"))
    .limit(1)
    .withColumnRenamed("DeliveryAttempts", "Mode_DeliveryAttempts")
    .select("Mode_DeliveryAttempts")
)

# Step 3: Combine everything into one summary DataFrame
summary_final = summary_df.crossJoin(days_mode_df).crossJoin(attempts_mode_df)

# Step 4: Write summary to CSV
output_summary_path = "/Volumes/workspace/default/shipmentsummarystats/shipment_summary_stats_csv"

summary_final.coalesce(1) \
    .write \
    .option("header", True) \
    .mode("overwrite") \
    .csv(output_summary_path)

summary_final.display()
