Diferente do processo de ETL, no ELT, os dados são carregados diretamente, da maneira que foram extraídos e a transformação é feita por comandos SQL, posteriormente.

In [1]:
import pandas as pd
from sqlalchemy import create_engine, text

# Extração dos dados e Load

In [2]:
# Configuração do Banco de Dados

usuario_db = 'postgres'
senha_db = '254535'
host_db = 'localhost'
porta_db = '5432'
nome_banco_db = 'elt_BD'

# String de Conexão PostgreSQL
DATABASE_URL = f'postgresql://{usuario_db}:{senha_db}@{host_db}:{porta_db}/{nome_banco_db}'

print(f"Tentando conectar em: {host_db}:{porta_db}/{nome_banco_db}...")

try:
    # Criando a engine
    engine = create_engine(DATABASE_URL)

    with engine.connect() as conn: # apenas para teste de conexão
        conn.execute(text("SELECT 1"))

    print("Conexão com PostgreSQL configurada com sucesso!")

except Exception as e:
    print(f"\n ERRO NA CARGA DO BANCO!")
    print(f"Detalhe do erro: {e}")

Tentando conectar em: localhost:5432/elt_BD...
Conexão com PostgreSQL configurada com sucesso!


In [3]:
# Extract & load dos arquivos

arquivos = {
    '2022': 'data/156_2022.csv',
    '2024': 'data/156_2024.csv',
    '2025': 'data/156_2025.csv'
}

# Limpeza inicial para garantir que não haja dados duplicados
with engine.begin() as conn:
    conn.execute(text("DROP TABLE IF EXISTS raw_chamados_156 CASCADE;"))

# Loop de carga
for ano, arquivo in arquivos.items():
    try:
        encoding = 'utf-8' if ano == '2022' else 'latin1'
        sep = ';'
        
        # Lê o CSV como texto, forçando tudo como string para evitar erros de tipo no load inicial
        df_raw = pd.read_csv(arquivo, sep=sep, encoding=encoding, dtype=str)

        # Força todos os nomes das colunas a serem minúsculos para não ocorrer erros nas chamadas
        df_raw.columns = df_raw.columns.str.lower()
        # Coluna de rastreamento
        df_raw['ano_origem'] = ano
        
        # Vai empilhando os dados dos anos
        df_raw.to_sql('raw_chamados_156', con=engine, if_exists='append', index=False)
        
        print(f"Dados de {ano} carregados para a tabela 'raw_chamados_156'.")
        
    except Exception as e:
        print(f"Erro no ano {ano}: {e}")

Dados de 2022 carregados para a tabela 'raw_chamados_156'.
Dados de 2024 carregados para a tabela 'raw_chamados_156'.
Dados de 2025 carregados para a tabela 'raw_chamados_156'.


# Transformação dos dados

**Criando as dimensões**
- Dimensão serviço (gruposervico_codigo, gruposervico_descricao, servico_codigo, servico_descricao)
- Dimensão localização (logradouro, numero, bairro, rpa, latitude, longitude)
- Dimensão situacao (situacao)
- Dimensão tempo (dia, mes, ano, trimestre)

> Durante a execução das *queries* de criação, aplicaremos funções de tratamento e conversão em SQL para garantir a qualidade dos dados finais. 

In [4]:
with engine.begin() as conn:
    # Utilização da extensão unaccent do Postgres para remover acentos
    # Verifica se não há erro na utilização da extensão
    try:
        conn.execute(text("CREATE EXTENSION IF NOT EXISTS unaccent;"))
        print("Extensão unaccent habilitada (limpeza de acentos ativa)")
    except Exception as e:
        print("Extensão unaccent não disponível. A limpeza de acentos será ignorada.")


    # 1. Criação da Dimensão Serviço
    conn.execute(text("""
        DROP TABLE IF EXISTS dim_servico CASCADE;
        
        CREATE TABLE dim_servico AS
        SELECT DISTINCT
            -- Criação do ID - chave primária substituta
            ROW_NUMBER() OVER (ORDER BY servico_descricao) as id_servico,
            
            gruposervico_codigo,
            -- Padronização para caixa alta e remoção de espaços extras
            UPPER(TRIM(gruposervico_descricao)) as gruposervico_descricao,
                      
            servico_codigo,
            UPPER(TRIM(servico_descricao)) as servico_descricao
        
        FROM raw_chamados_156
        WHERE servico_descricao IS NOT NULL;
    """))

    # 2. Criação da Dimensão Localização
    conn.execute(text("""
        DROP TABLE IF EXISTS dim_localizacao CASCADE;
        
        CREATE TABLE dim_localizacao AS
        SELECT DISTINCT
            -- Criação do ID - chave primária substituta
            ROW_NUMBER() OVER (ORDER BY bairro, logradouro) as id_localizacao,
            
            -- Inclusão dos elementos, além da padronização
            UPPER(TRIM(bairro)) as bairro,
            rpa,
            UPPER(TRIM(logradouro)) as logradouro
            
        FROM raw_chamados_156
        WHERE bairro IS NOT NULL;          
    """))

    # 3. Criação da Dimensão Situação
    conn.execute(text("""
        DROP TABLE IF EXISTS dim_situacao CASCADE;
        
        CREATE TABLE dim_situacao AS
        SELECT DISTINCT
            -- Criação do ID - chave primária substituta
            ROW_NUMBER() OVER (ORDER BY situacao) as id_situacao,
            
            -- Inclusão de situação, além da padronização dos dados
            UPPER(TRIM(situacao)) as situacao
            
        FROM raw_chamados_156
        WHERE situacao IS NOT NULL;     
    """))

    # 4. Criação da Dimensão Tempo
    conn.execute(text("""
        DROP TABLE IF EXISTS dim_tempo CASCADE;
        
        CREATE TABLE dim_tempo AS
        WITH todas_datas AS (
            -- Pega datas de solicitação
            SELECT CAST(data_demanda AS DATE) as data_ref 
            FROM raw_chamados_156 
            WHERE data_demanda IS NOT NULL
            UNION
            -- Pega datas de finalização
            SELECT CAST(data_ult_situacao AS DATE) as data_ref 
            FROM raw_chamados_156 
            WHERE data_ult_situacao IS NOT NULL
        )
        SELECT DISTINCT
            -- Criação do ID - chave primária substituta
            ROW_NUMBER() OVER (ORDER BY data_ref) as id_tempo,
            data_ref as data_completa,
            
            -- Extração de atributos de data
            EXTRACT(DAY FROM data_ref) as dia,
            EXTRACT(MONTH FROM data_ref) as mes,
            EXTRACT(YEAR FROM data_ref) as ano,
            EXTRACT(QUARTER FROM data_ref) as trimestre,
            
            -- Nome do dia da semana
            TO_CHAR(data_ref, 'Day') as dia_semana
            
        FROM todas_datas;         
    """))

Extensão unaccent habilitada (limpeza de acentos ativa)


In [None]:
with engine.begin() as conn:
    # Aplicando chaves primárias às dimensões
    conn.execute(text("ALTER TABLE dim_servico ADD PRIMARY KEY (id_servico);"))
    conn.execute(text("ALTER TABLE dim_localizacao ADD PRIMARY KEY (id_localizacao);"))
    conn.execute(text("ALTER TABLE dim_situacao ADD PRIMARY KEY (id_situacao);"))
    conn.execute(text("ALTER TABLE dim_tempo ADD PRIMARY KEY (id_tempo);"))

    #criando indices auxiliares funcionais na tabela raw para tentar melhor performance
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_raw_servico_full ON raw_chamados_156(servico_codigo, (UPPER(TRIM(servico_descricao))));"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_raw_bairro_func ON raw_chamados_156((UPPER(TRIM(bairro))));"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_raw_lograd_func ON raw_chamados_156((UPPER(TRIM(logradouro))));"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_raw_situacao_func ON raw_chamados_156((UPPER(TRIM(situacao))));"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_raw_data_demanda ON raw_chamados_156(data_demanda);"))
    conn.execute(text("CREATE INDEX IF NOT EXISTS idx_raw_data_ult ON raw_chamados_156(data_ult_situacao);"))
    

**A tabela Fato do Esquema Estrela**
- Nesta etapa, unificamos os dados brutos com as chaves das dimensões e calculamos as métricas principais do negócio: volume, taxa de resolução e tempo médio de atendimento.

In [None]:

with engine.begin() as conn:
    conn.execute(text("SET work_mem = '128MB';"))
    
    # 5. Criação da tabela fato Chamado
    conn.execute(text("""
        DROP TABLE IF EXISTS fato_chamados CASCADE;
        
        CREATE TABLE fato_chamados AS
        SELECT 
            -- chaves estrangeiras
            s.id_servico,
            l.id_localizacao,
            sit.id_situacao,
            t_ini.id_tempo as id_tempo_inicio,
            t_fim.id_tempo as id_tempo_fim,
            
            -- Metadados
            r.ano_origem,
            r.situacao as situacao_original, 
            
            -- MÉTRICAS DE NEGÓCIO
            
            -- Resolutividade (1 = Resolvido, 0 = Pendente)
            -- Permite somar para saber o total de atendimentos concluídos
            CASE 
                WHEN UPPER(TRIM(r.situacao)) = 'ATENDIDA' THEN 1 
                ELSE 0 
            END as is_resolvido,
            
            -- Eficiência (tempo de atendimento em dias)
            -- Calcula a diferença apenas para chamados fechados
            CASE 
                WHEN UPPER(TRIM(r.situacao)) = 'ATENDIDA' AND r.data_ult_situacao IS NOT NULL THEN 
                    (CAST(r.data_ult_situacao AS DATE) - CAST(r.data_demanda AS DATE))
                ELSE NULL 
            END as dias_para_conclusao

        FROM raw_chamados_156 r
        
                      
        -- Conectando com todas as dimensões criadas
        
        LEFT JOIN dim_servico s 
            ON r.servico_codigo = s.servico_codigo 
            AND UPPER(TRIM(r.servico_descricao)) = s.servico_descricao
            
        LEFT JOIN dim_localizacao l 
            ON UPPER(TRIM(r.bairro)) = l.bairro 
            AND UPPER(TRIM(r.logradouro)) = l.logradouro
            
        LEFT JOIN dim_situacao sit
            ON UPPER(TRIM(r.situacao)) = sit.situacao
            
        LEFT JOIN dim_tempo t_ini 
            ON CAST(r.data_demanda AS DATE) = t_ini.data_completa
            
        LEFT JOIN dim_tempo t_fim 
            ON CAST(r.data_ult_situacao AS DATE) = t_fim.data_completa;
    """))

In [None]:
df_analise = pd.read_sql("SELECT * FROM fato_chamados LIMIT 20", engine)
display(df_analise)