# 📊 Pipeline de Dados - Acidentes de Trânsito (2019-2021)

## 🛠️ Imports e Configurações Iniciais

Este notebook implementa pipelines ETL e ELT para integração de dados de acidentes de trânsito de três anos consecutivos.

In [8]:
# Instalar pacotes necessários no ambiente do notebook
%pip install pandas numpy sqlalchemy openpyxl

# Bibliotecas para manipulação de dados
import pandas as pd
import numpy as np

# Bibliotecas para banco de dados
import sqlalchemy
from sqlalchemy import create_engine, text

print("✅ Todas as bibliotecas foram importadas com sucesso!")
print(f"📦 Pandas versão: {pd.__version__}")
print(f"📦 SQLAlchemy versão: {sqlalchemy.__version__}")

619.88s - pydevd: Sending message related to process being replaced timed-out after 5 seconds


Collecting pandas
  Obtaining dependency information for pandas from https://files.pythonhosted.org/packages/ec/d3/3c37cb724d76a841f14b8f5fe57e5e3645207cc67370e4f84717e8bb7657/pandas-2.3.1-cp311-cp311-macosx_11_0_arm64.whl.metadata
  Downloading pandas-2.3.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Downloading pandas-2.3.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting numpy
  Obtaining dependency information for numpy from https://files.pythonhosted.org/packages/b7/13/e792d7209261afb0c9f4759ffef6135b35c77c6349a151f488f531d13595/numpy-2.3.2-cp311-cp311-macosx_14_0_arm64.whl.metadata
  Downloading numpy-2.3.2-cp311-cp311-macosx_14_0_arm64.whl.metadata (62 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [14]:
import pandas as pd
import numpy as np

def run_etl_pipeline():
    """
    Executa o pipeline completo de ETL para os dados de acidentes de 2019, 2020 e 2021.
    """
    print("--- Iniciando o processo de ETL ---")

    # --- 1. EXTRAÇÃO (Extract) ---
    # Carregando os três conjuntos de dados a partir dos arquivos CSV.
    try:
        print("Etapa 1: Extraindo dados dos arquivos CSV...")
        df_2019 = pd.read_csv('data/acidentes-2019.csv', delimiter=';')
        df_2020 = pd.read_csv('data/acidentes_2020-novo.csv', delimiter=';')
        df_2021 = pd.read_csv('data/acidentes2021.csv', delimiter=';')
        print("Dados extraídos com sucesso!")
    except FileNotFoundError as e:
        print(f"ERRO: Arquivo não encontrado. Verifique se os nomes dos arquivos estão corretos. Detalhes: {e}")
        return

    # --- 2. TRANSFORMAÇÃO (Transform) ---
    print("\nEtapa 2: Transformando os dados...")

    # 2.1 Padronização dos nomes das colunas
    # O dataset de 2019 tem a coluna 'DATA' em maiúsculo. Vamos padronizar para 'data'.
    df_2019.rename(columns={'DATA': 'data'}, inplace=True)
    print("- Nomes de colunas padronizados.")

    # 2.2 Harmonização das colunas (removendo colunas inconsistentes)
    # Colunas presentes em 2019 mas não em 2020/2021
    cols_to_drop_2019 = ['endereco_cruzamento', 'numero_cruzamento', 'referencia_cruzamento', 'descricao']
    df_2019.drop(columns=[col for col in cols_to_drop_2019 if col in df_2019.columns], inplace=True)

    # Coluna 'descricao' presente em 2020 mas não em 2021
    if 'descricao' in df_2020.columns:
        df_2020.drop(columns=['descricao'], inplace=True)

    # A coluna 'tipo' em 2021 está no lugar de 'descricao', mas para unificar vamos manter apenas as colunas em comum.
    # A base de 2021 não tem a coluna 'descricao', então não é necessário fazer nada.

    print("- Colunas harmonizadas entre os datasets.")

    # Lista dos dataframes para aplicar as transformações em lote
    dataframes = [df_2019, df_2020, df_2021]
    ano_inicio = 2019 # Precisa ser alterado caso entre anos anteriores no data set

    # 2.3 Transformação de data e hora
    for i, df in enumerate(dataframes):
        ano = ano_inicio + i
        # Corrigindo valores de hora inválidos como '24:00:00'
        df['hora'] = df['hora'].str.replace('24:00:00', '00:00:00', regex=False)
        # Combinando 'data' e 'hora' em uma única coluna 'timestamp'
        # 'coerce' transforma datas inválidas em NaT (Not a Time)
        df['timestamp'] = pd.to_datetime(df['data'] + ' ' + df['hora'], errors='coerce')
        # Adicionando a coluna 'ano'
        df['ano'] = ano
        # Removendo as colunas originais
        df.drop(columns=['data', 'hora'], inplace=True)

    print("- Colunas de data e hora convertidas para o formato timestamp.")
    print("- Coluna 'ano' criada.")

    # 2.4 Padronização de tipos de dados numéricos
    colunas_numericas = [
        'auto', 'moto', 'ciclom', 'ciclista', 'pedestre', 'onibus',
        'caminhao', 'viatura', 'outros', 'vitimas', 'vitimasfatais'
    ]

    for df in dataframes:
        for col in colunas_numericas:
            if col in df.columns:
                # Converte para numérico, tratando erros, e preenche NaNs com 0
                df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0)
                # Converte para inteiro
                df[col] = df[col].astype(int)

    # Tratamento específico para 'num_semaforo' que tem tipos diferentes
    for df in dataframes:
        if 'num_semaforo' in df.columns:
            df['num_semaforo'] = pd.to_numeric(df['num_semaforo'], errors='coerce').fillna(0).astype(int)

    print("- Tipos de dados das colunas numéricas padronizados para inteiro.")

    # Tratamento de endereco
    for df in dataframes:
        if 'endereco' in df.columns:
            df['endereco'] = df['endereco'].str.strip().replace(r'^\s*$', np.nan, regex=True).fillna('Desconhecido')
        if 'detalhe_endereco_acidente' in df.columns:
            df['detalhe_endereco_acidente'] = df['detalhe_endereco_acidente'].str.strip().replace(r'^\s*$', np.nan, regex=True).fillna('Desconhecido')
        if 'complemento' in df.columns:
            df['complemento'] = df['complemento'].str.strip().replace(r'^\s*$', np.nan, regex=True).fillna('Desconhecido')
        if 'numero' in df.columns:
            df['numero'] = pd.to_numeric(df['numero'], errors='coerce').fillna(-1).astype(int)

    print("- Endereços padronizados.")

    # --- 3. CARGA (Load) ---
    print("\nEtapa 3: Carregando os dados transformados...")

    # 3.1 Unificação dos DataFrames
    # Concatenando os três dataframes em um só
    df_unificado = pd.concat(dataframes, ignore_index=True)
    print("- DataFrames unificados com sucesso.")

    # Reordenando colunas para colocar 'ano' e 'timestamp' no início
    cols = ['ano', 'timestamp'] + [col for col in df_unificado.columns if col not in ['ano', 'timestamp']]
    df_unificado = df_unificado[cols]

    # 3.2 Salvando o resultado em um novo arquivo CSV
    output_filename = 'acidentes_unificados_2019-2021.csv'
    df_unificado.to_csv(output_filename, index=False, sep=';', encoding='utf-8')

    print(f"\n--- Processo de ETL Concluído! ---")
    print(f"A base de dados unificada foi salva como: '{output_filename}'")
    print(f"Total de registros na base unificada: {len(df_unificado)}")
    print("\nAmostra dos 5 primeiros registros da base final:")
    print(df_unificado.head())
    print("\nInformações da base final:")
    df_unificado.info()

    # 3.3 Inserindo no banco de dados
    print("\nEtapa 3.3: Inserindo os dados no banco de dados...")

    # criando um arquivo .db
    nome_do_banco_de_dados2 = "banco_etl.db"
    nome_tabela = 'acidentes'

    engine = create_engine(f'sqlite:///{nome_do_banco_de_dados2}')

    df_unificado.to_sql(nome_tabela,
                        con=engine,
                        if_exists='replace',
                        index=False)

    print(f"- Dados inseridos com sucesso na tabela '{nome_tabela}' do banco SQLite (arquivo '{nome_do_banco_de_dados2}').")
# Executando a função principal do pipeline
run_etl_pipeline()


--- Iniciando o processo de ETL ---
Etapa 1: Extraindo dados dos arquivos CSV...
Dados extraídos com sucesso!

Etapa 2: Transformando os dados...
- Nomes de colunas padronizados.
- Colunas harmonizadas entre os datasets.
- Colunas de data e hora convertidas para o formato timestamp.
- Coluna 'ano' criada.
- Tipos de dados das colunas numéricas padronizados para inteiro.
- Endereços padronizados.

Etapa 3: Carregando os dados transformados...
- DataFrames unificados com sucesso.

--- Processo de ETL Concluído! ---
A base de dados unificada foi salva como: 'acidentes_unificados_2019-2021.csv'
Total de registros na base unificada: 18534

Amostra dos 5 primeiros registros da base final:
    ano           timestamp natureza_acidente    situacao       bairro  \
0  2019 2019-01-01 00:41:00        SEM VÍTIMA  FINALIZADA        IPSEP   
1  2019 2019-01-01 01:37:00        SEM VÍTIMA  FINALIZADA   BOA VIAGEM   
2  2019 2019-01-01 14:20:00        SEM VÍTIMA   CANCELADA   BOA VIAGEM   
3  2019 2019

# ETAPA ELT



In [19]:
#INICIANDO A SUBIDA DOS DADOS BRUTOS DE 2019 ATÉ 2021, Utilizando as bibliotecas panda e sqlalchemy

# --- CONFIGURAÇÃO---
# Nome dos arquivos CSV corrigidos para corresponder aos arquivos reais
lista_arquivos_csv = ['data/acidentes2019.csv', 'data/acidentes2020.csv', 'data/acidentes2021.csv']

# Nome do arquivo do banco de dados
nome_do_banco_de_dados = 'banco_elt.db'

# Nome da tabela onde os dados brutos serão guardados
nome_da_tabela_consolidada = 'dados_brutos_sinistros'

# Conectar ao banco de dados
engine = create_engine(f'sqlite:///{nome_do_banco_de_dados}')

# Limpar tabela existente se houver problemas de schema
print("🔄 Limpando banco de dados existente para evitar conflitos de schema...")
try:
    with engine.connect() as conn:
        conn.execute(text(f"DROP TABLE IF EXISTS {nome_da_tabela_consolidada}"))
        conn.commit()
except:
    pass

print("--- Iniciando o processo de ELT ---")

for i, nome_arquivo in enumerate(lista_arquivos_csv):
    print(f"Processando arquivo: {nome_arquivo}")
    
    # Lendo os arquivos csv para um dataframe (EXTRACT)
    try:
        df_temp = pd.read_csv(nome_arquivo, sep=';', encoding='utf-8')
        
        # Adicionar uma coluna para saber a origem do dado
        ano = nome_arquivo.replace('acidentes', '').replace('.csv', '').replace('data/', '')
        df_temp['ano_do_dado'] = int(ano)
        
        # CARREGAMENTO (LOAD)
        # Define a estratégia: 'replace' para o primeiro arquivo, 'append' para os demais.
        if i == 0:
            modo_de_insercao = 'replace'
        else:
            modo_de_insercao = 'append'

        df_temp.to_sql(
            nome_da_tabela_consolidada,
            con=engine,
            if_exists=modo_de_insercao,
            index=False
        )
        print(f"- Arquivo {nome_arquivo} carregado com sucesso ({len(df_temp)} registros)")
        
    except FileNotFoundError as e:
        print(f"ERRO: Arquivo {nome_arquivo} não encontrado. Detalhes: {e}")
        continue
    except Exception as e:
        print(f"ERRO: Problema ao processar {nome_arquivo}. Detalhes: {e}")
        continue

# Fazendo as transformações dentro do banco
try:
    with engine.connect() as conn:
        print("\n🔄 Iniciando transformações no banco de dados (ELT)...")
        
        # Verificar se a tabela existe e tem dados
        result = conn.execute(text(f"SELECT COUNT(*) FROM {nome_da_tabela_consolidada}"))
        total_inicial = result.fetchone()[0]
        print(f"Total de registros carregados: {total_inicial}")
        
        if total_inicial == 0:
            print("❌ Nenhum dado foi carregado. Verifique os arquivos CSV.")
        else:
            # Criar coluna TimeStamp
            conn.execute(text("ALTER TABLE dados_brutos_sinistros ADD COLUMN TimeStamp TEXT;"))
            
            # Tratando 2 dados específicos que foram mal preenchidos nos dados de 2019
            conn.execute(text("""
                UPDATE dados_brutos_sinistros
                SET hora = null
                WHERE hora in ('48:00:00', '1049592:00:00');
            """))
            
            # Preencher a coluna timestamp com DATA + hora
            conn.execute(text("""
                UPDATE dados_brutos_sinistros
                SET TimeStamp =
                    coalesce(data, DATA) || ' ' ||
                    CASE
                        WHEN hora = '24:00:00' THEN '00:00:00'
                        WHEN hora IS NULL THEN '00:00:00'
                        ELSE hora
                    END;
            """))
            conn.commit()
            print("- Coluna 'timestamp' preenchida com DATA e hora.")

            # Tratar campo de número, transformando em -1
            conn.execute(text("""
                UPDATE dados_brutos_sinistros
                SET numero = CASE
                    WHEN numero IS NULL OR TRIM(numero) = '' THEN -1
                    ELSE numero
                END;
            """))
            conn.commit()
            print("- Endereço e número tratados.")

            colunas_numericas = ['auto', 'moto', 'ciclom', 'ciclista', 'pedestre',
                                 'onibus', 'caminhao', 'viatura', 'outros',
                                 'vitimas', 'vitimasfatais', 'num_semaforo']

            # Padronizando colunas que são numéricas
            for col in colunas_numericas:
                conn.execute(text(f"""
                    UPDATE dados_brutos_sinistros
                    SET {col} = CASE
                        WHEN TRIM({col}) = '' OR {col} IS NULL THEN 0
                        ELSE CAST({col} as INTEGER)
                    END;
                """))
            conn.commit()
            print("- Colunas numéricas padronizadas.")
            
            # Criando coluna de tipo do acidente
            conn.execute(text("ALTER TABLE dados_brutos_sinistros ADD COLUMN tipo_acidente TEXT;"))
            conn.execute(text("""
                UPDATE dados_brutos_sinistros
                SET tipo_acidente = CASE
                    WHEN ciclista > 0 THEN 'Com ciclista'
                    WHEN pedestre > 0 THEN 'Com pedestre'
                    WHEN vitimasfatais > 0 THEN 'Fatal'
                    ELSE 'Outros'
                END;
            """))
            conn.commit()
            print("- Coluna 'tipo_acidente' criada.")

            # Definindo que caso não haja informação sobre vítimas, colocar como 'SEM VÍTIMA'
            conn.execute(text("""
                UPDATE dados_brutos_sinistros
                SET natureza_acidente = 'SEM VÍTIMA'
                WHERE natureza_acidente IS NULL;
            """))

            # Tratamento de colunas de texto
            colunas_strings = ['bairro', 'endereco', 'detalhe_endereco_acidente', 'complemento', 
                              'endereco_cruzamento', 'conservacao_via', 'ponto_controle', 'situacao_placa',
                              'divisao_via1', 'divisao_via2', 'divisao_via3', 'bairro_cruzamento', 'tipo',
                              'acidente_verificado', 'tempo_clima', 'situacao_semaforo', 'sinalizacao',
                              'sentido_via', 'condicao_via', 'mao_direcao']
            
            for col in colunas_strings:
                conn.execute(text(f"""
                    UPDATE dados_brutos_sinistros
                    SET {col} = COALESCE(NULLIF(TRIM({col}), ''), 'Desconhecido');
                """))
            conn.commit()
            print("- Colunas de texto padronizadas.")

            # Adicionar coluna 'id_linha'
            conn.execute(text("ALTER TABLE dados_brutos_sinistros ADD COLUMN id_linha INTEGER;"))
            conn.execute(text("UPDATE dados_brutos_sinistros SET id_linha = ROWID;"))
            conn.commit()
            print("- Coluna 'id_linha' adicionada e preenchida.")

            # Tratamento da velocidade máxima da via (se a coluna existir)
            try:
                conn.execute(text("""
                    UPDATE dados_brutos_sinistros
                    SET velocidade_max_via = CASE
                        WHEN velocidade_max_via LIKE '%km/h%' THEN TRIM(REPLACE(velocidade_max_via, 'km/h', ''))
                        ELSE velocidade_max_via
                    END;
                """))
                
                conn.execute(text("""
                    UPDATE dados_brutos_sinistros
                    SET velocidade_max_via = CASE
                        WHEN TRIM(velocidade_max_via) GLOB '[0-9]*' THEN CAST(velocidade_max_via AS INTEGER)
                        ELSE -1
                    END;
                """))
                conn.commit()
                print("- Velocidade máxima da via tratada.")
            except Exception as e:
                print(f"- Velocidade máxima da via não encontrada ou erro: {e}")

except Exception as e:
    print(f"Erro durante as transformações: {e}")

# Verificação Final
print("\n--- Verificação Final do Processo ELT ---")

try:
    # Contar o total de registros na tabela consolidada
    total_registros = pd.read_sql(f"SELECT COUNT(*) FROM {nome_da_tabela_consolidada}", engine).iloc[0,0]
    print(f"A tabela final '{nome_da_tabela_consolidada}' contém um total de {total_registros} registros.")

    # Agrupar por ano para confirmar que os dados dos 3 arquivos foram carregados
    print("\nContagem de registros por ano de origem do dado:")
    df_verificacao = pd.read_sql(f'SELECT ano_do_dado, COUNT(*) as total_de_linhas FROM {nome_da_tabela_consolidada} GROUP BY ano_do_dado', engine)
    print(df_verificacao)

    # Selecionar todos os dados
    df_all = pd.read_sql(f'SELECT * FROM {nome_da_tabela_consolidada}', engine)
    print(f"\nDataFrame final criado com {len(df_all)} registros e {len(df_all.columns)} colunas.")
    print("\nPrimeiras 5 linhas:")
    print(df_all.head())
    
except Exception as e:
    print(f"Erro na verificação final: {e}")
    print("Verifique se os dados foram carregados corretamente.")

🔄 Limpando banco de dados existente para evitar conflitos de schema...
--- Iniciando o processo de ELT ---
Processando arquivo: data/acidentes2019.csv
--- Iniciando o processo de ELT ---
Processando arquivo: data/acidentes2019.csv
- Arquivo data/acidentes2019.csv carregado com sucesso (12062 registros)
Processando arquivo: data/acidentes2020.csv
- Arquivo data/acidentes2020.csv carregado com sucesso (4092 registros)
Processando arquivo: data/acidentes2021.csv
- Arquivo data/acidentes2021.csv carregado com sucesso (2380 registros)

🔄 Iniciando transformações no banco de dados (ELT)...
Total de registros carregados: 18534
- Arquivo data/acidentes2019.csv carregado com sucesso (12062 registros)
Processando arquivo: data/acidentes2020.csv
- Arquivo data/acidentes2020.csv carregado com sucesso (4092 registros)
Processando arquivo: data/acidentes2021.csv
- Arquivo data/acidentes2021.csv carregado com sucesso (2380 registros)

🔄 Iniciando transformações no banco de dados (ELT)...
Total de reg

In [None]:
# --- CÓDIGO PARA GERAR O ARQUIVO XLSX ---
# (Execute esta célula após ter executado seu código que cria o DataFrame 'df_all')

# Nome que você deseja para o arquivo Excel de saída
nome_do_arquivo_saida = 'dados_brutos_sinistros_consolidados.xlsx'

print(f"Gerando o arquivo Excel: {nome_do_arquivo_saida}...")

df_all.to_excel(
    nome_do_arquivo_saida,
    sheet_name='Dados de Sinistros',
    index=False
)

print(f"✅ Arquivo '{nome_do_arquivo_saida}' gerado com sucesso!")


Gerando o arquivo Excel: dados_brutos_sinistros_consolidados.xlsx...
✅ Arquivo 'dados_brutos_sinistros_consolidados.xlsx' gerado com sucesso!
