In [1]:
from pyspark.sql import SparkSession
import os
import time

os.environ['HADOOP_USER_NAME'] = 'root'

print("Waiting 30 seconds for HDFS cluster to synchronize...")
time.sleep(30)

spark = SparkSession.builder \
    .appName("Final_Connection_Test") \
    .master("local[*]") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://namenode:8020") \
    .config("spark.hadoop.dfs.client.use.datanode.hostname", "true") \
    .getOrCreate()

try:
    # Task 1: Load and Ingest
    print("Reading local CSV...")
    df = spark.read.csv("file:///home/jovyan/work/smart_logistics_dataset.csv", header=True, inferSchema=True)
    
    print("Writing to HDFS Warehouse...")
    df.write.mode("overwrite").parquet("hdfs://namenode:8020/data/logistics_input")
    
    print("✅ SUCCESS! The Batch Layer storage is fully operational.")
    print("You can now proceed to the analysis tasks (groupBy, Window functions, etc.)")
    
except Exception as e:
    print(f"❌ Connection still struggling. Error: {e}")

Waiting 30 seconds for HDFS cluster to synchronize...
Reading local CSV...
Writing to HDFS Warehouse...
✅ SUCCESS! The Batch Layer storage is fully operational.
You can now proceed to the analysis tasks (groupBy, Window functions, etc.)


In [3]:
#Read & Explore the data 
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# --- Task 1: Data Setup & Ingestion (Already confirmed, but we load it here) ---
hdfs_input_path = "hdfs://namenode:8020/data/logistics_input"
df = spark.read.parquet(hdfs_input_path)

df.printSchema()

root
 |-- Timestamp: string (nullable = true)
 |-- Asset_ID: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Inventory_Level: integer (nullable = true)
 |-- Shipment_Status: string (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Humidity: double (nullable = true)
 |-- Traffic_Status: string (nullable = true)
 |-- Waiting_Time: integer (nullable = true)
 |-- User_Transaction_Amount: integer (nullable = true)
 |-- User_Purchase_Frequency: integer (nullable = true)
 |-- Logistics_Delay_Reason: string (nullable = true)
 |-- Asset_Utilization: double (nullable = true)
 |-- Demand_Forecast: integer (nullable = true)
 |-- Logistics_Delay: integer (nullable = true)



In [8]:
#getting a look at the table
df.limit(10).toPandas()

Unnamed: 0,Timestamp,Asset_ID,Latitude,Longitude,Inventory_Level,Shipment_Status,Temperature,Humidity,Traffic_Status,Waiting_Time,User_Transaction_Amount,User_Purchase_Frequency,Logistics_Delay_Reason,Asset_Utilization,Demand_Forecast,Logistics_Delay
0,3/20/2024 0:11,Truck_7,-65.7383,11.2497,390,Delayed,27.0,67.8,Detour,38,320,4,,60.1,285,1
1,10/30/2024 7:53,Truck_6,22.2748,-131.7086,491,In Transit,22.5,54.3,Heavy,16,439,7,Weather,80.9,174,1
2,7/29/2024 18:42,Truck_10,54.9232,79.5455,190,In Transit,25.2,62.2,Detour,34,355,3,,99.2,260,0
3,10/28/2024 0:50,Truck_9,42.39,-1.4788,330,Delivered,25.4,52.3,Heavy,37,227,5,Traffic,97.4,160,1
4,9/27/2024 15:52,Truck_7,-65.8477,47.9468,480,Delayed,20.5,57.2,Clear,56,197,6,,71.6,270,1
5,9/17/2024 6:02,Truck_7,73.3312,46.5831,118,In Transit,24.3,61.8,Clear,56,258,10,,66.8,189,0
6,2/5/2024 8:38,Truck_4,27.9307,147.5317,480,Delayed,20.7,75.4,Clear,32,263,3,,73.3,198,1
7,9/28/2024 9:08,Truck_9,46.5643,-126.5348,222,In Transit,23.3,64.2,Detour,30,459,9,Traffic,73.8,253,0
8,1/18/2024 8:27,Truck_9,-83.6737,-31.7125,245,Delivered,26.4,77.2,Clear,14,183,2,,69.6,206,0
9,11/9/2024 0:48,Truck_2,54.1683,-135.2676,389,In Transit,21.9,57.3,Clear,52,127,7,Mechanical Failure,63.1,224,0


In [9]:
df.describe().toPandas()

Unnamed: 0,summary,Timestamp,Asset_ID,Latitude,Longitude,Inventory_Level,Shipment_Status,Temperature,Humidity,Traffic_Status,Waiting_Time,User_Transaction_Amount,User_Purchase_Frequency,Logistics_Delay_Reason,Asset_Utilization,Demand_Forecast,Logistics_Delay
0,count,1000,1000,1000.0,1000.0,1000.0,1000,1000.0,1000.0,1000,1000.0,1000.0,1000.0,1000,1000.0,1000.0,1000.0
1,mean,,,-1.3600925999999978,0.8370488999999981,297.915,,23.89390000000001,65.04219999999997,,35.062,299.055,5.513,,79.59910000000004,199.284,0.566
2,stddev,,,51.997182824074486,104.8436181288404,113.55477295764128,,3.3221784509053194,8.753765336560466,,14.477767844587152,117.78779197751749,2.9353785867324578,,11.631152690206711,59.92084655486645,0.4958728565770565
3,min,1/1/2024 11:37,Truck_1,-89.7915,-179.8202,100.0,Delayed,18.0,50.0,Clear,10.0,100.0,1.0,Mechanical Failure,60.0,100.0,0.0
4,max,9/9/2024 5:33,Truck_9,89.8701,179.9237,500.0,In Transit,30.0,80.0,Heavy,60.0,500.0,10.0,Weather,100.0,300.0,1.0


In [10]:
#Register the Dataframe as a SQL Table 
df.createOrReplaceTempView("logistics_table")

# Using SQL to explore attributes
spark.sql(
"""
    Select * from logistics_table
    Limit 10
"""
).toPandas()

Unnamed: 0,Timestamp,Asset_ID,Latitude,Longitude,Inventory_Level,Shipment_Status,Temperature,Humidity,Traffic_Status,Waiting_Time,User_Transaction_Amount,User_Purchase_Frequency,Logistics_Delay_Reason,Asset_Utilization,Demand_Forecast,Logistics_Delay
0,3/20/2024 0:11,Truck_7,-65.7383,11.2497,390,Delayed,27.0,67.8,Detour,38,320,4,,60.1,285,1
1,10/30/2024 7:53,Truck_6,22.2748,-131.7086,491,In Transit,22.5,54.3,Heavy,16,439,7,Weather,80.9,174,1
2,7/29/2024 18:42,Truck_10,54.9232,79.5455,190,In Transit,25.2,62.2,Detour,34,355,3,,99.2,260,0
3,10/28/2024 0:50,Truck_9,42.39,-1.4788,330,Delivered,25.4,52.3,Heavy,37,227,5,Traffic,97.4,160,1
4,9/27/2024 15:52,Truck_7,-65.8477,47.9468,480,Delayed,20.5,57.2,Clear,56,197,6,,71.6,270,1
5,9/17/2024 6:02,Truck_7,73.3312,46.5831,118,In Transit,24.3,61.8,Clear,56,258,10,,66.8,189,0
6,2/5/2024 8:38,Truck_4,27.9307,147.5317,480,Delayed,20.7,75.4,Clear,32,263,3,,73.3,198,1
7,9/28/2024 9:08,Truck_9,46.5643,-126.5348,222,In Transit,23.3,64.2,Detour,30,459,9,Traffic,73.8,253,0
8,1/18/2024 8:27,Truck_9,-83.6737,-31.7125,245,Delivered,26.4,77.2,Clear,14,183,2,,69.6,206,0
9,11/9/2024 0:48,Truck_2,54.1683,-135.2676,389,In Transit,21.9,57.3,Clear,52,127,7,Mechanical Failure,63.1,224,0


In [11]:
# Task 1 Calculate the Average Asset Utilization and total Delay Count for every unique Asset_ID 
# historically in spark
asset_scores = df.groupBy("Asset_ID").agg(
    F.round(F.avg("Asset_Utilization"), 4 ).alias("avg_asset_utilization"),
    F.count("Logistics_Delay_Reason").alias("Total_Delay_Count")
)
asset_scores.show()

+--------+---------------------+-----------------+
|Asset_ID|avg_asset_utilization|Total_Delay_Count|
+--------+---------------------+-----------------+
|Truck_10|              79.5752|              105|
| Truck_9|              79.9436|               94|
| Truck_6|              79.0262|              103|
| Truck_3|              80.0978|               93|
| Truck_7|              80.6696|              102|
| Truck_5|              76.5957|               93|
| Truck_8|               80.245|              109|
| Truck_4|              79.0607|              107|
| Truck_2|              80.1552|              105|
| Truck_1|              80.5169|               89|
+--------+---------------------+-----------------+



In [14]:
#Getting that same results in SQL
asset_scores_sql = spark.sql(
"""
    Select 
        Asset_ID,
        Avg(Asset_Utilization) as avg_asset_utilization,
        count(Logistics_Delay_Reason) as Total_Delay_Count
    from logistics_table
    group by Asset_ID
    order by Total_Delay_Count desc
"""
)
asset_scores_sql.toPandas()

Unnamed: 0,Asset_ID,avg_asset_utilization,Total_Delay_Count
0,Truck_8,80.244954,109
1,Truck_4,79.060748,107
2,Truck_10,79.575238,105
3,Truck_2,80.155238,105
4,Truck_6,79.026214,103
5,Truck_7,80.669608,102
6,Truck_9,79.943617,94
7,Truck_3,80.097849,93
8,Truck_5,76.595699,93
9,Truck_1,80.516854,89


In [17]:
# Task 3 - Identify the Top 5 most frequent Logistics_Delay_Reason across the entire dataset in Spark
top_delay_reasons = df.groupBy("Logistics_Delay_Reason")\
                    .count()\
                    .orderBy(F.desc("count")) \
                    .limit(5)
top_delay_reasons.show()

+----------------------+-----+
|Logistics_Delay_Reason|count|
+----------------------+-----+
|               Weather|  267|
|                  None|  263|
|               Traffic|  236|
|    Mechanical Failure|  234|
+----------------------+-----+



In [19]:
# Identify the Top 5 most frequent Logistics_Delay_Reason across the entire dataset in SQL
top_delay_reasons_sql = spark.sql(
    """
    SELECT 
        Logistics_Delay_Reason, 
        count(Logistics_Delay_Reason) as delay_count
    from logistics_table
    group by Logistics_Delay_Reason
    order by count(Logistics_Delay_Reason) desc
    """
)
top_delay_reasons_sql.toPandas()

Unnamed: 0,Logistics_Delay_Reason,delay_count
0,Weather,267
1,,263
2,Traffic,236
3,Mechanical Failure,234


In [21]:
# Task 4
# Find the Top 10 regions (based on grouped Latitude/Longitude) with the highest average Waiting_Time.
# we will use a window function to get the rank of each "Latitude/Longitude" pair
window_spec = Window.orderBy(F.desc("Avg_Waiting_Time"))
route_rankings = df.groupBy("Latitude", "Longitude")\
                .agg(F.avg("Waiting_Time").alias("Avg_Waiting_Time") )\
                .withColumn("Rank", F.rank().over(window_spec)) \
                .filter(F.col("Rank") <= 10 )

route_rankings.show()

+--------+---------+----------------+----+
|Latitude|Longitude|Avg_Waiting_Time|Rank|
+--------+---------+----------------+----+
|-27.8944|  104.517|            60.0|   1|
|-36.3388|  69.5488|            60.0|   1|
|-58.2884| -53.9833|            60.0|   1|
| -59.602|   62.712|            60.0|   1|
| 89.3898| -93.1034|            60.0|   1|
| 49.9197|  79.4653|            60.0|   1|
| 25.5003| -47.5718|            60.0|   1|
|-10.9298| -89.0306|            60.0|   1|
|-46.3051| -93.1541|            60.0|   1|
| 69.7608| -77.0188|            60.0|   1|
|-79.5331|-178.6658|            60.0|   1|
| -3.4094| -85.8774|            60.0|   1|
| 43.4429|-106.3154|            60.0|   1|
|  53.517| 152.1576|            60.0|   1|
|-35.9441|-133.4183|            60.0|   1|
| 58.6218| 133.4487|            60.0|   1|
| 83.2858| -66.1069|            60.0|   1|
| 61.8446| 115.5236|            60.0|   1|
+--------+---------+----------------+----+



In [31]:
# task 4 in SQL
routings_rank = spark.sql(
"""
with cte as (
Select 
    Latitude, 
    Longitude,
    avg(Waiting_Time) as Avg_Waiting_Time
    from logistics_table
    group by 
        Latitude, Longitude
)
select *,
    DENSE_RANK() over (order by Avg_Waiting_Time desc) as ranking /*Dense rank for ties */
from CTE
    Order by 
        Avg_Waiting_Time desc
"""
)
routings_rank.limit(10).toPandas()

Unnamed: 0,Latitude,Longitude,Avg_Waiting_Time,ranking
0,-27.8944,104.517,60.0,1
1,-36.3388,69.5488,60.0,1
2,-58.2884,-53.9833,60.0,1
3,-59.602,62.712,60.0,1
4,89.3898,-93.1034,60.0,1
5,49.9197,79.4653,60.0,1
6,25.5003,-47.5718,60.0,1
7,-10.9298,-89.0306,60.0,1
8,-46.3051,-93.1541,60.0,1
9,69.7608,-77.0188,60.0,1


In [32]:
# --- Task 5: Batch Result Storage ---
# Objective: Store results back to HDFS in Parquet format
output_base = "hdfs://namenode:8020/data/output"

asset_scores.write.mode("overwrite").parquet(f"{output_base}/asset_scores")
top_delay_reasons.write.mode("overwrite").parquet(f"{output_base}/top_delays")
route_rankings.write.mode("overwrite").parquet(f"{output_base}/route_rankings")

print("--- All Batch Layer Deliverables Completed and Saved to HDFS ---")

--- All Batch Layer Deliverables Completed and Saved to HDFS ---
