### 📌 Passo 2: Criar a Tabela Fato

In [4]:
# 🔹 Instalar pacotes necessários (caso não estejam instalados)
!pip install --upgrade pandas psycopg2 sqlalchemy python-dotenv pymysql

# 🔹 Importações necessárias
import pandas as pd
import psycopg2
import pymysql
from sqlalchemy import create_engine
import os
import time
from dotenv import load_dotenv

# 🔹 Carregar variáveis de ambiente do .env
load_dotenv()

# 🔹 Configuração do PostgreSQL (Staging Area)
DB_USER_PG = os.getenv("DB_USER_PG")
DB_PASSWORD_PG = os.getenv("DB_PASSWORD_PG")
DB_HOST_PG = os.getenv("DB_HOST_PG")
DB_PORT_PG = os.getenv("DB_PORT_PG")
DB_NAME_PG = os.getenv("DB_NAME_PG")

DATABASE_URL_PG = f"postgresql://{DB_USER_PG}:{DB_PASSWORD_PG}@{DB_HOST_PG}:{DB_PORT_PG}/{DB_NAME_PG}?sslmode=require"

# 🔹 Configuração do MySQL (Destino)
DB_USER_MYSQL = os.getenv("DB_USER_MYSQL")
DB_PASSWORD_MYSQL = os.getenv("DB_PASSWORD_MYSQL")
DB_HOST_MYSQL = os.getenv("DB_HOST_MYSQL")
DB_PORT_MYSQL = os.getenv("DB_PORT_MYSQL")
DB_NAME_MYSQL = os.getenv("DB_NAME_MYSQL")

DATABASE_URL_MYSQL = f"mysql+pymysql://{DB_USER_MYSQL}:{DB_PASSWORD_MYSQL}@{DB_HOST_MYSQL}:{DB_PORT_MYSQL}/{DB_NAME_MYSQL}"

# 🔹 Função para criar conexão com PostgreSQL
def get_pg_connection():
    retries = 3
    for i in range(retries):
        try:
            conn = psycopg2.connect(
                dbname=DB_NAME_PG, user=DB_USER_PG, password=DB_PASSWORD_PG,
                host=DB_HOST_PG, port=DB_PORT_PG, connect_timeout=60, sslmode='require'
            )
            print("✅ Conexão com PostgreSQL estabelecida!")
            return conn
        except psycopg2.OperationalError as e:
            print(f"⚠️ Tentativa {i+1} falhou: {e}")
            time.sleep(5)
    raise Exception("❌ Não foi possível conectar ao PostgreSQL após várias tentativas.")

conn_pg = get_pg_connection()
engine_pg = create_engine(DATABASE_URL_PG)

# 🔹 Criar conexão com MySQL
engine_mysql = create_engine(DATABASE_URL_MYSQL, pool_size=5, max_overflow=10, connect_args={"connect_timeout": 60})

conn_mysql = pymysql.connect(
    host=DB_HOST_MYSQL, user=DB_USER_MYSQL, password=DB_PASSWORD_MYSQL,
    database=DB_NAME_MYSQL, port=int(DB_PORT_MYSQL)
)
cursor_mysql = conn_mysql.cursor()

# 🔹 Carregar dados da Staging Area
df_stg = pd.read_sql("SELECT * FROM stg_acidentes", con=engine_pg)

# 🔹 Verificar se a tabela fato_acidentes existe no MySQL
cursor_mysql.execute("""
    SELECT COUNT(*) 
    FROM information_schema.tables 
    WHERE table_schema = %s AND table_name = 'fato_acidentes'
""", (DB_NAME_MYSQL,))
table_exists = cursor_mysql.fetchone()[0] > 0

if table_exists:
    print("✅ Tabela fato_acidentes encontrada! Excluindo dados antigos...")
    cursor_mysql.execute("DELETE FROM fato_acidentes;")
    conn_mysql.commit()
else:
    print("⚠️ Tabela fato_acidentes não encontrada! Criando tabela...")
    cursor_mysql.execute("""
        CREATE TABLE fato_acidentes (
            sk_acidente INT AUTO_INCREMENT PRIMARY KEY,
            id VARCHAR(36),
            data DATE,
            horario TIME,
            km FLOAT,
            sk_tipo_ocorrencia VARCHAR(255),
            sk_sentido VARCHAR(255),
            sk_trecho VARCHAR(255),
            sk_rodovia INT,
            sk_tipo_veiculo VARCHAR(255),
            sk_situacao_acidente VARCHAR(255),
            quantidade_veiculos INT,
            quantidade_vitimas INT
        );
    """)
    conn_mysql.commit()
    print("✅ Tabela fato_acidentes criada com sucesso!")

# 🔹 Transformação das tabelas fato
veiculos_cols = ["automovel", "bicicleta", "moto", "caminhao", "onibus", "outros", 
                 "tracao_animal", "transporte_de_cargas_especiais", "trator_maquinas", "utilitarios"]

situacao_cols = ["ilesos", "levemente_feridos", "moderadamente_feridos", "gravemente_feridos", "mortos"]

# Criação dos DataFrames de veículos e situações
df_fato_veiculos = df_stg.melt(
    id_vars=["id", "data", "horario", "km", "tipo_de_ocorrencia", "sentido", "trecho", "concessionaria", "tipo_de_acidente"],
    value_vars=veiculos_cols, var_name="tipo_veiculo", value_name="quantidade_veiculos"
)

df_fato_situacoes = df_stg.melt(
    id_vars=["id", "data", "horario", "km", "tipo_de_ocorrencia", "sentido", "trecho", "concessionaria", "tipo_de_acidente"],
    value_vars=situacao_cols, var_name="tipo", value_name="quantidade_vitimas"
)

# Converter os valores para numéricos e filtrar registros com quantidade > 0
df_fato_veiculos["quantidade_veiculos"] = pd.to_numeric(df_fato_veiculos["quantidade_veiculos"], errors='coerce')
df_fato_situacoes["quantidade_vitimas"] = pd.to_numeric(df_fato_situacoes["quantidade_vitimas"], errors='coerce')

df_fato_veiculos = df_fato_veiculos[df_fato_veiculos["quantidade_veiculos"] > 0]
df_fato_situacoes = df_fato_situacoes[df_fato_situacoes["quantidade_vitimas"] > 0]

# Concatenar ambos os DataFrames
df_fato_final = pd.concat([df_fato_veiculos, df_fato_situacoes], ignore_index=True)

# 🔹 Adicionar surrogate key à tabela fato
df_fato_final["sk_acidente"] = range(1, len(df_fato_final) + 1)

# 🔹 Remapeamento dos nomes das colunas para que correspondam à tabela destino
df_fato_final.rename(columns={
    "tipo_de_ocorrencia": "sk_tipo_ocorrencia",
    "sentido": "sk_sentido",
    "trecho": "sk_trecho",
    "tipo_veiculo": "sk_tipo_veiculo",
    "tipo": "sk_situacao_acidente"
}, inplace=True)

# Remover colunas que não estão no esquema destino
df_fato_final.drop(columns=["concessionaria", "tipo_de_acidente"], inplace=True)

# Adicionar a coluna 'sk_rodovia', pois ela existe na tabela, mas não está no DataFrame original
df_fato_final["sk_rodovia"] = None

# Reorganizar as colunas na mesma ordem definida na tabela MySQL
df_fato_final = df_fato_final[[
    "sk_acidente", "id", "data", "horario", "km", 
    "sk_tipo_ocorrencia", "sk_sentido", "sk_trecho", "sk_rodovia",
    "sk_tipo_veiculo", "sk_situacao_acidente", "quantidade_veiculos", "quantidade_vitimas"
]]

# 🔹 Inserir os dados na tabela fato_acidentes no MySQL
df_fato_final.to_sql("fato_acidentes", con=engine_mysql, if_exists="append", index=False)
print("✅ Dados inseridos com sucesso na tabela fato_acidentes!")

# 🔹 Fechar conexões
cursor_mysql.close()
conn_mysql.close()
conn_pg.close()
print("✅ Conexões fechadas.")
