<a href="https://colab.research.google.com/github/ORodrigo/my-first-blog/blob/master/ETL(Bronze%2C_Silver%2C_Gold).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [83]:
# BIBLIOTECAS

import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, trim, upper, when, regexp_replace, format_string, round
from pyspark.sql.types import StringType


In [92]:
# Inicializar SparkSession
spark = SparkSession.builder.appName("Orcamento-SP-2019").getOrCreate()

# Leitura dos arquivos CSV com charset UTF-8
df_despesas_bronze = spark.read.option("header", "true").option("charset", "UTF-8").csv("gdvDespesasExcel.csv")
df_receitas_bronze = spark.read.option("header", "true").option("charset", "UTF-8").csv("gdvReceitasExcel.csv")

# Função para remover caracteres especiais dos nomes das colunas
def clean_column_name(col_name):
    return re.sub(r'[^A-Za-z0-9_]+', '_', col_name)  # Substitui qualquer caractere não alfanumérico por '_'

# Aplicando a função de limpeza nos nomes das colunas
df_despesas_bronze = df_despesas_bronze.toDF(*[clean_column_name(col_name) for col_name in df_despesas_bronze.columns])
df_receitas_bronze = df_receitas_bronze.toDF(*[clean_column_name(col_name) for col_name in df_receitas_bronze.columns])

# Verificar se os nomes das colunas foram atualizados corretamente
print("Nomes das colunas - Despesas:", df_despesas_bronze.columns)
print("Nomes das colunas - Receitas:", df_receitas_bronze.columns)


Nomes das colunas - Despesas: ['Fonte_de_Recursos', 'Despesa', 'Liquidado', '_c3']
Nomes das colunas - Receitas: ['Fonte_de_Recursos', 'Receita', 'Arrecadado_at_02_02_2024', '_c3']


In [93]:
# CAMADA BRONZE

# Verificar se as transformações funcionaram corretamente - Despesas
df_despesas_bronze.show(5, truncate=False)

# Verificar se as transformações funcionaram corretamente - Receitas
df_receitas_bronze.show(5, truncate=False)

#Armazenando camada bronze no banco de dados




+--------------------------------------------+-------------------------------------------------------+-----------------------+----+
|Fonte_de_Recursos                           |Despesa                                                |Liquidado              |_c3 |
+--------------------------------------------+-------------------------------------------------------+-----------------------+----+
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900119 - ATRASADOS-OUTROS PODERES/MINIST.PUBLICO     |          79.760.504,67|NULL|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900124 - COMPLEMENTACAO DE APOSENTADORIA             |       1.850.834.374,26|NULL|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900125 - COMPLEMENTACAO DE APOSENTADORIA - 13�SALARIO|         124.536.202,87|NULL|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900325 - COMPLEMENTACAO DE PENSAO                    |          10.948.297,00|NULL|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900326 - COMPLEMENTACAO DE P

In [102]:
# CAMADA SILVER
# Trantando carcteres especiais no campo de texto. Tratanto valor do campo 'Liquidado' e 'Arrecadado_at_02_02_2024' para não transformar a informação como numero cientifo, mantendo o valor real. Retirando o campo '_c3', onde não há informação, apenas valores null

from pyspark.sql.functions import regexp_replace

# CAMADA SILVER

# Remover caracteres especiais da coluna 'Despesa' na df_despesas_bronze
df_despesas_silver = df_despesas_bronze.withColumn("Despesa", regexp_replace("Despesa", r'[^A-Za-z0-9 ]', ''))

# Remover caracteres especiais da coluna 'Receita' na df_receitas_bronze
df_receitas_silver = df_receitas_bronze.withColumn("Receita", regexp_replace("Receita", r'[^A-Za-z0-9 ]', ''))

# Excluir a coluna '_c3' em ambos os DataFrames
df_despesas_silver = df_despesas_silver.drop("_c3")
df_receitas_silver = df_receitas_silver.drop("_c3")

# Remover registros com valores nulos na coluna 'Fonte_de_Recursos'
df_despesas_silver = df_despesas_silver.filter(df_despesas_silver["Fonte_de_Recursos"].isNotNull())
df_receitas_silver = df_receitas_silver.filter(df_receitas_silver["Fonte_de_Recursos"].isNotNull())

# Verificar as primeiras linhas para garantir que as transformações foram aplicadas
df_despesas_silver.show(5, truncate=False)
df_receitas_silver.show(5, truncate=False)



#Armazenando camada Silver no banco de dados




+--------------------------------------------+----------------------------------------------------+-----------------------+
|Fonte_de_Recursos                           |Despesa                                             |Liquidado              |
+--------------------------------------------+----------------------------------------------------+-----------------------+
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900119  ATRASADOSOUTROS PODERESMINISTPUBLICO      |          79.760.504,67|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900124  COMPLEMENTACAO DE APOSENTADORIA           |       1.850.834.374,26|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900125  COMPLEMENTACAO DE APOSENTADORIA  13SALARIO|         124.536.202,87|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900325  COMPLEMENTACAO DE PENSAO                  |          10.948.297,00|
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR|31900326  COMPLEMENTACAO DE PENSAO  13 SALARIO      |             521.632,30|
+-------

In [110]:
# CAMADA GOLD

from pyspark.sql.functions import col, regexp_replace, sum, format_number


# Remover caracteres especiais da coluna 'Despesa' na df_despesas_bronze
df_despesas_silver = df_despesas_bronze.withColumn("Despesa", regexp_replace("Despesa", r'[^A-Za-z0-9 ]', ''))

# Remover caracteres especiais da coluna 'Receita' na df_receitas_bronze
df_receitas_silver = df_receitas_bronze.withColumn("Receita", regexp_replace("Receita", r'[^A-Za-z0-9 ]', ''))

# Excluir a coluna '_c3' em ambos os DataFrames
df_despesas_silver = df_despesas_silver.drop("_c3")
df_receitas_silver = df_receitas_silver.drop("_c3")

# Tratamento do campo 'Liquidado' no df_despesas_silver
df_despesas_silver = df_despesas_silver.withColumn(
    "Liquidado",
    regexp_replace(col("Liquidado"), r'\.', '')  # Remove pontos (milhares)
)

df_despesas_silver = df_despesas_silver.withColumn(
    "Liquidado",
    regexp_replace(col("Liquidado"), r',', '.')  # Substitui vírgula por ponto
)

# Converter para tipo inteiro (sem casas decimais)
df_despesas_silver = df_despesas_silver.withColumn(
    "Liquidado", col("Liquidado").cast("long")  # 'long' para números inteiros
)

# Agrupar por 'Fonte_de_Recursos' e somar a coluna 'Liquidado' no df_despesas_silver
df_despesas_gold = df_despesas_silver.groupBy("Fonte_de_Recursos").agg(
    sum("Liquidado").alias("Total_Liquidado")
)

# Filtrar registros com Fonte_de_Recursos não nula
df_despesas_gold = df_despesas_gold.filter(col("Fonte_de_Recursos").isNotNull())

# Formatar a coluna 'Total_Liquidado' para exibição, sem notação científica e sem casas decimais
df_despesas_gold = df_despesas_gold.withColumn(
    "Total_Liquidado", format_number(col("Total_Liquidado"), 0)  # Sem casas decimais
)

# Exibir os resultados do df_despesas_gold
df_despesas_gold.show(truncate=False)


# Tratamento do campo 'Arrecadado_at_02_02_2024' no df_receitas_silver
df_receitas_silver = df_receitas_silver.withColumn(
    "Arrecadado_at_02_02_2024",
    regexp_replace(col("Arrecadado_at_02_02_2024"), r'\.', '')  # Remove pontos (milhares)
)

df_receitas_silver = df_receitas_silver.withColumn(
    "Arrecadado_at_02_02_2024",
    regexp_replace(col("Arrecadado_at_02_02_2024"), r',', '.')  # Substitui vírgula por ponto
)

# Converter para tipo inteiro (sem casas decimais)
df_receitas_silver = df_receitas_silver.withColumn(
    "Arrecadado_at_02_02_2024", col("Arrecadado_at_02_02_2024").cast("long")  # 'long' para números inteiros
)

# Agrupar por 'Fonte_de_Recursos' e somar a coluna 'Arrecadado_at_02_02_2024' no df_receitas_silver
df_receitas_gold = df_receitas_silver.groupBy("Fonte_de_Recursos").agg(
    sum("Arrecadado_at_02_02_2024").alias("Total_Arrecadado")
)

# Filtrar registros com Fonte_de_Recursos não nula
df_receitas_gold = df_receitas_gold.filter(col("Fonte_de_Recursos").isNotNull())

# Formatar a coluna 'Total_Arrecadado' para exibição, sem notação científica e sem casas decimais
df_receitas_gold = df_receitas_gold.withColumn(
    "Total_Arrecadado", format_number(col("Total_Arrecadado"), 0)  # Sem casas decimais
)

# Exibir os resultados do df_receitas_gold
df_receitas_gold.show(truncate=False)


# Juntar os DataFrames df_despesas_gold e df_receitas_gold e garantir que os campos sejam combinados corretamente
df_juncao_gold = df_despesas_gold.join(
    df_receitas_gold, on="Fonte_de_Recursos", how="outer"
)

# Exibir os resultados da junção final
df_juncao_gold.show(truncate=False)


+--------------------------------------------------+---------------+
|Fonte_de_Recursos                                 |Total_Liquidado|
+--------------------------------------------------+---------------+
|042 - REC.VINC.ESTADUAIS-CRED.SUPERAVIT FINANCEIRO|524,395,845    |
|084 - REC.PROPRIO-ADM.IND.-DOT.INIC.CR.SUPL.-INTRA|2,743,387      |
|005 - RECURSOS VINCULADOS FEDERAIS                |8,091,074,521  |
|085 - RECURSOS VINCULADOS FEDERAIS-INTRA          |225,804,427    |
|006 - OUTRAS FONTES DE RECURSOS                   |615,464,238    |
|001 - TESOURO-DOT.INICIAL E CRED.SUPLEMENTAR      |146,622,206,831|
|007 - OP.CRED.E CONTRIB.DO EXTERIOR-DOT.INIC.CR.SU|1,513,684,003  |
|045 - REC.VINC.TRANSF.FEDERAL/SUPERAVIT FINANC.   |326,177,974    |
|004 - REC.PROPRIO-ADM.IND.-DOT.INIC.CR.SUPL.      |10,198,168,738 |
|086 - OUTRAS FONTES DE RECURSOS-INTRA             |137,377,128    |
|003 - RECURSOS VINCULADOS-FUNDO ESPECIAL DE DESPES|4,364,160,391  |
|083 - RECURSOS VINCULADOS-FUNDO E

In [131]:
# CONEXAO COM O POSTGRE NO DOCKER

spark = SparkSession.builder \
    .appName("PostgreSQL Integration") \
    .config("spark.jars", "file:/content/postgresql-42.7.4.jar") \
    .getOrCreate()

# Definindo as configurações de conexão
url = "jdbc:postgresql://172.19.0.2:5432/postgres"
properties = {
    "user": "airflow",
    "password": "airflow",
    "driver": "org.postgresql.Driver"
}

# Inserindo os dados no PostgreSQL
df_juncao_gold.write.jdbc(url=url, table="orcamento", mode="append", properties=properties)

