In [27]:
import pandas as pd
from pathlib import Path
import os
from sqlalchemy import create_engine, text
from sqlalchemy.types import VARCHAR, INTEGER, FLOAT, TIMESTAMP, TEXT
from dotenv import load_dotenv
import logging

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

load_dotenv()

DB_URL = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}/{os.getenv('DB_NAME')}?sslmode={os.getenv('DB_SSL_MODE')}"

required_vars = ['DB_USER', 'DB_PASSWORD', 'DB_HOST', 'DB_NAME', 'DB_SSL_MODE']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
    raise ValueError(f"Variáveis faltando no .env: {missing_vars}")

BASE_DIR = Path.cwd()
BRONZE_DIR = BASE_DIR / "layer-bronze"

engine = create_engine(
    DB_URL,
    pool_size=5,
    max_overflow=10,
    pool_pre_ping=True,
    connect_args={
        "connect_timeout": 10,
        "keepalives": 1,
        "keepalives_idle": 30,
        "keepalives_interval": 10,
    }
)

try:
    with engine.connect() as conn:
        conn.execute(text("SELECT 1"))
    logger.info("Conexão com o banco de dados testada com sucesso")
except Exception as e:
    logger.error(f"Falha na conexão com o banco: {str(e)}")
    raise

2025-03-24 16:23:29,474 - __main__ - INFO - Conexão com o banco de dados testada com sucesso


In [28]:
def load_data() -> pd.DataFrame:
    arquivos = [
        "BancoVDE 2022.xlsx",
        "BancoVDE 2023.xlsx", 
        "BancoVDE 2024.xlsx"
    ]
    
    dfs = []
    
    for arquivo in arquivos:
        caminho = BRONZE_DIR / arquivo
        if not caminho.exists():
            logger.warning(f"Arquivo não encontrado: {caminho}")
            continue
            
        try:
            ano = arquivo.split()[-1].replace(".xlsx", "")
            
            df = pd.read_excel(caminho)
            
            df["ano"] = int(ano)
            
            numeric_cols = ['feminino', 'masculino', 'nao_informado', 'total_vitima', 'total', 'total_peso']
            for col in numeric_cols:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col].astype(str).str.replace(',', '.'), errors='coerce')
                    df[col] = df[col].fillna(0)
                    if col != 'total_peso':
                        df[col] = df[col].astype('int64')  
            
            text_cols = ['uf', 'municipio', 'evento', 'agente', 'arma', 'faixa_etaria', 'abrangencia', 'formulario']
            for col in text_cols:
                if col in df.columns:
                    df[col] = df[col].astype('string')
            
            dfs.append(df)
            logger.info(f" {arquivo} carregado | Registros: {len(df):,}")
            
            logger.debug(f"Amostra de dados:\n{df.head(2)}")
            logger.debug(f"Tipos de dados:\n{df.dtypes}")
            
        except Exception as e:
            logger.error(f" Erro ao processar {arquivo}: {str(e)}", exc_info=True)
            continue
    
    if not dfs:
        raise ValueError("Nenhum dado válido foi carregado")
    
    return pd.concat(dfs, ignore_index=True)

In [None]:
def save_to_db(df: pd.DataFrame, table_name: str = "ocorrencias_pmdf"):
    """Salva dados no PostgreSQL com tratamento de tipos robusto"""
    try:
        df = df.rename(columns=str.lower)  
        
        cat_cols = df.select_dtypes(include=['category']).columns
        for col in cat_cols:
            df[col] = df[col].astype('string')
        
        date_cols = [col for col in df.columns if 'data' in col]
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors='coerce')
        
        from sqlalchemy.types import VARCHAR, INTEGER, FLOAT, TIMESTAMP, TEXT
        
        dtype = {
            'uf': TEXT(),
            'municipio': TEXT(),
            'evento': TEXT(),
            'data_referencia': TIMESTAMP(),
            'agente': TEXT(),
            'arma': TEXT(),
            'faixa_etaria': TEXT(),
            'feminino': INTEGER(),
            'masculino': INTEGER(),
            'nao_informado': INTEGER(),
            'total_vitima': INTEGER(),
            'total': INTEGER(),
            'total_peso': FLOAT(),
            'abrangencia': TEXT(),
            'formulario': TEXT(),
            'ano': INTEGER()
        }
        
        dtype = {k: v for k, v in dtype.items() if k in df.columns}
        
        df.to_sql(
            name=table_name,
            con=engine,
            if_exists='replace',
            index=False,
            chunksize=1000,
            dtype=dtype,
            method='multi' 
        )
        logger.info(f"Dados salvos na tabela '{table_name}' ({len(df):,} registros)")
        
        with engine.begin() as conn:
            conn.execute(text(f"""
                CREATE INDEX IF NOT EXISTS idx_{table_name}_ano ON {table_name} (ano);
                CREATE INDEX IF NOT EXISTS idx_{table_name}_evento ON {table_name} (evento);
                CREATE INDEX IF NOT EXISTS idx_{table_name}_data ON {table_name} (data_referencia);
            """))
        logger.info("🔍 Índices criados com sucesso")
        
    except Exception as e:
        logger.error(f"Falha ao salvar no banco: {str(e)}", exc_info=True)
        raise

In [None]:
if __name__ == "__main__":
    try:
        logger.info("Iniciando pipeline de dados...")
        
        logger.info("\n Fase de extração (pode levar alguns minutos)...")
        df_final = load_data()
        
        logger.info("\n Validação dos dados:")
        logger.info(f"• Total de registros: {len(df_final):,}")
        logger.info(f"• Anos presentes: {sorted(df_final['ano'].unique())}")
        
        logger.info("\n Valores nulos por coluna:")
        for col, count in df_final.isnull().sum().items():
            if count > 0:
                logger.warning(f"⚠️ {col}: {count} valores nulos")
        
        logger.info("\n Tipos de dados finais:")
        logger.info(df_final.dtypes)
        
        logger.info("\n Fase de carregamento no banco de dados...")
        save_to_db(df_final)
        
        logger.info("\n Verificação final no banco:")
        with engine.connect() as conn:
            result = conn.execute(text("SELECT COUNT(*) FROM ocorrencias_pmdf"))
            total = result.scalar()
            logger.info(f"Total de registros na tabela: {total:,}")
            
            result = conn.execute(text("""
                SELECT ano, COUNT(*) as qtd 
                FROM ocorrencias_pmdf 
                GROUP BY ano 
                ORDER BY ano
            """))
            for row in result:
                logger.info(f"• Ano {row.ano}: {row.qtd:,} registros")
        
        logger.info("\n✅ Processo concluído com sucesso!")
        
    except Exception as e:
        logger.error(f"\n ERRO NO PROCESSO: {str(e)}", exc_info=True)
        raise

2025-03-24 16:23:29,551 - __main__ - INFO - Iniciando pipeline de dados...
2025-03-24 16:23:29,553 - __main__ - INFO - 
 Fase de extração (pode levar alguns minutos)...
2025-03-24 16:23:49,614 - __main__ - INFO -  BancoVDE 2022.xlsx carregado | Registros: 691,012
2025-03-24 16:24:08,264 - __main__ - INFO -  BancoVDE 2023.xlsx carregado | Registros: 691,012
2025-03-24 16:24:27,042 - __main__ - INFO -  BancoVDE 2024.xlsx carregado | Registros: 691,012
2025-03-24 16:24:27,960 - __main__ - INFO - 
 Validação dos dados:
2025-03-24 16:24:27,961 - __main__ - INFO - • Total de registros: 2,073,036
2025-03-24 16:24:27,984 - __main__ - INFO - • Anos presentes: [np.int64(2022), np.int64(2023), np.int64(2024)]
2025-03-24 16:24:27,985 - __main__ - INFO - 
 Valores nulos por coluna:
2025-03-24 16:24:28,229 - __main__ - INFO - 
 Tipos de dados finais:
2025-03-24 16:24:28,230 - __main__ - INFO - uf                 string[python]
municipio          string[python]
evento             string[python]
data_

In [None]:
with engine.connect() as conn:
    result = conn.execute(text("""
        SELECT column_name, data_type 
        FROM information_schema.columns 
        WHERE table_name = 'ocorrencias_pmdf'
    """))
    logger.info("\n Estrutura da tabela no banco:")
    for row in result:
        logger.info(f"• {row.column_name}: {row.data_type}")

2025-03-24 16:39:18,065 - __main__ - INFO - 
 Estrutura da tabela no banco:
2025-03-24 16:39:18,067 - __main__ - INFO - • ano: integer
2025-03-24 16:39:18,072 - __main__ - INFO - • data_referencia: timestamp without time zone
2025-03-24 16:39:18,075 - __main__ - INFO - • feminino: integer
2025-03-24 16:39:18,078 - __main__ - INFO - • masculino: integer
2025-03-24 16:39:18,080 - __main__ - INFO - • nao_informado: integer
2025-03-24 16:39:18,082 - __main__ - INFO - • total_vitima: integer
2025-03-24 16:39:18,083 - __main__ - INFO - • total: integer
2025-03-24 16:39:18,084 - __main__ - INFO - • total_peso: double precision
2025-03-24 16:39:18,086 - __main__ - INFO - • formulario: text
2025-03-24 16:39:18,087 - __main__ - INFO - • municipio: text
2025-03-24 16:39:18,090 - __main__ - INFO - • evento: text
2025-03-24 16:39:18,091 - __main__ - INFO - • uf: text
2025-03-24 16:39:18,093 - __main__ - INFO - • agente: text
2025-03-24 16:39:18,094 - __main__ - INFO - • arma: text
2025-03-24 16:39: