In [0]:
#test pubic data set read
df = spark.read.csv('/databricks-datasets/COVID/covid-19-data/us-counties.csv', header=True, inferSchema=True)
df.display()


In [0]:
# Step 1: Ingest Public COVID-19 Data on Databricks
covid_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/databricks-datasets/COVID/covid-19-data/us-counties.csv")
)



In [0]:
# Step 2: Data Cleaning - Remove nulls and filter for relevant timeframe
covid_clean = (
    covid_df
    .dropna(subset=["cases", "deaths", "county", "state"])
    .filter("date >= '2021-01-01' AND date <= '2021-12-31'")
)

# Step 3: Feature Engineering - Calculate daily new cases and deaths for each county/state
from pyspark.sql import Window
import pyspark.sql.functions as F

window_spec = Window.partitionBy("county", "state").orderBy("date")
covid_features = (
    covid_clean
    .withColumn("new_cases", F.col("cases") - F.lag("cases", 1).over(window_spec))
    .withColumn("new_deaths", F.col("deaths") - F.lag("deaths", 1).over(window_spec))
    .fillna({"new_cases": 0, "new_deaths": 0})
)

# Step 4: Aggregation - Compute total new cases by state
state_agg = (
    covid_features
    .groupBy("state")
    .agg(
        F.sum("new_cases").alias("total_new_cases"),
        F.sum("new_deaths").alias("total_new_deaths"),
        F.countDistinct("county").alias("distinct_counties")
    )
    .orderBy(F.desc("total_new_cases"))
)

# Step 5: Visualization - Display top 10 states by cases
display(state_agg.limit(10))

# Step 6: Save Results To Data Lake/Delta Table (optional for full pipeline)
# state_agg.write.format("delta").mode("overwrite").save("/mnt/silver/covid_state_summary")


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# 1. Calculate 7-day moving average of new cases for each county
window_7day = Window.partitionBy("county", "state").orderBy("date").rowsBetween(-6, 0)
covid_features = covid_features.withColumn(
    "new_cases_7day_avg",
    F.avg("new_cases").over(window_7day)
)

# 2. Flag counties with case surges (new_cases > 150% of 7-day average)
covid_features = covid_features.withColumn(
    "is_surge",
    F.when(F.col("new_cases") > 1.5 * F.col("new_cases_7day_avg"), 1).otherwise(0)
)

# 3. Add month, week, day features from the date
covid_features = (
    covid_features
    .withColumn("year", F.year("date"))
    .withColumn("month", F.month("date"))
    .withColumn("week", F.weekofyear("date"))
    .withColumn("day_of_week", F.dayofweek("date"))
)

# 4. Create cumulative cases/deaths by county for dimension modeling
window_cum = Window.partitionBy("county", "state").orderBy("date").rowsBetween(Window.unboundedPreceding, 0)
covid_features = covid_features.withColumn(
    "cum_cases_county",
    F.sum("new_cases").over(window_cum)
).withColumn(
    "cum_deaths_county",
    F.sum("new_deaths").over(window_cum)
)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Step 1: Read COVID-19 case data (already done in previous steps)
covid_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/databricks-datasets/COVID/covid-19-data/us-counties.csv")
)

# Step 2: Clean data and filter for 2021
covid_clean = (
    covid_df
    .dropna(subset=["cases", "deaths", "county", "state"])
    .filter("date >= '2021-01-01' AND date <= '2021-12-31'")
)

# Step 3: Calculate daily new cases and deaths using window function
window_spec = Window.partitionBy("county", "state").orderBy("date")
covid_features = (
    covid_clean
    .withColumn("new_cases", F.col("cases") - F.lag("cases", 1).over(window_spec))
    .withColumn("new_deaths", F.col("deaths") - F.lag("deaths", 1).over(window_spec))
    .fillna({"new_cases": 0, "new_deaths": 0})
)

# Step 4: Read US county population dataset from public CSV uploaded to DBFS
# Feel free to replace this path with your own uploaded file location if needed
pop_df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .csv("/Volumes/workspace/default/test/uscounties.csv")
)
pop_df = pop_df.withColumnRenamed("county", "County_US")
# Step 5: Join population data on county and state (matching column names assumed county and state_name)
covid_enriched = covid_features.join(
    pop_df,
    (F.lower(covid_features.county) == F.lower(pop_df.County_US)) & (F.lower(covid_features.state) == F.lower(pop_df.state_name)),
    "left"
)

# Step 6: Calculate per-capita cases and deaths per 10k population
covid_enriched = covid_enriched.withColumn(
    "cases_per_10k",
    F.round((F.col("cases") / F.col("population")) * 10000, 2)
).withColumn(
    "deaths_per_10k",
    F.round((F.col("deaths") / F.col("population")) * 10000, 2)
)

# Step 7: Additional transformations - 7-day new case moving average and surge detection
window_7day = Window.partitionBy("county", "state").orderBy("date").rowsBetween(-6, 0)
covid_enriched = covid_enriched.withColumn(
    "new_cases_7day_avg",
    F.avg("new_cases").over(window_7day)
).withColumn(
    "is_surge",
    F.when(F.col("new_cases") > 1.5 * F.col("new_cases_7day_avg"), 1).otherwise(0)
)

# Step 8: Show top 10 counties by total cases
from pyspark.sql.functions import desc
covid_enriched.groupBy("county", "state")\
    .agg(F.max("cases").alias("max_cases"), F.max("deaths").alias("max_deaths"))\
    .orderBy(desc("max_cases"))\
    .limit(10)\
    .show(truncate=False)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import DateType

# Previous steps assumed covid_enriched dataframe is ready from earlier code

# --- Change Point Detection Simulation ---
# Using 7-day rolling average delta to flag potential change points
window_7day = Window.partitionBy("county", "state").orderBy("date").rowsBetween(-6, 0)
window_prev = Window.partitionBy("county", "state").orderBy("date")
covid_enriched = covid_enriched.withColumn(
    "new_cases_7day_avg",
    F.avg("new_cases").over(window_7day)
)
covid_enriched = covid_enriched.withColumn(
    "prev_7day_avg",
    F.lag("new_cases_7day_avg", 1).over(window_prev)
)
covid_enriched = covid_enriched.withColumn(
    "change_point_flag",
    F.when(F.abs(F.col("new_cases_7day_avg") - F.col("prev_7day_avg")) > 50, 1).otherwise(0)
)

# --- SCD Type 2 Dimension Table for Counties ---
# Adding versioning and effective date columns for slowly changing dimension
from pyspark.sql.functions import current_date

dim_county = (
    covid_enriched.select("county", "state")
    .dropDuplicates()
    .withColumn("scd_version", F.lit(1))
    .withColumn("effective_date", current_date())
)

# --- Week-over-week percentage change in new_cases ---
covid_enriched = covid_enriched.withColumn("week", F.weekofyear("date"))
window_week = Window.partitionBy("county", "state", "week").orderBy("date")
weekly_sum = (
    covid_enriched.groupBy("county", "state", "week")
    .agg(F.sum("new_cases").alias("weekly_cases"))
)
weekly_change = (
    weekly_sum.withColumn(
        "prev_week_cases",
        F.lag("weekly_cases", 1).over(Window.partitionBy("county", "state").orderBy("week"))
    )
    .withColumn(
        "wo_w_change_pct",
        F.when(
            F.col("prev_week_cases").isNotNull(),
            (F.col("weekly_cases") - F.col("prev_week_cases")) / F.col("prev_week_cases") * 100
        ).otherwise(0)
    )
)
# Join weekly percent changes back to main table by county, state, week
covid_enriched = covid_enriched.join(
    weekly_change.select("county", "state", "week", "wo_w_change_pct"),
    on=["county", "state", "week"],
    how="left"
)




In [0]:
# --- County Surge Frequency and Ranking ---
county_surge_freq = (
    covid_enriched.groupBy("county", "state")
    .agg(F.sum("is_surge").alias("surge_count"))
    .orderBy(F.desc("surge_count"))
)
display(county_surge_freq.show(10))