In [45]:
!hdfs dfs -ls /tmp    #Here we have Raw data and Distributed 

Found 6 items
-rw-r--r--   2 root hadoop     231442 2025-04-19 11:28 /tmp/Global_Superstore.csv
-rw-r--r--   2 root hadoop     149601 2025-04-19 11:30 /tmp/Logistic_Shipment.csv
drwxr-xr-x   - root hadoop          0 2025-04-19 22:16 /tmp/Logistic_Shipment_KPIs.csv
drwxr-xr-x   - root hadoop          0 2025-04-19 14:44 /tmp/Logistic_shipments_new
drwxrwxrwt   - hdfs hadoop          0 2025-04-18 23:55 /tmp/hadoop-yarn
drwx-wx-wx   - hive hadoop          0 2025-04-18 23:56 /tmp/hive


ETL with PySpark

In [3]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import * 

In [4]:
spark = SparkSession.builder.appName("Logistics Shipment ETL").getOrCreate()   # Create SparkSession 

25/04/19 21:42:21 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [5]:
df=spark.read.option("header",True).csv("hdfs:///tmp/Logistic_Shipment.csv")  #--- here we have to put Hadoop file Storage system path

                                                                                

In [7]:
df   #--- Here it is just reading columns , untill we do  not perform action over that dataframe.

DataFrame[shipment_id: string, origin_city: string, destination_city: string, dispatch_date: string, delivery_date: string, carrier_name: string, weight_kg: string, cost_usd: string, delivery_status: string, delay_reason: string]

In [8]:
df.show(2)  #--- here we Perform action  

+-----------+-----------+----------------+-------------+-------------+------------+---------+--------+---------------+------------+
|shipment_id|origin_city|destination_city|dispatch_date|delivery_date|carrier_name|weight_kg|cost_usd|delivery_status|delay_reason|
+-----------+-----------+----------------+-------------+-------------+------------+---------+--------+---------------+------------+
|    SHP0001|     Jeddah|          Riyadh|   2025-03-24|   2025-03-27|      Aramex|    105.8|  107.44|     In-Transit|        null|
|    SHP0002|   Buraidah|          Jeddah|   2025-03-19|   2025-03-23|      Aramex|     57.4|   37.48|      Delivered|        null|
+-----------+-----------+----------------+-------------+-------------+------------+---------+--------+---------------+------------+
only showing top 2 rows



In [9]:
df.printSchema()   # here it will provide datatype of dataframe or You can make it with your own choice.but Here we will not change because 
                    # I already made suitable datatype in mysql.

root
 |-- shipment_id: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- dispatch_date: string (nullable = true)
 |-- delivery_date: string (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- weight_kg: string (nullable = true)
 |-- cost_usd: string (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- delay_reason: string (nullable = true)



In [13]:
df = df.withColumn("delivery_date", to_date("delivery_date", "yyyy-MM-dd")) \
       .withColumn("dispatch_date", to_date("dispatch_date", "yyyy-MM-dd"))   #--- change string format of date to date format. 

In [14]:
df.printSchema()   # Now it's changed ,  delivery_date: date (nullable = true)

root
 |-- shipment_id: string (nullable = true)
 |-- origin_city: string (nullable = true)
 |-- destination_city: string (nullable = true)
 |-- dispatch_date: date (nullable = true)
 |-- delivery_date: date (nullable = true)
 |-- carrier_name: string (nullable = true)
 |-- weight_kg: string (nullable = true)
 |-- cost_usd: string (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- delay_reason: string (nullable = true)



In [None]:
df = df.withColumn("delivery_days", datediff("delivery_date", "dispatch_date")) 

In [32]:
df = df.withColumn("dispatch_date", to_date("dispatch_date")) \
       .withColumn("delivery_date", to_date("delivery_date"))

# Add delivery_days as benchamrk (
df = df.withColumn("delivery_days", datediff("delivery_date", "dispatch_date")) \
       .withColumn("on_time", (col("delivery_days") <= 3).cast("int"))

# On-time delivery rate per destination city 
on_time_by_destination = df.filter(col("delivery_status") == "Delivered").groupBy("destination_city", "carrier_name").agg(expr("avg(on_time) as on_time_delivery_rate")) 

In [33]:
on_time_by_destination.show()

+----------------+------------+---------------------+
|destination_city|carrier_name|on_time_delivery_rate|
+----------------+------------+---------------------+
|           Tabuk|       FedEx|   0.5609756097560976|
|           Jazan|      Aramex|                 0.55|
|          Dammam|       FedEx|   0.6666666666666666|
|           Jazan|       FedEx|   0.5526315789473685|
|          Dammam|      Aramex|   0.6136363636363636|
|          Jeddah|      Aramex|   0.5769230769230769|
|            Abha|      Aramex|   0.4857142857142857|
|          Makkah|      Aramex|   0.5897435897435898|
|        Buraidah|       FedEx|   0.5714285714285714|
|          Makkah|       FedEx|   0.6470588235294118|
|         Madinah|      Aramex|   0.6739130434782609|
|          Riyadh|         DHL|                  0.7|
|         Madinah|       FedEx|   0.6060606060606061|
|        Buraidah|         DHL|  0.47619047619047616|
|          Dammam|         DHL|                 0.65|
|            Abha|       Fed

In [34]:
df_delay = df.withColumn("delay_days", (col("delivery_days") - 3)) \
             .filter((col("delivery_status") == "Delivered") & (col("delivery_days") > 3))

avg_delay_by_destination = df_delay.groupBy("destination_city", "carrier_name") \
    .agg(expr("avg(delay_days) as avg_delay_days"))
 

In [35]:
avg_delay_by_destination.show()

[Stage 21:>                                                         (0 + 1) / 1]

+----------------+------------+------------------+
|destination_city|carrier_name|    avg_delay_days|
+----------------+------------+------------------+
|           Tabuk|       FedEx|               1.5|
|           Jazan|      Aramex|1.3333333333333333|
|          Dammam|       FedEx|1.5333333333333334|
|           Jazan|       FedEx| 1.588235294117647|
|          Dammam|      Aramex|1.3529411764705883|
|          Jeddah|      Aramex|1.2727272727272727|
|            Abha|      Aramex|1.3888888888888888|
|          Makkah|      Aramex|             1.375|
|        Buraidah|       FedEx|1.6111111111111112|
|          Makkah|       FedEx|1.3333333333333333|
|         Madinah|      Aramex|1.5333333333333334|
|          Riyadh|         DHL|1.5833333333333333|
|         Madinah|       FedEx|1.3846153846153846|
|        Buraidah|         DHL|1.3636363636363635|
|          Dammam|         DHL|1.2857142857142858|
|            Abha|       FedEx|              1.45|
|          Riyadh|       FedEx|

                                                                                

In [29]:
carrier_dest_perf = df.filter(col("delivery_status") == "Delivered") \
    .withColumn("delivery_days", datediff("delivery_date", "dispatch_date")) \
    .withColumn("on_time", (col("delivery_days") <= 3).cast("int")) \
    .groupBy("destination_city", "carrier_name") \
    .agg(expr("avg(on_time) as on_time_rate")) 

In [31]:
carrier_dest_perf.show() 

+----------------+------------+-------------------+
|destination_city|carrier_name|       on_time_rate|
+----------------+------------+-------------------+
|           Tabuk|       FedEx| 0.5609756097560976|
|           Jazan|      Aramex|               0.55|
|          Dammam|       FedEx| 0.6666666666666666|
|           Jazan|       FedEx| 0.5526315789473685|
|          Dammam|      Aramex| 0.6136363636363636|
|          Jeddah|      Aramex| 0.5769230769230769|
|            Abha|      Aramex| 0.4857142857142857|
|          Makkah|      Aramex| 0.5897435897435898|
|        Buraidah|       FedEx| 0.5714285714285714|
|          Makkah|       FedEx| 0.6470588235294118|
|         Madinah|      Aramex| 0.6739130434782609|
|          Riyadh|         DHL|                0.7|
|         Madinah|       FedEx| 0.6060606060606061|
|        Buraidah|         DHL|0.47619047619047616|
|          Dammam|         DHL|               0.65|
|            Abha|       FedEx| 0.5918367346938775|
|          R

In [38]:
shipment_volume_trend = df.withColumn("month", date_format("dispatch_date", "yyyy-MM")) \
    .groupBy("destination_city","carrier_name") \
    .count() 

In [40]:
shipment_volume_trend.show() 

+----------------+------------+-----+
|destination_city|carrier_name|count|
+----------------+------------+-----+
|           Tabuk|       FedEx|   71|
|           Jazan|      Aramex|   69|
|          Dammam|       FedEx|   75|
|           Jazan|       FedEx|   78|
|          Dammam|      Aramex|   72|
|          Jeddah|      Aramex|   74|
|            Abha|      Aramex|   62|
|          Makkah|      Aramex|   66|
|        Buraidah|       FedEx|   77|
|          Makkah|       FedEx|   74|
|         Madinah|      Aramex|   76|
|          Riyadh|         DHL|   85|
|         Madinah|       FedEx|   60|
|        Buraidah|         DHL|   84|
|          Dammam|         DHL|   65|
|            Abha|       FedEx|   80|
|          Riyadh|       FedEx|   78|
|          Jeddah|       FedEx|   76|
|           Tabuk|         DHL|   81|
|          Riyadh|      Aramex|   77|
+----------------+------------+-----+
only showing top 20 rows



In [43]:
result_df = on_time_by_destination \
    .join(avg_delay_by_destination, ["destination_city", "carrier_name"], "left") \
    .join(carrier_dest_perf, ["destination_city", "carrier_name"], "left") \
    .join(shipment_volume_trend, ["destination_city", "carrier_name"], "left")

# Final Result Preview
result_df.show(truncate=False)  

                                                                                

+----------------+------------+---------------------+------------------+-------------------+-----+
|destination_city|carrier_name|on_time_delivery_rate|avg_delay_days    |on_time_rate       |count|
+----------------+------------+---------------------+------------------+-------------------+-----+
|Tabuk           |FedEx       |0.5609756097560976   |1.5               |0.5609756097560976 |71   |
|Jazan           |Aramex      |0.55                 |1.3333333333333333|0.55               |69   |
|Dammam          |FedEx       |0.6666666666666666   |1.5333333333333334|0.6666666666666666 |75   |
|Jazan           |FedEx       |0.5526315789473685   |1.588235294117647 |0.5526315789473685 |78   |
|Dammam          |Aramex      |0.6136363636363636   |1.3529411764705883|0.6136363636363636 |72   |
|Jeddah          |Aramex      |0.5769230769230769   |1.2727272727272727|0.5769230769230769 |74   |
|Abha            |Aramex      |0.4857142857142857   |1.3888888888888888|0.4857142857142857 |62   |
|Makkah   

In [44]:
result_df.coalesce(1).write.mode("overwrite").option("header", True).csv("hdfs:///tmp/Logistic_Shipment_KPIs.csv")

                                                                                