# ETL da camada silver para camada gold

Esta célula importa todas as bibliotecas necessárias para o processo de ETL.
Aqui são carregados os pacotes para manipulação de dados (pyspark), conexão com o banco de dados PostgreSQL (psycopg), controle de mensagens de erro (sys) e tratamento de avisos (warnings).
Ela deve ser executada antes de qualquer outra célula, pois fornece as dependências básicas que serão usadas nas etapas de Extract, Transform e Load.

In [3]:
import os
import sys
import warnings
import psycopg2 
import pandas as pd
import numpy as np
from psycopg2 import sql
from dotenv import load_dotenv 

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, IntegerType, DoubleType, DecimalType, TimestampType

# 1. CONFIGURAÇÕES GERAIS

Para evitar avisos de versão/depreciação disparados por dependências internas do pyspark com a tag "ignore", ignoramos com o pacote warnings.


In [4]:

warnings.filterwarnings('ignore')  

### 1.1 DRIVERS E CAMINHOS
O pyspark precisa de um driver java específico para para a comunicação com o banco Postgres. Ele carrega com base no caminho absoluto de onde se encontra o drive. Na falta dele a comunicação é impossível.

In [5]:

JAR_NAME = "postgresql-42.7.8.jar"
JAR_PATH = os.path.abspath(JAR_NAME)

if not os.path.exists(JAR_PATH):
    print(f"AVISO: Driver JDBC '{JAR_NAME}' não encontrado na pasta atual.")
    print("O Spark não conseguirá salvar no banco sem ele.")
else:
    print(f" Driver JDBC localizado: {JAR_PATH}")

 Driver JDBC localizado: /home/caiomesvie/projetos/SDBD2---INEP/Transformer/postgresql-42.7.8.jar


### 1.2 INICIALIZAÇÃO DA SESSÃO SPARK
Finalizamos sessão antiga se existir para garantir que o JAR seja carregado corretamente, e onde configuramos o Spark

In [6]:
spark = SparkSession.builder \
    .appName("ETL_Enem_Gold_Layer") \
    .config("spark.driver.memory", "5g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.jars", JAR_PATH) \
    .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/26 16:40:32 WARN Utils: Your hostname, DellG15Caio, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/26 16:40:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/26 16:40:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/26 16:40:35 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### 1.3 VARIÁVEIS DE CONEXÃO

Essas são as variaveis utilizadas para a conexão com o banco de dados. A conexão também pode ser dada pela variavel de ambiente DB_JDBC_URL


In [7]:

DB_USER = "admin"
DB_PASS = "l1l2r1r2"
DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "dados_inep"
DB_SCHEMA = "silver"
TABLE_NAME = "microdados_enem"
FULL_TABLE_NAME = f"{DB_SCHEMA}.{TABLE_NAME}" 

# 1. Extract

Esta célula define as configurações de conexão com o banco de dados PostgreSQL e monta a consulta SQL que será usada para extrair os dados.
Ela cria variáveis com credenciais, monta o nome completo da tabela (schema.tabela) e gera a query SELECT * FROM silver.listings, além de preparar a connection string usada na etapa de conexão.

In [8]:


def get_jdbc_connection_info():
    load_dotenv()
    
    url = os.getenv('DB_JDBC_URL')

    if url:
        return url

    return f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}", DB_USER, DB_PASS

jdbc_url, db_user, db_pass = get_jdbc_connection_info()

print(f" Preparando leitura da tabela: {FULL_TABLE_NAME}")
print(f" URL de Conexão: {jdbc_url}")

try:
    df_silver = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", FULL_TABLE_NAME) \
        .option("user", db_user) \
        .option("password", db_pass) \
        .option("driver", "org.postgresql.Driver") \
        .load()

    print("\n Sucesso! Dados da camada Silver carregados.")
    print("Schema carregado do Banco:")
    df_silver.printSchema()

except Exception as e:
    print(f" Erro ao ler do banco: {e}")

 Preparando leitura da tabela: silver.microdados_enem
 URL de Conexão: jdbc:postgresql://localhost:5432/dados_inep

 Sucesso! Dados da camada Silver carregados.
Schema carregado do Banco:
root
 |-- nu_inscricao: long (nullable = true)
 |-- tp_faixa_etaria: integer (nullable = true)
 |-- tp_sexo: string (nullable = true)
 |-- tp_estado_civil: integer (nullable = true)
 |-- tp_cor_raca: integer (nullable = true)
 |-- tp_nacionalidade: integer (nullable = true)
 |-- tp_st_conclusao: integer (nullable = true)
 |-- tp_ano_concluiu: integer (nullable = true)
 |-- tp_escola: integer (nullable = true)
 |-- in_treineiro: integer (nullable = true)
 |-- co_municipio_prova: integer (nullable = true)
 |-- no_municipio_prova: string (nullable = true)
 |-- co_uf_prova: integer (nullable = true)
 |-- sg_uf_prova: string (nullable = true)
 |-- tp_presenca_cn: integer (nullable = true)
 |-- tp_presenca_ch: integer (nullable = true)
 |-- tp_presenca_lc: integer (nullable = true)
 |-- tp_presenca_mt: inte

# 2 CARGA DOS DADOS (Leitura JDBC via Spark) 

Esta proxima célula ira executa a extração dos dados do banco PostgreSQL.

Ela tem que estabelece a conexão usando as configurações definidas anteriormente, converte o objeto SQL em uma query, executa a consulta e carrega o resultado no DataFrame df.

Em caso de falha na conexão ou na leitura, deve ser exibido uma mensagem de erro detalhada e encerra o processo.

Em Big Data, constumasse usar o `spark.read` para criar um ponteiro distribuído para os dados, mais robusto.

afim de melhorar e ver o se aconteceu algum erro no final e feito umas captura dos erros genéricos do Java/Spark.

In [9]:
try:
    print(" Estabelecendo conexão com o Spark JDBC...")
    
    df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", FULL_TABLE_NAME) \
        .option("user", db_user) \
        .option("password", db_pass) \
        .option("driver", "org.postgresql.Driver") \
        .load()

    print(" Conexão estabelecida.")
    print(f" Tabela '{FULL_TABLE_NAME}' mapeada com sucesso para o Spark!")
    

except Exception as e:
   
    print(f"\n Ocorreu um erro ao conectar ou ler o banco de dados via Spark ")
    print(f"Erro detalhado: {e}")
    sys.exit(1)

 Estabelecendo conexão com o Spark JDBC...
 Conexão estabelecida.
 Tabela 'silver.microdados_enem' mapeada com sucesso para o Spark!


### 2.1 VALIDAÇÃO DA CARGA
Estas células e basicamente para validar a quantidade de dados esta vindo corretamente e exibem um resumo simples do resultado da extração, mostrando o número total de registros carregados no DataFrame df, as primeiras cinco tuplas e os tipos de cada dado.
Elas servem para confirmar visualmente que a consulta foi executada com sucesso e quantas linhas foram retornadas do banco.

In [10]:

print(" Contando registros no DataFrame...")
qtd_linhas = df.count()
print(f"Total de linhas carregadas: {qtd_linhas}")

print("\n --- Estrutura da Tabela (Schema) ---")

df.printSchema()

print("\n--- Visualização dos Dados (Amostra) ---")

df.show(5, truncate=False)



 Contando registros no DataFrame...


                                                                                

Total de linhas carregadas: 509954

 --- Estrutura da Tabela (Schema) ---
root
 |-- nu_inscricao: long (nullable = true)
 |-- tp_faixa_etaria: integer (nullable = true)
 |-- tp_sexo: string (nullable = true)
 |-- tp_estado_civil: integer (nullable = true)
 |-- tp_cor_raca: integer (nullable = true)
 |-- tp_nacionalidade: integer (nullable = true)
 |-- tp_st_conclusao: integer (nullable = true)
 |-- tp_ano_concluiu: integer (nullable = true)
 |-- tp_escola: integer (nullable = true)
 |-- in_treineiro: integer (nullable = true)
 |-- co_municipio_prova: integer (nullable = true)
 |-- no_municipio_prova: string (nullable = true)
 |-- co_uf_prova: integer (nullable = true)
 |-- sg_uf_prova: string (nullable = true)
 |-- tp_presenca_cn: integer (nullable = true)
 |-- tp_presenca_ch: integer (nullable = true)
 |-- tp_presenca_lc: integer (nullable = true)
 |-- tp_presenca_mt: integer (nullable = true)
 |-- co_prova_cn: integer (nullable = true)
 |-- co_prova_ch: integer (nullable = true)
 |--

26/01/26 16:40:46 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+------------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+------------+------------------+------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+---------------------------------------------+---------------------------------------------+--------------------------------------------------+---------------------------------------------+---------+---------------------------------------------+---------------------------------------------+--------------------------------------------------+---------------------------------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|nu_inscricao|tp_faixa_etaria|

# 2. Transform

### 2.1 RENOMEAÇÃO E REMOÇÃO DE COLUNAS 
Renomeação de colunas para o padrão definido para a camada Gold. Ex: de 'nu_inscricao' para 'NUM_INS'

In [11]:
print("\n Iniciando processo de transformação dos dados...")

mapa_colunas = {
    'nu_inscricao': 'NUM_INS',
    'tp_faixa_etaria': 'COD_FAI_ETA',
    'tp_sexo': 'TIP_SEX',
    'tp_estado_civil': 'COD_EST_CIV',
    'tp_cor_raca': 'COD_COR_RAC',
    'tp_nacionalidade': 'COD_NAC',
    'tp_st_conclusao': 'COD_SIT_CON',
    'tp_ano_concluiu': 'NUM_ANO_CON',
    'in_treineiro': 'IND_TRE',
    'co_municipio_prova': 'COD_MUN',
    'no_municipio_prova': 'NOM_MUN',
    'tp_presenca_cn': 'IND_PRE_NAT',
    'tp_presenca_ch': 'IND_PRE_HUM',
    'tp_presenca_lc': 'IND_PRE_LIN',
    'tp_presenca_mt': 'IND_PRE_MAT',
    'co_prova_cn': 'COD_PRV_NAT',
    'co_prova_ch': 'COD_PRV_HUM',
    'co_prova_lc': 'COD_PRV_LIN',
    'co_prova_mt': 'COD_PRV_MAT',
    'nu_nota_cn': 'VAL_NOT_NAT',
    'nu_nota_ch': 'VAL_NOT_HUM',
    'nu_nota_lc': 'VAL_NOT_LIN',
    'nu_nota_mt': 'VAL_NOT_MAT',
    'tp_lingua': 'TIP_LIN',
    'tp_status_redacao': 'COD_SIT_RED',
    'nu_nota_redacao': 'VAL_NOT_RED',
    'q001': 'COD_ESC_PAI',
    'q002': 'COD_ESC_MAE',
    'q003': 'COD_OCU_PAI',
    'q004': 'COD_OCU_MAE',
    'q005': 'QTD_PES_RES',
    'q006': 'COD_REN_FAM',
    'q007': 'COD_EMP_DOM',
    'q008': 'COD_POS_BAN',
    'q009': 'COD_POS_QUA',
    'q010': 'COD_POS_CAR',
    'q011': 'COD_POS_MOT',
    'q012': 'COD_POS_GEL',
    'q013': 'COD_POS_FRE',
    'q014': 'COD_POS_LAV',
    'q015': 'COD_POS_SEC',
    'q016': 'COD_POS_MIC',
    'q017': 'COD_POS_LOU',
    'q018': 'COD_POS_ASP',
    'q019': 'COD_POS_TEL',
    'q020': 'COD_POS_DVD',
    'q021': 'COD_POS_TVA',
    'q022': 'COD_POS_CEL',
    'q023': 'COD_POS_FIX',
    'q024': 'COD_POS_COM',
    'q025': 'IND_ACE_INT'
}


print(" Renomeando colunas...")
for coluna_antiga, coluna_nova in mapa_colunas.items():
    df = df.withColumnRenamed(coluna_antiga, coluna_nova)


cols_to_drop = ['tp_escola', 'co_uf_prova', 'sg_uf_prova','tx_gabarito_cn', 'tx_gabarito_ch', 'tx_gabarito_lc', 'tx_gabarito_mt',
                'tx_respostas_cn', 'tx_respostas_ch', 'tx_respostas_lc', 'tx_respostas_mt', 'nu_nota_comp1', 'nu_nota_comp2', 
                'nu_nota_comp3', 'nu_nota_comp4', 'nu_nota_comp5']

df = df.drop(*cols_to_drop)

print(" Colunas renomeadas e desnecessárias removidas.")
print("Novas colunas:", df.columns)

print("\n--- Visualização dos Dados (Amostra) ---")

df.show(5, truncate=False)


 Iniciando processo de transformação dos dados...
 Renomeando colunas...
 Colunas renomeadas e desnecessárias removidas.
Novas colunas: ['NUM_INS', 'COD_FAI_ETA', 'TIP_SEX', 'COD_EST_CIV', 'COD_COR_RAC', 'COD_NAC', 'COD_SIT_CON', 'NUM_ANO_CON', 'IND_TRE', 'COD_MUN', 'NOM_MUN', 'IND_PRE_NAT', 'IND_PRE_HUM', 'IND_PRE_LIN', 'IND_PRE_MAT', 'COD_PRV_NAT', 'COD_PRV_HUM', 'COD_PRV_LIN', 'COD_PRV_MAT', 'VAL_NOT_NAT', 'VAL_NOT_HUM', 'VAL_NOT_LIN', 'VAL_NOT_MAT', 'TIP_LIN', 'COD_SIT_RED', 'VAL_NOT_RED', 'COD_ESC_PAI', 'COD_ESC_MAE', 'COD_OCU_PAI', 'COD_OCU_MAE', 'QTD_PES_RES', 'COD_REN_FAM', 'COD_EMP_DOM', 'COD_POS_BAN', 'COD_POS_QUA', 'COD_POS_CAR', 'COD_POS_MOT', 'COD_POS_GEL', 'COD_POS_FRE', 'COD_POS_LAV', 'COD_POS_SEC', 'COD_POS_MIC', 'COD_POS_LOU', 'COD_POS_ASP', 'COD_POS_TEL', 'COD_POS_DVD', 'COD_POS_TVA', 'COD_POS_CEL', 'COD_POS_FIX', 'COD_POS_COM', 'IND_ACE_INT']

--- Visualização dos Dados (Amostra) ---


[Stage 4:>                                                          (0 + 1) / 1]

+------------+-----------+-------+-----------+-----------+-------+-----------+-----------+-------+-------+-------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|NUM_INS     |COD_FAI_ETA|TIP_SEX|COD_EST_CIV|COD_COR_RAC|COD_NAC|COD_SIT_CON|NUM_ANO_CON|IND_TRE|COD_MUN|NOM_MUN      |IND_PRE_NAT|IND_PRE_HUM|IND_PRE_LIN|IND_PRE_MAT|COD_PRV_NAT|COD_PRV_HUM|COD_PRV_LIN|COD_PRV_MAT|VAL_NOT_NAT|VAL_NOT_HUM|VAL_NOT_LIN|VAL_NOT_MAT|TIP_LIN|COD_SIT_RED|VAL_NOT_RED|COD_ESC_PAI|COD_ESC_MAE|COD_OCU_PAI|COD_OCU_MAE|QTD_PES_RES|COD_REN_FAM|COD_EMP_DOM|COD_POS_BAN|COD_POS_QUA

                                                                                

### 2.2 Alocação das colunas 

##### 2.2.1 ORGANIZAÇÃO E CONVERSÃO DE TIPOS
Divisão das colunas nas tabelas dimensões e fato

In [12]:


df_par = [ 'NUM_INS' , 'COD_FAI_ETA', 'TIP_SEX', 'COD_EST_CIV', 'COD_COR_RAC', 'COD_NAC', 'COD_SIT_CON', 'NUM_ANO_CON', 'IND_TRE']

df_loc = [ 'COD_MUN', 'NOM_MUN' ]

df_soc = ['COD_ESC_PAI', 'COD_ESC_MAE', 'COD_OCU_PAI', 'COD_OCU_MAE', 'QTD_PES_RES', 'COD_REN_FAM', 'COD_EMP_DOM', 'COD_POS_BAN', 'COD_POS_QUA',
          'COD_POS_CAR', 'COD_POS_MOT', 'COD_POS_GEL', 'COD_POS_FRE', 'COD_POS_LAV', 'COD_POS_SEC', 'COD_POS_MIC', 'COD_POS_LOU', 'COD_POS_ASP',
          'COD_POS_TEL', 'COD_POS_DVD', 'COD_POS_TVA', 'COD_POS_CEL', 'COD_POS_FIX', 'COD_POS_COM', 'IND_ACE_INT' ]

df_prv = ['TIP_LIN', 'COD_PRV_NAT', 'COD_PRV_HUM', 'COD_PRV_LIN', 'COD_PRV_MAT', 'COD_SIT_RED']

df_fat = ['NUM_INS','VAL_NOT_NAT', 'VAL_NOT_HUM', 'VAL_NOT_LIN', 'VAL_NOT_MAT', 'VAL_NOT_RED', 'IND_PRE_NAT', 'IND_PRE_HUM', 'IND_PRE_LIN', 
          'IND_PRE_MAT']

print(" Colunas segmentadas para Dimensões:")
print("\n Parâmetros:", df_par)
print("\n Localização:", df_loc)
print("\n Socioeconômica:", df_soc)
print("\n Prova:", df_prv)
print("\n Fato:", df_fat)


print("\nFato: Todas as outras colunas restantes.",df.columns)

 Colunas segmentadas para Dimensões:

 Parâmetros: ['NUM_INS', 'COD_FAI_ETA', 'TIP_SEX', 'COD_EST_CIV', 'COD_COR_RAC', 'COD_NAC', 'COD_SIT_CON', 'NUM_ANO_CON', 'IND_TRE']

 Localização: ['COD_MUN', 'NOM_MUN']

 Socioeconômica: ['COD_ESC_PAI', 'COD_ESC_MAE', 'COD_OCU_PAI', 'COD_OCU_MAE', 'QTD_PES_RES', 'COD_REN_FAM', 'COD_EMP_DOM', 'COD_POS_BAN', 'COD_POS_QUA', 'COD_POS_CAR', 'COD_POS_MOT', 'COD_POS_GEL', 'COD_POS_FRE', 'COD_POS_LAV', 'COD_POS_SEC', 'COD_POS_MIC', 'COD_POS_LOU', 'COD_POS_ASP', 'COD_POS_TEL', 'COD_POS_DVD', 'COD_POS_TVA', 'COD_POS_CEL', 'COD_POS_FIX', 'COD_POS_COM', 'IND_ACE_INT']

 Prova: ['TIP_LIN', 'COD_PRV_NAT', 'COD_PRV_HUM', 'COD_PRV_LIN', 'COD_PRV_MAT', 'COD_SIT_RED']

 Fato: ['NUM_INS', 'VAL_NOT_NAT', 'VAL_NOT_HUM', 'VAL_NOT_LIN', 'VAL_NOT_MAT', 'VAL_NOT_RED', 'IND_PRE_NAT', 'IND_PRE_HUM', 'IND_PRE_LIN', 'IND_PRE_MAT']

Fato: Todas as outras colunas restantes. ['NUM_INS', 'COD_FAI_ETA', 'TIP_SEX', 'COD_EST_CIV', 'COD_COR_RAC', 'COD_NAC', 'COD_SIT_CON', 'NUM_ANO_C

#### 2.2.2 Calculando volume das tabelas

In [13]:
print(" Calculando volumes das Dimensões e Fato (isso pode levar alguns segundos)...")


dim_participante = df.select(*df_par).distinct()
dim_localizacao  = df.select(*df_loc).distinct()
dim_socioecon    = df.select(*df_soc).distinct()
dim_prova        = df.select(*df_prv).distinct()

fato_enem        = df.select(*df_fat) 

qtd_par = dim_participante.count()
qtd_loc = dim_localizacao.count()
qtd_soc = dim_socioecon.count()
qtd_prv = dim_prova.count()
qtd_fat = fato_enem.count()



print(" REGISTROS PREPARADOS PARA CARGA (GOLD)")

print(f" Dimensão Participante (dim_par): {qtd_par:,} registros únicos")
print(f" Dimensão Localização  (dim_loc): {qtd_loc:,} municípios únicos")
print(f" Dimensão Socioecon.   (dim_soc): {qtd_soc:,} perfis únicos")
print(f" Dimensão Prova        (dim_prv): {qtd_prv:,} variações de prova")
print(f" Tabela Fato (fato_enem):         {qtd_fat:,} registros totais")

print("--- Estrutura da Tabela Fato ---")
fato_enem.printSchema()

 Calculando volumes das Dimensões e Fato (isso pode levar alguns segundos)...


                                                                                

 REGISTROS PREPARADOS PARA CARGA (GOLD)
 Dimensão Participante (dim_par): 509,954 registros únicos
 Dimensão Localização  (dim_loc): 210 municípios únicos
 Dimensão Socioecon.   (dim_soc): 492,641 perfis únicos
 Dimensão Prova        (dim_prv): 495 variações de prova
 Tabela Fato (fato_enem):         509,954 registros totais
--- Estrutura da Tabela Fato ---
root
 |-- NUM_INS: long (nullable = true)
 |-- VAL_NOT_NAT: decimal(10,2) (nullable = true)
 |-- VAL_NOT_HUM: decimal(10,2) (nullable = true)
 |-- VAL_NOT_LIN: decimal(10,2) (nullable = true)
 |-- VAL_NOT_MAT: decimal(10,2) (nullable = true)
 |-- VAL_NOT_RED: decimal(10,2) (nullable = true)
 |-- IND_PRE_NAT: integer (nullable = true)
 |-- IND_PRE_HUM: integer (nullable = true)
 |-- IND_PRE_LIN: integer (nullable = true)
 |-- IND_PRE_MAT: integer (nullable = true)



# 3. Load

Esta célula configura os parâmetros de conexão com o banco de dados do Data Warehouse (gold) e valida se todas as variáveis geradas na etapa de transformação estão disponíveis na memória.
Ela garante que o ambiente esteja pronto antes de iniciar a fase de carga, evitando erros por falta de dados ou variáveis necessárias.

In [14]:
DB_SCHEMA_GOLD = "gold"

vars_to_check = [
    'dim_participante', # tabela DIM_PAR
    'dim_localizacao',  # tabela DIM_LOC
    'dim_socioecon',    # tabela DIM_SOC
    'dim_prova',        # tabela DIM_PRV
    'fato_enem',        # tabela FAT_DES
    'jdbc_url'   
]

print(f" Verificando pré-requisitos para carga no schema '{DB_SCHEMA_GOLD}'...")

for v in vars_to_check:
    if v not in globals():
        
        raise RuntimeError(f" Variável ausente: {v}. Por favor, rode a célula anterior de preparação das dimensões.")

print("Tudo certo! Todas as variáveis (DataFrames e Conexão) estão carregadas.")

 Verificando pré-requisitos para carga no schema 'gold'...
Tudo certo! Todas as variáveis (DataFrames e Conexão) estão carregadas.


### 3.1 Carregando o DDl

In [15]:
CAMINHO_DDL_GOLD = "../gold/ddl.sql"

print(f" Lendo script DDL em: {CAMINHO_DDL_GOLD}")


try:
    with open(CAMINHO_DDL_GOLD, 'r', encoding='utf-8') as f:
        ddl_gold_script = f.read()
    print(" Arquivo lido com sucesso.")

except FileNotFoundError:
    print(f" Erro Crítico: O arquivo '{CAMINHO_DDL_GOLD}' não foi encontrado.")
    print("Certifique-se de salvar o conteúdo do DDL corrigido no arquivo.")
    sys.exit(1)

print("\n Aplicando estrutura (tabelas e chaves) no PostgreSQL...")

try:
    conn = psycopg2.connect(
        host="localhost",      
        port="5432",           
        database="dados_inep", 
        user="admin",          
        password="l1l2r1r2" 
    )
    
    with conn.cursor() as cur:
        
        cur.execute(ddl_gold_script)
        conn.commit() 
        
    conn.close()
    print(" Sucesso! Schema 'gold' e tabelas criadas/recriadas.")

except Exception as e:
    print(f" Erro ao executar o DDL no banco: {e}")
    sys.exit(1)

 Lendo script DDL em: ../gold/ddl.sql
 Arquivo lido com sucesso.

 Aplicando estrutura (tabelas e chaves) no PostgreSQL...
 Sucesso! Schema 'gold' e tabelas criadas/recriadas.


# ETL: Construção da Camada Gold (Star Schema)
Este processo realiza a transformação dos dados da camada Silver para um modelo dimensional (Star Schema), estruturado em quatro dimensões e uma tabela fato, visando otimizar consultas analíticas.

## Estratégias Técnicas Adotadas:

Idempotência (Full Overwrite): Implementação de limpeza prévia via TRUNCATE CASCADE no PostgreSQL antes da escrita. Isso garante que o processo possa ser reexecutado múltiplas vezes sem gerar duplicidade ou inconsistência nos dados.

Consistência de IDs (Padrão Write-Read): Para mitigar divergências causadas pelo lazy evaluation do Spark na geração de chaves artificiais (Surrogate Keys), adotou-se a persistência imediata das dimensões no banco seguida de releitura. Isso assegura que os IDs utilizados na construção da tabela Fato correspondam exatamente aos IDs persistidos nas Dimensões.

Integridade Referencial: A unificação dos dados na tabela Fato utiliza junções determinísticas (baseadas na chave natural NUM_INS para a dimensão Participante) e aplica o tratamento de "Membro Desconhecido" (ID -1). Registros com chaves nulas ou inexistentes são mapeados para este ID, garantindo que a volumetria da camada Gold seja idêntica à da origem.

In [16]:
import psycopg2
from pyspark.sql import functions as F

print(" INICIANDO PROCESSO BLINDADO (WRITE-READ) ")

try:
    conn = psycopg2.connect(
        host="localhost", port="5432", database="dados_inep", 
        user="admin", password="l1l2r1r2"
    )
    with conn.cursor() as cur:
        cur.execute("TRUNCATE TABLE gold.FAT_DES CASCADE;")
        cur.execute("TRUNCATE TABLE gold.DIM_PRV CASCADE;")
        cur.execute("TRUNCATE TABLE gold.DIM_SOC CASCADE;")
        cur.execute("TRUNCATE TABLE gold.DIM_LOC CASCADE;")
        cur.execute("TRUNCATE TABLE gold.DIM_PAR CASCADE;")
        conn.commit()
    conn.close()
    print("Tabelas limpas.")
except Exception as e:
    print(f"Erro na limpeza: {e}")
    raise e


def carregar_dimensao_e_ler_de_volta(df_origem, lista_colunas, nome_tabela, col_pk):
    print(f"Processando {nome_tabela}...")
    

    df_dim = df_origem.select(*lista_colunas).distinct() \
        .withColumn(col_pk, F.monotonically_increasing_id())
    
    cols_ordem = [col_pk] + lista_colunas
    df_dim = df_dim.select(*cols_ordem)
    
    dados_unknown = [(-1,) + tuple(None for _ in lista_colunas)]
    df_unknown = spark.createDataFrame(dados_unknown, df_dim.schema)
    df_final = df_dim.union(df_unknown)
    
    df_final.write.jdbc(
        url=jdbc_url, 
        table=nome_tabela, 
        mode="append", 
        properties={"user": db_user, "password": db_pass, "driver": "org.postgresql.Driver"}
    )
    
    print(f" Lendo {nome_tabela} de volta do PostgreSQL para garantir integridade...")
    df_retorno = spark.read.jdbc(
        url=jdbc_url, 
        table=nome_tabela, 
        properties={"user": db_user, "password": db_pass, "driver": "org.postgresql.Driver", "batchsize": "500"}
    )
    return df_retorno

df_par_gold = carregar_dimensao_e_ler_de_volta(df, df_par, "gold.DIM_PAR", "PAR_SRK")
df_loc_gold = carregar_dimensao_e_ler_de_volta(df, df_loc, "gold.DIM_LOC", "LOC_SRK")
df_soc_gold = carregar_dimensao_e_ler_de_volta(df, df_soc, "gold.DIM_SOC", "SOC_SRK")
df_prv_gold = carregar_dimensao_e_ler_de_volta(df, df_prv, "gold.DIM_PRV", "PRV_SRK")

df_main = df.alias("main")

cond_par = (F.col("main.NUM_INS") == F.col("par.NUM_INS")) 
cond_loc = (F.col("main.COD_MUN") == F.col("loc.COD_MUN"))
# Para Soc e Prv, usamos eqNullSafe pois lemos do banco e nulls são possíveis
cond_soc = [F.col(f"main.{c}").eqNullSafe(F.col(f"soc.{c}")) for c in df_soc] 
cond_prv = [F.col(f"main.{c}").eqNullSafe(F.col(f"prv.{c}")) for c in df_prv]

df_fato = df_main \
    .join(df_par_gold.alias("par"), cond_par, "left") \
    .join(df_loc_gold.alias("loc"), cond_loc, "left") \
    .join(df_soc_gold.alias("soc"), cond_soc, "left") \
    .join(df_prv_gold.alias("prv"), cond_prv, "left") \
    .select(
        F.monotonically_increasing_id().alias("FAT_SRK"),
        "main.NUM_INS",
        "main.VAL_NOT_NAT", "main.VAL_NOT_HUM", "main.VAL_NOT_LIN", "main.VAL_NOT_MAT", "main.VAL_NOT_RED",
        "main.IND_PRE_NAT", "main.IND_PRE_HUM", "main.IND_PRE_LIN", "main.IND_PRE_MAT",
        F.coalesce(F.col("par.PAR_SRK"), F.lit(-1)).alias("PAR_SRK"),
        F.coalesce(F.col("loc.LOC_SRK"), F.lit(-1)).alias("LOC_SRK"),
        F.coalesce(F.col("soc.SOC_SRK"), F.lit(-1)).alias("SOC_SRK"),
        F.coalesce(F.col("prv.PRV_SRK"), F.lit(-1)).alias("PRV_SRK")
    )

print("Gravando FAT_DES...")
df_fato.write.jdbc(
    url=jdbc_url, 
    table="gold.FAT_DES", 
    mode="append", 
    properties={"user": db_user, "password": db_pass, "driver": "org.postgresql.Driver", "batchsize": "500"}
)

print("ETL CONCLUÍDO")

 INICIANDO PROCESSO BLINDADO (WRITE-READ) 


Tabelas limpas.
Processando gold.DIM_PAR...


                                                                                

 Lendo gold.DIM_PAR de volta do PostgreSQL para garantir integridade...
Processando gold.DIM_LOC...


                                                                                

 Lendo gold.DIM_LOC de volta do PostgreSQL para garantir integridade...
Processando gold.DIM_SOC...


                                                                                

 Lendo gold.DIM_SOC de volta do PostgreSQL para garantir integridade...
Processando gold.DIM_PRV...


                                                                                

 Lendo gold.DIM_PRV de volta do PostgreSQL para garantir integridade...
Gravando FAT_DES...




ETL CONCLUÍDO


                                                                                