In [1]:

from pyspark.sql import SparkSession, functions as F, types as T, Window

spark = SparkSession.builder.appName("LogisticsPipeline").config("spark.sql.shuffle.partitions","8").getOrCreate()


delivery_data = [
("DLV001","Delhi ","D001","Delivered","120","2024-01-05 10:30"),
("DLV002","Mumbai","D002","Delivered","90","05/01/2024 11:00"),
("DLV003","Bangalore","D003","In Transit","200","2024/01/06 09:45"),
("DLV004","Delhi","D004","Cancelled","","2024-01-07 14:00"),
("DLV005","Chennai","D002","Delivered","invalid","2024-01-08 16:20"),
("DLV006","Mumbai","D005","Delivered",None,"2024-01-08 18:10"),
("DLV007","Delhi","D001","Delivered","140","09-01-2024 12:30"),
("DLV008","Bangalore","D003","Delivered","160","2024-01-09 15:45"),
("DLV009","Mumbai","D004","Delivered","110","2024-01-10 13:20"),
("DLV009","Mumbai","D004","Delivered","110","2024-01-10 13:20")
]
driver_data = [("D001","Ravi","Senior"),("D002","Amit","Junior"),("D003","Sneha","Senior"),("D004","Karan","Junior"),("D005","Neha","Senior")]
city_zone_data = [("Delhi","North"),("Mumbai","West"),("Bangalore","South"),("Chennai","South")]


In [7]:
# 1) Schemas
delivery_schema = T.StructType([
    T.StructField("delivery_id", T.StringType(), True),
    T.StructField("city",        T.StringType(), True),
    T.StructField("driver_id",   T.StringType(), True),
    T.StructField("status",      T.StringType(), True),
    T.StructField("delivery_time_minutes", T.StringType(), True),
    T.StructField("delivery_timestamp",    T.StringType(), True)
])
driver_schema = T.StructType([T.StructField("driver_id",T.StringType(),True),T.StructField("driver_name",T.StringType(),True),T.StructField("seniority",T.StringType(),True)])
zone_schema   = T.StructType([T.StructField("city",T.StringType(),True),T.StructField("zone",T.StringType(),True)])

In [8]:

# 2) Load
dlv_raw   = spark.createDataFrame(delivery_data, delivery_schema)
drivers   = spark.createDataFrame(driver_data,  driver_schema)
city_zone = spark.createDataFrame(city_zone_data, zone_schema)

In [9]:
# 3) Flag corrupt (bad time / bad timestamp / missing keys)
to_int = lambda c: F.when(F.trim(c).rlike(r"^\d+$"), F.trim(c).cast("int")).otherwise(F.lit(None).cast("int"))
fmts = ["yyyy-MM-dd HH:mm","dd/MM/yyyy HH:mm","yyyy/MM/dd HH:mm","dd-MM-yyyy HH:mm"]
to_ts = lambda c: F.coalesce(*[F.to_timestamp(F.trim(c), f) for f in fmts])

dq = (dlv_raw
      .withColumn("time_int", to_int(F.col("delivery_time_minutes")))
      .withColumn("ts",       to_ts(F.col("delivery_timestamp")))
      .withColumn("bad_time", F.col("time_int").isNull())
      .withColumn("bad_ts",   F.col("ts").isNull())
      .withColumn("missing_keys", F.col("delivery_id").isNull() | F.col("driver_id").isNull() | F.col("city").isNull()))
dq.selectExpr("sum(case when bad_time then 1 else 0 end) as invalid_time",
              "sum(case when bad_ts   then 1 else 0 end) as invalid_ts",
              "sum(case when missing_keys then 1 else 0 end) as missing_keys").show()


DateTimeException: [CANNOT_PARSE_TIMESTAMP] Text '05/01/2024 11:00' could not be parsed at index 0. Use `try_to_timestamp` to tolerate invalid input string and return NULL instead. SQLSTATE: 22007

In [6]:

from pyspark.sql import functions as F

def to_ts(col_name: str):
    """
    Parse a mixed-format timestamp column using safe try_to_timestamp.
    Uses SQL expr so format strings are treated as literals (not columns).
    """
    return F.coalesce(
        F.expr(f"try_to_timestamp(trim({col_name}), 'yyyy-MM-dd HH:mm')"),
        F.expr(f"try_to_timestamp(trim({col_name}), 'dd/MM/yyyy HH:mm')"),
        F.expr(f"try_to_timestamp(trim({col_name}), 'yyyy/MM/dd HH:mm')"),
        F.expr(f"try_to_timestamp(trim({col_name}), 'dd-MM-yyyy HH:mm')")
    )



{"ts": "2025-12-23 08:59:20.529", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `yyyy-MM-dd HH:mm` cannot be resolved. Did you mean one of the following? [`city`, `driver_id`, `delivery_id`, `status`, `time_int`]. SQLSTATE: 42703", "context": {"file": "jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o511.withColumn.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `yyyy-MM-dd HH:mm` cannot be resolved. Did you mean one of the following? [`city`, `driver_id`, `delivery_id`, `status`, `time_int`]. SQLSTATE: 42703;\n'Project [delivery_id#136, city#137, driver_id#138, status#139, delivery_time_minutes#140, delivery_tim

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `yyyy-MM-dd HH:mm` cannot be resolved. Did you mean one of the following? [`city`, `driver_id`, `delivery_id`, `status`, `time_int`]. SQLSTATE: 42703;
'Project [delivery_id#136, city#137, driver_id#138, status#139, delivery_time_minutes#140, delivery_timestamp#141, time_int#147, 'coalesce('try_to_timestamp(trim(delivery_timestamp#141, None), 'yyyy-MM-dd HH:mm), 'try_to_timestamp(trim(delivery_timestamp#141, None), 'dd/MM/yyyy HH:mm), 'try_to_timestamp(trim(delivery_timestamp#141, None), 'yyyy/MM/dd HH:mm), 'try_to_timestamp(trim(delivery_timestamp#141, None), 'dd-MM-yyyy HH:mm)) AS ts#148]
+- Project [delivery_id#136, city#137, driver_id#138, status#139, delivery_time_minutes#140, delivery_timestamp#141, CASE WHEN RLIKE(trim(delivery_time_minutes#140, None), ^\d+$) THEN cast(trim(delivery_time_minutes#140, None) as int) ELSE cast(null as int) END AS time_int#147]
   +- LogicalRDD [delivery_id#136, city#137, driver_id#138, status#139, delivery_time_minutes#140, delivery_timestamp#141], false


In [10]:

# 4) Validate schema
dlv_raw.printSchema(); drivers.printSchema(); city_zone.printSchema()


root
 |-- delivery_id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- driver_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- delivery_time_minutes: string (nullable = true)
 |-- delivery_timestamp: string (nullable = true)

root
 |-- driver_id: string (nullable = true)
 |-- driver_name: string (nullable = true)
 |-- seniority: string (nullable = true)

root
 |-- city: string (nullable = true)
 |-- zone: string (nullable = true)



In [11]:

norm = lambda c: F.upper(F.trim(F.regexp_replace(c, r"\s+", " ")))  # trim + collapse + upper
status_norm = lambda c: F.when(norm(c).like("DELIVERED%"), "DELIVERED") \
                         .when(norm(c).like("CANCEL%"),   "CANCELLED") \
                         .when(norm(c).like("IN%TRANSIT%"),"IN_TRANSIT") \
                         .otherwise("UNKNOWN")

dlv_clean = (dlv_raw
             .withColumn("delivery_id", norm("delivery_id"))
             .withColumn("city",        norm("city"))
             .withColumn("driver_id",   norm("driver_id"))
             .withColumn("status_std",  status_norm("status"))
             .withColumn("time_int",    to_int("delivery_time_minutes"))
             .withColumn("ts",          to_ts("delivery_timestamp"))
             .withColumn("bad_time",    F.col("time_int").isNull())
             .withColumn("bad_ts",      F.col("ts").isNull()))

# 10) De-duplicate by delivery_id (keep latest timestamp, then lowest time)
w = Window.partitionBy("delivery_id").orderBy(F.col("ts").desc_nulls_last(), F.col("time_int").asc_nulls_last())
dlv_dedup = dlv_clean.withColumn("rn", F.row_number().over(w)).filter("rn=1").drop("rn")


In [12]:
#5
trim_norm = lambda c: F.upper(F.trim(F.regexp_replace(F.col(c), r"\s+", " ")))
dlv_t5 = (dlv_raw
          .withColumn("delivery_id", trim_norm("delivery_id"))
          .withColumn("city",        trim_norm("city"))
          .withColumn("driver_id",   trim_norm("driver_id"))
          .withColumn("status",      trim_norm("status")))


In [13]:
#6

status_std = (F.when(F.col("status").like("DELIVERED%"), "DELIVERED")
               .when(F.col("status").like("CANCEL%"),   "CANCELLED")
               .when(F.col("status").like("IN%TRANSIT%"),"IN_TRANSIT")
               .otherwise("UNKNOWN"))

dlv_t6 = dlv_t5.withColumn("status_std", status_std)


In [14]:
#7

to_int = lambda c: F.when(F.trim(F.col(c)).rlike(r"^\d+$"), F.trim(F.col(c)).cast("int")) \
                    .otherwise(F.lit(None).cast("int"))

dlv_t7 = dlv_t6.withColumn("time_int", to_int("delivery_time_minutes"))


In [15]:
#8
dlv_t8 = dlv_t7.withColumn("time_invalid", F.col("time_int").isNull())



In [17]:
#9

def to_ts(col_name: str):
    return F.coalesce(
        F.expr(f"try_to_timestamp(trim({col_name}), 'yyyy-MM-dd HH:mm')"),
        F.expr(f"try_to_timestamp(trim({col_name}), 'dd/MM/yyyy HH:mm')"),
        F.expr(f"try_to_timestamp(trim({col_name}), 'yyyy/MM/dd HH:mm')"),
        F.expr(f"try_to_timestamp(trim({col_name}), 'dd-MM-yyyy HH:mm')")
    )

dlv_t9 = (dlv_t8
          .withColumn("ts", to_ts("delivery_timestamp"))
          .withColumn("ts_invalid", F.col("ts").isNull()))
# Optional quick check:
dlv_t9.selectExpr("sum(case when ts_invalid then 1 else 0 end) as invalid_timestamps").show()


+------------------+
|invalid_timestamps|
+------------------+
|                 0|
+------------------+



In [19]:

w = Window.partitionBy("delivery_id") \
          .orderBy(F.col("ts").desc_nulls_last(),
                   F.col("time_int").asc_nulls_last(),
                   F.col("driver_id").asc())

dlv_t10 = (dlv_t9
           .withColumn("rn", F.row_number().over(w))
           .filter("rn = 1")
           .drop("rn"))#10


In [20]:
from pyspark.sql import functions as F

dlv_t11 = dlv_t10.filter(
    (F.col("status_std") == "DELIVERED") &
    F.col("time_int").isNotNull() &
    F.col("ts").isNotNull()
)
#11

In [21]:
dlv_t12 = dlv_t11.filter(~F.col("status_std").isin("CANCELLED", "IN_TRANSIT"))#12

In [22]:

before_total   = dlv_t10.count()
after_delivered = dlv_t11.count()
after_final     = dlv_t12.count()

print(f"Total after Phase-2 (de-dup): {before_total}")
print(f"Delivered + valid (Task 11):  {after_delivered}")
print(f"Final after removing non-delivered (Task 12): {after_final}")

# 13
dlv_t10.groupBy("status_std").count().orderBy("status_std").show()


Total after Phase-2 (de-dup): 9
Delivered + valid (Task 11):  5
Final after removing non-delivered (Task 12): 5
+----------+-----+
|status_std|count|
+----------+-----+
| CANCELLED|    1|
| DELIVERED|    7|
|IN_TRANSIT|    1|
+----------+-----+



In [23]:
from pyspark.sql import functions as F

# Normalize join keys on the dimension side
drivers_n = drivers.select(
    F.upper(F.trim("driver_id")).alias("driver_id"),
    F.trim("driver_name").alias("driver_name"),
    F.upper(F.trim("seniority")).alias("seniority")
)

# Join (left) so deliveries without a matching driver still remain
dlv_t14 = dlv_t12.join(F.broadcast(drivers_n), on="driver_id", how="left")
#14

In [24]:

city_zone_n = city_zone.select(
    F.upper(F.trim("city")).alias("city"),
    F.upper(F.trim("zone")).alias("zone")
)

dlv_t15 = dlv_t14.join(F.broadcast(city_zone_n), on="city", how="left") \
                 .withColumn("zone", F.coalesce(F.col("zone"), F.lit("UNKNOWN")))
#15

In [25]:

print("\n== Join Plan (drivers) ==")
dlv_t14.explain(True)

print("\n== Join Plan (drivers + city zone) ==")
dlv_t15.explain(True)
#17


== Join Plan (drivers) ==
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [driver_id])
:- Filter NOT status_std#204 IN (CANCELLED,IN_TRANSIT)
:  +- Filter (((status_std#204 = DELIVERED) AND isnotnull(time_int#205)) AND isnotnull(ts#209))
:     +- Project [delivery_id#200, city#201, driver_id#202, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210]
:        +- Filter (rn#222 = 1)
:           +- Project [delivery_id#200, city#201, driver_id#202, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, rn#222]
:              +- Project [delivery_id#200, city#201, driver_id#202, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, rn#222, rn#222]
:                 +- Window [row_number() windowspecdefinition(delivery_id#200, ts#209 DESC NULLS LA

In [26]:
from pyspark.sql import functions as F, Window

avg_city = (dlv_t15
            .groupBy("city")
            .agg(F.round(F.avg("time_int"), 2).alias("avg_minutes")))
# Inspect
avg_city.orderBy("city").show(truncate=False)
#18

+---------+-----------+
|city     |avg_minutes|
+---------+-----------+
|BANGALORE|160.0      |
|DELHI    |130.0      |
|MUMBAI   |100.0      |
+---------+-----------+



In [27]:

avg_driver = (dlv_t15
              .groupBy("driver_id", "driver_name", "city")
              .agg(F.round(F.avg("time_int"), 2).alias("avg_minutes")))
# Inspect
avg_driver.orderBy("city","avg_minutes").show(truncate=False)
#19

+---------+-----------+---------+-----------+
|driver_id|driver_name|city     |avg_minutes|
+---------+-----------+---------+-----------+
|D003     |Sneha      |BANGALORE|160.0      |
|D001     |Ravi       |DELHI    |130.0      |
|D002     |Amit       |MUMBAI   |90.0       |
|D004     |Karan      |MUMBAI   |110.0      |
+---------+-----------+---------+-----------+



In [28]:

w_city_rank = Window.partitionBy("city").orderBy(F.col("avg_minutes").asc())
ranked_in_city = avg_driver.withColumn("rank_in_city", F.dense_rank().over(w_city_rank))
# Inspect
ranked_in_city.orderBy("city","rank_in_city","avg_minutes").show(truncate=False)
#20

+---------+-----------+---------+-----------+------------+
|driver_id|driver_name|city     |avg_minutes|rank_in_city|
+---------+-----------+---------+-----------+------------+
|D003     |Sneha      |BANGALORE|160.0      |1           |
|D001     |Ravi       |DELHI    |130.0      |1           |
|D002     |Amit       |MUMBAI   |90.0       |1           |
|D004     |Karan      |MUMBAI   |110.0      |2           |
+---------+-----------+---------+-----------+------------+



In [29]:

avg_driver_zone = (dlv_t15
                   .groupBy("zone","driver_id","driver_name")
                   .agg(F.round(F.avg("time_int"), 2).alias("avg_minutes")))
w_zone_fastest = Window.partitionBy("zone").orderBy(F.col("avg_minutes").asc())
fastest_per_zone = avg_driver_zone.withColumn("rnk", F.dense_rank().over(w_zone_fastest)) \
                                  .filter("rnk = 1").drop("rnk")
# Inspect
fastest_per_zone.orderBy("zone").show(truncate=False)
#21

+-----+---------+-----------+-----------+
|zone |driver_id|driver_name|avg_minutes|
+-----+---------+-----------+-----------+
|NORTH|D001     |Ravi       |130.0      |
|SOUTH|D003     |Sneha      |160.0      |
|WEST |D002     |Amit       |90.0       |
+-----+---------+-----------+-----------+



In [30]:

top2_per_city = ranked_in_city.filter("rank_in_city <= 2")
# Inspect
top2_per_city.orderBy("city","rank_in_city","avg_minutes").show(truncate=False)
#22

+---------+-----------+---------+-----------+------------+
|driver_id|driver_name|city     |avg_minutes|rank_in_city|
+---------+-----------+---------+-----------+------------+
|D003     |Sneha      |BANGALORE|160.0      |1           |
|D001     |Ravi       |DELHI    |130.0      |1           |
|D002     |Amit       |MUMBAI   |90.0       |1           |
|D004     |Karan      |MUMBAI   |110.0      |2           |
+---------+-----------+---------+-----------+------------+



In [31]:

# Reused frequently:
reused_dfs = {
    "dlv_t15": "Main enriched deliveries used by many downstream aggregations",
    "avg_city": "Used in dashboards and repeated reads",
    "avg_driver": "Used for ranking, top-k, fastest per zone derived from it"
}
print(reused_dfs)
#23

{'dlv_t15': 'Main enriched deliveries used by many downstream aggregations', 'avg_city': 'Used in dashboards and repeated reads', 'avg_driver': 'Used for ranking, top-k, fastest per zone derived from it'}


In [32]:

dlv_t15_cached  = dlv_t15.cache()
avg_city_cached = avg_city.cache()

# Trigger cache materialization (actions)
print("enriched count:", dlv_t15_cached.count())
print("avg_city count:", avg_city_cached.count())
#24

enriched count: 5
avg_city count: 3


In [33]:

# Without cache (recompute lineage)
dlv_t15.unpersist(False)  # ensure uncached view to compare
print("\n-- Plan WITHOUT cache --")
dlv_t15.explain(True)

# With cache
dlv_t15_cached = dlv_t15.cache(); dlv_t15_cached.count()  # materialize cache
print("\n-- Plan WITH cache --")
dlv_t15_cached.explain(True)
#25


-- Plan WITHOUT cache --
== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(zone, 'coalesce('zone, UNKNOWN), None)]
+- Project [city#201, driver_id#202, delivery_id#200, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, driver_name#292, seniority#293, zone#295]
   +- Join LeftOuter, (city#201 = city#294)
      :- Project [driver_id#202, delivery_id#200, city#201, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, driver_name#292, seniority#293]
      :  +- Join LeftOuter, (driver_id#202 = driver_id#291)
      :     :- Filter NOT status_std#204 IN (CANCELLED,IN_TRANSIT)
      :     :  +- Filter (((status_std#204 = DELIVERED) AND isnotnull(time_int#205)) AND isnotnull(ts#209))
      :     :     +- Project [delivery_id#200, city#201, driver_id#202, status#203, delivery_time_minutes#153, delivery_timestamp#154, 

In [34]:

dlv_by_city = dlv_t15_cached.repartition("city")  # hash partition by city

# Optional: verify distribution
print("Partitions after repartition by city:", dlv_by_city.rdd.getNumPartitions())
#26

Partitions after repartition by city: 1


In [35]:

explanation = """
Repartitioning by 'city' improves performance because:
1) Parallelism: Tasks are distributed per city, enabling better parallel aggregation.
2) Shuffle reduction: Many operations group/filter by 'city'; pre-partitioning reduces shuffle costs.
3) Partition pruning: When writing partitioned Parquet by 'city', readers can skip non-relevant partitions.
4) Data locality: Joins or aggregations keyed on 'city' co-locate records, minimizing network I/O.
Notes:
- Choose partition count based on cluster cores and data size.
- Use coalesce(<smaller>) for reducing partitions without full shuffle (e.g., optimizing small output writes).
"""
print(explanation)
#27


Repartitioning by 'city' improves performance because:
1) Parallelism: Tasks are distributed per city, enabling better parallel aggregation.
2) Shuffle reduction: Many operations group/filter by 'city'; pre-partitioning reduces shuffle costs.
3) Partition pruning: When writing partitioned Parquet by 'city', readers can skip non-relevant partitions.
4) Data locality: Joins or aggregations keyed on 'city' co-locate records, minimizing network I/O.
Notes:
- Choose partition count based on cluster cores and data size.
- Use coalesce(<smaller>) for reducing partitions without full shuffle (e.g., optimizing small output writes).



In [36]:
from pyspark.sql import functions as F

In [37]:

# Use the repartitioned DF if you created it; else use dlv_t15 (Delivered + joins)
clean_for_write = dlv_by_city if 'dlv_by_city' in globals() else dlv_t15

(clean_for_write
 .write
 .mode("overwrite")
 .partitionBy("city")      # partition pruning on reads
 .parquet("output/cleaned_deliveries_parquet"))
#28

In [38]:

# Write each analytic view to ORC
(avg_city.write.mode("overwrite").orc("output/analytics_orc/avg_city"))
(avg_driver.write.mode("overwrite").orc("output/analytics_orc/avg_driver"))
(ranked_in_city.write.mode("overwrite").orc("output/analytics_orc/ranked_in_city"))
(fastest_per_zone.write.mode("overwrite").orc("output/analytics_orc/fastest_per_zone"))
(top2_per_city.write.mode("overwrite").orc("output/analytics_orc/top2_per_city"))
#29

In [40]:

import os

def list_dir_tree(root_dir, suffixes=None, max_depth=3):
    suffixes = tuple(suffixes) if suffixes else None
    for root, dirs, files in os.walk(root_dir):
        depth = root[len(root_dir):].count(os.sep)
        if depth > max_depth:  # avoid printing huge trees
            continue
        files_filt = [f for f in files if (not suffixes or f.endswith(suffixes))]
        print(f"{root} -> {files_filt}")

print("=== Parquet output ===")
list_dir_tree("output/cleaned_deliveries_parquet", suffixes=[".parquet"])

print("\n=== ORC outputs ===")
list_dir_tree("output/analytics_orc", suffixes=[".orc"])

#30

=== Parquet output ===
output/cleaned_deliveries_parquet -> []
output/cleaned_deliveries_parquet/city=BANGALORE -> ['part-00000-81fcb639-9efe-4d89-be72-51d6f4d45d4b.c000.snappy.parquet']
output/cleaned_deliveries_parquet/city=MUMBAI -> ['part-00000-81fcb639-9efe-4d89-be72-51d6f4d45d4b.c000.snappy.parquet']
output/cleaned_deliveries_parquet/city=DELHI -> ['part-00000-81fcb639-9efe-4d89-be72-51d6f4d45d4b.c000.snappy.parquet']

=== ORC outputs ===
output/analytics_orc -> []
output/analytics_orc/ranked_in_city -> ['part-00000-745c3e15-f349-42a6-8e28-ed9f0ef0e137-c000.zstd.orc']
output/analytics_orc/top2_per_city -> ['part-00000-78a9e002-60ec-49ac-9083-6bcdbf59b540-c000.zstd.orc']
output/analytics_orc/fastest_per_zone -> ['part-00000-de1433e5-faaf-4977-831f-fdfec68a1136-c000.zstd.orc']
output/analytics_orc/avg_driver -> ['part-00000-2066eefb-13c2-41e0-b712-303e296626de-c000.zstd.orc']
output/analytics_orc/avg_city -> ['part-00003-4239c8eb-f223-4db8-9390-bceb5696c319-c000.zstd.orc', 'part-00

In [41]:

from pyspark.sql import functions as F, types as T
from pyspark.sql import DataFrame

dlv_count = dlv_t15.count()
print("dlv_t15 count:", dlv_count)

def ensure_df(df: DataFrame, name: str) -> DataFrame:
    if df is None:
        raise ValueError(f"[NoneType] {name} is None. Check that your transformation returns a DataFrame.")
    return df

dlv_safe = ensure_df(dlv_t15, "dlv_t15")
#32


dlv_t15 count: 5


In [42]:

drivers_raw_schema = drivers.printSchema()
dlv_schema         = dlv_t15.printSchema()
drivers_norm = drivers.select(
    F.upper(F.trim("driver_id")).alias("driver_id"),
    F.trim("driver_name").alias("driver_name"),
    F.upper(F.trim("seniority")).alias("seniority")
)
for df, cols in [(dlv_t15, ["driver_id", "city"]), (drivers_norm, ["driver_id"]), (city_zone, ["city"])]:
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise ValueError(f"Missing columns {missing} in DataFrame.")
#33

root
 |-- driver_id: string (nullable = true)
 |-- driver_name: string (nullable = true)
 |-- seniority: string (nullable = true)

root
 |-- city: string (nullable = true)
 |-- driver_id: string (nullable = true)
 |-- delivery_id: string (nullable = true)
 |-- status: string (nullable = true)
 |-- delivery_time_minutes: string (nullable = true)
 |-- delivery_timestamp: string (nullable = true)
 |-- status_std: string (nullable = false)
 |-- time_int: integer (nullable = true)
 |-- time_invalid: boolean (nullable = false)
 |-- ts: timestamp (nullable = true)
 |-- ts_invalid: boolean (nullable = false)
 |-- driver_name: string (nullable = true)
 |-- seniority: string (nullable = true)
 |-- zone: string (nullable = false)



In [43]:

try:
    # Intentionally broken: add 1 to a string column without casting
    spark.createDataFrame([("42",)], ["only_str"]).select(F.col("only_str") + 1).show()
except Exception as e:
    print("[Expected Error] Arithmetic on string without cast:", e)

try:
    # Unresolved column
    dlv_t15.select(F.col("non_existent_col")).show()
except Exception as e:
    print("[Expected Error] Unresolved column:", e)

try:
    # Wrong schema in createDataFrame
    bad_schema = T.StructType([T.StructField("value", T.IntegerType(), True)])
    spark.createDataFrame([("x",)], bad_schema).show()
except Exception as e:
    print("[Expected Error] Schema mismatch:", e)
#34

{"ts": "2025-12-23 09:55:31.536", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `non_existent_col` cannot be resolved. Did you mean one of the following? [`delivery_id`, `time_int`, `driver_id`, `seniority`, `status`]. SQLSTATE: 42703", "context": {"file": "line 9 in cell [43]", "line": "", "fragment": "col", "errorClass": "UNRESOLVED_COLUMN.WITH_SUGGESTION"}, "exception": {"class": "Py4JJavaError", "msg": "An error occurred while calling o1005.select.\n: org.apache.spark.sql.AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `non_existent_col` cannot be resolved. Did you mean one of the following? [`delivery_id`, `time_int`, `driver_id`, `seniority`, `status`]. SQLSTATE: 42703;\n'Project ['non_existent_col]\n+- Project [city#201, driver_id#202, delivery_id#200, status#203, delivery_time_minutes#153, delivery_timestamp#154,

+--------------+
|(only_str + 1)|
+--------------+
|            43|
+--------------+

[Expected Error] Unresolved column: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column, variable, or function parameter with name `non_existent_col` cannot be resolved. Did you mean one of the following? [`delivery_id`, `time_int`, `driver_id`, `seniority`, `status`]. SQLSTATE: 42703;
'Project ['non_existent_col]
+- Project [city#201, driver_id#202, delivery_id#200, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, driver_name#292, seniority#293, coalesce(zone#295, UNKNOWN) AS zone#296]
   +- Project [city#201, driver_id#202, delivery_id#200, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, driver_name#292, seniority#293, zone#295]
      +- Join LeftOuter, (city#201 = city#294)
         :- Project [driver_id#202, delivery_id#200, city#201,

In [44]:

print("\n== Plan: dlv_t15 (enriched deliveries) ==")
dlv_t15.explain(True)

print("\n== Plan: avg_driver (aggregation) ==")
avg_driver.explain(True)

drivers_small = F.broadcast(drivers_norm)
dlv_bcast = dlv_t15.join(drivers_small, "driver_id", "left")
dlv_bcast.explain(True)

dlv_city_part = dlv_t15.repartition("city")
avg_city_fast = dlv_city_part.groupBy("city").agg(F.avg("time_int").alias("avg_minutes"))
avg_city_fast.explain(True)

dlv_cached = dlv_t15.cache(); dlv_cached.count()
dlv_cached.explain(True)
#35


== Plan: dlv_t15 (enriched deliveries) ==
== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(zone, 'coalesce('zone, UNKNOWN), None)]
+- Project [city#201, driver_id#202, delivery_id#200, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, driver_name#292, seniority#293, zone#295]
   +- Join LeftOuter, (city#201 = city#294)
      :- Project [driver_id#202, delivery_id#200, city#201, status#203, delivery_time_minutes#153, delivery_timestamp#154, status_std#204, time_int#205, time_invalid#206, ts#209, ts_invalid#210, driver_name#292, seniority#293]
      :  +- Join LeftOuter, (driver_id#202 = driver_id#291)
      :     :- Filter NOT status_std#204 IN (CANCELLED,IN_TRANSIT)
      :     :  +- Filter (((status_std#204 = DELIVERED) AND isnotnull(time_int#205)) AND isnotnull(ts#209))
      :     :     +- Project [delivery_id#200, city#201, driver_id#202, status#203, delivery_time_minutes#153, deliver