####Read csv and create delta tables

In [0]:
#Read owid_subset.csv file and create a table in delta format 
df = spark.read.format("csv").option("header","true").load("abfss://raw@covid19storagedb.dfs.core.windows.net/owid_subset.csv")

countries = ['India','United Kingdom','United States','France','Germany','Italy','Spain']

df_sample = df.filter(df.location.isin(countries))

df_sample.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("databricks_cat.bronze.owid_subset")

In [0]:
#Read vaccinations_by_manufacturer.csv file and create a table in delta format 
df = spark.read.format("csv").option("header","true").load("abfss://raw@covid19storagedb.dfs.core.windows.net/vaccinations-by-manufacturer.csv")

df_sample = df.filter(df.location.isin(countries))
df_sample.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("databricks_cat.bronze.vaccinations_by_manufacturer")

####Set Catalog and Schema


In [0]:
spark.sql("USE CATALOG databricks_cat")
spark.sql("USE SCHEMA bronze")

DataFrame[]

####Explore both the tables

In [0]:
# List and read the tables in bronze schema
spark.sql("SHOW TABLES").show()

df_owid = spark.sql("SELECT * FROM owid_subset")
df_vaccinations = spark.sql("SELECT * FROM vaccinations_by_manufacturer")

df_owid.printSchema()
df_vaccinations.printSchema()

df_owid.show(5)
#df_vaccinations.show(5)

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|  bronze|         owid_subset|      false|
|  bronze|vaccinations_by_m...|      false|
+--------+--------------------+-----------+

root
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- total_cases: string (nullable = true)
 |-- new_cases: string (nullable = true)
 |-- total_deaths: string (nullable = true)
 |-- people_vaccinated: string (nullable = true)
 |-- population: string (nullable = true)
 |-- tests_per_case: string (nullable = true)
 |-- reproduction_rate: string (nullable = true)

root
 |-- location: string (nullable = true)
 |-- date: string (nullable = true)
 |-- vaccine: string (nullable = true)
 |-- total_vaccinations: string (nullable = true)

+--------+----------+-----------+---------+------------+-----------------+----------+--------------+-----------------+
|location|      date|total_cases|new_cases|total_

####Data Cleansing and Transformation
Let’s do basic cleanup like:

- Renaming columns
- Converting date format
- Dropping nulls (if any)

In [0]:
from pyspark.sql.functions import col, to_date

df_owid_clean = df_owid \
      .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))\
      .dropna(subset=["total_cases", "new_cases", "total_deaths", "people_vaccinated", "population", "tests_per_case", "reproduction_rate"])

df_owid_clean = df_owid_clean \
        .withColumn("total_cases", col("total_cases").cast("double")) \
        .withColumn("new_cases",col("new_cases").cast("double")) \
        .withColumn("total_deaths",col("total_deaths").cast("double")) \
        .withColumn("people_vaccinated",col("people_vaccinated").cast("double")) \
        .withColumn("population",col("population").cast("long")) \
        .withColumn("tests_per_case",col("tests_per_case").cast("double")) \
        .withColumn("reproduction_rate",col("reproduction_rate").cast("double"))

df_owid_clean.fillna({"total_cases": 0, 
                      "new_cases": 0, "total_deaths": 0, "people_vaccinated": 0, "population": 0, "tests_per_case": 0, "reproduction_rate": 0})
df_owid_clean.printSchema()    


root
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_cases: double (nullable = true)
 |-- new_cases: double (nullable = true)
 |-- total_deaths: double (nullable = true)
 |-- people_vaccinated: double (nullable = true)
 |-- population: long (nullable = true)
 |-- tests_per_case: double (nullable = true)
 |-- reproduction_rate: double (nullable = true)



In [0]:
df_vacc_clean = df_vaccinations \
      .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))\
      .dropna(subset=["total_vaccinations", "location"])

df_vacc_clean = df_vacc_clean \
        .withColumn("total_vaccinations", col("total_vaccinations").cast("double"))

df_vacc_clean.fillna({"total_vaccinations": 0})
df_vacc_clean.printSchema()

root
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- vaccine: string (nullable = true)
 |-- total_vaccinations: double (nullable = true)



####Save to Silver Layer

In [0]:
spark.sql("USE SCHEMA silver")

df_owid_clean.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("owid_subset_cleaned")

df_vacc_clean.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable("vaccinations_by_manufacturer_cleaned")

spark.sql("SHOW TABLES").show()
spark.sql("select * from owid_subset_cleaned limit 5").show()
spark.sql("select * from vaccinations_by_manufacturer_cleaned limit 5").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|  silver| owid_subset_cleaned|      false|
|  silver|vaccinations_by_m...|      false|
+--------+--------------------+-----------+

+--------+----------+-----------+---------+------------+-----------------+----------+--------------+-----------------+
|location|      date|total_cases|new_cases|total_deaths|people_vaccinated|population|tests_per_case|reproduction_rate|
+--------+----------+-----------+---------+------------+-----------------+----------+--------------+-----------------+
|  France|2020-12-27|  2338258.0|  83888.0|     63534.0|            616.0|  67813000|          35.7|             1.03|
|  France|2020-12-28|  2338258.0|      0.0|     63534.0|           1326.0|  67813000|          32.3|             1.05|
|  France|2020-12-29|  2338258.0|      0.0|     63534.0|           1716.0|  67813000|          27.0|             1.07|
|  France|2020-12-30|