In [2]:
# -----------------------------------------------
# Day 1: From Raw CSV to Clean Delta Table
# Microsoft Fabric | PySpark Notebook
# -----------------------------------------------

# STEP 1: Load raw CSV from OneLake Files
df = spark.read.option("header", True).csv("Files/covid_data/owid-covid-data.csv")

# STEP 2: Initial exploration
print("Original Schema:")
df.printSchema()

print("Sample Records:")
df.show(5)

StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 4, Finished, Available, Finished)

Original Schema:
root
 |-- iso_code: string (nullable = true)
 |-- continent: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: string (nullable = true)
 |-- new_cases: string (nullable = true)
 |-- new_cases_smoothed: string (nullable = true)
 |-- total_deaths: string (nullable = true)
 |-- new_deaths: string (nullable = true)
 |-- new_deaths_smoothed: string (nullable = true)
 |-- total_cases_per_million: string (nullable = true)
 |-- new_cases_per_million: string (nullable = true)
 |-- new_cases_smoothed_per_million: string (nullable = true)
 |-- total_deaths_per_million: string (nullable = true)
 |-- new_deaths_per_million: string (nullable = true)
 |-- new_deaths_smoothed_per_million: string (nullable = true)
 |-- reproduction_rate: string (nullable = true)
 |-- icu_patients: string (nullable = true)
 |-- icu_patients_per_million: string (nullable = true)
 |-- hosp_patients: string (nullable = true)
 |-- hosp_pat

In [3]:
from pyspark.sql.types import DoubleType

numeric_cols = [
    "total_cases", "new_cases", "total_deaths", "new_deaths",
    "total_vaccinations", "people_vaccinated", "people_fully_vaccinated",
    "new_vaccinations"
]

for col_name in numeric_cols:
    df = df.withColumn(col_name, df[col_name].cast(DoubleType()))

df.printSchema

StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 5, Finished, Available, Finished)

<bound method DataFrame.printSchema of DataFrame[iso_code: string, continent: string, location: string, date: string, total_cases: double, new_cases: double, new_cases_smoothed: string, total_deaths: double, new_deaths: double, new_deaths_smoothed: string, total_cases_per_million: string, new_cases_per_million: string, new_cases_smoothed_per_million: string, total_deaths_per_million: string, new_deaths_per_million: string, new_deaths_smoothed_per_million: string, reproduction_rate: string, icu_patients: string, icu_patients_per_million: string, hosp_patients: string, hosp_patients_per_million: string, weekly_icu_admissions: string, weekly_icu_admissions_per_million: string, weekly_hosp_admissions: string, weekly_hosp_admissions_per_million: string, total_tests: string, new_tests: string, total_tests_per_thousand: string, new_tests_per_thousand: string, new_tests_smoothed: string, new_tests_smoothed_per_thousand: string, positive_rate: string, tests_per_case: string, tests_units: string

In [4]:
df = df.fillna({
    "total_cases": 0,
    "new_cases": 0,
    "total_deaths": 0,
    "new_deaths": 0,
    "total_vaccinations": 0,
    "new_vaccinations": 0
})


StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 6, Finished, Available, Finished)

In [8]:
from pyspark.sql.functions import to_date
df = df.withColumn("date", to_date("date","yyyy-MM-dd"))

StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 10, Finished, Available, Finished)

In [9]:
from pyspark.sql.functions import year, month
df = df.withColumn("year", year("date")).withColumn("month", month("date"))

StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 11, Finished, Available, Finished)

In [11]:
from pyspark.sql.functions import when, col
df = df.withColumn(
    "death_rate_pct",
    when(col("total_cases") > 0, (col("total_deaths") / col("total_cases")) * 100).otherwise(None) 
)

StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 13, Finished, Available, Finished)

In [12]:
df = df.withColumn(
    "vaccinated_per_100",
    when( col("population") > 0, (col("total_vaccinations") / col("population")) * 100).otherwise(None)
)

StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 14, Finished, Available, Finished)

In [14]:
spark.sql("DROP TABLE IF EXISTS cleaned_covid_data")
df.write.mode("overwrite").saveAsTable("cleaned_covid_data")


StatementMeta(, 7095b4dc-6367-463b-b4b9-ef025051b548, 16, Finished, Available, Finished)

In [1]:
df = spark.read.table("cleaned_covid_data")
df.show(5)

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 3, Finished, Available, Finished)

+--------+---------+--------+----------+------------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+---------------------------------

In [2]:
# -----------------------------------------------
# Phase 2: Aggregations by Country & Continent
# Microsoft Fabric | PySpark Notebook
# -----------------------------------------------
#2. Monthly Aggregates by Country

from pyspark.sql.functions import sum as _sum 

monthly_country_df = df.groupBy("location", "year", "month").agg(
    _sum("new_cases").alias("monthly_cases"),
    _sum("new_deaths").alias("monthly_deaths"),
    _sum("new_vaccinations").alias("monthly_vaccinations")
).orderBy("location", "year", "month")

#Save as delta table
monthly_country_df.write.mode("overwrite").saveAsTable("covid_monthly_country_summary")

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 4, Finished, Available, Finished)

In [6]:
spark.sql("SELECT * FROM covid_monthly_country_summary").show(50)

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 8, Finished, Available, Finished)

+-----------+----+-----+-------------+--------------+--------------------+
|   location|year|month|monthly_cases|monthly_deaths|monthly_vaccinations|
+-----------+----+-----+-------------+--------------+--------------------+
|Afghanistan|2020|    1|          0.0|           0.0|                 0.0|
|Afghanistan|2020|    2|          0.0|           0.0|                 0.0|
|Afghanistan|2020|    3|         91.0|           2.0|                 0.0|
|Afghanistan|2020|    4|       1239.0|          41.0|                 0.0|
|Afghanistan|2020|    5|      13113.0|         205.0|                 0.0|
|Afghanistan|2020|    6|      16173.0|         455.0|                 0.0|
|Afghanistan|2020|    7|       5420.0|         543.0|                 0.0|
|Afghanistan|2020|    8|       2107.0|         156.0|                 0.0|
|Afghanistan|2020|    9|       1049.0|          51.0|                 0.0|
|Afghanistan|2020|   10|       1576.0|          58.0|                 0.0|
|Afghanistan|2020|   11| 

In [7]:
#3. Monthly Aggregates by Continent
monthly_continent_df = df.groupBy("continent", "year", "month").agg(
    _sum("new_cases").alias("monthly_cases"),
    _sum("new_deaths").alias("monthly_deaths"),
    _sum("new_vaccinations").alias("monthly_vaccinations")
).orderBy("continent", "year", "month")

monthly_continent_df.write.mode("overwrite").saveAsTable("covid_monthly_continent_summary")


StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 9, Finished, Available, Finished)

+---------+----+-----+-------------+--------------+--------------------+
|continent|year|month|monthly_cases|monthly_deaths|monthly_vaccinations|
+---------+----+-----+-------------+--------------+--------------------+
|     NULL|2020|    1|       6107.0|         192.0|                 0.0|
|     NULL|2020|    2|     228784.0|        7234.0|                 0.0|
|     NULL|2020|    3|    2163305.0|      133550.0|                 0.0|
|     NULL|2020|    4|    6580527.0|      604388.0|                 0.0|
|     NULL|2020|    5|    9677769.0|      577807.0|                 0.0|
|     NULL|2020|    6|  1.1915989E7|      438421.0|                 0.0|
|     NULL|2020|    7|  1.8357853E7|      490201.0|                 0.0|
|     NULL|2020|    8|  2.8307354E7|      663042.0|                 0.0|
|     NULL|2020|    9|  2.5334622E7|      487769.0|                 0.0|
|     NULL|2020|   10|  3.3047235E7|      491450.0|                 0.0|
|     NULL|2020|   11|   6.483057E7|     1085185.0|

In [13]:
#Preview Some Insights
from pyspark.sql.functions import col
df1 = spark.read.table("covid_monthly_continent_summary")
df1.filter(col("continent").isNotNull()).show(10)

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 15, Finished, Available, Finished)

+---------+----+-----+-------------+--------------+--------------------+
|continent|year|month|monthly_cases|monthly_deaths|monthly_vaccinations|
+---------+----+-----+-------------+--------------+--------------------+
|   Africa|2020|    1|          0.0|           0.0|                 0.0|
|   Africa|2020|    2|          1.0|           0.0|                 0.0|
|   Africa|2020|    3|       4565.0|         140.0|                 0.0|
|   Africa|2020|    4|      26756.0|        1625.0|                 0.0|
|   Africa|2020|    5|     113256.0|        2652.0|                 0.0|
|   Africa|2020|    6|     234371.0|        7537.0|                 0.0|
|   Africa|2020|    7|     457877.0|        8326.0|                 0.0|
|   Africa|2020|    8|     406714.0|       11710.0|                 0.0|
|   Africa|2020|    9|     218213.0|        5890.0|                 0.0|
|   Africa|2020|   10|     259853.0|        5203.0|                 0.0|
+---------+----+-----+-------------+--------------+

In [14]:
#4. Country-Level Totals (All-Time)
country_summary_df = df.groupBy("location").agg(
    _sum("total_cases").alias("total_cases"),
    _sum("total_deaths").alias("total_deaths"),
    _sum("total_vaccinations").alias("total_vaccinations")
).orderBy("total_vaccinations", ascending=False)

country_summary_df.write.mode("overwrite").saveAsTable("covid_country_totals")


StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 16, Finished, Available, Finished)

In [15]:
#Preview Some Insights
spark.read.table("covid_country_totals").show(10)
spark.read.table("covid_monthly_country_summary").filter("location = 'India'").show(5)


StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 17, Finished, Available, Finished)

+--------------------+----------------+-------------+------------------+
|            location|     total_cases| total_deaths|total_vaccinations|
+--------------------+----------------+-------------+------------------+
|               World|7.15697182101E11|8.009960985E9|1.3645076016423E13|
|                Asia|2.52167317226E11| 1.79040488E9|  9.17567589714E12|
|Upper-middle-inco...|2.18037426784E11|3.265564653E9| 5.700644092399E12|
|Lower-middle-inco...|1.00873065026E11|1.385814325E9| 4.722738799023E12|
|High-income count...|3.93562796886E11|3.303895414E9| 2.954085737029E12|
|               India| 5.2079485202E10| 6.36656068E8| 2.099623546783E12|
|               China| 5.9789650705E10|  7.9278696E7|   1.7127640577E12|
|              Europe|2.36756684151E11|2.361106039E9|  1.47689087882E12|
|       North America|1.27073670231E11|1.880636077E9| 1.185558142946E12|
| European Union (27)|1.71330559623E11|1.427055363E9| 1.008473468053E12|
+--------------------+----------------+------------

In [16]:
#Mini Insights
#1. Top 10 Countries by Total Vaccinations

df = spark.read.table("covid_country_totals")

top_vaccinated_countries = df.orderBy("total_vaccinations", ascending=False)
top_vaccinated_countries.show(10)

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 18, Finished, Available, Finished)

+--------------------+----------------+-------------+------------------+
|            location|     total_cases| total_deaths|total_vaccinations|
+--------------------+----------------+-------------+------------------+
|               World|7.15697182101E11|8.009960985E9|1.3645076016423E13|
|                Asia|2.52167317226E11| 1.79040488E9|  9.17567589714E12|
|Upper-middle-inco...|2.18037426784E11|3.265564653E9| 5.700644092399E12|
|Lower-middle-inco...|1.00873065026E11|1.385814325E9| 4.722738799023E12|
|High-income count...|3.93562796886E11|3.303895414E9| 2.954085737029E12|
|               India| 5.2079485202E10| 6.36656068E8| 2.099623546783E12|
|               China| 5.9789650705E10|  7.9278696E7|   1.7127640577E12|
|              Europe|2.36756684151E11|2.361106039E9|  1.47689087882E12|
|       North America|1.27073670231E11|1.880636077E9| 1.185558142946E12|
| European Union (27)|1.71330559623E11|1.427055363E9| 1.008473468053E12|
+--------------------+----------------+------------

In [17]:
#2. Top 5 Countries with Highest Death-to-Case Ratio

from pyspark.sql.functions import when, col

top_death_ratio = df.withColumn(
    "death_rate_pct",
    when(col("total_cases") > 0, (col("total_deaths") * 100.0) / col("total_cases"))
).orderBy("death_rate_pct", ascending=False)

top_death_ratio.select("location", "death_rate_pct").show(5)

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 19, Finished, Available, Finished)

+--------+------------------+
|location|    death_rate_pct|
+--------+------------------+
|   Yemen|18.725605233529926|
|   Sudan| 7.615696153132691|
|    Peru| 5.900691744605385|
|   Syria| 5.699421485299383|
|  Mexico| 5.333296733502787|
+--------+------------------+
only showing top 5 rows



In [18]:
#3. Monthly Trend of Vaccination for a Country (e.g., India)
monthly_df = spark.read.table("covid_monthly_country_summary")

monthly_df.filter("location = 'India'") \
    .orderBy("year", "month") \
    .select("year", "month", "monthly_vaccinations") \
    .show()


StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 20, Finished, Available, Finished)

+----+-----+--------------------+
|year|month|monthly_vaccinations|
+----+-----+--------------------+
|2020|    1|                 0.0|
|2020|    2|                 0.0|
|2020|    3|                 0.0|
|2020|    4|                 0.0|
|2020|    5|                 0.0|
|2020|    6|                 0.0|
|2020|    7|                 0.0|
|2020|    8|                 0.0|
|2020|    9|                 0.0|
|2020|   10|                 0.0|
|2020|   11|                 0.0|
|2020|   12|                 0.0|
|2021|    1|           3758843.0|
|2021|    2|         1.0078106E7|
|2021|    3|          5.081663E7|
|2021|    4|         7.7932654E7|
|2021|    5|         5.5496176E7|
|2021|    6|        1.18708244E8|
|2021|    7|         1.1840429E8|
|2021|    8|        1.40320023E8|
+----+-----+--------------------+
only showing top 20 rows



In [19]:
#4. Top 3 Continents by Monthly Vaccinations in Most Recent Month
monthly_continent_df = spark.read.table("covid_monthly_continent_summary")

# Find the latest year and month
latest_date = monthly_continent_df.select("year", "month").orderBy("year", "month", ascending=False).limit(1).collect()[0]
latest_year, latest_month = latest_date["year"], latest_date["month"]

# Now get top 3 continents
monthly_continent_df.filter((col("year") == latest_year) & (col("month") == latest_month)) \
    .orderBy("monthly_vaccinations", ascending=False) \
    .show(3)


StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 21, Finished, Available, Finished)

+---------+----+-----+-------------+--------------+--------------------+
|continent|year|month|monthly_cases|monthly_deaths|monthly_vaccinations|
+---------+----+-----+-------------+--------------+--------------------+
|     NULL|2024|    8|     167148.0|        2595.0|             32280.0|
|     Asia|2024|    8|       4515.0|          17.0|               257.0|
|   Europe|2024|    8|      39047.0|         162.0|               138.0|
+---------+----+-----+-------------+--------------+--------------------+
only showing top 3 rows



In [20]:
#Saving One of These as a View/Table for Dashboarding
top_vaccinated_countries.write.mode("overwrite").saveAsTable("top_10_vaccinated_countries")

StatementMeta(, 5120bf40-4c84-4675-8c77-401656a92780, 22, Finished, Available, Finished)