In [0]:
# Import required libraries
from pyspark.dbutils import DBUtils
from pyspark.sql import SparkSession

# Initializing the spark session

In [0]:
# Initialize Spark session and DBUtils
spark = SparkSession.builder.getOrCreate()
dbutils = DBUtils(spark)

# Get the secret values from the secret scope

In [0]:
# Get the secret values from the secret scope
client_id = dbutils.secrets.get(scope="key-vault-scope", key="clientid")
client_secret = dbutils.secrets.get(scope="key-vault-scope", key="applicationkey")
tenant_id = dbutils.secrets.get(scope="key-vault-scope", key="tenantid")

In [0]:
# Update the configs dictionary with the secret values
configs = {
    "fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": client_id,
    "fs.azure.account.oauth2.client.secret": client_secret,
    "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"
}


# Mount the Data to databricks

In [0]:
# Define the mount point
mount_point = "/mnt/covid-19-data"

try:
    dbutils.fs.unmount(mount_point)
except Exception as e:
    print(f"Directory was not mounted: {e}")

# Mount the Azure Data Lake Storage using the updated configs
dbutils.fs.mount(
    source="abfss://covid-19-data@covidstoragefiles.dfs.core.windows.net", # container@resourcegroup
    mount_point=mount_point,
    extra_configs=configs
)

Directory was not mounted: An error occurred while calling o392.unmount.
: java.rmi.RemoteException: java.lang.IllegalArgumentException: requirement failed: Directory not mounted: /mnt/covid-19-data; nested exception is: 
	java.lang.IllegalArgumentException: requirement failed: Directory not mounted: /mnt/covid-19-data
	at com.databricks.backend.daemon.data.client.DbfsClient.send0(DbfsClient.scala:135)
	at com.databricks.backend.daemon.data.client.DbfsClient.sendIdempotent(DbfsClient.scala:69)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.unmount(DBUtilsCore.scala:1328)
	at com.databricks.backend.daemon.dbutils.DBUtilsCore.$anonfun$unmount$1(DBUtilsCore.scala:1344)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:571)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:667)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:685)
	at com.databricks.logg

True

# Reading the data

In [0]:
path = "/mnt/covid-19-data/raw_data"
countries = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/country.csv")
states = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/state.csv")
cities = spark.read.format("csv").option("header","true").option("inferSchema","true").load(f"{path}/city.csv")
     

# printing data and schema

In [0]:
countries.show()
countries.printSchema()

+--------+----------+---------+------+---------+-----+--------+-----------------+-----------------------+----+----+----+--------------+-----------------+-------------+-----------------------+-----------------+----------------------+------------------------------+-----------------------------------+---------------------+--------------+---------------+----------------+------------------+-------------------------+-------------------------+----------------+------------------------+----------------------+-------------------------+---------------------------+---------------------------+---------------------------+--------+---------+----------+-----------+-----------+-----------+------------+---------+-------------------+------------------+------------+--------+--------+
|      id|      date|confirmed|deaths|recovered|tests|vaccines|people_vaccinated|people_fully_vaccinated|hosp| icu|vent|school_closing|workplace_closing|cancel_events|gatherings_restrictions|transport_closing|stay_home_restri

In [0]:
states.show()
states.printSchema()

+--------+----------+---------+------+---------+-----+--------+-----------------+-----------------------+----+----+----+--------------+-----------------+-------------+-----------------------+-----------------+----------------------+------------------------------+-----------------------------------+---------------------+--------------+---------------+----------------+------------------+-------------------------+-------------------------+----------------+------------------------+----------------------+-------------------------+---------------------------+---------------------------+---------------------------+----------+----------------+----------+-----------+-----------+-----------+------------+---------+-------------------+------------------+------------+--------+--------+
|      id|      date|confirmed|deaths|recovered|tests|vaccines|people_vaccinated|people_fully_vaccinated|hosp| icu|vent|school_closing|workplace_closing|cancel_events|gatherings_restrictions|transport_closing|stay_ho

In [0]:
cities.show()
cities.printSchema()

+--------+----------+---------+------+---------+-----+--------+-----------------+-----------------------+----+----+----+--------------+-----------------+-------------+-----------------------+-----------------+----------------------+------------------------------+-----------------------------------+---------------------+--------------+---------------+----------------+------------------+-------------------------+-------------------------+----------------+------------------------+----------------------+-------------------------+---------------------------+---------------------------+---------------------------+----------------+-----------------+----------+-----------+-----------+-----------+------------+---------+--------------------+------------------+------------+--------+------------+
|      id|      date|confirmed|deaths|recovered|tests|vaccines|people_vaccinated|people_fully_vaccinated|hosp| icu|vent|school_closing|workplace_closing|cancel_events|gatherings_restrictions|transport_clo

# transforming countries

In [0]:
from pyspark.sql.functions import col, to_date

# Assuming your DataFrame is named 'countries'
transformed_countries = countries.select(
    col("id"),
    to_date(col("date")).alias("date"),
    col("confirmed").cast("integer"),
    col("deaths").cast("integer"),
    col("recovered").cast("integer"),
    col("tests").cast("long"),
    col("vaccines").cast("long"),
    col("people_vaccinated").cast("integer"),
    col("people_fully_vaccinated").cast("integer"),
    col("hosp").cast("integer"),
    col("icu").cast("integer"),
    col("vent").cast("integer"),
    col("population").cast("integer"),
    col("administrative_area_level_1")
)

# Show the transformed DataFrame
transformed_countries.show()

+--------+----------+---------+------+---------+-----+--------+-----------------+-----------------------+----+----+----+----------+---------------------------+
|      id|      date|confirmed|deaths|recovered|tests|vaccines|people_vaccinated|people_fully_vaccinated|hosp| icu|vent|population|administrative_area_level_1|
+--------+----------+---------+------+---------+-----+--------+-----------------+-----------------------+----+----+----+----------+---------------------------+
|8320791a|2020-03-13|        2|  NULL|     NULL| NULL|    NULL|             NULL|                   NULL|NULL|NULL|NULL|      3533|             Grand Princess|
|8320791a|2020-03-14|        2|  NULL|     NULL| NULL|    NULL|             NULL|                   NULL|NULL|NULL|NULL|      3533|             Grand Princess|
|8320791a|2020-03-15|        2|  NULL|     NULL| NULL|    NULL|             NULL|                   NULL|NULL|NULL|NULL|      3533|             Grand Princess|
|8320791a|2020-03-16|        2|  NULL|  

In [0]:
transformed_countries = transformed_countries.withColumnRenamed("id", "country_id") \
    .withColumnRenamed("date", "date") \
    .withColumnRenamed("confirmed", "total_confirmed") \
    .withColumnRenamed("deaths", "total_deaths") \
    .withColumnRenamed("recovered", "total_recovered") \
    .withColumnRenamed("tests", "total_tests") \
    .withColumnRenamed("vaccines", "total_vaccines") \
    .withColumnRenamed("people_vaccinated", "total_people_vaccinated") \
    .withColumnRenamed("people_fully_vaccinated", "total_people_fully_vaccinated") \
    .withColumnRenamed("hosp", "total_hosp") \
    .withColumnRenamed("icu", "total_icu") \
    .withColumnRenamed("vent", "total_vent") \
    .withColumnRenamed("population", "total_population") \
    .withColumnRenamed("administrative_area_level_1", "country")

transformed_countries.printSchema()

root
 |-- country_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- total_confirmed: integer (nullable = true)
 |-- total_deaths: integer (nullable = true)
 |-- total_recovered: integer (nullable = true)
 |-- total_tests: long (nullable = true)
 |-- total_vaccines: long (nullable = true)
 |-- total_people_vaccinated: integer (nullable = true)
 |-- total_people_fully_vaccinated: integer (nullable = true)
 |-- total_hosp: integer (nullable = true)
 |-- total_icu: integer (nullable = true)
 |-- total_vent: integer (nullable = true)
 |-- total_population: integer (nullable = true)
 |-- country: string (nullable = true)



# transforming states

In [0]:
from pyspark.sql.functions import col, to_date

# Assuming your DataFrame is named 'states'
transformed_states = states.select(
    col("id").alias("state_id"),
    to_date(col("date")).alias("date"),
    col("confirmed").cast("integer").alias("total_confirmed"),
    col("deaths").cast("integer").alias("total_deaths"),
    col("recovered").cast("integer").alias("total_recovered"),
    col("tests").cast("long").alias("total_tests"),
    col("vaccines").cast("long").alias("total_vaccines"),
    col("people_vaccinated").cast("integer").alias("total_people_vaccinated"),
    col("people_fully_vaccinated").cast("integer").alias("total_people_fully_vaccinated"),
    col("hosp").cast("integer").alias("total_hosp"),
    col("icu").cast("integer").alias("total_icu"),
    col("vent").cast("integer").alias("total_vent"),
    col("population").cast("integer").alias("total_population"),
    col("administrative_area_level_1").alias("country"), 
    col("administrative_area_level_2").alias("state")
)

# Show the transformed DataFrame
transformed_states.show()

# show schema
transformed_states.printSchema()

+--------+----------+---------------+------------+---------------+-----------+--------------+-----------------------+-----------------------------+----------+---------+----------+----------------+--------+--------+
|state_id|      date|total_confirmed|total_deaths|total_recovered|total_tests|total_vaccines|total_people_vaccinated|total_people_fully_vaccinated|total_hosp|total_icu|total_vent|total_population| country|   state|
+--------+----------+---------------+------------+---------------+-----------+--------------+-----------------------+-----------------------------+----------+---------+----------+----------------+--------+--------+
|0042529a|2020-01-12|              0|           0|           NULL|       NULL|          NULL|                   NULL|                         NULL|      NULL|     NULL|      NULL|          487976|Thailand|Yasothon|
|0042529a|2020-01-13|              0|           0|           NULL|       NULL|          NULL|                   NULL|                       

# transorm city data

In [0]:
from pyspark.sql.functions import col, to_date

# Assuming your DataFrame is named 'cities'
transformed_cities = cities.select(
    col("id").alias("city_id"),
    to_date(col("date")).alias("date"),
    col("confirmed").cast("integer").alias("total_confirmed"),
    col("deaths").cast("integer").alias("total_deaths"),
    col("recovered").cast("integer").alias("total_recovered"),
    col("tests").cast("long").alias("total_tests"),
    col("vaccines").cast("long").alias("total_vaccines"),
    col("people_vaccinated").cast("integer").alias("total_people_vaccinated"),
    col("people_fully_vaccinated").cast("integer").alias("total_people_fully_vaccinated"),
    col("hosp").cast("integer").alias("total_hosp"),
    col("icu").cast("integer").alias("total_icu"),
    col("vent").cast("integer").alias("total_vent"),
    col("school_closing").cast("integer").alias("school_closing"),
    col("workplace_closing").cast("integer").alias("workplace_closing"),
    col("cancel_events").cast("integer").alias("cancel_events"),
    col("gatherings_restrictions").cast("integer").alias("gatherings_restrictions"),
    col("transport_closing").cast("integer").alias("transport_closing"),
    col("stay_home_restrictions").cast("integer").alias("stay_home_restrictions"),
    col("internal_movement_restrictions").cast("integer").alias("internal_movement_restrictions"),
    col("international_movement_restrictions").cast("integer").alias("international_movement_restrictions"),
    col("information_campaigns").cast("integer").alias("information_campaigns"),
    col("testing_policy").cast("integer").alias("testing_policy"),
    col("contact_tracing").cast("integer").alias("contact_tracing"),
    col("facial_coverings").cast("integer").alias("facial_coverings"),
    col("vaccination_policy").cast("integer").alias("vaccination_policy"),
    col("administrative_area_level_1").alias("country"),
    col("administrative_area_level_2").alias("state"),
    col("administrative_area_level_3").alias("city")
)

# Show the transformed DataFrame
transformed_cities.show()

# Show the schema
transformed_cities.printSchema()

+--------+----------+---------------+------------+---------------+-----------+--------------+-----------------------+-----------------------------+----------+---------+----------+--------------+-----------------+-------------+-----------------------+-----------------+----------------------+------------------------------+-----------------------------------+---------------------+--------------+---------------+----------------+------------------+-------------+-------+------+
| city_id|      date|total_confirmed|total_deaths|total_recovered|total_tests|total_vaccines|total_people_vaccinated|total_people_fully_vaccinated|total_hosp|total_icu|total_vent|school_closing|workplace_closing|cancel_events|gatherings_restrictions|transport_closing|stay_home_restrictions|internal_movement_restrictions|international_movement_restrictions|information_campaigns|testing_policy|contact_tracing|facial_coverings|vaccination_policy|      country|  state|  city|
+--------+----------+---------------+---------

In [0]:
total_rows_cities = transformed_cities.count()
total_rows_cities

13277719

In [0]:
path = "/mnt/covid-19-data/transformed-data"
transformed_countries.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/countries")
transformed_states.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/states")
transformed_cities.repartition(1).write.mode("overwrite").option("header", "true").csv(f"{path}/cities")