In [0]:
import os

account_name = os.getenv('AZURE_STORAGE_ACCOUNT_NAME')
account_key = os.getenv('AZURE_STORAGE_ACCOUNT_KEY')

dbutils.fs.mount(
    source=f"wasbs://covid@{account_name}.blob.core.windows.net/bronze",
    mount_point=f"/mnt/bronze",
    extra_configs={
        f"fs.azure.account.key.{account_name}.blob.core.windows.net": account_key
    },
)

Out[25]: True

In [0]:
# silver

dbutils.fs.mount(
    source=f"wasbs://covid@{account_name}.blob.core.windows.net/prata",
    mount_point=f"/mnt/prata",
    extra_configs={
        f"fs.azure.account.key.{account_name}.blob.core.windows.net": account_key
    },
)

Out[26]: True

In [0]:
# gold

dbutils.fs.mount(
    source=f"wasbs://covid@{account_name}.blob.core.windows.net/ouro",
    mount_point=f"/mnt/ouro",
    extra_configs={
        f"fs.azure.account.key.{account_name}.blob.core.windows.net": account_key
    },
)

Out[27]: True

In [0]:
# criar database

spark.sql("CREATE DATABASE IF NOT EXISTS covid")

Out[28]: DataFrame[]

In [0]:
# ler dados da camada bronze

df_covid_br_bronze = (
    spark.read.format("csv")
    .options(header="true", infer_schema="true", delimiter=";")
    .load("dbfs:/mnt/bronze/SRAG_01-06.csv")
)

In [0]:
# limpeza dados vazios

df_covid_br_silver = df_covid_br_bronze.filter(
    df_covid_br_bronze.ID_REGIONA.isNotNull()
)

In [0]:
# transforma arquivos em parquet

df_covid_br_silver.write.format("delta").mode("overwrite").save("/mnt/prata/br_covid")

In [0]:
# camada gold
from pyspark.sql.functions import concat, to_date, year, month

In [0]:
df_covid_br_silver = spark.read.format("delta").load("/mnt/prata/br_covid")

In [0]:
# converte as datas do tipo string para date

df_covid_br_silver = df_covid_br_silver.withColumn(
    "DT_NOTIFIC", to_date(df_covid_br_silver["DT_NOTIFIC"])
)
df_covid_br_silver = df_covid_br_silver.withColumn(
    "DT_SIN_PRI", to_date(df_covid_br_silver["DT_SIN_PRI"])
)
df_covid_br_silver = df_covid_br_silver.withColumn(
    "DT_NASC", to_date(df_covid_br_silver["DT_NASC"])
)

In [0]:
# adicionar coluna data da carga
from pyspark.sql.functions import current_date

df_covid_br_gold = df_covid_br_silver.withColumn("DT_CARGA", current_date())

In [0]:
# renomeando colunas

df_covid_br_gold = (
    df_covid_br_gold.withColumnRenamed("DT_NOTIFIC", "DT_NOTIFICACAO")
    .withColumnRenamed("DT_SIN_PRI", "DT_PRIMEIROS_SINTOMAS")
    .withColumnRenamed("DT_NASC", "DT_NASCIMENTO")
)

In [0]:
# adicionando colunas YEAR e MONTH

df_covid_br_gold = df_covid_br_gold.withColumn(
    "YEAR", year(df_covid_br_gold["DT_NOTIFICACAO"])
)
df_covid_br_gold = df_covid_br_gold.withColumn(
    "MONTH", month(df_covid_br_gold["DT_NOTIFICACAO"])
)

In [0]:
df_covid_br_gold.write.format("delta").mode("overwrite").option(
    "mergeSchema", "true"
).partitionBy("YEAR", "MONTH").save("/mnt/gold/br_covid")

In [0]:
# criar tabela agregada

from pyspark.sql.functions import count

selected_columns = [
    "year",
    "month",
    "DT_NOTIFICACAO",
    "SG_UF_NOT",
    "ID_MUNICIP",
    "DT_CARGA",
]
df_select_columns_gold = df_covid_br_gold.select(selected_columns)
grouped_df = df_select_columns_gold.groupBy(selected_columns).agg(
    count("*").alias("count")
)

In [0]:
grouped_df.write.format("delta").mode("overwrite").partitionBy("YEAR", "MONTH").save(
    "/mnt/gold/br_covid_gold_agg"
)