# ETL da camada bronze para camada silver

Este notebook realiza o ETL dos dados da camada bronze para a camada silver. Ou seja: ele abre o dataset e o salva num dataframe, realiza a transformação dos dados e os carrega num arquivo `.csv` e no banco de dados.

## EXTRACT
# Estratégia de Leitura Escalável (PySpark)

Para contornar as limitações de memória (especialmente em ambientes como WSL) ao ler o arquivo `MICRODADOS_ENEM_2021.csv` (>1.5GB), adotamos a estratégia de processamento distribuído com **Apache Spark**.

Diferente do padrão do Pandas que tenta carregar todo o arquivo na RAM de uma vez, o PySpark utiliza o conceito de *Lazy Evaluation* (avaliação preguiçosa). Ele mapeia o arquivo e cria um plano de execução, mas só processa os dados na memória quando uma ação é solicitada, evitando o travamento do sistema.

#### **Parâmetros Críticos:**

* **`inferSchema="true"`:** Permite que o Spark percorra os dados inicialmente para identificar automaticamente quais colunas são numéricas e quais são textuais, facilitando a análise imediata.
* **`encoding="ISO-8859-1"`:** Corrige erros de decodificação de caracteres que contem dentro do arquivo `MICRODADOS_ENEM_2021.csv` que e um "erro" bem comuns em dados  usado no Brasil.
* **`delimiter=";"`:** Define o ponto e vírgula como o separador correto, evitando que o dataset seja interpretado equivocadamente como uma única coluna longa.

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("RawToSilves") \
    .config("spark.driver.memory", "5g") \
    .getOrCreate()

data_layer_filepath = '../raw/'

df = spark.read \
    .option("header", "true") \
    .option("delimiter", ";") \
    .option("encoding", "ISO-8859-1") \
    .option("inferSchema", "true") \
    .csv(data_layer_filepath + 'data_raw/MICRODADOS_ENEM_2021.csv')

print("Arquivo completo mapeado com sucesso!")

df.show(5)


                                                                                

Arquivo completo mapeado com sucesso!
+------------+------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+----------------+---------+---------+----------------------+------------------+---------------+------------------+--------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+--------------------+--------------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|NU_INSCRICAO|NU_ANO|TP_FAIXA_ETARIA|TP_SEXO|TP_ESTADO_CIVIL|TP_COR_RAC

## TRANSFORM
### Padronização dos Nomes das Colunas.

Inicialmente, o dataset contém colunas com nomes sem padrão, por "sorte" nessa basse que estamos usando todos os nome estão padronizado porem tudo com letras Maisculas. E para fins de tratamento iremos fazer uma varedura para que caso ajá alguma coluna que não eteja separam palavras com `_`, ou tenha alguma coluna com `whitespace`, passse para o padrão com as colunas seguindo esse novo padrão: **todos os caracteres em minúsculo, separando palavras com `_`**.


Para renomear todas as colunas de uma vez convertendo para minúsculas e trocando espaços por `_`, a maneira mais eficiente quando se usa o **PySpark** é usar o método `.toDF().`

In [3]:

novas_colunas = [col.lower().replace(' ', '_') for col in df.columns]

df = df.toDF(*novas_colunas)

print(df.columns)
df.show(5)

['nu_inscricao', 'nu_ano', 'tp_faixa_etaria', 'tp_sexo', 'tp_estado_civil', 'tp_cor_raca', 'tp_nacionalidade', 'tp_st_conclusao', 'tp_ano_concluiu', 'tp_escola', 'tp_ensino', 'in_treineiro', 'co_municipio_esc', 'no_municipio_esc', 'co_uf_esc', 'sg_uf_esc', 'tp_dependencia_adm_esc', 'tp_localizacao_esc', 'tp_sit_func_esc', 'co_municipio_prova', 'no_municipio_prova', 'co_uf_prova', 'sg_uf_prova', 'tp_presenca_cn', 'tp_presenca_ch', 'tp_presenca_lc', 'tp_presenca_mt', 'co_prova_cn', 'co_prova_ch', 'co_prova_lc', 'co_prova_mt', 'nu_nota_cn', 'nu_nota_ch', 'nu_nota_lc', 'nu_nota_mt', 'tx_respostas_cn', 'tx_respostas_ch', 'tx_respostas_lc', 'tx_respostas_mt', 'tp_lingua', 'tx_gabarito_cn', 'tx_gabarito_ch', 'tx_gabarito_lc', 'tx_gabarito_mt', 'tp_status_redacao', 'nu_nota_comp1', 'nu_nota_comp2', 'nu_nota_comp3', 'nu_nota_comp4', 'nu_nota_comp5', 'nu_nota_redacao', 'q001', 'q002', 'q003', 'q004', 'q005', 'q006', 'q007', 'q008', 'q009', 'q010', 'q011', 'q012', 'q013', 'q014', 'q015', 'q016', 

### Remoção de Colunas Desnecessárias

Afim de melhorar como os dados vão ficar nessa etapa começamos a filtrar e retirar alguns dados são desnecesarios como por exemplo **nu_ano** já que toda a base e referente ao ano de 2021, e assim optamos por não trabalhar com essa coluna.

In [4]:
cols_to_drop = ['nu_ano', 'sg_uf_esc']

df = df.drop(*cols_to_drop) 

for col in cols_to_drop:
    if col not in df.columns:
        print(f"Coluna {col} deletada!")

print("Colunas restantes: ")
print(df.columns)
print(" ")
print("Tabelas restantes: ")
df.show(5)

Coluna nu_ano deletada!
Coluna sg_uf_esc deletada!
Colunas restantes: 
['nu_inscricao', 'tp_faixa_etaria', 'tp_sexo', 'tp_estado_civil', 'tp_cor_raca', 'tp_nacionalidade', 'tp_st_conclusao', 'tp_ano_concluiu', 'tp_escola', 'tp_ensino', 'in_treineiro', 'co_municipio_esc', 'no_municipio_esc', 'co_uf_esc', 'tp_dependencia_adm_esc', 'tp_localizacao_esc', 'tp_sit_func_esc', 'co_municipio_prova', 'no_municipio_prova', 'co_uf_prova', 'sg_uf_prova', 'tp_presenca_cn', 'tp_presenca_ch', 'tp_presenca_lc', 'tp_presenca_mt', 'co_prova_cn', 'co_prova_ch', 'co_prova_lc', 'co_prova_mt', 'nu_nota_cn', 'nu_nota_ch', 'nu_nota_lc', 'nu_nota_mt', 'tx_respostas_cn', 'tx_respostas_ch', 'tx_respostas_lc', 'tx_respostas_mt', 'tp_lingua', 'tx_gabarito_cn', 'tx_gabarito_ch', 'tx_gabarito_lc', 'tx_gabarito_mt', 'tp_status_redacao', 'nu_nota_comp1', 'nu_nota_comp2', 'nu_nota_comp3', 'nu_nota_comp4', 'nu_nota_comp5', 'nu_nota_redacao', 'q001', 'q002', 'q003', 'q004', 'q005', 'q006', 'q007', 'q008', 'q009', 'q010', 

### Correções dos Tipos de Dados.

Nesta etapa, analisaremos a estrutura e a **tipagem dos dados** nas colunas para identificar inconsistências e realizar os tratamentos necessários.

In [5]:
# nome_arquivo = 'schema_enem.txt'
# f = open(nome_arquivo, 'w')

header = f"{'COLUNA':<40} | {'TIPO'}"
separador = "-" * 50
print(header)
print(separador)

# f.write(header + "\n")
# f.write(separador + "\n")

for coluna, tipo in df.dtypes:
    linha = f"{coluna:<40} | {tipo}"
    print(linha)
    # f.write(linha + "\n")

# f.close() 
# print(f"\nO arquivo '{nome_arquivo}' também foi salvo na pasta.")

COLUNA                                   | TIPO
--------------------------------------------------
nu_inscricao                             | bigint
tp_faixa_etaria                          | int
tp_sexo                                  | string
tp_estado_civil                          | int
tp_cor_raca                              | int
tp_nacionalidade                         | int
tp_st_conclusao                          | int
tp_ano_concluiu                          | int
tp_escola                                | int
tp_ensino                                | int
in_treineiro                             | int
co_municipio_esc                         | int
no_municipio_esc                         | string
co_uf_esc                                | int
tp_dependencia_adm_esc                   | int
tp_localizacao_esc                       | int
tp_sit_func_esc                          | int
co_municipio_prova                       | int
no_municipio_prova                       | str

#### Resultado
A análise do esquema (schema) confirmou que a **tipagem dos dados** está consistente, dispensando a necessidade de conversões (casting) ou tratamentos adicionais nesta etapa.

### Correção de possivei erros de digitação e padronização na siglas das UF

O código realiza a verificação e a correção de siglas de estados para garantir que todas estejam em **letras maiúsculas**.

 **Ex:** Se o valor é `'sp'`, ele compara com `'SP'`. Como são diferentes, ele identifica que essa linha precisa ser contabilizada.
 
 **Ex:** Se o valor é `'SP'`, ele compara com `'SP'`. Como são iguais, ele ignora.

In [6]:
from pyspark.sql.functions import col, upper

coluna_alvo = 'sg_uf_prova' 
qtd_correcoes = df.filter(col(coluna_alvo) != upper(col(coluna_alvo))).count()
df = df.withColumn(coluna_alvo, upper(col(coluna_alvo)))


print(f"Total de siglas corrigidas (estavam minúsculas ou mistas): {qtd_correcoes}")
print("Valores únicos após correção:")
df.select(coluna_alvo).distinct().show()

                                                                                

Total de siglas corrigidas (estavam minúsculas ou mistas): 0
Valores únicos após correção:




+-----------+
|sg_uf_prova|
+-----------+
|         SC|
|         RO|
|         PI|
|         AM|
|         RR|
|         GO|
|         TO|
|         MT|
|         SP|
|         PB|
|         ES|
|         RS|
|         MS|
|         AL|
|         MG|
|         PA|
|         BA|
|         SE|
|         PE|
|         CE|
+-----------+
only showing top 20 rows


                                                                                

## Contagem de incriçoes por estado
Após a etapa de transformação, realizamos a contagem de frequência de cada UF para identificar a quantidade de inscritos por estado.

In [7]:
from pyspark.sql.functions import col, desc

print("Quantidade de registros por UF:")

# Agrupa pela sigla do estado, faz a contagem e ordena do maior para o menor
df.groupBy('sg_uf_prova') \
  .count() \
  .orderBy(col('count').desc()) \
  .show()

Quantidade de registros por UF:




+-----------+------+
|sg_uf_prova| count|
+-----------+------+
|         SP|509954|
|         MG|327829|
|         BA|266194|
|         RJ|238347|
|         CE|220517|
|         PE|193616|
|         PA|185978|
|         RS|150484|
|         PR|144282|
|         GO|136915|
|         MA|127905|
|         PB|102002|
|         AM| 89778|
|         RN| 80820|
|         SC| 80765|
|         PI| 79969|
|         DF| 67501|
|         ES| 64181|
|         AL| 56584|
|         MT| 56085|
+-----------+------+
only showing top 20 rows


                                                                                

## LOAD

### Filtro por estado
Nesta etapa, aplicamos um filtro para selecionar apenas os estados **(UFs)** que farão parte do escopo da nossa análise.

**Filtragem e Validação de UF:** o código abaixo filtra o DataFrame para conter apenas as siglas presentes na lista ``estados_selecionados`` (neste caso, **'SP'**) e verifica a integridade do filtro listando os valores únicos da coluna."

In [8]:
from pyspark.sql.functions import col


estados_selecionados = ['SP']
df = df.filter(col('sg_uf_prova').isin(estados_selecionados))
print(f"Filtragem concluída! Mantendo apenas: {estados_selecionados}")
print("Estados distintos restantes no DataFrame:")
df.select('sg_uf_prova').distinct().show()

Filtragem concluída! Mantendo apenas: ['SP']
Estados distintos restantes no DataFrame:




+-----------+
|sg_uf_prova|
+-----------+
|         SP|
+-----------+



                                                                                

#### Conferencia do que restou apos o filtro
Este comando exibe as primeiras 10 linhas do DataFrame em formato de tabela. O parâmetro ``truncate=False`` é fundamental aqui, pois força o PySpark a mostrar o conteúdo completo de cada coluna, impedindo que textos longos sejam cortados ou substituídos por reticências (...).

In [9]:
df.show(10, truncate=False)

+------------+---------------+-------+---------------+-----------+----------------+---------------+---------------+---------+---------+------------+----------------+----------------+---------+----------------------+------------------+---------------+------------------+---------------------+-----------+-----------+--------------+--------------+--------------+--------------+-----------+-----------+-----------+-----------+----------+----------+----------+----------+---------------------------------------------+---------------------------------------------+--------------------------------------------------+---------------------------------------------+---------+---------------------------------------------+---------------------------------------------+--------------------------------------------------+---------------------------------------------+-----------------+-------------+-------------+-------------+-------------+-------------+---------------+----+----+----+----+----+----+----+----+-

#### Persistência e Padronização do Arquivo Final (Silver)
Esta etapa é responsável por exportar o DataFrame tratado para o diretório da camada Silver. O processo envolve duas ações principais:

 * **Gravação Unificada:** O comando .coalesce(1) é utilizado para forçar o Spark a unificar todos os dados em uma única partição, garantindo que seja gerado apenas um arquivo CSV, ao invés de vários fragmentos.

 * **Renomeação do Arquivo:** Como o Spark gera nomes automáticos (ex: part-00000...csv), utilizamos um script em Python logo em seguida para localizar esse arquivo e renomeá-lo para microdados_enem.csv, facilitando o consumo futuro.

In [10]:
import os

pasta_destino = '../silver/microdados_enem'
nome_final = 'microdados_enem_sp.csv'

df.coalesce(1) \
  .write \
  .mode("overwrite") \
  .option("header", "true") \
  .option("encoding", "UTF-8") \
  .csv(pasta_destino)

arquivos = os.listdir(pasta_destino)

for arquivo in arquivos:
    if arquivo.startswith('part-') and arquivo.endswith('.csv'):
        caminho_antigo = os.path.join(pasta_destino, arquivo)
        caminho_novo = os.path.join(pasta_destino, nome_final)
        
        if os.path.exists(caminho_novo):
            os.remove(caminho_novo)
        
        os.rename(caminho_antigo, caminho_novo)
        print(f"Sucesso! Arquivo renomeado para: {nome_final}")

[Stage 21:>                                                         (0 + 1) / 1]

Sucesso! Arquivo renomeado para: microdados_enem_sp.csv


                                                                                

### Carregando os dados na base de dados

In [1]:
import os
import sys
import psycopg2
from pyspark.sql import SparkSession

caminho_jar = (
    "/home/emivalto/workspace/BD2V/SDBD2---INEP/Transformer/postgresql-42.7.3.jar"
)
caminho_csv = "../silver/microdados_enem/microdados_enem_sp.csv"
caminho_ddl = "../silver/ddl.sql"


DB_HOST = "localhost"
DB_PORT = "5432"
DB_NAME = "dados_inep"
DB_USER = "admin"
DB_PASS = "l1l2r1r2"


jdbc_url = f"jdbc:postgresql://{DB_HOST}:{DB_PORT}/{DB_NAME}"

arquivos_necessarios = [caminho_jar, caminho_csv, caminho_ddl]
for arq in arquivos_necessarios:
    if not os.path.exists(arq):
        raise FileNotFoundError(f"Arquivo obrigatório não encontrado: {arq}")

print("arquivos localizados.")

print("\n--- [ETAPA 1] Rodando Script SQL (DDL) ---")
try:

    with open(caminho_ddl, "r", encoding="utf-8") as f:
        sql_script = f.read()
    conn = psycopg2.connect(
        host=DB_HOST, database=DB_NAME, user=DB_USER, password=DB_PASS, port=DB_PORT
    )

    with conn.cursor() as cur:
        print("Executando SQL...")
        cur.execute(sql_script)
        conn.commit()  
        print("Script SQL executado, tabela pronta.")

    conn.close()

except Exception as e:
    print(f"Erro na preparação do banco: {e}")
    sys.exit(1) 

print("\n--- [ETAPA 2] Iniciando Spark ---")

spark = (
    SparkSession.builder.appName("CargaEnem_Final")
    .config("spark.jars", caminho_jar)
    .config("spark.driver.extraClassPath", caminho_jar)
    .config("spark.executor.extraClassPath", caminho_jar)
    .getOrCreate()
)

try:
    print(f"Lendo CSV: {caminho_csv}")
    df_spark = (
        spark.read.option("header", "true")
        .option("inferSchema", "true")
        .csv(caminho_csv)
    )

    qtd_linhas = df_spark.count()
    print(f"Escrevendo {qtd_linhas} linhas no PostgreSQL...")

    df_spark.write.mode("append").format("jdbc").option("url", jdbc_url).option(
        "dbtable", "silver.microdados_enem"
    ).option("user", DB_USER).option("password", DB_PASS).option(
        "driver", "org.postgresql.Driver"
    ).option(
        "batchsize", "5000"
    ).save()

    print("\n Processo Completo Finalizado. ")

except Exception as e:
    print("\n Erro durante a carga do Spark:")
    print(e)
finally:
    spark.stop()

arquivos localizados.

--- [ETAPA 1] Rodando Script SQL (DDL) ---
Executando SQL...
Script SQL executado, tabela pronta.

--- [ETAPA 2] Iniciando Spark ---


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/01/17 03:41:23 WARN Utils: Your hostname, Emivalto, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/01/17 03:41:23 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
26/01/17 03:41:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Lendo CSV: ../silver/microdados_enem/microdados_enem_sp.csv


                                                                                

Escrevendo 509954 linhas no PostgreSQL...


26/01/17 03:41:33 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                


 Processo Completo Finalizado. 
