Practice Cell to explore and validate the Bronze data - Mini Challenges 

In [10]:
from pathlib import Path
from pyspark.sql import functions as F
import os
import sys

current_dir = os.path.abspath('')
# Add parent directory to Python path to allow module imports
sys.path.append(os.path.abspath(os.path.join(current_dir,'..','..')))

from etl.common import get_spark, load_yml, project_root

root = project_root()
cfg = load_yml(str(root / "configs" / "bronze_config.yml"))
fmt = cfg["storage"]["format"]

spark = get_spark("bronze-mini-challenges")

bronze_matches_path = root / cfg["tables"]["matches"]["target_path"]
bronze_deliveries_path = root / cfg["tables"]["deliveries"]["target_path"]

matches = spark.read.format(fmt).load(str(bronze_matches_path))
deliveries = spark.read.format(fmt).load(str(bronze_deliveries_path))

Challenge 1 — Deliveries count vs matches

Goal: compute deliveries per match and see top matches (useful to spot super overs / anomalies).

In [11]:
dpm = ( deliveries
       .groupBy("match_id")
       .count()
       .withColumnRenamed("count","deliveries_count")
       )
print("Top 3 matches by deliveries_count")
(dpm.orderBy(F.desc("deliveries_count"))
 .limit(3)
 .show(truncate=False))
print("Summary stats(deliveries per match)")
(dpm.select(F.min("deliveries_count").alias("min"),
            F.expr("percentile_approx(deliveries_count,0.5)").alias("median"),
            F.max("deliveries_count").alias("max"),
            F.avg("deliveries_count").alias("avg"))
       .show())

Top 3 matches by deliveries_count
+----------------+----------------+
|match_id        |deliveries_count|
+----------------+----------------+
|eef1376b596f046c|269             |
|755403f168d4e0fe|267             |
|af039871ce7fb9ae|265             |
+----------------+----------------+

Summary stats(deliveries per match)
+---+------+---+------------------+
|min|median|max|               avg|
+---+------+---+------------------+
| 51|   245|269|237.98545765611635|
+---+------+---+------------------+



Challenge 2 — Integrity: runs_total == runs_batter + runs_extras

In [12]:
violations_df = deliveries.filter(
    F.coalesce(F.col("runs_total"), F.lit(0)) !=
    (F.coalesce(F.col("runs_batter"), F.lit(0)) + F.coalesce(F.col("runs_extras"), F.lit(0)))
)

violations_cnt = violations_df.count()
print("Integrity violations (total != batter + extras):", violations_cnt)

# Peek a few rows if any
if violations_cnt > 0:
    (violations_df
     .select("match_id","inning_no","over_no","ball_in_over","runs_batter","runs_extras","runs_total","src_file_name")
     .orderBy("match_id","inning_no","over_no","ball_in_over")
     .show(10, truncate=False))

Integrity violations (total != batter + extras): 0


Challenge 4 — Performance tweak: output file counts

Goal: tweak partitions/files and observe the number of files written per season.
You don’t need to re-ingest everything—just write a tiny sample with different repartition values and count files.

In [13]:
# Write a small sample for one season to a temp output with different partitioning
from datetime import datetime

season_to_check = (matches.select("season").orderBy("season").first()["season"])

sample = deliveries.filter(F.col("season") == season_to_check).limit(5000)

def write_and_count(out_dir: Path, n_parts: int):
    (sample.repartition(n_parts)
           .write.mode("overwrite").format("parquet").save(str(out_dir)))
    files = list((out_dir).rglob("part-*"))
    return len(files)

base = root / "data" / "tmp" / f"perf_{season_to_check}_{datetime.now().strftime('%H%M%S')}"

files_1  = write_and_count(base / "n1",  1)
files_4  = write_and_count(base / "n4",  4)
files_16 = write_and_count(base / "n16", 16)

print(f"Output files with repartition(1):  {files_1}")
print(f"Output files with repartition(4):  {files_4}")
print(f"Output files with repartition(16): {files_16}")

Output files with repartition(1):  1
Output files with repartition(4):  1
Output files with repartition(16): 1
