### 1. Configuração do Ambiente e Conexão com Banco de Dados
Importação das bibliotecas necessárias e definição da string de conexão com o PostgreSQL. Também são criadas funções auxiliares (`exec_sql` e `run_query`) para facilitar a execução de comandos SQL e consultas durante o processo.        

In [None]:
import pandas as pd
from sqlalchemy import create_engine, text

DB_CONNECTION_STRING = "postgresql://postgres:admin123@localhost:5432/postgres"

# Cria a engine de conexão
engine = create_engine(DB_CONNECTION_STRING)

# Função auxiliar para rodar comandos SQL sem retorno (CREATE, UPDATE, DROP)
def exec_sql(query):
    with engine.connect() as conn:
        conn.execute(text(query))
        conn.commit()

# Função para consultas (SELECT)
def run_query(query):
    with engine.connect() as conn:
        return pd.read_sql_query(text(query), conn)

print("Conexão com PostgreSQL configurada!")

### 2. Extração de Dados e Carga na Staging Area
Leitura dos arquivos CSV anuais (2023, 2024, 2025), unificação em um único DataFrame e padronização inicial dos nomes das colunas. Em seguida, os dados brutos são carregados para a tabela `stage.tb_staging_samu` no banco de dados, preparando o terreno para as transformações via SQL.

In [None]:
colunas_nomes = [
    'id', 'data', 'hora_minuto', 'municipio', 'bairro',
    'endereco', 'origem_chamado', 'tipo', 'subtipo',
    'sexo', 'idade', 'motivo_finalizacao', 'motivo_desfecho'
]

# Leitura (Mantendo nomes originais Maiúsculos das colunas no Pandas)
# header=None para 2025 (porque a linha 0 é dado)
# header=0 para 23/24 (porque a linha 0 é titulo)
df_2025 = pd.read_csv('../data/samu_2025.csv', header=None, names=colunas_nomes, dtype=str)
df_2024 = pd.read_csv('../data/samu_2024.csv', header=0, names=colunas_nomes, dtype=str)
df_2023 = pd.read_csv('../data/samu_2023.csv', header=0, names=colunas_nomes, dtype=str)

df_staging = pd.concat([df_2025, df_2024, df_2023], ignore_index=True)


print("Verificando estrutura do Banco de Dados...")

with engine.connect() as conn:
    # CRIA O SCHEMA SE NÃO EXISTIR
    conn.execute(text("CREATE SCHEMA IF NOT EXISTS stage;"))
    
    # CRIA A TABELA SE NÃO EXISTIR
    # Criamos tudo como TEXT para o ELT ser robusto
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS stage.tb_staging_samu (
            id_temp SERIAL,
            id TEXT,
            data TEXT,
            hora_minuto TEXT,
            municipio TEXT,
            bairro TEXT,
            endereco TEXT,
            origem_chamado TEXT,
            tipo TEXT,
            subtipo TEXT,
            sexo TEXT,
            idade TEXT,
            motivo_finalizacao TEXT,
            motivo_desfecho TEXT
        );
    """))
    
    # LIMPA A TABELA PARA NÃO DUPLICAR DADOS
    conn.execute(text("TRUNCATE TABLE stage.tb_staging_samu;"))
    conn.commit()

print(f"Iniciando carga de {len(df_staging)} linhas...")

# Carga Efetiva
df_staging.to_sql(
    'tb_staging_samu', 
    engine, 
    schema='stage', 
    if_exists='append',
    index=False, 
    method='multi', 
    chunksize=2000
)

del df_staging, df_2025, df_2024, df_2023
print("✅ Carga realizada com sucesso na tabela stage.tb_staging_samu!")

### 3. Criação da Tabela de Trabalho e Conversão de Tipos
Criação da tabela `stage.tb_trabalho` a partir dos dados da staging. Nesta etapa, realizamos a conversão de tipos de dados (casting), transformando strings em datas, horas e números inteiros, além de tratar campos vazios como NULL para facilitar a limpeza subsequente.

In [None]:
print("Criando tabela de trabalho...")

# usamos nomes minúsculos aqui para o SQL não travar
query_trabalho = """
DROP TABLE IF EXISTS stage.tb_trabalho;

CREATE TABLE stage.tb_trabalho AS
SELECT
    ROW_NUMBER() OVER () - 1 AS id_gerado,
    
    TO_DATE(NULLIF(SUBSTRING(data, 1, 10), ''), 'YYYY-MM-DD') AS data_ocorrencia,
    TO_TIMESTAMP(NULLIF(SUBSTRING(TRIM(hora_minuto), 1, 8), ''), 'HH24:MI:SS')::TIME AS hora_ocorrencia,

    CAST(NULLIF(idade, '') AS INTEGER) AS idade,

    municipio, bairro, endereco, origem_chamado,
    tipo AS tipo_ocorrencia,
    subtipo AS subtipo_ocorrencia,
    sexo, motivo_finalizacao, motivo_desfecho
FROM stage.tb_staging_samu;
"""

exec_sql(query_trabalho)
print("✅ Tabela stage.tb_trabalho criada!")

### 4. Tratamento de Valores Nulos (Imputação)
Estratégia de limpeza de dados faltantes:
1. Cálculo da **mediana** das idades para preencher registros sem essa informação.
2. Substituição de valores nulos em campos categóricos (como endereço, sexo, motivo) por valores padrão como 'NÃO INFORMADO' ou 'SEM FINALIZAÇÃO'.

In [None]:
print("4. Tratando valores nulos...")

# Calcula a Mediana (na coluna 'idade' do schema 'stage')
mediana_sql = run_query("""
    SELECT PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY idade)
    FROM stage.tb_trabalho
""").iloc[0,0]

# Converte para Inteiro
mediana_int = int(mediana_sql)
print(f"   -> Mediana calculada: {mediana_int}")

# Aplica o Update
query_fillna = f"""
UPDATE stage.tb_trabalho
SET
    idade = COALESCE(idade, {mediana_int}),
    motivo_finalizacao = COALESCE(motivo_finalizacao, 'SEM FINALIZAÇÃO'),
    endereco = COALESCE(endereco, 'NÃO INFORMADO'),
    origem_chamado = COALESCE(origem_chamado, 'NÃO INFORMADO'),
    sexo = COALESCE(sexo, 'NÃO INFORMADO'),
    municipio = COALESCE(municipio, 'NÃO INFORMADO'),
    bairro = COALESCE(bairro, 'NÃO INFORMADO'),
    
    -- CORREÇÃO AQUI: Nomes atualizados conforme criamos na Célula 3
    subtipo_ocorrencia = COALESCE(subtipo_ocorrencia, 'NÃO INFORMADO'),
    tipo_ocorrencia = COALESCE(tipo_ocorrencia, 'NÃO INFORMADO');
"""
exec_sql(query_fillna)
print("✅ Nulos tratados com valores inteiros.")

### 5. Padronização de Texto e Correção de Inconsistências
Aplicação de funções de formatação (UPPER, TRIM) para padronizar todas as colunas de texto. Também é realizada uma limpeza específica na coluna `origem_chamado`, corrigindo erros de digitação e agrupando valores inconsistentes identificados na análise exploratória.

In [None]:
# Padronização (Upper e Trim) - Usando nomes corretos (_ocorrencia)
exec_sql("""
UPDATE stage.tb_trabalho
SET
    municipio = UPPER(TRIM(municipio)),
    bairro = UPPER(TRIM(bairro)),
    endereco = UPPER(TRIM(endereco)),
    origem_chamado = UPPER(TRIM(origem_chamado)),
    sexo = UPPER(TRIM(sexo)),
    motivo_finalizacao = UPPER(TRIM(motivo_finalizacao)),
    motivo_desfecho = UPPER(TRIM(motivo_desfecho)),
    
    -- Nomes corrigidos
    tipo_ocorrencia = UPPER(TRIM(tipo_ocorrencia)),
    subtipo_ocorrencia = UPPER(TRIM(subtipo_ocorrencia));
""")

# Lista de valores ruins
valores_ruins = [
    '93999830', 'ANI/ALI','JOSELENE', 'JUSELITA',
    'MARCILIA', 'R MA','RAYSSA', 'R  CELIA','JAGUARIB',
    'MONICA', 'AV NORTE', '00', 'MONIQUE', 'CARLOS', 'SANDRO',
    'EDVALDO', 'RECIFE', 'EDIMILSO', 'MARIA', 'MANOEL R', 'TEC ENF',
    'ANTONIO'
]
lista_sql = ", ".join([f"'{x}'" for x in valores_ruins])

# Replace condicional
query_replace = f"""
UPDATE stage.tb_trabalho
SET origem_chamado = CASE
    WHEN origem_chamado IN ({lista_sql}) THEN 'NÃO INFORMADO'
    WHEN origem_chamado = 'ESTAB PR' THEN 'ESTABELECIMENTO PRIVADO'
    WHEN origem_chamado = 'ESTAB PU' THEN 'ESTABELECIMENTO PUBLICO'
    ELSE origem_chamado
END;
"""
exec_sql(query_replace)
print("✅ Padronização concluída.")

### 6. Criação da Tabela Tratada, Deduplicação e Enriquecimento
Geração da tabela final de stage (`stage.tb_samu_tratada`). O processo inclui:
1. **Deduplicação:** Uso de `DISTINCT ON` para remover registros duplicados, mantendo a integridade dos dados.
2. **Enriquecimento:** Criação de colunas derivadas como `DIA_SEMANA`, `TURNO` e `ANO_ORIGEM` para facilitar análises futuras.

In [None]:
# Limpa tabela antiga
exec_sql("DROP TABLE IF EXISTS stage.tb_samu_tratada;")


exec_sql("""
CREATE TABLE stage.tb_samu_tratada (
    id INTEGER PRIMARY KEY,
    data DATE,
    hora_minuto TIME,
    municipio VARCHAR(100),
    bairro VARCHAR(100),
    endereco VARCHAR(255),
    origem_chamado VARCHAR(100),
    tipo VARCHAR(100),
    subtipo VARCHAR(100),
    sexo VARCHAR(20),
    idade INTEGER,
    motivo_finalizacao VARCHAR(255),
    motivo_desfecho VARCHAR(255),
    dia_semana VARCHAR(20),
    turno VARCHAR(20),
    ano_origem INTEGER
);
""")

query_final = """
INSERT INTO stage.tb_samu_tratada (
    id, data, hora_minuto,
    municipio, bairro, endereco, origem_chamado,
    tipo, subtipo, sexo, idade,
    motivo_finalizacao, motivo_desfecho,
    dia_semana, turno, ano_origem
)
SELECT * FROM (
    -- SUBQUERY: Mantém a lógica de deduplicação e ordem
    SELECT DISTINCT ON (
        data_ocorrencia, hora_ocorrencia, municipio, bairro,
        endereco, origem_chamado, tipo_ocorrencia, subtipo_ocorrencia,
        sexo, idade, motivo_finalizacao, motivo_desfecho
    )
        id_gerado,
        
        data_ocorrencia,
        hora_ocorrencia,
        municipio, bairro, endereco, origem_chamado,
        tipo_ocorrencia, subtipo_ocorrencia, sexo, idade,
        motivo_finalizacao, motivo_desfecho,
        
        CASE EXTRACT(DOW FROM data_ocorrencia)
            WHEN 0 THEN 'DOMINGO' WHEN 1 THEN 'SEGUNDA-FEIRA'
            WHEN 2 THEN 'TERCA-FEIRA' WHEN 3 THEN 'QUARTA-FEIRA'
            WHEN 4 THEN 'QUINTA-FEIRA' WHEN 5 THEN 'SEXTA-FEIRA'
            WHEN 6 THEN 'SABADO'
        END,
        
        CASE
            WHEN EXTRACT(HOUR FROM hora_ocorrencia) >= 6 AND EXTRACT(HOUR FROM hora_ocorrencia) < 12 THEN 'MANHA'
            WHEN EXTRACT(HOUR FROM hora_ocorrencia) >= 12 AND EXTRACT(HOUR FROM hora_ocorrencia) < 18 THEN 'TARDE'
            WHEN EXTRACT(HOUR FROM hora_ocorrencia) >= 18 AND EXTRACT(HOUR FROM hora_ocorrencia) <= 23 THEN 'NOITE'
            ELSE 'MADRUGADA'
        END,

        EXTRACT(YEAR FROM data_ocorrencia)::INTEGER

    FROM stage.tb_trabalho
    
    ORDER BY
        data_ocorrencia, hora_ocorrencia, municipio, bairro,
        endereco, origem_chamado, tipo_ocorrencia, subtipo_ocorrencia,
        sexo, idade, motivo_finalizacao, motivo_desfecho,
        id_gerado ASC
) AS subquery

ORDER BY id_gerado ASC;
"""

exec_sql(query_final)
print("ELT CONCLUÍDO! Tabela final salva com colunas em minúsculo.")

### 7. Validação Visual dos Dados Tratados
Consulta de verificação para exibir uma amostra (20 primeiras linhas) da tabela tratada e contagem total de registros, garantindo que o processo de transformação ocorreu conforme o esperado.

In [None]:
df_visualizacao = pd.read_sql_query(
    'SELECT * FROM stage.tb_samu_tratada ORDER BY "id" ASC LIMIT 20;', 
    engine, 
    index_col="id" 
)

from IPython.display import display
display(df_visualizacao)

total = run_query("SELECT COUNT(*) FROM stage.tb_samu_tratada").iloc[0,0]
print(f"\nTotal de linhas na tabela final: {total}")

### 8. Carga das Dimensões do Data Warehouse
Povoamento das tabelas de dimensão do modelo Star Schema:
* **Localidade:** Municípios e Bairros únicos.
* **Ocorrência:** Tipos, subtipos e origens de chamado.
* **Situação:** Motivos de finalização e desfechos.
* **Paciente:** Sexo e cálculo da **Faixa Etária** (Criança, Adolescente, Adulto, Idoso).
* **Tempo:** Calendário com dias, meses, trimestres e semestres baseados nas datas dos atendimentos.

In [None]:
# Carga das dimensoes do data warehouse
print("Iniciando carga das Dimensões...")

# Dimensao localidade
print("Carregando Localidade...")
exec_sql("""
INSERT INTO dw.dim_localidade (municipio, bairro)
SELECT DISTINCT municipio, bairro
FROM stage.tb_samu_tratada
ORDER BY municipio, bairro;
""")

# Dimensao ocorrencia
print("Carregando Ocorrência...")
exec_sql("""
INSERT INTO dw.dim_ocorrencia (origem_chamado, tipo, subtipo)
SELECT DISTINCT origem_chamado, tipo, subtipo
FROM stage.tb_samu_tratada
ORDER BY tipo, subtipo;
""")

# Dimensao situacao
print("Carregando Situação...")
exec_sql("""
INSERT INTO dw.dim_situacao (motivo_finalizacao, motivo_desfecho)
SELECT DISTINCT motivo_finalizacao, motivo_desfecho
FROM stage.tb_samu_tratada;
""")

# Dimensao paciente
# Calculo de faixa etaria via sql
print("Carregando Paciente...")
exec_sql("""
INSERT INTO dw.dim_paciente (sexo, faixa_etaria)
SELECT DISTINCT 
    sexo,
    CASE 
        WHEN idade <= 12 THEN 'CRIANCA'
        WHEN idade BETWEEN 13 AND 18 THEN 'ADOLESCENTE'
        WHEN idade BETWEEN 19 AND 59 THEN 'ADULTO'
        WHEN idade >= 60 THEN 'IDOSO'
        ELSE 'NAO INFORMADO'
    END
FROM stage.tb_samu_tratada;
""")

# Dimensao tempo
# Extracao de partes da data
print("Carregando Tempo...")
exec_sql("""
INSERT INTO dw.dim_tempo (data_completa, ano, mes, dia, dia_semana, trimestre, semestre)
SELECT DISTINCT 
    data,
    EXTRACT(YEAR FROM data),
    EXTRACT(MONTH FROM data),
    EXTRACT(DAY FROM data),
    dia_semana,
    EXTRACT(QUARTER FROM data),
    CASE WHEN EXTRACT(MONTH FROM data) <= 6 THEN 1 ELSE 2 END
FROM stage.tb_samu_tratada
ORDER BY data;
""")

print("Todas as dimensões foram carregadas com sucesso!")

### 9. Carga da Tabela Fato (Fato Atendimentos)
Processo final do ELT: Carga da tabela `dw.fato_atendimentos`. Os dados tratados são cruzados (JOIN) com as dimensões carregadas anteriormente para recuperar as chaves estrangeiras (IDs) e consolidar o histórico de atendimentos no Data Warehouse.

In [None]:
# Carga da tabela fato
print("Carregando a Tabela Fato...")

# Limpeza da tabela fato para evitar duplicidade
exec_sql("TRUNCATE TABLE dw.fato_atendimentos;")

# Insercao de dados conectando ids das dimensoes
# CORREÇÃO: Removemos as aspas das colunas ST.coluna
exec_sql("""
INSERT INTO dw.fato_atendimentos (
    fk_tempo, fk_local, fk_ocorrencia, fk_situacao, fk_paciente,
    hora_exata, idade_paciente, qtd_atendimentos
)
SELECT 
    T.id_tempo,
    L.id_local,
    O.id_ocorrencia,
    S.id_situacao,
    P.id_paciente,
    ST.hora_minuto,       
    ST.idade,             
    1
    
FROM stage.tb_samu_tratada ST

-- Joins corrigidos para usar nomes minúsculos na origem (ST)
JOIN dw.dim_tempo T ON T.data_completa = ST.data
JOIN dw.dim_localidade L ON L.municipio = ST.municipio AND L.bairro = ST.bairro
JOIN dw.dim_ocorrencia O ON O.tipo = ST.tipo AND O.subtipo = ST.subtipo AND O.origem_chamado = ST.origem_chamado
JOIN dw.dim_situacao S ON S.motivo_finalizacao = ST.motivo_finalizacao AND S.motivo_desfecho = ST.motivo_desfecho
JOIN dw.dim_paciente P ON P.sexo = ST.sexo AND P.faixa_etaria = (
    CASE 
        WHEN ST.idade <= 12 THEN 'CRIANCA'
        WHEN ST.idade BETWEEN 13 AND 18 THEN 'ADOLESCENTE'
        WHEN ST.idade BETWEEN 19 AND 59 THEN 'ADULTO'
        WHEN ST.idade >= 60 THEN 'IDOSO'
        ELSE 'NAO INFORMADO'
    END
);
""")

print("Data Warehouse Concluído!")

# Verificacao de total de registros carregados
total_fato = run_query("SELECT count(*) FROM dw.fato_atendimentos").iloc[0,0]
print(f"Total de registros na Fato: {total_fato}")