In [None]:
#Load, Transform, Persist Pipeline

#1-mount the data lakes
#2-loads csvs from landing data lake
#3-convert csvs to parquet and move then to processing data lake
#4-create sql database
#5-create tables based on parquet format files
#6-specific analysis wil be moved to curated data lake and then loaded into sql tables
#7-powerbi application reads directly from sql tables at databricks rest api service

# Mounting Data lakes

In [None]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "5c91bc0d-64e8-4c79-866d-16de4bc997a3", 
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="vacinacao-scope6",key="vacinacao-secret6"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/abd4a16d-59d9-4bce-afca-7bccec6cabf8/oauth2/token"}

In [None]:
# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://landing@vacinacaocovid19.dfs.core.windows.net/",
  mount_point = "/mnt/landing",
  extra_configs = configs)

dbutils.fs.ls("/mnt/landing/")

In [None]:
dbutils.fs.mount(
  source = "abfss://processing@vacinacaocovid19.dfs.core.windows.net/",
  mount_point = "/mnt/processing",
  extra_configs = configs)

#   dbutils.fs.ls("/mnt/processing")

In [None]:
dbutils.fs.mount(
  source = "abfss://curated@vacinacaocovid19.dfs.core.windows.net/",
  mount_point = "/mnt/curated",
  extra_configs = configs)

  dbutils.fs.ls("/mnt/curated")

# Readings CSVs in Landing Data Lake to DataFrames

In [None]:
vacinacao01 = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dados1.csv")

 
#display the dataframe
display(vacinacao01)

In [None]:
vacinacao01.printSchema()

In [None]:
#read the rest of csv files to the respectives dataframes

vacinacao02 = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dados2.csv")
vacinacao03 = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dados3.csv")
vacinacao04 = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dados4.csv")
vacinacao05 = spark.read.format("csv").option("inferSchema", "true").option("header","true").option("delimiter",",").load("/mnt/landing/dados5.csv")

# Write Full Parquet Datasets to Processing Data lake

In [None]:
vacinacao01.write.mode("overwrite").parquet("/mnt/processing/vacinacao01.parquet")
vacinacao02.write.mode("overwrite").parquet("/mnt/processing/vacinacao02.parquet")
vacinacao03.write.mode("overwrite").parquet("/mnt/processing/vacinacao03.parquet")
vacinacao04.write.mode("overwrite").parquet("/mnt/processing/vacinacao04.parquet")
vacinacao05.write.mode("overwrite").parquet("/mnt/processing/vacinacao05.parquet")

# Create SQL Tables based on Parquet files at Processing Data Lake

In [None]:
%sql
CREATE DATABASE IF NOT EXISTS vacinacao

In [None]:
%sql
-- Full parquet
CREATE TABLE IF NOT EXISTS vacinacao.vacinacao01 USING PARQUET OPTIONS (path "/mnt/processing/vacinacao01.parquet", header "true", inferSchema "true");
CREATE TABLE IF NOT EXISTS vacinacao.vacinacao02 USING PARQUET OPTIONS (path "/mnt/processing/vacinacao02.parquet", header "true", inferSchema "true");
CREATE TABLE IF NOT EXISTS vacinacao.vacinacao03 USING PARQUET OPTIONS (path "/mnt/processing/vacinacao03.parquet", header "true", inferSchema "true");
CREATE TABLE IF NOT EXISTS vacinacao.vacinacao04 USING PARQUET OPTIONS (path "/mnt/processing/vacinacao04.parquet", header "true", inferSchema "true");
CREATE TABLE IF NOT EXISTS vacinacao.vacinacao05 USING PARQUET OPTIONS (path "/mnt/processing/vacinacao05.parquet", header "true", inferSchema "true");

In [None]:
%sql
SELECT * FROM vacinacao.vacinacao01;

In [None]:
%sql
(SELECT * FROM vacinacao.vacinacao01)
UNION
(SELECT * FROM vacinacao.vacinacao02)
UNION
(SELECT * FROM vacinacao.vacinacao03)
UNION
(SELECT * FROM vacinacao.vacinacao04)
UNION
(SELECT * FROM vacinacao.vacinacao05);

In [None]:
vacinacao = spark.sql("""
SELECT 
paciente_id AS ID_Paciente,
paciente_idade AS Idade,
paciente_dataNascimento AS Data_Nasciemento,
paciente_enumSexoBiologico AS Sexo,
paciente_racaCor_valor AS Raca_Cor,
CASE WHEN paciente_endereco_nmMunicipio IS NULL
THEN '0'
ELSE paciente_endereco_nmMunicipio END AS Cidade,
CASE WHEN paciente_endereco_nmPais IS NULL
THEN '0'
ELSE paciente_endereco_nmPais END AS Pais,
paciente_endereco_uf AS Estado,
paciente_endereco_cep AS CEP,
CASE WHEN paciente_nacionalidade_enumNacionalidade IS NULL
THEN '0'
ELSE paciente_nacionalidade_enumNacionalidade END AS Nacionalidade,
estabelecimento_municipio_nome AS Cidade_Posto,
estabelecimento_uf AS Estado_Posto,
vacina_fabricante_nome AS Fabricante_Vacina,
vacina_dataAplicacao AS Data_Aplicacao
FROM
((SELECT * FROM vacinacao.vacinacao01)
UNION
(SELECT * FROM vacinacao.vacinacao02)
UNION
(SELECT * FROM vacinacao.vacinacao03)
UNION
(SELECT * FROM vacinacao.vacinacao04)
UNION
(SELECT * FROM vacinacao.vacinacao05))
                      """);

In [None]:
vacinacao.write.option("header",True).option("delimiter",",").mode("overwrite").csv("/mnt/curated/vacinacao.csv")

In [None]:
vacinacaoMaceio = spark.sql("select * from vacinacao where Cidade = 'MACEIO'")