### Definição e Tratamento de Dados
Esse projeto será construido levando em consideração o modelo de arquitetura sugerido na seguinte documentação ->
[Medallion Schema](https://www.databricks.com/sites/default/files/inline-images/building-data-pipelines-with-delta-lake-120823.png?v=1702318922)

Nessa camada executamos a definição e tratamento dos Dados:
- Etapa de tratamento dos dados
- São feitos tratamentos e transformações nos dados

In [0]:
%run ../../Lakehouse/functions/functions

#### Variáveis de Path - Landing-Bronze

##### Diesel e GNV

In [0]:
path_bronze_diesel_gnv = '/databricks/bronze/anp/diesel_gnv/'
path_silver_diesel_gnv = '/databricks/silver/anp/tb_diesel_gnv/'

##### Etanol e Gasolina

In [0]:
path_bronze_etanol_gasol = '/databricks/bronze/anp/etanol_gasolina/'
path_silver_etanol_gasol = '/databricks/silver/anp/tb_etanol_gasolina/'

##### GLP

In [0]:
path_bronze_glp = '/databricks/bronze/anp/glp/'
path_silver_glp = '/databricks/silver/anp/tb_glp/'

#### Leitura - Tabela Diesel GNV - Camada Bronze-Prata
- Leitura em delta
- Definição de Schema

##### Leitura dos Dados

In [0]:
df_diesel_gnv = spark.read.format('delta')\
    .load(path_bronze_diesel_gnv)\
  .where('dt_carga = current_date()')

##### Transformação das colunas
- Exploração e identificação das inconsistências de dados
- Os dados da bronze chegam sem definição ou sem padrão, então é necessário trata-los e tipa-los.

##### Case Fictício - Tratamentos
- Precisamos que o campo de **nome da Região** seja criado, porém não temos nenhuma base de dimensão para utilizar e fazer esse De-para. Iremos aplicar manualmente conforme os dados.
- O campo de valor de compra não foi povoado, portanto, devemos definir pelo menos zero para poder atribuir um tipo de dado aquele campo.
- Colunas de datas não aceitarão serem definidas como data pois o formato default do ambiente que estamos usando é **yyyy-MM-dd**
- Tratar colunas de valores para definir como **decimal(3,2)**
- Demais campos não precisarão de outras definições visto que eles são de fato string

In [0]:
df_dsl_gnv_tratado = (
            df_diesel_gnv\
                        #---> Criação da nova coluna de região
                        .withColumn('Regiao_Nome', 
                                    when(col('Regiao_Sigla')=='SE','Sudeste')
                                    .when(col('Regiao_Sigla')=='NE','Nordeste')
                                    .when(col('Regiao_Sigla')=='N','Norte')
                                    .when(col('Regiao_Sigla')=='S','Sul')
                                    .when(col('Regiao_Sigla')=='CO','Centro-Oeste')
                                    )\
                        
                        #---> Casting da coluna de data
                        .withColumn('Data_da_Coleta',
                                    to_date(col('Data_da_Coleta'),'dd/MM/yyyy')
                                    )\

                        #---> Casting do campo númerico

                        # Preenchimento de coluna nula e definição do DataType
                        .withColumn('Valor_de_Compra',
                                    when(col('Valor_de_Compra').isNull(),0).cast('decimal(3,2)')
                                    )\

                        # Campo de Venda e DataType
                        .withColumn('Valor_de_Venda',
                                    regexp_replace(col('Valor_de_Venda'),'[,]','.').cast('decimal(3,2)')
                                    )

)


In [0]:
df_dsl_gnv_tratado.printSchema()

root
 |-- Regiao_Sigla: string (nullable = true)
 |-- Estado_Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ_da_Revenda: string (nullable = true)
 |-- Nome_da_Rua: string (nullable = true)
 |-- Numero_Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data_da_Coleta: date (nullable = true)
 |-- Valor_de_Venda: decimal(3,2) (nullable = true)
 |-- Valor_de_Compra: decimal(3,2) (nullable = true)
 |-- Unidade_de_Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)
 |-- Regiao_Nome: string (nullable = true)



##### Delete dos Dados - Silver 

In [0]:
# Leitura da Delta Table
df_dsl_gnv_tratado = deltaTable.forPath(spark, path_silver_diesel_gnv)

# Merge visando deletar os dados que serão reprocessados
df_dsl_gnv_tratado.alias("silver") \
    .merge(
        df_dsl_gnv_tratado.alias("bronze"),
        """
        silver.CNPJ_da_Revenda = bronze.CNPJ_da_Revenda
        AND silver.Data_da_Coleta = bronze.Data_da_Coleta
        AND silver.Cep = bronze.Cep
        """
    ) \
    .whenMatchedDelete() \
    .execute()

# Merge visando inserir os registros que estão vindo na bronze
df_dsl_gnv_tratado.alias("silver") \
    .merge(
        df_dsl_gnv_tratado.alias("bronze"),
        """
        silver.CNPJ_da_Revenda = bronze.CNPJ_da_Revenda
        AND silver.Data_da_Coleta = bronze.Data_da_Coleta
        AND silver.Cep = bronze.Cep
        """
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

    # OBS é possível colocar ambas operações de delete e insert no mesmo merge(operação) porém, não inclui por opção própria.

##### Materialização da tabela no SQL Warehouse 

In [0]:
%sql
CREATE TABLE default.tb_price_diesel_gnv
USING delta LOCATION '/databricks/silver/anp/tb_diesel_gnv/'

#### Leitura - Tabela Etanol e Gasolina - Camada Bronze-Prata
- Leitura em delta
- Definição de Schema

##### Leitura dos Dados

In [0]:
df_etanol_gasol = spark.read.format('delta')\
    .load(path_bronze_etanol_gasol)\
  .where('dt_carga = current_date()')

##### Transformação das colunas

In [0]:
df_etanol_gasol_tratada = (
    df_etanol_gasol\
                        #---> Criação da nova coluna de região
                        .withColumn('Regiao_Nome', 
                                    when(col('Regiao_Sigla')=='SE','Sudeste')
                                    .when(col('Regiao_Sigla')=='NE','Nordeste')
                                    .when(col('Regiao_Sigla')=='N','Norte')
                                    .when(col('Regiao_Sigla')=='S','Sul')
                                    .when(col('Regiao_Sigla')=='CO','Centro-Oeste')
                                    )\
                        
                        #---> Casting da coluna de data
                        .withColumn('Data_da_Coleta',
                                    to_date(col('Data_da_Coleta'),'dd/MM/yyyy')
                                    )\

                        #---> Casting do campo númerico

                        # Preenchimento de coluna nula e definição do DataType
                        .withColumn('Valor_de_Compra',
                                    when(col('Valor_de_Compra').isNull(),0).cast('decimal(3,2)')
                                    )\

                        # Campo de Venda e DataType
                        .withColumn('Valor_de_Venda',
                                    regexp_replace(col('Valor_de_Venda'),'[,]','.').cast('decimal(3,2)')
                                    )
)

##### Delete dos Dados - Silver 

In [0]:
# Leitura da Delta Table
etanol_gasol_silver = deltaTable.forPath(spark, path_silver_etanol_gasol)

# Merge visando deletar os dados que serão reprocessados
etanol_gasol_silver.alias("silver") \
    .merge(
        df_etanol_gasol_tratada.alias("bronze"),
        """silver.CNPJ_da_Revenda = bronze.CNPJ_da_Revenda
        AND silver.Data_da_Coleta = bronze.Data_da_Coleta
        AND silver.Cep = bronze.Cep"""
    ) \
    .whenMatchedDelete() \
    .execute()

# Merge visando inserir os registros que estão vindo na bronze
etanol_gasol_silver.alias("silver") \
    .merge(
        df_etanol_gasol_tratada.alias("bronze"),
        """silver.CNPJ_da_Revenda = bronze.CNPJ_da_Revenda
        AND silver.Data_da_Coleta = bronze.Data_da_Coleta
        AND silver.Cep = bronze.Cep"""
    ) \
    .whenNotMatchedInsertAll() \
    .execute()

#### Leitura - Tabela Etanol e Gasolina - Camada Bronze-Prata
- Leitura em delta
- Definição de Schema

##### Leitura dos Dados

In [0]:
df_glp = spark.read.format('delta')\
    .load(path_bronze_glp)\
  .where('dt_carga = current_date()')


##### Transformação das colunas

In [0]:
df_glp_tratada = (
    df_glp\
                        #---> Criação da nova coluna de região
                        .withColumn('Regiao_Nome', 
                                    when(col('Regiao_Sigla')=='SE','Sudeste')
                                    .when(col('Regiao_Sigla')=='NE','Nordeste')
                                    .when(col('Regiao_Sigla')=='N','Norte')
                                    .when(col('Regiao_Sigla')=='S','Sul')
                                    .when(col('Regiao_Sigla')=='CO','Centro-Oeste')
                                    )\
                        
                        #---> Casting da coluna de data
                        .withColumn('Data_da_Coleta',
                                    to_date(col('Data_da_Coleta'),'dd/MM/yyyy')
                                    )\

                        #---> Casting do campo númerico

                        # Preenchimento de coluna nula e definição do DataType
                        .withColumn('Valor_de_Compra',
                                    when(col('Valor_de_Compra').isNull(),0).cast('decimal(3,2)')
                                    )\

                        # Campo de Venda e DataType
                        .withColumn('Valor_de_Venda',
                                    regexp_replace(col('Valor_de_Venda'),'[,]','.').cast('decimal(3,2)')
                                    )
)

#### Deleta dados do periodo de reprocessamento

**Deleta todos os registros da Tabela-alvo que foram carregados na Bronze:**

**Chaves de Delete**
  - CNPJ_da_Revenda
  - Data_da_Coleta
  - Cep

In [0]:
glp_silver = deltaTable.forPath(spark, path_silver_glp)

glp_silver.alias("silver") \
    .merge(
        df_glp_tratada.alias("bronze"),
        """silver.CNPJ_da_Revenda = bronze.CNPJ_da_Revenda
        AND silver.Data_da_Coleta = bronze.Data_da_Coleta
        AND silver.Cep = bronze.Cep"""
    ) \
    .whenMatchedDelete() \
    .execute()

#### Insere na Tabela os Registros Reprocessados

In [0]:
glp_silver.alias("silver") \
    .merge(
        df_glp_tratada.alias("bronze"),
        """silver.CNPJ_da_Revenda = bronze.CNPJ_da_Revenda
        AND silver.Data_da_Coleta = bronze.Data_da_Coleta
        AND silver.Cep = bronze.Cep"""
    ) \
    .whenNotMatchedInsertAll() \
    .execute()