# Table Existence


In [0]:

%sql
USE CATALOG workspace;
USE SCHEMA delivery_logistics;

SELECT * FROM delivery_logistic_raw LIMIT 10;

delivery_id,delivery_partner,package_type,vehicle_type,delivery_mode,region,weather_condition,distance_km,package_weight_kg,delivery_time_hours,expected_time_hours,delayed,delivery_status,delivery_rating,delivery_cost
250.99,delhivery,automobile parts,bike,same day,west,clear,297.0,46.96,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,3,1632.7206
250.99,xpressbees,cosmetics,ev van,express,central,cold,89.6,47.39,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,5,640.17
250.99,shadowfax,groceries,truck,two day,east,rainy,273.5,26.89,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,4,1448.17
250.99,dhl,electronics,ev van,same day,east,cold,269.7,12.69,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,3,1486.57
250.99,dhl,clothing,van,two day,north,foggy,256.7,37.02,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,4,1394.56
250.99,amazon logistics,documents,ev bike,express,west,rainy,48.4,33.15,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,yes,delayed,3,391.45
250.99,delhivery,groceries,scooter,same day,central,clear,198.3,43.79,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,3,1222.87
250.99,xpressbees,fragile items,van,same day,north,cold,114.6,42.63,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,3,800.89
250.99,blue dart,clothing,van,same day,south,hot,142.4,14.06,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,5,854.18
250.99,delhivery,pharmacy,truck,same day,east,foggy,47.1,29.28,1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,no,delivered,5,423.34


# Reading Table

In [0]:

spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA delivery_logistics")


df_raw = spark.table("delivery_logistic_raw")

df_raw.printSchema()
df_raw.show(5)
df_raw.columns

root
 |-- delivery_id: double (nullable = true)
 |-- delivery_partner: string (nullable = true)
 |-- package_type: string (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- delivery_mode: string (nullable = true)
 |-- region: string (nullable = true)
 |-- weather_condition: string (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- package_weight_kg: double (nullable = true)
 |-- delivery_time_hours: timestamp (nullable = true)
 |-- expected_time_hours: timestamp (nullable = true)
 |-- delayed: string (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- delivery_rating: long (nullable = true)
 |-- delivery_cost: double (nullable = true)

+-----------+----------------+----------------+------------+-------------+-------+-----------------+-----------+-----------------+-------------------+-------------------+-------+---------------+---------------+------------------+
|delivery_id|delivery_partner|    package_type|vehicle_type|delivery_mode| re

['delivery_id',
 'delivery_partner',
 'package_type',
 'vehicle_type',
 'delivery_mode',
 'region',
 'weather_condition',
 'distance_km',
 'package_weight_kg',
 'delivery_time_hours',
 'expected_time_hours',
 'delayed',
 'delivery_status',
 'delivery_rating',
 'delivery_cost']

# Cleaning and type casting
##  clean numeric columns

In [0]:
from pyspark.sql import functions as F

df = df_raw


df = df.dropna(subset=[
    "distance_km",
    "package_weight_kg",
    "delivery_time_hours",
    "expected_time_hours",
    "delivery_cost"
])


df = (
    df.withColumn("distance_km", F.col("distance_km").cast("double"))
      .withColumn("package_weight_kg", F.col("package_weight_kg").cast("double"))
      .withColumn("delivery_time_hours", F.col("delivery_time_hours").cast("double"))
      .withColumn("expected_time_hours", F.col("expected_time_hours").cast("double"))
      .withColumn("delivery_cost", F.col("delivery_cost").cast("double"))
      .withColumn("delivery_rating", F.col("delivery_rating").cast("double"))
)

# Cleaning and type casting
##  fix the delayed column

In [0]:

df = df.withColumn("delayed_str", F.col("delayed").cast("string"))

df = df.withColumn(
    "delayed_flag",
    F.when(F.lower("delayed_str").isin("yes", "y", "1", "true"), 1).otherwise(0)
)


df = df.drop("delayed", "delayed_str").withColumnRenamed("delayed_flag", "delayed")

# Cleaning and type casting
##  adding derived columns(delay,speed, cost per km)

In [0]:
# delay_hours = actual - expected
df = df.withColumn(
    "delay_hours",
    F.round(F.col("delivery_time_hours") - F.col("expected_time_hours"), 2)
)

# On-time vs Delayed label
df = df.withColumn(
    "on_time",
    F.when(F.col("delay_hours") <= 0, F.lit("On-time")).otherwise("Delayed")
)

# Speed in km/h
df = df.withColumn(
    "speed_kmph",
    F.when(F.col("delivery_time_hours") > 0,
           F.round(F.col("distance_km") / F.col("delivery_time_hours"), 2))
     .otherwise(F.lit(None))
)

# Cost per km
df = df.withColumn(
    "cost_per_km",
    F.when(F.col("distance_km") > 0,
           F.round(F.col("delivery_cost") / F.col("distance_km"), 2))
     .otherwise(F.lit(None))
)

df.select(
    "delivery_id", "region", "weather_condition",
    "distance_km", "delivery_time_hours", "expected_time_hours",
    "delay_hours", "on_time", "speed_kmph", "delivery_cost", "cost_per_km",
    "delayed"
).show(10)

+-----------+-------+-----------------+-----------+-------------------+-------------------+-----------+-------+----------+------------------+-----------+-------+
|delivery_id| region|weather_condition|distance_km|delivery_time_hours|expected_time_hours|delay_hours|on_time|speed_kmph|     delivery_cost|cost_per_km|delayed|
+-----------+-------+-----------------+-----------+-------------------+-------------------+-----------+-------+----------+------------------+-----------+-------+
|     250.99|   west|            clear|      297.0|                0.0|                0.0|        0.0|On-time|      NULL|1632.7205999999999|        5.5|      0|
|     250.99|central|             cold|       89.6|                0.0|                0.0|        0.0|On-time|      NULL|            640.17|       7.14|      0|
|     250.99|   east|            rainy|      273.5|                0.0|                0.0|        0.0|On-time|      NULL|           1448.17|       5.29|      0|
|     250.99|   east|       

# Cleaning and type casting
##  Making delivery_id column unique(no duplicates)


In [0]:
from pyspark.sql.window import Window


w = Window.partitionBy("delivery_id").orderBy("delivery_time_hours")

df = df.withColumn(
    "delivery_id_unique",
    F.concat(F.col("delivery_id"), F.lit("-"), F.row_number().over(w))
)


df = df.drop("delivery_id").withColumnRenamed("delivery_id_unique", "delivery_id")


(
    df.groupBy("delivery_id")
      .count()
      .filter("count > 1")
      .show()
)


+-----------+-----+
|delivery_id|count|
+-----------+-----+
+-----------+-----+



# Saving clean data as a new Table
## saving as delivery_logistic_clean 

In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA delivery_logistics")

df.write.format("delta").saveAsTable(
    "workspace.delivery_logistics.delivery_logistic_clean"
)

print("Saved cleaned table as workspace.delivery_logistics.delivery_logistic_clean")

Saved cleaned table as workspace.delivery_logistics.delivery_logistic_clean


# Saving clean data as a new Table 

## verifying the new table

In [0]:
df_clean = spark.table("workspace.delivery_logistics.delivery_logistic_clean")

df_clean.printSchema()
df_clean.show(5)

root
 |-- delivery_partner: string (nullable = true)
 |-- package_type: string (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- delivery_mode: string (nullable = true)
 |-- region: string (nullable = true)
 |-- weather_condition: string (nullable = true)
 |-- distance_km: double (nullable = true)
 |-- package_weight_kg: double (nullable = true)
 |-- delivery_time_hours: double (nullable = true)
 |-- expected_time_hours: double (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- delivery_rating: double (nullable = true)
 |-- delivery_cost: double (nullable = true)
 |-- delayed: integer (nullable = true)
 |-- delay_hours: double (nullable = true)
 |-- on_time: string (nullable = true)
 |-- speed_kmph: double (nullable = true)
 |-- cost_per_km: double (nullable = true)
 |-- delivery_id: string (nullable = true)

+----------------+----------------+------------+-------------+-------+-----------------+-----------+-----------------+-------------------+-----