In [0]:

import pandas as pd
import time
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, when

In [0]:
# Check Spark version and cluster info
print(f"Spark version: {spark.version}")
print(f"Cluster configured successfully!")

# Check available datasets
dbutils.fs.ls("/databricks-datasets/")

Spark version: 4.0.0
Cluster configured successfully!


[FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=1762918143456),
 FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1596557781000),
 FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=1762918143456),
 FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1596557823000),
 FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=1762918143456),
 FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=1762918143456),
 FileInfo(path='dbfs:/databricks-datasets/amazon/', name='amazon/', size=0, modificationTime=1762918143456),
 FileInfo(path='dbfs:/databricks-datasets/asa/', name='asa/', size=0, modificationTime=1762918143456),
 FileInfo(path='dbfs:/databricks-datasets/atlas_higgs/', name='atlas_higgs/', size=0, modificationTime=

In [0]:
%pip install -q kaggle 
!mkdir -p ~/.kaggle
!echo '{"username":"<KAGGLE_USER>","key":"<KAGGLE_KEY>"}' > ~/.kaggle/kaggle.json
!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d sobhanmoosavi/us-accidents -p /tmp --force
!unzip -o /tmp/us-accidents.zip -d /tmp/us_accidents
!ls -lh /tmp/us_accidents | head


[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
Dataset URL: https://www.kaggle.com/datasets/sobhanmoosavi/us-accidents
License(s): CC-BY-NC-SA-4.0


In [0]:
# Inspect dataset size 
files = dbutils.fs.ls("/databricks-datasets/flights/departuredelays.csv")
total_bytes = sum(f.size for f in files)
total_gb_compressed = total_bytes / (1024**3)
estimated_uncompressed_gb = total_gb_compressed * 4  # average ×4 expansion
print(f"Compressed size: {total_gb_compressed:.2f} GB")
print(f"Estimated uncompressed size: {estimated_uncompressed_gb:.2f} GB (≈×4 expansion)")

Compressed size: 0.03 GB
Estimated uncompressed size: 0.12 GB (≈×4 expansion)


In [0]:
# PYSPARK: Same task
path = "/databricks-datasets/flights/departuredelays.csv"

start = time.time()

df_spark = spark.read.csv(path, header=True, inferSchema=True)
result = df_spark.filter(col('delay') > 0) \
    .groupBy('origin') \
    .count() \
    .orderBy(desc('count')) \
    .limit(10)

result.show()

pyspark_time = time.time() - start

print(f"\n  PySpark: {pyspark_time:.2f} seconds")
     

+------+-----+
|origin|count|
+------+-----+
|   ATL|41828|
|   ORD|33812|
|   DEN|30760|
|   DFW|28706|
|   LAX|22684|
|   IAH|21009|
|   PHX|17555|
|   LAS|16938|
|   SFO|16552|
|   MCO|14189|
+------+-----+


  PySpark: 1.68 seconds


In [0]:
# Read built-in dataset
df = spark.read.csv(
    "/databricks-datasets/flights/departuredelays.csv",
    header=True,
    inferSchema=True
)

# 0.12 GB * 2^4 = 0.12 * 16 = 1.92 GB
for i in range(4):
    df = df.unionByName(df, allowMissingColumns=True)

    rows = df.count()              
    cols = len(df.columns)         
    print(f"{rows:,} rows × {cols} columns")


2,783,156 rows × 5 columns
5,566,312 rows × 5 columns
11,132,624 rows × 5 columns
22,265,248 rows × 5 columns


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast

df.printSchema()

root
 |-- date: integer (nullable = true)
 |-- delay: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- destination: string (nullable = true)



In [0]:
from pyspark.sql import functions as F

df_t = (df
    # 1) Cast to string and keep only digits (defensive)
    .withColumn("date_digits", F.regexp_replace(F.col("date").cast("string"), "[^0-9]", ""))
    # 2) Left-pad to 8 chars (yyyyMMdd). e.g., 1011245 -> 01011245
    .withColumn("date_8", F.lpad(F.col("date_digits"), 8, "0"))
    # 3) Parse with try_to_date to avoid hard failures
    .withColumn("flight_date", F.expr("try_to_date(date_8, 'yyyyMMdd')"))
    .withColumn("year",  F.year("flight_date"))
    .withColumn("month", F.month("flight_date"))
    .withColumn("day",   F.dayofmonth("flight_date"))
    .withColumn("is_delayed", F.when(F.col("delay") > 0, 1).otherwise(0))
    .drop("date_digits","date_8")
)

# Quick peek (won't error if some dates are NULL)
df_t.select("flight_date","year","month","day","delay","distance","origin","destination","is_delayed").show(5)


+-----------+----+-----+----+-----+--------+------+-----------+----------+
|flight_date|year|month| day|delay|distance|origin|destination|is_delayed|
+-----------+----+-----+----+-----+--------+------+-----------+----------+
|       NULL|NULL| NULL|NULL|    6|     602|   ABE|        ATL|         1|
|       NULL|NULL| NULL|NULL|   -8|     369|   ABE|        DTW|         0|
|       NULL|NULL| NULL|NULL|   -2|     602|   ABE|        ATL|         0|
| 0102-06-05| 102|    6|   5|   -4|     602|   ABE|        ATL|         0|
|       NULL|NULL| NULL|NULL|   -4|     602|   ABE|        ATL|         0|
+-----------+----+-----+----+-----+--------+------+-----------+----------+
only showing top 5 rows


In [0]:
df_f = (df_t
    .filter(F.col("flight_date").isNotNull())   # removes rows where parsing failed
    .filter(F.col("distance") > 0)
    .filter(F.col("origin").isNotNull() & F.col("destination").isNotNull()))
print("Rows after filters:", df_f.count())


Rows after filters: 4991904


In [0]:
dim_month = spark.createDataFrame(
    [(1,"Winter"),(2,"Winter"),(3,"Spring"),(4,"Spring"),(5,"Spring"),
     (6,"Summer"),(7,"Summer"),(8,"Summer"),
     (9,"Fall"),(10,"Fall"),(11,"Fall"),(12,"Winter")],
    ["month","season"]
)

df_enriched = df_f.join(broadcast(dim_month), on="month", how="left")
df_enriched.show(5)

+-----+-------+-----+--------+------+-----------+-----------+----+---+----------+------+
|month|   date|delay|distance|origin|destination|flight_date|year|day|is_delayed|season|
+-----+-------+-----+--------+------+-----------+-----------+----+---+----------+------+
|    6|1020605|   -4|     602|   ABE|        ATL| 0102-06-05| 102|  5|         0|Summer|
|    6|1030605|    0|     602|   ABE|        ATL| 0103-06-05| 103|  5|         0|Summer|
|    6|1040605|   28|     602|   ABE|        ATL| 0104-06-05| 104|  5|         1|Summer|
|    6|1050605|    9|     602|   ABE|        ATL| 0105-06-05| 105|  5|         1|Summer|
|   12|1061215|   -6|     602|   ABE|        ATL| 0106-12-15| 106| 15|         0|Winter|
+-----+-------+-----+--------+------+-----------+-----------+----+---+----------+------+
only showing top 5 rows


In [0]:
agg_origin = (df_enriched
  .groupBy("year","month","origin")
  .agg(
      F.count("*").alias("flights"),
      F.avg("delay").alias("avg_delay_min"),
      F.sum("is_delayed").alias("delayed_flights"),
      F.avg("distance").alias("avg_distance")
  )
  .withColumn("delay_rate", F.col("delayed_flights")/F.col("flights"))
)

agg_origin.orderBy(F.desc("flights")).show(10, truncate=False)


+----+-----+------+-------+-------------------+---------------+------------------+-------------------+
|year|month|origin|flights|avg_delay_min      |delayed_flights|avg_distance      |delay_rate         |
+----+-----+------+-------+-------------------+---------------+------------------+-------------------+
|102 |8    |ATL   |1024   |9.5625             |480            |621.40625         |0.46875            |
|103 |8    |ATL   |1024   |12.625             |512            |625.78125         |0.5                |
|106 |8    |ATL   |960    |11.35              |496            |642.9333333333333 |0.5166666666666667 |
|105 |8    |ATL   |896    |9.803571428571429  |464            |644.8214285714286 |0.5178571428571429 |
|207 |12   |ATL   |848    |9.641509433962264  |464            |509.20754716981133|0.5471698113207547 |
|127 |12   |ATL   |848    |1.2641509433962264 |160            |509.20754716981133|0.18867924528301888|
|130 |12   |ATL   |848    |25.037735849056602 |288            |506.641509

In [0]:
agg_route = (df_enriched
  .groupBy("year","origin","destination","season")
  .agg(
      F.count("*").alias("flights"),
      F.avg("delay").alias("avg_delay_min"),
      F.expr("percentile_approx(delay, 0.9)").alias("p90_delay_min")
  )
)

agg_route.orderBy(F.desc("flights")).show(10, truncate=False)

+----+------+-----------+------+-------+-------------------+-------------+
|year|origin|destination|season|flights|avg_delay_min      |p90_delay_min|
+----+------+-----------+------+-------+-------------------+-------------+
|107 |LAS   |LAX        |Summer|96     |0.3333333333333333 |10           |
|207 |LAS   |LAX        |Summer|96     |4.666666666666667  |40           |
|120 |LAS   |LAX        |Summer|96     |9.0                |62           |
|110 |LAS   |LAX        |Summer|96     |-0.8333333333333334|3            |
|113 |LAS   |LAX        |Summer|96     |-2.6666666666666665|8            |
|216 |LAS   |LAX        |Fall  |96     |0.6666666666666666 |12           |
|124 |LAS   |LAX        |Summer|96     |1.6666666666666667 |32           |
|102 |LAX   |DEN        |Summer|96     |-0.8333333333333334|5            |
|106 |LAX   |DEN        |Summer|96     |10.0               |35           |
|127 |LAS   |LAX        |Summer|96     |-4.0               |-2           |
+----+------+-----------+

In [0]:
# Compute data-driven thresholds
route_thresh  = agg_route.approxQuantile("flights", [0.50], 0.05)[0]   # median
origin_thresh = agg_origin.approxQuantile("flights", [0.50], 0.05)[0]

print("Adaptive thresholds — route:", route_thresh, " origin-month:", origin_thresh)

# Recreate the two “queries” via DataFrame API
q1 = (agg_route
      .filter(F.col("flights") >= F.lit(route_thresh))
      .select("year","season","origin","destination","flights",
              F.round(F.col("avg_delay_min"), 1).alias("avg_delay_min"))
      .orderBy(F.desc("avg_delay_min"))
      .limit(20))

q1.show(truncate=False)


# SQL #2: Best origins by on-time rate (min 1k flights in a month)
q2 = spark.sql("""
SELECT year, month, origin, flights,
       ROUND(1 - delay_rate, 3) AS on_time_rate
FROM agg_origin
WHERE flights >= 1000
ORDER BY on_time_rate DESC, flights DESC
LIMIT 20
""")
q2.show(truncate=False)



Adaptive thresholds — route: 16.0  origin-month: 32.0
+----+------+------+-----------+-------+-------------+
|year|season|origin|destination|flights|avg_delay_min|
+----+------+------+-----------+-------+-------------+
|309 |Summer|TPA   |DFW        |16     |1642.0       |
|303 |Winter|PDX   |DFW        |16     |1553.0       |
|130 |Fall  |EGE   |JFK        |16     |1500.0       |
|115 |Summer|ONT   |DFW        |16     |1496.0       |
|130 |Summer|EGE   |DFW        |16     |1482.0       |
|121 |Summer|IAD   |DFW        |16     |1444.0       |
|205 |Summer|LAS   |MIA        |16     |1418.0       |
|203 |Summer|FAT   |DFW        |16     |1417.0       |
|111 |Summer|PSP   |DFW        |16     |1404.0       |
|118 |Winter|PBI   |ORD        |16     |1151.0       |
|211 |Winter|PBI   |ORD        |16     |1128.0       |
|223 |Spring|BNA   |ATL        |16     |1056.0       |
|314 |Summer|DEN   |MSP        |16     |982.0        |
|127 |Summer|MKE   |DFW        |16     |951.0        |
|211 |Summe

In [0]:
print("=== df_enriched ===")
df_enriched.explain(mode="formatted")

print("\n=== agg_origin ===")
agg_origin.explain(mode="formatted")

print("\n=== q1 ===")
q1.explain(mode="formatted")


=== df_enriched ===
== Physical Plan ==
AdaptiveSparkPlan (92)
+- == Initial Plan ==
   ColumnarToRow (91)
   +- PhotonResultStage (90)
      +- PhotonProject (89)
         +- PhotonBroadcastHashJoin LeftOuter (88)
            :- PhotonUnion (81)
            :  :- PhotonProject (5)
            :  :  +- PhotonProject (4)
            :  :     +- PhotonFilter (3)
            :  :        +- PhotonRowToColumnar (2)
            :  :           +- Scan csv  (1)
            :  :- PhotonProject (10)
            :  :  +- PhotonProject (9)
            :  :     +- PhotonFilter (8)
            :  :        +- PhotonRowToColumnar (7)
            :  :           +- Scan csv  (6)
            :  :- PhotonProject (15)
            :  :  +- PhotonProject (14)
            :  :     +- PhotonFilter (13)
            :  :        +- PhotonRowToColumnar (12)
            :  :           +- Scan csv  (11)
            :  :- PhotonProject (20)
            :  :  +- PhotonProject (19)
            :  :     +- PhotonFilter 

In [0]:
# Repartition to different sizes
print("Testing different partition strategies...\n")

# Fewer partitions (4)
df_few = df.repartition(4)
start = time.time()
df_few.filter(col('delay') > 0).count()
few_time = time.time() - start
print(f"✓ Fewer partitions (4): {few_time:.2f}s")

# More partitions (32)
df_many = df.repartition(32)
start = time.time()
df_many.filter(col('delay') > 0).count()
many_time = time.time() - start
print(f"✓ More partitions (32): {many_time:.2f}s")

# Default (let Spark decide)
start = time.time()
df.filter(col('delay') > 0).count()
default_time = time.time() - start
print(f"✓ Default (auto): {default_time:.2f}s")
     

Testing different partition strategies...

✓ Fewer partitions (4): 4.69s
✓ More partitions (32): 4.83s
✓ Default (auto): 3.70s


In [0]:
# Transformations (lazy): nothing executes yet
lazy_demo = (df_enriched
  .select("origin","destination","distance","delay")
  .filter(F.col("delay") >= 15)                                  # transformation
  .withColumn("speed_mpm", F.col("distance") / F.greatest(F.col("delay"), F.lit(1))))  # transformation

print("No action yet — still lazy.")
lazy_demo.take(5) 


No action yet — still lazy.


[Row(origin='ABE', destination='ATL', distance=602, delay=28, speed_mpm=21.5),
 Row(origin='ABE', destination='DTW', distance=369, delay=33, speed_mpm=11.181818181818182),
 Row(origin='ABE', destination='ORD', distance=569, delay=54, speed_mpm=10.537037037037036),
 Row(origin='ABE', destination='ATL', distance=602, delay=43, speed_mpm=14.0),
 Row(origin='ABE', destination='ORD', distance=569, delay=83, speed_mpm=6.855421686746988)]

In [0]:
# Transformations - lazy (just build a plan)
start = time.time()
filtered = df.filter(col('delay') > 100)
selected = filtered.select('origin', 'delay', 'distance')
print(f"Transformations: {time.time() - start:.4f}s")

# Actions - eager (trigger execution)
print("\nAction 1:")
start = time.time()
count = selected.count()
print(f"count() = {count} rows, took {time.time() - start:.2f}s")

print("\nAction 2:")
start = time.time()
selected.show(5)
print(f"show() took {time.time() - start:.2f}s")

print("\nNotice: Each action re-executes the transformations!")

Transformations: 0.0005s

Action 1:
count() = 733760 rows, took 3.68s

Action 2:
+------+-----+--------+
|origin|delay|distance|
+------+-----+--------+
|   ABE|  151|     369|
|   ABE|  127|     602|
|   ABE|  333|     602|
|   ABE|  219|     569|
|   ABE|  180|     602|
+------+-----+--------+
only showing top 5 rows
show() took 0.61s

Notice: Each action re-executes the transformations!


Spark optimized the pipeline primarily through column pruning, predicate pushdown, and operator fusion. Because we selected only the needed columns up front, the physical plan shows a Parquet/CSV scan that reads fewer fields. In the .explain(mode="formatted") output for the Parquet step, the scan node lists PushedFilters (e.g., GreaterThan(distance, 0), IsNotNull(origin), In(origin, [JFK,LAX,ATL])) and PartitionFilters when filtering by year and month, confirming that filters were applied at the data source to minimize I/O. Aggregations are executed under WholeStageCodegen, reducing JVM overhead, and the small month→season lookup uses a BroadcastHashJoin, avoiding a shuffle on the dimension side.

The Spark UI’s SQL Query Details reveals that the main bottlenecks were the wide shuffles during groupBy(year, month, origin) and groupBy(year, origin, destination, season), as well as any global orderBy used for top-N results. Another minor cost stemmed from parsing dates in the raw file; we addressed malformed values via try_to_date, then dropped NULL dates early so bad rows wouldn’t propagate. When using CSV as the source, pushdown is limited, so converting an intermediate to Parquet both improved scans and made the pushdown behavior visible (smaller bytes-read in the UI).

To optimize, we (1) wrote filters early (flight_date IS NOT NULL, distance > 0, required columns non-null) to shrink the working set; (2) enforced column pruning by selecting only the fields used downstream; (3) broadcast the tiny month→season mapping to eliminate a join shuffle; (4) tuned parallelism with spark.sql.files.maxPartitionBytes=128m and a reasonable spark.sql.shuffle.partitions; and (5) repartitioned and wrote partitioned Parquet by year,month (and year,origin) so future queries benefit from partition pruning and avoid unnecessary shuffles. Together, these choices cut input I/O, reduced shuffle volume, and sped up repeated analysis steps without changing business logic.