In [0]:
%spark.pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import col, to_date, datediff, when
from pyspark.sql.functions import sum as _sum, round, countDistinct

spark = SparkSession.builder.appName("ETL").enableHiveSupport().getOrCreate()

In [1]:
%spark.pyspark

# Load data and assign column names

logistics_df = spark.read.csv("/staging_zone/logistics/logistics_data", header=False, inferSchema=True) \
    .toDF("logistics_id", "order_id", "estimated_delivery_date", "actual_delivery_date", "shipping_cost", "warehouse_id")

order_lines = spark.read.csv("/staging_zone/staging_orderlines/staging_order_lines_data", header=False, inferSchema=True) \
    .toDF("order_id", "product_id", "quantity", "price")

In [2]:
%spark.pyspark
# Inspect schema and sample data for logistics

logistics_df.printSchema()
logistics_df.show(5)

In [3]:
%spark.pyspark
# Inspect schema and sample data for order lines

order_lines.printSchema()
order_lines.show(5)

In [4]:
%spark.pyspark

# Clean and transform logistics dates: 

logistics_df = logistics_df.filter(col("estimated_delivery_date").isNotNull() & col("actual_delivery_date").isNotNull()) \
    .withColumn("estimated_delivery_date", to_date(col("estimated_delivery_date"), "yyyy-MM-dd")) \
    .withColumn("actual_delivery_date", to_date(col("actual_delivery_date"), "yyyy-MM-dd"))

In [5]:
%spark.pyspark

# Create logistics fact table:

logistics_fact = logistics_df \
    .withColumn("delivery_time_delta_days", datediff(col("actual_delivery_date") ,col("estimated_delivery_date"))) \
    .withColumn("is_late_delivery", when(col("delivery_time_delta_days") > 0, 1).otherwise(0)) \
    .select("order_id", "delivery_time_delta_days", "is_late_delivery", "shipping_cost", "warehouse_id")
    
logistics_fact.show(10)

In [6]:
%spark.pyspark
# Join logistics fact with order lines to get revenue

logistics_fact_joined = logistics_fact.join(order_lines, on="order_id", how="inner") \
                        .withColumn("total_revenue",round(col("quantity") * col("price"), 2))
                        
logistics_fact_joined.show(10)

In [7]:
%spark.pyspark
# Check if an order is served by multiple warehouses

order_warehouse_check = logistics_fact.groupBy("order_id") \
                        .agg(countDistinct("warehouse_id").alias("warehouse_count")) \
                        .withColumn("is_multi_warehouse", when(col("warehouse_count") > 1, 1).otherwise(0)) \
                        .filter(col("warehouse_count") > 1) 
                        
order_warehouse_check.show()

In [8]:
%spark.pyspark

# Aggregate order-level summary:

order_summary = logistics_fact_joined.groupBy("order_id", "warehouse_id", "is_late_delivery", "shipping_cost") \
    .agg(round(sum("total_revenue"), 2).alias("total_revenue_per_order")) \
    .orderBy("order_id")

order_summary.show(10)

In [9]:
%spark.pyspark

# Aggregate warehouse-level summary:

warehouse_summary = order_summary.groupBy("warehouse_id") \
    .agg(
        round(_sum("total_revenue_per_order"), 2).alias("total_revenue"),
        round(_sum("shipping_cost"), 2).alias("total_shipping_cost"),
        sum("is_late_delivery").alias("late_deliveries"),
        countDistinct("order_id").alias("total_orders")) \
    .withColumn("late_delivery_rate", round((col("late_deliveries") / col("total_orders")) * 100, 2)) \
    .withColumn("avg_shipping_cost_per_order", round(col("total_shipping_cost") / col("total_orders"), 2)) \
    .orderBy("warehouse_id")

warehouse_summary.show(10)

In [10]:
%spark.pyspark

# Warehouse efficiency analysis: calculate ratio of shipping cost to total revenue

warehouse_eff = warehouse_summary.withColumn("shipping_to_revenue_ratio",
            round(col("total_shipping_cost") / col("total_revenue"), 3)) \
            .select("warehouse_id", "shipping_to_revenue_ratio")

warehouse_eff.show()

In [11]:
%spark.pyspark

# Delivery performance per warehouse: average delivery delay in days

delivery_perf = logistics_fact.groupBy("warehouse_id") \
    .agg(round(avg("delivery_time_delta_days"), 2).alias("avg_delivery_time_delta")) \
    .orderBy("warehouse_id")

delivery_perf.show()

In [12]:
%spark.pyspark

# Product-level analysis of late deliveries:

product_delay = logistics_fact_joined.groupBy("product_id") \
    .agg(sum("is_late_delivery").alias("late_deliveries"),
         countDistinct("order_id").alias("total_orders")) \
    .withColumn("late_rate", round(col("late_deliveries") / col("total_orders") * 100, 2)) \
    .orderBy(desc("late_rate"))

product_delay.show(10)

In [13]:
%spark.pyspark

# Save Fact Table
logistics_fact.write.mode("overwrite").parquet("/staging_zone/staging_logistics_fact")

# Save Order Summary
order_summary.write.mode("overwrite").parquet("/staging_zone/order_summary")

# Save Warehouse Summary
warehouse_summary.write.mode("overwrite").parquet("/staging_zone/warehouse_summary")

# Save Product Delay
product_delay.write.mode("overwrite").parquet("/staging_zone/product_delay_summary")

In [14]:
%spark.pyspark
# Register Temp Views for Spark SQL queries

logistics_fact.createOrReplaceTempView("logistics_fact")
order_summary.createOrReplaceTempView("order_summary")
warehouse_summary.createOrReplaceTempView("warehouse_summary")
product_delay.createOrReplaceTempView("product_delay_summary")

In [15]:
%spark.sql
-- Spark SQL Query: Calculate average delivery time delta (days) per warehouse

SELECT warehouse_id,
    ROUND(AVG(delivery_time_delta_days), 2) AS avg_delivery_time_delta
FROM logistics_fact
GROUP BY warehouse_id
ORDER BY warehouse_id;

In [16]:
%spark.sql

-- Top warehouses with highest late delivery rate

SELECT warehouse_id, late_delivery_rate
FROM warehouse_summary
ORDER BY late_delivery_rate DESC;

In [17]:
%spark.sql

-- Top 10 products causing delays

SELECT product_id, late_rate, late_deliveries, total_orders
FROM product_delay_summary
ORDER BY late_rate DESC
LIMIT 10;

In [18]:
%spark.sql

-- Revenue vs shipping cost ratio per warehouse

SELECT warehouse_id,
       total_revenue,
       total_shipping_cost,
       ROUND(total_shipping_cost/total_revenue,3) AS shipping_to_revenue_ratio
FROM warehouse_summary
ORDER BY shipping_to_revenue_ratio DESC;

In [19]:
%spark.sql

-- Warehouse efficiency vs late deliveries

SELECT 
    warehouse_id,
    COUNT(order_id) AS total_orders,
    SUM(is_late_delivery) AS late_deliveries,
    ROUND(SUM(total_revenue_per_order), 2) AS total_revenue,
    ROUND(SUM(shipping_cost), 2) AS total_shipping_cost,
    ROUND(SUM(total_revenue_per_order)/SUM(shipping_cost), 2) AS revenue_to_shipping_ratio,
    ROUND(SUM(is_late_delivery)/COUNT(order_id)*100, 2) AS late_delivery_rate_percentage
FROM order_summary
GROUP BY warehouse_id
ORDER BY revenue_to_shipping_ratio DESC, late_delivery_rate_percentage ASC;