# Coleta e Tratamento de Variáveis Econômicas e Climáticas

**Projeto:** "Modelagem Preditiva de Risco de Crédito sob *Stress* Macroeconômico: uma abordade comparativa en *Ensemble Trees* e Regressão Regularizada"<br>
**Autor:** Pedro Rogério Pereira Júnior (Pereira Júnior, P. R.)<br>
**Data:** Dezembro/2025<br>

## 1. Objetivo
Este notebook realiza o processo de **ETL (Extract, Transform, Load)** para consolidar uma base de dados analítica. Os dados são coletados de fontes oficiais públicas:
* **Banco Central do Brasil (BCB/SGS):** Inadimplência, Selic, IPCA, Dólar, etc.
* **IBGE (SIDRA):** Dados de emprego (PNAD) e rendimento.
* **INMET:** Dados climáticos históricos (precipitação, temperatura, etc.).

O objetivo final é gerar arquivos `.csv` padronizados na pasta `data/raw` para a modelagem subsequente.

In [1]:
# --- 1. Configurações Iniciais e Bibliotecas ---
import pandas as pd
import numpy as np
import pandera as pa
from pandera import Column, DataFrameSchema, Check
import requests
import zipfile
import os
import io
import warnings
from pathlib import Path
from functools import reduce

# Ignorar warnings de conexões seguras para manter o log limpo
warnings.filterwarnings("ignore")

# CONFIGURAÇÃO DE DIRETÓRIOS
BASE_PATH = Path("../data/raw")
BASE_PATH.mkdir(parents=True, exist_ok=True)

print(f"Diretório de saída configurado: {BASE_PATH.resolve()}")

Diretório de saída configurado: C:\Users\pedro\projeto_inadimplencia\data\raw


## 2. Coleta de Dados do Banco Central (SGS)
Utilizamos a API do Sistema Gerenciador de Séries Temporais (SGS) para capturar indicadores macroeconômicos e de crédito.

In [2]:
# --- Função de Coleta BCB ---
def coleta_bcb(codigo: int, nome: str) -> pd.DataFrame:
    """
    Coleta uma série temporal do Banco Central (SGS) e retorna um DataFrame formatado.
    
    Args:
        codigo (int): Código da série no SGS.
        nome (str): Nome que a coluna de valor receberá.
    """
    url = f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.{codigo}/dados?formato=json&dataInicial=01/01/2015&dataFinal=31/12/2024"
    try:
        r = requests.get(url)
        r.raise_for_status() # Alerta se houver erro na requisição (404, 500)
        dados = pd.DataFrame(r.json())
        dados["data"] = pd.to_datetime(dados["data"], dayfirst=True)
        dados["valor"] = pd.to_numeric(dados["valor"], errors="coerce")
        dados.rename(columns={"valor": nome}, inplace=True)
        return dados
    except Exception as e:
        print(f"Erro ao coletar série {codigo}: {e}")
        return pd.DataFrame()

print("Iniciando coleta do Banco Central...")

# 1. Inadimplência
inad_pj_tot = coleta_bcb(21083, "inad_pj_tot")
inad_pf_tot = coleta_bcb(21084, "inad_pf_tot")
inad_rd_pf_cr_rur_tot = coleta_bcb(21148, "inad_rd_pf_cr_rur_tot")
inad_rd_pj_cr_rur_tot = coleta_bcb(21136, "inad_rd_pj_cr_rur_tot")

df_inadimplencia = reduce(lambda left, right: pd.merge(left, right, on="data", how="outer"), 
                          [inad_pj_tot, inad_pf_tot, inad_rd_pf_cr_rur_tot, inad_rd_pj_cr_rur_tot])
df_inadimplencia = df_inadimplencia.sort_values("data").reset_index(drop=True)
df_inadimplencia["data_str"] = df_inadimplencia["data"].dt.strftime("%m/%Y") # Mantendo coluna original data para ordenação

# 2. Variáveis Econômicas Diversas
selic = coleta_bcb(11, "selic")
ipca = coleta_bcb(4449, "ipca")
cdi = coleta_bcb(12, "cdi")
spread_pf = coleta_bcb(20785, "spread_pf")
spread_pj = coleta_bcb(20784, "spread_pj")
spread_tot = coleta_bcb(20783, "spread_tot")
dolar = coleta_bcb(1, "dolar_ptax")
icms_petroleo = coleta_bcb(7694, "icms_petroleo")
soja_triturada = coleta_bcb(2971, "soja_triturada_expt")
soja_residuo = coleta_bcb(2959, "soja_residuo_expt")
soja_oleo = coleta_bcb(2959, "soja_oleo_expt")
milho_grao = coleta_bcb(20213, "milho_grao_expt")

# Consolidar Mensais
lista_mensal = [ipca, spread_pf, spread_pj, spread_tot, icms_petroleo, soja_triturada, soja_residuo, soja_oleo, milho_grao]
df_econ_mensal = reduce(lambda left, right: pd.merge(left, right, on="data", how="outer"), lista_mensal)

# Consolidar Diárias (Transformando em média mensal)
lista_diaria = [selic, cdi, dolar]
df_econ_diaria_raw = reduce(lambda left, right: pd.merge(left, right, on="data", how="outer"), lista_diaria)
df_econ_diaria = df_econ_diaria_raw.set_index("data").resample("ME").mean().reset_index()

# Merge Final Economia
df_economico = pd.merge(df_econ_mensal, df_econ_diaria, on="data", how="outer").sort_values("data").reset_index(drop=True)
df_economico["data_str"] = df_economico["data"].dt.strftime("%m/%Y")

print("Dados do BCB coletados e consolidados.")

Iniciando coleta do Banco Central...
Dados do BCB coletados e consolidados.


## 3. Coleta de Dados do IBGE (SIDRA)
Extração de dados da PNAD Contínua (Emprego) e Rendimento médio.
* **Tratamento:** As datas vêm no formato "trimestre móvel" ou ano, exigindo conversão para frequência mensal.

In [3]:
# --- Coleta IBGE ---
print("Iniciando coleta do IBGE...")

# A. Renda
url_renda = "https://apisidra.ibge.gov.br/values/t/7437/n1/all/v/10750/p/last%2010/c11308/49204"
df_renda = pd.DataFrame(requests.get(url_renda).json()).iloc[1:]
df_renda = df_renda[["D3C", "V"]].rename(columns={"D3C": "data", "V": "rendimento_medio_mensal_reais"})
df_renda["rendimento_medio_mensal_reais"] = pd.to_numeric(df_renda["rendimento_medio_mensal_reais"], errors="coerce")
df_renda["data"] = pd.to_datetime(df_renda["data"], format="%Y")
# Resample anual para mensal (preenchimento forward)
df_renda_mensal = df_renda.set_index("data").resample("ME").ffill().reset_index()

# B. Trabalho (PNAD)
url_trabalho = "https://apisidra.ibge.gov.br/values/t/4093/n1/all/v/1641,4088,4090,4092,4094,4096,4099,12466/p/all"
df_trabalho = pd.DataFrame(requests.get(url_trabalho).json()).iloc[1:]
df_trabalho = df_trabalho[["D2C", "D3C", "V"]].rename(columns={"D2C": "variavel", "D3C": "data", "V": "valor"})

def clean_ibge_trimestre(date_str):
    if len(str(date_str)) == 6 and str(date_str).isdigit():
        ano = int(str(date_str)[:4])
        trim = int(str(date_str)[4:])
        mes_map = {1: 1, 2: 4, 3: 7, 4: 10}
        return pd.Timestamp(year=ano, month=mes_map.get(trim, 1), day=1)
    return pd.NaT

df_trabalho["data"] = df_trabalho["data"].apply(clean_ibge_trimestre)
df_trabalho["valor"] = pd.to_numeric(df_trabalho["valor"], errors="coerce")
df_trabalho = df_trabalho.dropna(subset=["data"]).pivot_table(index="data", columns="variavel", values="valor").reset_index()

# Renomear colunas
cols_map = {
    "1641": "pessoas_14mais_mil", "4088": "p14m_forca_trabalho_mil", 
    "4090": "p14m_ocupadas_mil", "4092": "p14m_desocupadas_mil",
    "4094": "p14m_fora_forca_trabalho_mil", "4096": "tx_foca_trabalho_p14m_pct",
    "4099": "tx_desocupacao_p14m_pct", "12466": "tx_informalidade_p14m_ocupadas_pct"
}
df_trabalho.rename(columns=cols_map, inplace=True)
df_trabalho_mensal = df_trabalho.set_index("data").resample("ME").ffill().reset_index()

# Consolidar IBGE
df_ibge = pd.merge(df_renda_mensal, df_trabalho_mensal, on="data", how="outer").sort_values("data")
df_ibge["data_str"] = df_ibge["data"].dt.strftime("%m/%Y")

print("Dados IBGE coletados.")

Iniciando coleta do IBGE...
Dados IBGE coletados.


## 4. Processamento de Dados Climáticos (INMET)
Os dados climáticos são volumosos e fornecidos em arquivos `.zip`.
* **Estratégia:** Iterar sobre os arquivos CSV dentro do zip sem extraí-los fisicamente para o disco (leitura em memória), otimizando o processamento.
* **Tratamento:** Identificação dinâmica do início da tabela de dados e cálculo de médias mensais.

In [5]:
# --- Coleta INMET ---
zip_path = BASE_PATH / "INMET.zip" 

print(f"Processando dados climáticos de: {zip_path}")

if not zip_path.exists():
    print(f"ALERTA: Arquivo {zip_path} não encontrado. Pule esta etapa se não tiver os dados locais.")
else:
    dfs_inmet = []
    with zipfile.ZipFile(zip_path, 'r') as z:
        csvs = [f for f in z.namelist() if f.lower().endswith(".csv")]
        print(f"Total de estações/arquivos encontrados: {len(csvs)}")
        
        for i, arquivo in enumerate(csvs):
            if i % 50 == 0: print(f"Processando {i}/{len(csvs)}...") # Log menos verboso
            
            with z.open(arquivo) as f:
                content = f.read().decode("latin1")
                lines = content.splitlines()
                start_row = next((idx for idx, l in enumerate(lines) if "Data Medicao" in l), None)
                
                if start_row is not None:
                    tabela = "\n".join(lines[start_row:])
                    df_temp = pd.read_csv(io.StringIO(tabela), sep=";", decimal=",", encoding="latin1")
                    dfs_inmet.append(df_temp)

    # Concatenação e Limpeza
    if dfs_inmet:
        df_inmet_full = pd.concat(dfs_inmet, ignore_index=True)
        
        map_cols = {
            "Data Medicao": "data", "INSOLACAO TOTAL, MENSAL(h)": "insolacao_total_h",
            "PRECIPITACAO TOTAL, MENSAL(mm)": "precipitacao_total_mm",
            "TEMPERATURA MAXIMA MEDIA, MENSAL(Â°C)": "temp_max_media_c",
            "UMIDADE RELATIVA DO AR, MEDIA MENSAL(%)": "umidade_media_pct"
        }
        
        df_inmet_clean = df_inmet_full.rename(columns=map_cols)[list(map_cols.values())]
        df_inmet_clean["data"] = pd.to_datetime(df_inmet_clean["data"], errors="coerce")
        
        # Agrupamento Mensal (Média de todas as estações do Brasil para simplificação macro)
        df_inmet_final = df_inmet_clean.groupby("data").mean().reset_index().round(2)
        df_inmet_final["data_str"] = df_inmet_final["data"].dt.strftime("%m/%Y")
        
        print("Dados Climáticos processados.")
    else:
        df_inmet_final = pd.DataFrame()

Processando dados climáticos de: ..\data\raw\INMET.zip
Total de estações/arquivos encontrados: 322
Processando 0/322...
Processando 50/322...
Processando 100/322...
Processando 150/322...
Processando 200/322...
Processando 250/322...
Processando 300/322...
Dados Climáticos processados.


## 5. Engenharia de Variáveis: Eventos Políticos e Calendário
Criação de variáveis *dummy* (binárias) para marcar períodos históricos relevantes que podem explicar choques na inadimplência, como:
* Pandemia de COVID-19.
* Greve dos Caminhoneiros.
* Trocas de mandatos presidenciais.

In [6]:
# --- Dataset de Eventos ---
def criar_eventos(start="2015-01-01", end="2024-12-31"):
    dates = pd.date_range(start=start, end=end, freq="MS")
    df = pd.DataFrame({"data": dates})
    
    # Flags Iniciais
    df["flag_pandemia"] = 0
    df["flag_greve_caminhoneiros"] = 0
    df["presidente"] = "Indefinido"
    
    # Regras de Negócio
    df.loc[(df["data"] >= "2020-03-01") & (df["data"] <= "2022-05-01"), "flag_pandemia"] = 1
    df.loc[df["data"] == "2018-05-01", "flag_greve_caminhoneiros"] = 1
    
    # Presidentes
    df.loc[df["data"] < "2016-09-01", "presidente"] = "Dilma"
    df.loc[(df["data"] >= "2016-09-01") & (df["data"] < "2019-01-01"), "presidente"] = "Temer"
    df.loc[(df["data"] >= "2019-01-01") & (df["data"] < "2023-01-01"), "presidente"] = "Bolsonaro"
    df.loc[df["data"] >= "2023-01-01", "presidente"] = "Lula"
    
    return df

df_eventos = criar_eventos()

# Validação Pandera
schema_eventos = DataFrameSchema({
    "data": Column(pd.Timestamp),
    "presidente": Column(str, checks=pa.Check.isin(["Dilma", "Temer", "Bolsonaro", "Lula", "Indefinido"])),
    "flag_pandemia": Column(int, checks=pa.Check.isin([0, 1])),
}, strict=False)

try:
    schema_eventos.validate(df_eventos)
    print("✅ Validação de Eventos: Sucesso")
    df_eventos["data_str"] = df_eventos["data"].dt.strftime("%m/%Y")
except Exception as e:
    print(f"❌ Erro na validação: {e}")

✅ Validação de Eventos: Sucesso


## 6. Exportação dos Dados
Salvando os DataFrames processados na pasta `data/raw` para uso nos próximos notebooks.

In [7]:
dfs_to_save = {
    "inadimplencia": df_inadimplencia,
    "economico": df_economico,
    "ibge": df_ibge,
    "inmet": df_inmet_final,
    "eventos_politicos": df_eventos
}

for nome, df in dfs_to_save.items():
    if not df.empty:
        # Removemos a coluna auxiliar 'data' (timestamp) se quisermos apenas a string, 
        # ou mantemos ambas. Aqui vou salvar limpo, mas mantendo data datetime para sorting se necessario no load
        path = BASE_PATH / f"df_{nome}.csv"
        df.to_csv(path, index=False)
        print(f"Salvo: {path} | Shape: {df.shape}")
    else:
        print(f"Ignorado (Vazio): {nome}")

Salvo: ..\data\raw\df_inadimplencia.csv | Shape: (120, 6)
Salvo: ..\data\raw\df_economico.csv | Shape: (240, 14)
Salvo: ..\data\raw\df_ibge.csv | Shape: (163, 11)
Salvo: ..\data\raw\df_inmet.csv | Shape: (144, 6)
Salvo: ..\data\raw\df_eventos_politicos.csv | Shape: (120, 5)
