# ETL (Extrair, Transformar e Carregar) da camada Silver para Gold.

Este notebook realiza o processo de ETL para transformar e carregar os dados da camada Silver para a camada Gold no data lake. A camada Gold é otimizada para consultas analíticas e relatórios, garantindo que os dados estejam prontos para uso por ferramentas de BI e análise avançada.


### Configuração do Ambiente de Desenvolvimento

In [None]:
!pip install pyspark
!pip install psycopg2

# Criação e Carga das Dimensões (Star Schema)

Esta seção tem como objetivo realizar a **criação e carga das tabelas dimensão**
da camada Gold, seguindo o modelo **Star Schema**, a partir de dados previamente
tratados na camada Silver.


### Importação de Bibliotecas

Nesta seção são importadas as bibliotecas necessárias para:
- manipulação de dados com PySpark
- criação de colunas derivadas
- conexão com o banco PostgreSQL

In [None]:
import pandas as pd
import psycopg2

from psycopg2 import execute_batch, extras

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, concat_ws, year, month, dayofmonth, quarter, 
    dayofweek, when, date_format, monotonically_increasing_id,
    lit, trim, upper
)

from pyspark.sql import functions as F

from pyspark.sql.types import StringType, IntegerType, DateType, DecimalType

### Configuração do Ambiente

Nesta etapa são configurados:
- a sessão Spark
- a conexão JDBC com o PostgreSQL
- o schema de destino da camada Gold

In [None]:
spark = SparkSession.builder \
    .appName("SilverToGold") \
    .getOrCreate()

POSTGRES_HOST = "localhost"
POSTGRES_PORT = "5432"
POSTGRES_DB = "cat_db"
POSTGRES_USER = "admin"
POSTGRES_PASSWORD = "admin"

conn_params = {
    "host": POSTGRES_HOST,
    "database": POSTGRES_DB,
    "user": POSTGRES_USER,
    "password": POSTGRES_PASSWORD,
    "port": POSTGRES_PORT
}

jdbc_url = f"jdbc:postgresql://{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

connection_properties = {
    "user": POSTGRES_USER,
    "password": POSTGRES_PASSWORD,
    "driver": "org.postgresql.Driver"
}

try:
    conn = psycopg2.connect(**conn_params)
    
    query = "SELECT * FROM ACIDENTE"
    
    pdf = pd.read_sql(query, conn)
    df_silver = spark.createDataFrame(pdf)
    
    print("Dados da camada Silver carregados")
    df_silver.show()

except Exception as e:
    print(f"Erro na leitura: {e}")
finally:
    if conn:
        conn.close()
        
GOLD_SCHEMA = "gold"

### Função Genérica de Carga de Dimensões

Esta função padroniza o processo de carga das dimensões, garantindo:
- remoção de duplicidades
- preservação da business key
- inserção no schema Gold
- validação básica pós-carga

In [None]:
def save_dimension(
    df_dim,
    dim_name,
    id_col_silver,
    other_cols,
    pg_conn_params,
    cols_to_drop=None
):
    """
    Salva dados em uma dimensão no PostgreSQL usando psycopg2,
    preservando a business key.
    Retorna um DataFrame Spark da dimensão carregada (com surrogate keys).
    """

    table_name = f"{GOLD_SCHEMA}.dim_{dim_name}"
    business_key_col = f"chv_{dim_name}_org"

    # Preparação da dimensão no Spark
    df_dim_unique = (
        df_dim
        .select(col(id_col_silver).alias(business_key_col), *other_cols)
        .distinct()
        .dropna(subset=[business_key_col])
    )

    if cols_to_drop:
        df_dim_unique = df_dim_unique.drop(*cols_to_drop)

    print(f"\n---> Criando e carregando Dimensão: {table_name}")
    count_unique = df_dim_unique.count()
    print(f"     Registros únicos: {count_unique}")

    if count_unique == 0:
        print("DataFrame vazio")
        return None

    # Converter para Pandas para inserção via psycopg2
    pdf = df_dim_unique.toPandas()

    columns = list(pdf.columns)
    cols_sql = ", ".join(columns)
    values_sql = ", ".join([f"%({c})s" for c in columns])

    insert_sql = f"""
        INSERT INTO {table_name} ({cols_sql})
        VALUES ({values_sql})
        ON CONFLICT ({business_key_col}) DO NOTHING
    """

    try:
        # Conexão PostgreSQL
        with psycopg2.connect(**pg_conn_params) as conn:
            with conn.cursor() as cur:
                execute_batch(
                    cur,
                    insert_sql,
                    pdf.to_dict(orient="records"),
                    page_size=1000
                )
            conn.commit()

        print("Inserção concluída")

        # Recarregar dimensão com surrogate keys via Spark
        df_dim_loaded = spark.read \
            .format("jdbc") \
            .option("url", jdbc_url) \
            .option("dbtable", table_name) \
            .options(**connection_properties) \
            .load()

        count_check = df_dim_loaded.count()

        if count_check >= count_unique:
            print(f"Dimensão carregada. Registros confirmados: {count_check}")
        else:
            print(f"Confirmados ({count_check}) < esperados ({count_unique})")

        return df_dim_loaded

    except Exception as e:
        print(f"Erro ao gravar dimensão {table_name}: {e}")
        raise

### Dimensão Tempo

A dimensão tempo é utilizada para:
- data do acidente
- data de emissão da CAT
- data de nascimento do trabalhador

Ela permite análises temporais como sazonalidade, tendências e comparações anuais.

In [None]:
def create_dim_tempo(df_silver):
    """
    Cria dimensão tempo a partir de todas as datas do dataset.
    Usada como role-playing para: data_acidente, data_emissao, data_nascimento.
    """
    
    print("\nCriando Dimensão Tempo")
    
    # Coleta todas as datas distintas
    df_datas = df_silver.select("data_acidente").distinct() \
        .union(df_silver.select("data_emissao").distinct()) \
        .union(df_silver.select("data_nascimento").distinct()) \
        .withColumnRenamed("data_acidente", "data") \
        .distinct() \
        .dropna()
    
    # Enriquece com atributos temporais
    df_tempo = df_datas.select(
        col("data"),
        dayofmonth("data").alias("dia"),
        month("data").alias("mes"),
        date_format("data", "MMMM").alias("nome_mes"),
        quarter("data").alias("trimestre"),
        year("data").alias("ano"),
        dayofweek("data").alias("dia_semana"),
        when(dayofweek("data").isin(1, 7), True).otherwise(False).alias("is_fim_semana")
    )
    
    return save_dimension(
        df_tempo,
        "tempo",
        "data",
        ["dia", "mes", "nome_mes", "trimestre", "ano", "dia_semana", "is_fim_semana"]
    )

### Dimensão Trabalhador

Representa características demográficas e ocupacionais do trabalhador
no momento do acidente.

In [None]:
def create_dim_trabalhador(df_silver):
    """
    Dimensão com dados do trabalhador acidentado.
    Referencia: dim_cbo e dim_tempo (data_nascimento).
    """
    
    print("\nCriando Dimensão Trabalhador")
    
    df_trabalhador = df_silver.select(
        "id_trabalhador",
        upper(trim(col("sexo"))).alias("sexo"),
        col("codigo_cbo").alias("fk_cbo"),
        col("data_nascimento").alias("fk_tempo_nascimento")
    )
    
    return save_dimension(
        df_trabalhador,
        "trabalhador",
        "id_trabalhador",
        ["sexo", "fk_cbo", "fk_tempo_nascimento"]
    )

### Dimensão Empregador

A dimensão empregador representa as características da empresa
responsável pelo vínculo de trabalho no momento do acidente.

Esta dimensão referencia:
- a dimensão CNAE (atividade econômica)
- a dimensão Município (localização do empregador)

Sua criação ocorre após a carga das dimensões independentes,
garantindo integridade referencial.

In [None]:
def create_dim_empregador(df_silver):
    """
    Dimensão do empregador.
    Referencia: dim_cnae e dim_municipio.
    """
    
    print("\nCriando Dimensão Empregador")
    
    df_empregador = df_silver.select(
        col("id_empregador"),
        col("codigo_cnae").alias("fk_cnae"),
        col("codigo_municipio_empregador").alias("fk_municipio")
    )
    
    return save_dimension(
        df_empregador,
        "empregador",
        "id_empregador",
        ["fk_cnae", "fk_municipio"]
    )


### Dimensões de Classificação

As dimensões a seguir representam classificações oficiais utilizadas
para padronização e análise estatística.

In [None]:
def create_dim_cbo(df_silver):
    """
    Dimensão com códigos e descrições da CBO.
    Origem: campo codigo_cbo e descricao_cbo da Silver.
    """
    
    print("\nCriando Dimensão CBO")
    
    df_cbo = df_silver.select(
        col("codigo_cbo").alias("id_cbo"),
        col("codigo_cbo").alias("codigo"),
        col("descricao_cbo").alias("descricao")
    )
    
    return save_dimension(
        df_cbo,
        "cbo",
        "id_cbo",
        ["codigo", "descricao"]
    )
    
def create_dim_cnae(df_silver):
    """
    Dimensão com códigos CNAE da atividade econômica do empregador.
    """
    
    print("\nCriando Dimensão CNAE...")
    
    df_cnae = df_silver.select(
        col("codigo_cnae").alias("id_cnae"),
        col("codigo_cnae").alias("codigo"),
        col("descricao_cnae").alias("descricao")
    )
    
    return save_dimension(
        df_cnae,
        "cnae",
        "id_cnae",
        ["codigo", "descricao"]
    )
    
def create_dim_cid10(df_silver):
    """
    Dimensão com códigos CID-10 (Classificação Internacional de Doenças).
    """
    
    print("\nCriando Dimensão CID-10")
    
    df_cid = df_silver.select(
        col("codigo_cid10").alias("id_cid10"),
        col("codigo_cid10").alias("codigo"),
        col("descricao_cid10").alias("descricao")
    )
    
    return save_dimension(
        df_cid,
        "cid10",
        "id_cid10",
        ["codigo", "descricao"]
    )

### Dimensão Município

A dimensão município é utilizada para:
- local do acidente
- local do empregador

In [None]:
def create_dim_municipio(df_silver):
    """
    Dimensão de municípios.
    Usada como role-playing para: local_acidente e local_empregador.
    """
    
    print("\nCriando Dimensão Município")
    
    # União de municípios do acidente e do empregador
    df_mun_acidente = df_silver.select(
        col("codigo_municipio_acidente").alias("codigo_ibge"),
        col("municipio_acidente").alias("nome"),
        col("uf_acidente").alias("uf")
    )
    
    df_mun_empregador = df_silver.select(
        col("codigo_municipio_empregador").alias("codigo_ibge"),
        col("municipio_empregador").alias("nome"),
        col("uf_empregador").alias("uf")
    )
    
    df_municipio = df_mun_acidente.union(df_mun_empregador) \
        .distinct() \
        .dropna(subset=["codigo_ibge"])
    
    return save_dimension(
        df_municipio,
        "municipio",
        "codigo_ibge",
        ["nome", "uf"]
    )

### Dimensões de Caracterização do Acidente

Estas dimensões descrevem o contexto e as causas do acidente de trabalho.


In [None]:
def create_dim_tipo_acidente(df_silver):
    """
    Dimensão com tipos de acidente (típico, trajeto, doença).
    """
    
    print("\nCriando Dimensão Tipo de Acidente")
    
    df_tipo_acidente = df_silver.select(
        col("codigo_tipo_acidente").alias("id_tipo_acidente"),
        col("descricao_tipo_acidente").alias("descricao")
    )
    
    return save_dimension(
        df_tipo_acidente,
        "tipo_acidente",
        "id_tipo_acidente",
        ["descricao"]
    )
    
def create_dim_lesao(df_silver):
    """
    Dimensão com informações sobre a lesão sofrida.
    Combina natureza da lesão e parte do corpo atingida.
    """
    
    print("\nCriando Dimensão Lesão")
    
    df_lesao = df_silver.select(
        concat_ws("_", col("codigo_natureza_lesao"), col("codigo_parte_corpo")).alias("id_lesao"),
        col("descricao_natureza_lesao").alias("natureza_lesao"),
        col("descricao_parte_corpo").alias("parte_corpo_atingida")
    )
    
    return save_dimension(
        df_lesao,
        "lesao",
        "id_lesao",
        ["natureza_lesao", "parte_corpo_atingida"]
    )
    
def create_dim_agente_causador(df_silver):
    """
    Dimensão com o agente causador do acidente.
    """
    
    print("\nCriando Dimensão Agente Causador")
    
    df_agente = df_silver.select(
        col("codigo_agente_causador").alias("id_agente_causador"),
        col("descricao_agente_causador").alias("descricao")
    )
    
    return save_dimension(
        df_agente,
        "agente_causador",
        "id_agente_causador",
        ["descricao"]
    )

## Execução da Carga das Dimensões

Nesta etapa é executado o processo completo de criação e carga das dimensões,
respeitando a ordem de dependência entre elas.

In [None]:
print("\n" + "="*60)
print("CARGA DAS DIMENSÕES - STAR SCHEMA")
print("="*60)
    
try:
    dim_tempo = create_dim_tempo(df_silver)          
    dim_cbo = create_dim_cbo(df_silver)            
    dim_municipio = create_dim_municipio(df_silver)      
    dim_cnae = create_dim_cnae(df_silver)           
    dim_tipo_acidente = create_dim_tipo_acidente(df_silver)  
    dim_lesao = create_dim_lesao(df_silver)          
    dim_agente_causador = create_dim_agente_causador(df_silver)
    dim_cid10 = create_dim_cid10(df_silver)          

    # Dimensões com FKs
    dim_trabalhador = create_dim_trabalhador(df_silver)
    dim_empregador = create_dim_empregador(df_silver)
        
    print("\n" + "="*60)
    print("DIMENSÕES FORAM CARREGADAS")
    print("="*60)
        
except Exception as e:
    print(f"\nERRO NA CARGA DAS DIMENSÕES: {e}")
    raise e
    
finally:
    spark.stop()

### Preparando dados da camada Silver

In [None]:
df_base = df_silver.select(
    # Business key
    "id_cat",
    
    # Campos para lookup de FKs
    "data_acidente",
    "data_emissao",
    "data_nascimento",
    "id_trabalhador",
    "codigo_cbo",
    "id_empregador",
    "codigo_cnae",
    "codigo_municipio_acidente",
    "codigo_municipio_empregador",
    "codigo_tipo_acidente",
    col("codigo_natureza_lesao"),
    col("codigo_parte_corpo"),
    "codigo_agente_causador",
    "codigo_cid10",
    
    # Métricas
    col("dias_afastamento").cast(IntegerType()).alias("dias_afastamento"),
    col("valor_indenizacao").cast(DecimalType(15,2)).alias("valor_indenizacao"),
    col("idade_trabalhador").cast(IntegerType()).alias("idade_trabalhador"),
    
    # Flags
    when(col("houve_obito") == "S", 1).otherwise(0).alias("flag_obito"),
    when(col("houve_afastamento") == "S", 1).otherwise(0).alias("flag_afastamento"),
    when(col("comunicacao_policia") == "S", 1).otherwise(0).alias("flag_comunicacao_policia"),
    when(col("acidente_trajeto") == "S", 1).otherwise(0).alias("flag_trajeto"),
    when(col("primeira_cat") == "S", 1).otherwise(0).alias("flag_primeira_cat"),
    when(col("cat_reabertura") == "S", 1).otherwise(0).alias("flag_reabertura"),
    
    # Indicadores
    col("situacao_cat"),
    col("origem_cat"))

### Lookup de Surrogate Keys nas Dimensões

Neste passo, o pipeline realiza o mapeamento das chaves de negócio (business keys) presentes na base de fatos para as surrogate keys das respectivas tabelas de dimensão.
Cada JOIN associa os registros do fato às dimensões corretas (tempo, trabalhador, empregador, localização, classificação e natureza do acidente), garantindo integridade referencial, padronização dimensional e compatibilidade com o modelo estrela no Data Warehouse.

O processo é executado de forma incremental e controlada, com múltiplos joins à esquerda (LEFT JOIN), preservando todos os registros da fato mesmo quando alguma dimensão ainda não possui correspondência.

In [None]:
print("     [1/14] dim_tempo -> data_acidente")
df_fact = df_base.join(
    dim_tempo.select(
        col("id_tempo").alias("fk_tempo_acidente"),
        col("chv_tempo_org").alias("data_join_acidente")
    ),
    df_base.data_acidente == col("data_join_acidente"),
    "left"
).drop("data_join_acidente")

# Tempo - data de emissão
print("     [2/14] dim_tempo -> data_emissao")
df_fact = df_fact.join(
    dim_tempo.select(
        col("id_tempo").alias("fk_tempo_emissao"),
        col("chv_tempo_org").alias("data_join_emissao")
    ),
    df_fact.data_emissao == col("data_join_emissao"),
    "left"
).drop("data_join_emissao")

# Tempo - data de nascimento
print("     [3/14] dim_tempo -> data_nascimento")
df_fact = df_fact.join(
    dim_tempo.select(
        col("id_tempo").alias("fk_tempo_nascimento"),
        col("chv_tempo_org").alias("data_join_nascimento")
    ),
    df_fact.data_nascimento == col("data_join_nascimento"),
    "left"
).drop("data_join_nascimento")

# Trabalhador
print("     [4/14] dim_trabalhador")
df_fact = df_fact.join(
    dim_trabalhador.select(
        col("id_trabalhador").alias("fk_trabalhador"),
        col("chv_trabalhador_org").alias("id_trab_join")
    ),
    df_fact.id_trabalhador == col("id_trab_join"),
    "left"
).drop("id_trab_join")

# CBO
print("     [5/14] dim_cbo")
df_fact = df_fact.join(
    dim_cbo.select(
        col("id_cbo").alias("fk_cbo"),
        col("chv_cbo_org").alias("codigo_cbo_join")
    ),
    df_fact.codigo_cbo == col("codigo_cbo_join"),
    "left"
).drop("codigo_cbo_join")

# Empregador
print("     [6/14] dim_empregador")
df_fact = df_fact.join(
    dim_empregador.select(
        col("id_empregador").alias("fk_empregador"),
        col("chv_empregador_org").alias("id_emp_join")
    ),
    df_fact.id_empregador == col("id_emp_join"),
    "left"
).drop("id_emp_join")

# CNAE
print("     [7/14] dim_cnae")
df_fact = df_fact.join(
    dim_cnae.select(
        col("id_cnae").alias("fk_cnae"),
        col("chv_cnae_org").alias("codigo_cnae_join")
    ),
    df_fact.codigo_cnae == col("codigo_cnae_join"),
    "left"
).drop("codigo_cnae_join")

# Município - acidente
print("     [8/14] dim_municipio -> acidente")
df_fact = df_fact.join(
    dim_municipio.select(
        col("id_municipio").alias("fk_municipio_acidente"),
        col("chv_municipio_org").alias("mun_acidente_join")
    ),
    df_fact.codigo_municipio_acidente == col("mun_acidente_join"),
    "left"
).drop("mun_acidente_join")

# Município - empregador
print("     [9/14] dim_municipio -> empregador")
df_fact = df_fact.join(
    dim_municipio.select(
        col("id_municipio").alias("fk_municipio_empregador"),
        col("chv_municipio_org").alias("mun_empregador_join")
    ),
    df_fact.codigo_municipio_empregador == col("mun_empregador_join"),
    "left"
).drop("mun_empregador_join")

# Tipo de Acidente
print("     [10/14] dim_tipo_acidente")
df_fact = df_fact.join(
    dim_tipo_acidente.select(
        col("id_tipo_acidente").alias("fk_tipo_acidente"),
        col("chv_tipo_acidente_org").alias("tipo_acidente_join")
    ),
    df_fact.codigo_tipo_acidente == col("tipo_acidente_join"),
    "left"
).drop("tipo_acidente_join")

# Lesão
print("     [11/14] dim_lesao")
df_fact = df_fact.withColumn(
    "lesao_join_key",
    concat_ws("_", col("codigo_natureza_lesao"), col("codigo_parte_corpo"))
)

df_fact = df_fact.join(
    dim_lesao.select(
        col("id_lesao").alias("fk_lesao"),
        col("chv_lesao_org").alias("lesao_join")
    ),
    df_fact.lesao_join_key == col("lesao_join"),
    "left"
).drop("lesao_join", "lesao_join_key")

# Agente Causador
print("     [12/14] dim_agente_causador")
df_fact = df_fact.join(
    dim_agente_causador.select(
        col("id_agente_causador").alias("fk_agente_causador"),
        col("chv_agente_causador_org").alias("agente_join")
    ),
    df_fact.codigo_agente_causador == col("agente_join"),
    "left"
).drop("agente_join")

# CID-10
print("     [13/14] dim_cid10")
df_fact = df_fact.join(
    dim_cid10.select(
        col("id_cid10").alias("fk_cid10"),
        col("chv_cid10_org").alias("cid_join")
    ),
    df_fact.codigo_cid10 == col("cid_join"),
    "left"
).drop("cid_join")

print("     [14/14] Lookups concluídos!")

### Montagem da Tabela Fato

In [None]:
df_fact_final = df_fact.select(
    # Business Key
    col("id_cat").alias("chv_cat_org"),
    
    # FKs - Tempo
    "fk_tempo_acidente",
    "fk_tempo_emissao",
    "fk_tempo_nascimento",
    
    # FKs - Entidades
    "fk_trabalhador",
    "fk_cbo",
    "fk_empregador",
    "fk_cnae",
    
    # FKs - Localização
    "fk_municipio_acidente",
    "fk_municipio_empregador",
    
    # FKs - Características
    "fk_tipo_acidente",
    "fk_lesao",
    "fk_agente_causador",
    "fk_cid10",
    
    # Métricas
    "dias_afastamento",
    "valor_indenizacao",
    "idade_trabalhador",
    
    # Flags
    "flag_obito",
    "flag_afastamento",
    "flag_comunicacao_policia",
    "flag_trajeto",
    "flag_primeira_cat",
    "flag_reabertura",
    
    # Indicadores
    "situacao_cat",
    "origem_cat"
).distinct()

count_fact = df_fact_final.count()

### Carga da Tabela Fato

In [None]:
colunas = df_fact_final.columns
fact_table_name = f"{GOLD_SCHEMA}.fato_acidente_trabalho"

print(f"Colunas: {len(colunas)}")
print(f"Preparando dados para inserção...")

# Converter DataFrame para lista de tuplas
dados_para_inserir = [tuple(row) for row in df_fact_final.collect()]
print(f"     Total de registros a inserir: {len(dados_para_inserir)}")

# Conexão e inserção via psycopg2
conn = None
cursor = None

try:
    print(f"     Estabelecendo conexão com PostgreSQL")
    conn = psycopg2.connect(**conn_params)
    cursor = conn.cursor()
    
    print(f"     Iniciando inserção em lote usando execute_values")
    
    # Montar query de INSERT
    query = f"INSERT INTO {fact_table_name} ({', '.join(colunas)}) VALUES %s"
    
    # Execução em lote com execute_values
    extras.execute_values(
        cursor,
        query,
        dados_para_inserir,
        template=None,
        page_size=1000  # Insere 1000 registros por vez
    )
    
    conn.commit()
    print(f"Carga de {len(dados_para_inserir)} registros concluída")
    
    # Validação pós-carga
    print("\nValidando carga")
    cursor.execute(f"SELECT COUNT(*) FROM {fact_table_name}")
    count_check = cursor.fetchone()[0]
    print(f"Registros confirmados no banco: {count_check}")
    
    if count_check >= count_fact:
        print("carga bem sucedida")
    else:
        print(f"Confirmados ({count_check}) < esperados ({count_fact})")
        
    
except Exception as e:
    print(f"\nErro ao inserir dados: {e}")
    if conn:
        conn.rollback()
    raise e
    
finally:
    if cursor:
        cursor.close()
    if conn:
        conn.close()
    print("Conexão com PostgreSQL encerrada.")