## Mount bronze, silver and gold

In [0]:
dbutils.fs.mount(
    source = "wasbs://bronze@azuredatabricks2025.blob.core.windows.net",
    mount_point = "/mnt/azuredatabricks2025/bronze",
    extra_configs = {"fs.azure.account.key.azuredatabricks2025.blob.core.windows.net":"Ve2dpVvh2OgTxe/x0zPI+4Ur1GF7laVxQPasdn/dXFzusCcohVGR3W9xHaMrY6WIKLToDMiW1GDY+AStkofb2g=="}
)

In [0]:
dbutils.fs.mount(
    source = "wasbs://silver@azuredatabricks2025.blob.core.windows.net",
    mount_point = "/mnt/azuredatabricks2025/silver",
    extra_configs = {"fs.azure.account.key.azuredatabricks2025.blob.core.windows.net":"Ve2dpVvh2OgTxe/x0zPI+4Ur1GF7laVxQPasdn/dXFzusCcohVGR3W9xHaMrY6WIKLToDMiW1GDY+AStkofb2g=="}
)

In [0]:
dbutils.fs.mount(
    source = "wasbs://gold@azuredatabricks2025.blob.core.windows.net",
    mount_point = "/mnt/azuredatabricks2025/gold",
    extra_configs = {"fs.azure.account.key.azuredatabricks2025.blob.core.windows.net":"Ve2dpVvh2OgTxe/x0zPI+4Ur1GF7laVxQPasdn/dXFzusCcohVGR3W9xHaMrY6WIKLToDMiW1GDY+AStkofb2g=="}
)

## Creating a database

In [0]:
%sql
CREATE DATABASE IF NOT EXISTS COVID

## Reading csv files from bronze layer

In [0]:
df_covid_br_bronze = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .option("delimiter", ";")\
    .load("/mnt/azuredatabricks2025/bronze/SRAG_01-06_2025.csv")

In [0]:
df_covid_br_bronze.display()

## Siver layer

In [0]:
df_covid_br_silver = df_covid_br_bronze.filter(df_covid_br_bronze.ID_REGIONA.isNotNull())

Writing a parquet file with delta!

In [0]:
df_covid_br_silver.write.format("delta").mode("overwrite").save("/mnt/azuredatabricks2025/silver/covid_br")

now, take a look at Azure silver container. We have:

delta log (json? where can I see it?) </br>
part-0000X (parquet files: where can I check it?)

In [0]:
# Garanta que a estrutura Delta existe

display(spark.read.format("delta").load("/mnt/azuredatabricks2025/silver/covid_br"))

In [0]:
df = spark.read.format("delta").load("/mnt/azuredatabricks2025/silver/covid_br")

df.write.mode("overwrite").saveAsTable("silver.br_covid")

In [0]:
%sql
SELECT * FROM pipelinedatabricks.silver.br_covid LIMIT 10;

CREATE TABLE IF NOT EXISTS pipelinedatabricks.covid.br_covid
USING DELTA
LOCATION 'dbfs:/mnt/azuredatabricks2025/silver/covid_br'

In [0]:
# import function for gold layer:
from pyspark.sql.functions import concat, to_date, year, month, current_timestamp, count

In [0]:
df_covid_br_silver = spark.read.format("delta").load("/mnt/azuredatabricks2025/silver/covid_br")

In [0]:
# Convert strings to dates

df_covid_br_silver = df_covid_br_silver.withColumn("DT_NOTIFIC", to_date(df_covid_br_silver['DT_NOTIFIC']))
df_covid_br_silver = df_covid_br_silver.withColumn("DT_SIN_PRI", to_date(df_covid_br_silver['DT_SIN_PRI']))
df_covid_br_silver = df_covid_br_silver.withColumn("DT_NASC", to_date(df_covid_br_silver['DT_NASC']))

In [0]:
display(df_covid_br_silver)


In [0]:
# Adding a new collumn with the current timestamp

df_covid_br_gold = df_covid_br_silver.withColumn("load_timestamp", current_timestamp())

#Renaming columns
df_covid_br_gold = df_covid_br_gold.withColumnRenamed("DT_NOTIFIC", "DT_NOTIFICACAO")\
                                   .withColumnRenamed("DT_SIN_PRI", "DT_PRIMEIROS_SINTOMAS")\
                                   .withColumnRenamed("DT_NASC", "DT_NASCIMENTO")


In [0]:
# Adding new collumns: year and month of the notification
df_covid_br_gold = df_covid_br_gold.withColumn("YEAR_NOTIFICACAO", year("DT_NOTIFICACAO"))\
                                    .withColumn("MONTH_NOTIFICACAO", month("DT_NOTIFICACAO"))

In [0]:
# Saving into Delta Lake, partion by year and month of notification
df_covid_br_gold.write.mode("overwrite").partitionBy("YEAR_NOTIFICACAO", "MONTH_NOTIFICACAO").save("/mnt/azuredatabricks2025/gold/covid_br")

In [0]:
# Creating the gold schema:
spark.sql("CREATE SCHEMA IF NOT EXISTS gold")


In [0]:
# Creating the gold table:
spark.sql("DROP TABLE IF EXISTS gold.br_covid") 
spark.sql("CREATE TABLE gold.br_covid")   

#df_covid_br_gold.write.mode("overwrite").saveAsTable("gold.br_covid")

In [0]:
df_covid_br_gold.write\
    .mode("overwrite")\
    .partitionBy("YEAR_NOTIFICACAO", "MONTH_NOTIFICACAO")\
    .option("mergeSchema", "true")\
    .option("overwriteSchema", "true")\
    .saveAsTable("gold.br_covid")

In [0]:
%sql
select * from pipelinedatabricks.gold.br_covid limit 10;

In [0]:
# Aggregate the number of cases by month, DT_NOTIFICACAO, SG_UF_NOT and ID_MUNICIP:
select_columns = ["YEAR_NOTIFICACAO", "MONTH_NOTIFICACAO", "SG_UF_NOT", "ID_MUNICIP", "load_timestamp"]
df_select_columns_gold = df_covid_br_gold.select(select_columns)
grouped_df = df_select_columns_gold.groupBy("YEAR_NOTIFICACAO", "MONTH_NOTIFICACAO", "SG_UF_NOT", "ID_MUNICIP").agg(count("*").alias("cases_counting"))

In [0]:
display(grouped_df)

In [0]:
# Saving the aggregated data into gold inside the Delta Lake:

grouped_df.write.mode("overwrite").partitionBy("YEAR_NOTIFICACAO", "MONTH_NOTIFICACAO").save("/mnt/azuredatabricks2025/gold/covid_br_cases_counting")

In [0]:
# Saving table:
# Creating the gold agg table:
spark.sql("DROP TABLE IF EXISTS gold.covid_br_cases_counting") 
spark.sql("CREATE TABLE gold.covid_br_cases_counting")   


grouped_df.write\
    .mode("overwrite")\
    .partitionBy("YEAR_NOTIFICACAO", "MONTH_NOTIFICACAO")\
    .option("mergeSchema", "true")\
    .option("overwriteSchema", "true")\
    .saveAsTable("gold.covid_br_cases_counting")