## 1. Load the Data

In [0]:
print(f"Spark version: {spark.version}")
print(f"Cluster configured successfully!")
dbutils.fs.ls("/databricks-datasets/")

Spark version: 4.0.0
Cluster configured successfully!


[FileInfo(path='dbfs:/databricks-datasets/COVID/', name='COVID/', size=0, modificationTime=1762923943329),
 FileInfo(path='dbfs:/databricks-datasets/README.md', name='README.md', size=976, modificationTime=1596557781000),
 FileInfo(path='dbfs:/databricks-datasets/Rdatasets/', name='Rdatasets/', size=0, modificationTime=1762923943329),
 FileInfo(path='dbfs:/databricks-datasets/SPARK_README.md', name='SPARK_README.md', size=3359, modificationTime=1596557823000),
 FileInfo(path='dbfs:/databricks-datasets/adult/', name='adult/', size=0, modificationTime=1762923943329),
 FileInfo(path='dbfs:/databricks-datasets/airlines/', name='airlines/', size=0, modificationTime=1762923943329),
 FileInfo(path='dbfs:/databricks-datasets/amazon/', name='amazon/', size=0, modificationTime=1762923943329),
 FileInfo(path='dbfs:/databricks-datasets/asa/', name='asa/', size=0, modificationTime=1762923943329),
 FileInfo(path='dbfs:/databricks-datasets/atlas_higgs/', name='atlas_higgs/', size=0, modificationTime=

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
from pyspark.sql import functions as F

In [0]:
df = spark.read.csv("/databricks-datasets/COVID/covid-19-data/", header=True, inferSchema=True)
print("Number of Rows:", df.count())


Number of Rows: 1227256


In [0]:
df.show()

+----------+-----------+----------+-----+-----+------+
|      date|     county|     state| fips|cases|deaths|
+----------+-----------+----------+-----+-----+------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|
|2020-01-25|     Orange|California|06059|    1|     0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|
|2020-01-26|   Maricopa|   Arizona|04013|    1|     0|
|2020-01-26|Los Angeles|California|06037|    1|     0|
|2020-01-26|     Orange|California|06059|    1|     0|
|2020-01-26|       Cook|  Illinois|17031|    1|     0|
|2020-01-26|  Snohomish|Washington|53061|    1|     0|
|2020-01-27|   Maricopa|   Arizona|04013|    1|     0|
|2020-01-27|Los Angeles|California|06037|    1|     0|
|2020-01-2

In [0]:
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: string (nullable = true)
 |-- cases: string (nullable = true)
 |-- deaths: string (nullable = true)



In [0]:
data = (
    df
    .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
    .withColumn("cases", F.coalesce(expr("try_cast(cases as int)"), lit(0)))
    .withColumn("deaths", F.coalesce(expr("try_cast(deaths as int)"), lit(0)))
)

In [0]:
data.printSchema()

root
 |-- date: date (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- fips: string (nullable = true)
 |-- cases: integer (nullable = false)
 |-- deaths: integer (nullable = false)



In [0]:
df_filtered = data.filter((col("deaths").isNotNull()) & (col("cases").isNotNull()))


## 2. Transformation

### 2-1. Filtering

In [0]:
# 2 filter operations
df_filtered2 = data.filter((col("deaths").isNotNull()) & (col("state") == "New York"))
df_filtered2.show(20, truncate=False)


+----------+-------------+--------+-----+-----+------+
|date      |county       |state   |fips |cases|deaths|
+----------+-------------+--------+-----+-----+------+
|2020-03-01|New York City|New York|NULL |1    |0     |
|2020-03-02|New York City|New York|NULL |1    |0     |
|2020-03-03|New York City|New York|NULL |2    |0     |
|2020-03-04|New York City|New York|NULL |2    |0     |
|2020-03-04|Westchester  |New York|36119|9    |0     |
|2020-03-05|Nassau       |New York|36059|1    |0     |
|2020-03-05|New York City|New York|NULL |4    |0     |
|2020-03-05|Westchester  |New York|36119|17   |0     |
|2020-03-06|Nassau       |New York|36059|4    |0     |
|2020-03-06|New York City|New York|NULL |5    |0     |
|2020-03-06|Rockland     |New York|36087|2    |0     |
|2020-03-06|Westchester  |New York|36119|33   |0     |
|2020-03-07|Nassau       |New York|36059|4    |0     |
|2020-03-07|New York City|New York|NULL |12   |0     |
|2020-03-07|Rockland     |New York|36087|2    |0     |
|2020-03-0

### 2-2. Join operation

In [0]:
df1 = df_filtered.groupBy("state").agg(sum("deaths").alias("state_total_death"))
df_join = df_filtered.join(df1, "state", "left")
df_join.show(20, truncate=False)

+----------+----------+-----------+-----+-----+------+-----------------+
|state     |date      |county     |fips |cases|deaths|state_total_death|
+----------+----------+-----------+-----+-----+------+-----------------+
|Illinois  |2020-01-24|Cook       |17031|1    |0     |4352863          |
|Illinois  |2020-01-25|Cook       |17031|1    |0     |4352863          |
|Illinois  |2020-01-26|Cook       |17031|1    |0     |4352863          |
|Illinois  |2020-01-27|Cook       |17031|1    |0     |4352863          |
|California|2020-01-25|Orange     |06059|1    |0     |7480318          |
|California|2020-01-26|Los Angeles|06037|1    |0     |7480318          |
|California|2020-01-26|Orange     |06059|1    |0     |7480318          |
|California|2020-01-27|Los Angeles|06037|1    |0     |7480318          |
|California|2020-01-27|Orange     |06059|1    |0     |7480318          |
|California|2020-01-28|Los Angeles|06037|1    |0     |7480318          |
|California|2020-01-28|Orange     |06059|1    |0   

### 2-3. Groupby aggregation

In [0]:
df_groupby = df_filtered.groupBy("state").agg(
    count("*").alias("total_state_cases"),
    avg("deaths").alias("state_avg_deaths"),
    sum("deaths").alias("state_total_deaths")
)

df_groupby.show()

+--------------------+-----------------+------------------+------------------+
|               state|total_state_cases|  state_avg_deaths|state_total_deaths|
+--------------------+-----------------+------------------+------------------+
|                Utah|            10949| 25.95689104027765|            284202|
|             Florida|            26230| 206.3694624475791|           5413071|
|      North Carolina|            37909| 45.50542087630905|           1725065|
|             Indiana|            35178| 58.11203024617659|           2044265|
|                Ohio|            33568| 73.44268350810296|           2465324|
|            Illinois|            38405|113.34104934253352|           4352863|
|            Nebraska|            31790|10.655143126769424|            338727|
|              Hawaii|             1723| 38.65351131746953|             66600|
|             Vermont|             5773| 6.035336913216699|             34842|
|             Montana|            18312|10.065530799

### 2-4. Column transformation

In [0]:
df_cleaned = df_filtered.withColumn("deaths_squared", col("deaths") * col("deaths"))
df_cleaned.show()

+----------+-----------+----------+-----+-----+------+--------------+
|      date|     county|     state| fips|cases|deaths|deaths_squared|
+----------+-----------+----------+-----+-----+------+--------------+
|2020-01-21|  Snohomish|Washington|53061|    1|     0|             0|
|2020-01-22|  Snohomish|Washington|53061|    1|     0|             0|
|2020-01-23|  Snohomish|Washington|53061|    1|     0|             0|
|2020-01-24|       Cook|  Illinois|17031|    1|     0|             0|
|2020-01-24|  Snohomish|Washington|53061|    1|     0|             0|
|2020-01-25|     Orange|California|06059|    1|     0|             0|
|2020-01-25|       Cook|  Illinois|17031|    1|     0|             0|
|2020-01-25|  Snohomish|Washington|53061|    1|     0|             0|
|2020-01-26|   Maricopa|   Arizona|04013|    1|     0|             0|
|2020-01-26|Los Angeles|California|06037|    1|     0|             0|
|2020-01-26|     Orange|California|06059|    1|     0|             0|
|2020-01-26|       C

In [0]:
data = (
    df_raw
    .withColumn("date", F.expr("try_to_date(date, 'yyyy-MM-dd')"))
    .withColumn("cases",  F.expr("try_cast(cases as int)"))
    .withColumn("deaths", F.expr("try_cast(deaths as int)"))
    .filter(F.col("date").isNotNull())   
)

In [0]:
spark.sql("DROP TABLE IF EXISTS covid")
data.write.format("delta").mode("overwrite").saveAsTable("covid")

## 3. SQL Queries

In [0]:
print("\n[6] SQL Query 1: Top 10 States by Total Deaths")
query1 = """
    SELECT 
        state,
        SUM(deaths) as total_deaths
    FROM covid
    WHERE state IS NOT NULL
    GROUP BY state
    ORDER BY total_deaths DESC
    LIMIT 10
"""

df_sql1 = spark.sql(query1)
df_sql1.show()


[6] SQL Query 1: Top 10 States by Total Deaths
+-------------+------------+
|        state|total_deaths|
+-------------+------------+
|     New York|    12724140|
|   California|     7480318|
|        Texas|     6841454|
|   New Jersey|     6004001|
|      Florida|     5413071|
|     Illinois|     4352863|
| Pennsylvania|     4195968|
|Massachusetts|     3717383|
|     Michigan|     3395721|
|      Georgia|     2835421|
+-------------+------------+



In [0]:
print("\n[7] SQL Query 2: Top 10 States by Total Cases")
query2 = """
    SELECT 
        state,
        SUM(cases) as total_cases
    FROM covid
    WHERE state IS NOT NULL
    GROUP BY state
    ORDER BY total_cases DESC
    LIMIT 10
"""

df_sql2 = spark.sql(query2)
df_sql2.show()


[7] SQL Query 2: Top 10 States by Total Cases
+--------------+-----------+
|         state|total_cases|
+--------------+-----------+
|    California|  505068688|
|         Texas|  402592904|
|       Florida|  313235465|
|      New York|  272678258|
|      Illinois|  195082889|
|       Georgia|  151749782|
|          Ohio|  134921770|
|  Pennsylvania|  133343845|
|    New Jersey|  128802146|
|North Carolina|  124271360|
+--------------+-----------+



## 4. Performance analysis

In [0]:
df_join.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonProject [state#15859, date#15898, county#15858, fips#15860, cases#15900, deaths#15902, state_total_death#16812L]
         +- PhotonShuffledHashJoin [state#15859], [state#17736], LeftOuter, BuildLeft
            :- PhotonShuffleExchangeSource
            :  +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#20528]
            :     +- PhotonShuffleExchangeSink hashpartitioning(state#15859, 10)
            :        +- PhotonProject [cast(gettimestamp(date#15857, yyyy-MM-dd, TimestampType, try_to_date, Some(Etc/UTC), true) as date) AS date#15898, county#15858, state#15859, fips#15860, coalesce(try_cast(cases#15861 as int), 0) AS cases#15900, coalesce(try_cast(deaths#15862 as int), 0) AS deaths#15902]
            :           +- PhotonRowToColumnar
            :              +- FileScan csv [date#15857,county#15858,state#15859,fips#15860,cases#15861,deaths#15

In [0]:
df_groupby.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   ColumnarToRow
   +- PhotonResultStage
      +- PhotonGroupingAgg(keys=[state#15859], functions=[finalmerge_count(merge count#17764L) AS count(1)#17758L, finalmerge_sum(merge sum#17766) AS sum(deaths)#17761, finalmerge_sum(merge sum#17768L) AS sum(deaths)#17760L])
         +- PhotonShuffleExchangeSource
            +- PhotonShuffleMapStage ENSURE_REQUIREMENTS, [id=#20644]
               +- PhotonShuffleExchangeSink hashpartitioning(state#15859, 10)
                  +- PhotonGroupingAgg(keys=[state#15859], functions=[partial_count(1) AS count#17764L, partial_sum(cast(deaths#15902 as double)) AS sum#17766, partial_sum(deaths#15902) AS sum#17768L])
                     +- PhotonProject [state#15859, coalesce(try_cast(deaths#15862 as int), 0) AS deaths#15902]
                        +- PhotonRowToColumnar
                           +- FileScan csv [state#15859,deaths#15862] Batched: false, DataFilters: [], For

## 5. Transformers (Lazy) vs Actions (Eager)

### Transformers

In [0]:
# Transformation 1: Filter for California
print("\n→ Transformation 1: Filtering for California state")
california_df = df.filter(col("state") == "California")
print("  ✓ Transformation registered (NOT executed yet)")

# Transformation 2: Filter for cases > 10
print("\n→ Transformation 2: Filtering cases > 10")
high_cases_df = california_df.filter(col("cases") > 10)
print("  ✓ Transformation registered (NOT executed yet)")

# Transformation 3: Select specific columns
print("\n→ Transformation 3: Selecting date, county, cases, deaths")
selected_df = high_cases_df.select("date", "county", "cases", "deaths")
print("  ✓ Transformation registered (NOT executed yet)")

# Transformation 4: Add a new column
print("\n→ Transformation 4: Adding severity_level column")
transformed_df = selected_df.withColumn(
    "severity_level",
    when(col("cases") < 50, "Low")
    .when(col("cases") < 200, "Medium")
    .otherwise("High")
)
print("  ✓ Transformation registered (NOT executed yet)")

print("\n" + "-" * 80)
print("IMPORTANT: All transformations above are LAZY!")
print("No actual data processing has occurred yet.")
print("Spark just built an execution plan (DAG - Directed Acyclic Graph)")
print("-" * 80)


→ Transformation 1: Filtering for California state
  ✓ Transformation registered (NOT executed yet)

→ Transformation 2: Filtering cases > 10
  ✓ Transformation registered (NOT executed yet)

→ Transformation 3: Selecting date, county, cases, deaths
  ✓ Transformation registered (NOT executed yet)

→ Transformation 4: Adding severity_level column
  ✓ Transformation registered (NOT executed yet)

--------------------------------------------------------------------------------
IMPORTANT: All transformations above are LAZY!
No actual data processing has occurred yet.
Spark just built an execution plan (DAG - Directed Acyclic Graph)
--------------------------------------------------------------------------------


### Lazy

In [0]:
# Action 1: show()
print("\n→ Action 1: show() - Display the results")
print("  (This triggers execution of ALL transformations above)")
transformed_df.show(10)

# Action 2: count()
print("\n→ Action 2: count() - Count the number of rows")
print("  (This triggers execution again)")
row_count = transformed_df.count()
print(f"  Result: {row_count:,} rows found")

# Action 3: collect()
print("\n→ Action 3: collect() - Collect first 3 rows to driver")
print("  (This triggers execution and brings data to driver memory)")
collected_data = transformed_df.take(3)
print(f"  Result: Collected {len(collected_data)} rows to driver")
for i, row in enumerate(collected_data, 1):
    print(f"  Row {i}: Date={row['date']}, County={row['county']}, Cases={row['cases']}")

# Action 4: Aggregation
print("\n→ Action 4: Aggregation - Calculate total cases")
print("  (This triggers execution)")
total_cases = transformed_df.agg({"cases": "sum"}).collect()[0][0]
print(f"  Result: Total cases = {total_cases:,}")



→ Action 1: show() - Display the results
  (This triggers execution of ALL transformations above)
+----------+-----------+-----+------+--------------+
|      date|     county|cases|deaths|severity_level|
+----------+-----------+-----+------+--------------+
|2020-02-26|     Solano|   11|     0|           Low|
|2020-02-27|     Solano|   11|     0|           Low|
|2020-02-28|     Solano|   11|     0|           Low|
|2020-02-29|     Solano|   11|     0|           Low|
|2020-03-01|     Solano|   12|     0|           Low|
|2020-03-02|     Solano|   12|     0|           Low|
|2020-03-03|Santa Clara|   11|     0|           Low|
|2020-03-03|     Solano|   12|     0|           Low|
|2020-03-04|Santa Clara|   14|     0|           Low|
|2020-03-04|     Solano|   12|     0|           Low|
+----------+-----------+-----+------+--------------+
only showing top 10 rows

→ Action 2: count() - Count the number of rows
  (This triggers execution again)
  Result: 20,606 rows found

→ Action 3: collect() -

## 6. Query Optimization

### 6-1. Filter Early Optimization

In [0]:
start_time = time.time()
bad_practice = data \
    .groupBy("state") \
    .agg(sum("cases").alias("total_cases")) \
    .filter(col("total_cases") > 10000)
bad_count = bad_practice.count()
bad_time = time.time() - start_time
print("Time: {:.4f}s, Results: {} states".format(bad_time, bad_count))

print("\n✅ GOOD Practice: Filtering early")
print("Code: df.filter().groupBy().agg()")
start_time = time.time()
good_practice = data \
    .filter(col("cases") > 0) \
    .groupBy("state") \
    .agg(sum("cases").alias("total_cases")) \
    .filter(col("total_cases") > 10000)
good_count = good_practice.count()
good_time = time.time() - start_time
print("Time: {:.4f}s, Results: {} states".format(good_time, good_count))

improvement = ((bad_time - good_time) / bad_time * 100)
print("\n⚡ Performance improvement: {:.1f}% faster".format(improvement))


Time: 0.6931s, Results: 108 states

✅ GOOD Practice: Filtering early
Code: df.filter().groupBy().agg()
Time: 0.8318s, Results: 108 states

⚡ Performance improvement: -20.0% faster


### 6-2. Partitioning

In [0]:
print("Current shuffle partitions:", spark.conf.get("spark.sql.shuffle.partitions"))
spark.conf.set("spark.sql.shuffle.partitions", "10")

print("\n→ Repartitioning by 'state' (DataFrame API)")
data_partitioned = data.repartition(10, F.col("state"))

_ = (data_partitioned
     .where(F.col("state") == "California")
     .agg(F.sum("cases").alias("total_cases"))
     .collect())

start = time.time()
partitioned_result = (data_partitioned
    .where(F.col("state") == "California")
    .agg(F.sum("cases").alias("total_cases")))
partitioned_result.show(truncate=False)
print("Query time (warm cache, no table persist): {:.4f}s".format(time.time() - start))


Current shuffle partitions: 10

→ Repartitioning by 'state' (DataFrame API)
+-----------+
|total_cases|
+-----------+
|505068688  |
+-----------+

Query time (warm cache, no table persist): 0.6301s


### 6-3. Repartitioning

In [0]:

print("\n[Baseline] No Repartitioning")
start_time_base = time.time()
base_result = data.filter(
    col("state") == "California"
).agg(
    sum("cases").alias("total_cases")
)
display(base_result)
base_time = time.time() - start_time_base
print("Base Query time: {:.4f}s".format(base_time))

print("\n[Optimized] Repartitioning by 'state' (10 partitions)")
df_partitioned = data.repartition(10, "state")

start_time_part = time.time()
partitioned_result = df_partitioned.filter(
    col("state") == "California"
).agg(
    sum("cases").alias("total_cases")
)
display(partitioned_result)
part_time = time.time() - start_time_part
print("Partitioned Query time: {:.4f}s".format(part_time))

# 성능 비교
if base_time > part_time:
    improvement = ((base_time - part_time) / base_time * 100)
    print("\n⚡ Performance improvement: {:.1f}% faster".format(improvement))
else:
    print("\n⚠️ Note: Partitioning didn't show improvement in this run (may be due to small dataset or warm cache).")




[Baseline] No Repartitioning


total_cases
505068688


Base Query time: 1.0362s

[Optimized] Repartitioning by 'state' (10 partitions)


total_cases
505068688


Partitioned Query time: 0.6944s

⚡ Performance improvement: 33.0% faster


In [0]:
output_base = "/Volumes/de/de/de/covid_pipeline_output"

# 1. Standard Parquet
print("\n→ Writing to standard Parquet...")
output_path_parquet = "{}/covid_data_parquet".format(output_base)
data.write.mode("overwrite").parquet(output_path_parquet)
print("   ✓ Saved to: {}".format(output_path_parquet))

# Verify
read_back = spark.read.parquet(output_path_parquet)
print("   ✓ Verified: {} records written".format(read_back.count()))

# 2. Partitioned Parquet (by year)
print("\n→ Writing to partitioned Parquet (by year)...")
output_path_partitioned = "{}/covid_data_by_year".format(output_base)
data.withColumn("year", year(col("date"))) \
    .write.mode("overwrite").partitionBy("year").parquet(output_path_partitioned)
print("   ✓ Saved to: {}".format(output_path_partitioned))

# Verify
read_back2 = spark.read.parquet(output_path_partitioned)
print("   ✓ Verified: {} records written".format(read_back2.count()))

# 3. Delta format
print("\n→ Writing to Delta format...")
output_path_delta = "{}/state_summary_delta".format(output_base)
data.groupBy("state") \
    .agg(sum("cases").alias("total_cases"), sum("deaths").alias("total_deaths")) \
    .write.format("delta").mode("overwrite").save(output_path_delta)
print("   ✓ Saved to: {}".format(output_path_delta))

# Verify
read_back3 = spark.read.format("delta").load(output_path_delta)
print("   ✓ Verified: {} records written".format(read_back3.count()))

print("\n" + "=" * 80)
print("✅ ALL FILES WRITTEN SUCCESSFULLY!")
print("=" * 80)

print("\nSaved files:")
print("1. Standard Parquet: {}".format(output_path_parquet))
print("2. Partitioned Parquet: {}".format(output_path_partitioned))
print("3. Delta format: {}".format(output_path_delta))


→ Writing to standard Parquet...
   ✓ Saved to: /Volumes/de/de/de/covid_pipeline_output/covid_data_parquet
   ✓ Verified: 1227060 records written

→ Writing to partitioned Parquet (by year)...
   ✓ Saved to: /Volumes/de/de/de/covid_pipeline_output/covid_data_by_year
   ✓ Verified: 1227060 records written

→ Writing to Delta format...
   ✓ Saved to: /Volumes/de/de/de/covid_pipeline_output/state_summary_delta
   ✓ Verified: 478 records written

✅ ALL FILES WRITTEN SUCCESSFULLY!

Saved files:
1. Standard Parquet: /Volumes/de/de/de/covid_pipeline_output/covid_data_parquet
2. Partitioned Parquet: /Volumes/de/de/de/covid_pipeline_output/covid_data_by_year
3. Delta format: /Volumes/de/de/de/covid_pipeline_output/state_summary_delta
