In [1]:
# Manipulação e análise de dados
import pandas as pd
import numpy as np

# Validação de dados
import pandera as pa
from pandera import Column, DataFrameSchema, Check

# Requisições e manipulação de arquivos
import requests
import zipfile
import os
import io
from pathlib import Path

# Utilitários
from functools import reduce

In [4]:
# -------------------------------
# Função para coletar dados do Banco Central (SGS)
# -------------------------------
def coleta_bcb(codigo, nome):
    url = f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.{codigo}/dados?formato=json&dataInicial=01/01/2015&dataFinal=31/12/2024"
    r = requests.get(url)
    dados = pd.DataFrame(r.json())
    dados["data"] = pd.to_datetime(dados["data"], dayfirst=True)
    dados["valor"] = dados["valor"].astype(float)
    dados.rename(columns={"valor": nome}, inplace=True)
    return dados

# -------------------------------
# Coletar Inadimplências
# -------------------------------
inad_pj_tot = coleta_bcb(21083, "inad_pj_tot")                                          # Inadimplência da carteira de crédito das Pessoas Jurídicas_PJ _Total
inad_pf_tot = coleta_bcb(21084, "inad_pf_tot")                                          # Inadimplência da carteira de crédito das Pessoas Físicas_PF_Total
inad_rd_pf_cr_rur_tot = coleta_bcb(21148, "inad_rd_pf_cr_rur_tot")
inad_rd_pj_cr_rur_tot = coleta_bcb(21136, "inad_rd_pj_cr_rur_tot")

# Lista de Inadimplências:
lista_inad = [
    inad_pj_tot,
    inad_pf_tot,
    inad_rd_pf_cr_rur_tot,
    inad_rd_pj_cr_rur_tot
]

# Juntar em um único dataframe
df_inadimplencia = reduce(
    lambda left, right: pd.merge(left, right, on="data", how="outer"),
    lista_inad
)

# Ordenar por 'data'
df_inadimplencia = df_inadimplencia.sort_values("data").reset_index(drop=True)
df_inadimplencia["data"] = df_inadimplencia["data"].dt.strftime("%m/%Y")

In [5]:
# -------------------------------
# Coletar outras variáveis
# -------------------------------
selic = coleta_bcb(11, "selic")                                 # Taxa Selic
ipca = coleta_bcb(4449, "ipca")                                 # Inflação IPCA
cdi = coleta_bcb(12, "cdi")                                     # CDI (média diária)
spread_pf = coleta_bcb(20785, "spread_pf")                      # Spread bancário PF
spread_pj = coleta_bcb(20784, "spread_pj")                      # Spread bancário PJ
spread_tot = coleta_bcb(20783, "spread_tot")                    # Spread bancário total
dolar = coleta_bcb(1, "dolar_ptax")                             # Taxa de câmbio - Livre - Dólar americano (venda)
icms_petroleo = coleta_bcb(7694, "icms_petroleo")               # Arrecadação de ICMS (Fluxos) - Petróleo - Total Nacional 
soja_triturada_expt = coleta_bcb(2971, "soja_triturada_expt")   # Exportações - Soja mesmo triturada _ US$
soja_residuo_expt = coleta_bcb(2959, "soja_residuo_expt")       # Exportações - Farelo e resíduos da extração de óleo de soja _ US$
soja_oleo_expt = coleta_bcb(2959, "soja_oleo_expt")             # Exportações - Óleo de soja em bruto _ US$
milho_grao_expt = coleta_bcb(20213, "milho_grao_expt")          # Exportações - Milhos em grãos _ US$


# Lista de Variáveis Econômicas_periodicidade mensal:
lista_var_econ_mensal = [
    ipca,
    spread_pf,
    spread_pj,
    spread_tot,
    icms_petroleo,
    soja_triturada_expt,
    soja_residuo_expt,
    soja_oleo_expt,
    milho_grao_expt
]

# Lista de Variáveis Econômicas_periodicidade diária:
lista_var_econ_diaria = [
    selic,
    cdi,
    dolar
]

# Juntar variáveis mensais:
df_econ_mensal = reduce(
    lambda left, right: pd.merge(left, right, on="data", how="outer"),
    lista_var_econ_mensal
)

# Juntar variáveis diárias:
df_econ_diaria = reduce(
    lambda left, right: pd.merge(left, right, on="data", how="outer"),
    lista_var_econ_diaria
)

# Converter datas:
df_econ_mensal["data"] = pd.to_datetime(df_econ_mensal["data"])
df_econ_mensal["data"] = df_econ_mensal["data"].dt.strftime("%m/%Y")
df_econ_diaria["data"] = pd.to_datetime(df_econ_diaria["data"])

# Transformar diárias em mensais (média mensal):
df_econ_diaria = df_econ_diaria.resample("ME", on="data").mean().reset_index()
df_econ_diaria["data"] = df_econ_diaria["data"].dt.strftime("%m/%Y")

# Juntar em um único dataframe:
df_economico = pd.merge(df_econ_mensal, df_econ_diaria, on="data", how="outer")
df_economico = df_economico.sort_values("data").reset_index(drop=True)


In [6]:
# -------------------------------
# Cletar dados do IBGE (SIDRA)
# -------------------------------

# URL da API para Rendimento médio real habitual da população ocupada (PNAD Contínua)
url_renda = "https://apisidra.ibge.gov.br/values/t/7437/n1/all/v/10750/p/last%2010/c11308/49204"

# Fazendo a requisição e conversão
response_renda = requests.get(url_renda)
dados_renda = response_renda.json()

# Transformando em DataFrame
df_renda = pd.DataFrame(dados_renda)

# Exclusão das variáveis desnecessárias e renomear as variáveis úteis
df_renda = df_renda.iloc[1:].reset_index(drop=True)
df_renda = df_renda[["D2C", "D3C", "MN", "V"]]
df_renda.rename(columns={
    "D2C": "variavel",
    "D3C": "data",
    "MN": "unid_medida",
    "V": "valor"
    }, inplace=True)

# limpar valores inválidos
df_renda["valor"] = (
    df_renda["valor"]
    .replace(["...", "", "nan", "NaN", None, pd.NA], np.nan)
    .astype(float)
)

lista_nomes_variaveis_renda = {
    "10750": "rendimento_medio_mensal_reais"
}

# Renda anual → expandir para meses
df_renda["data"] = pd.to_datetime(df_renda["data"], format="%Y")
df_renda = df_renda.pivot_table(index="data", columns="variavel", values="valor", aggfunc="mean").reset_index()

# Renomear colunas usando o dicionário
df_renda.rename(columns=lista_nomes_variaveis_renda, inplace=True)

df_renda_mensal = df_renda.set_index("data").resample("ME").ffill().reset_index()
df_renda_mensal["data"] = df_renda_mensal["data"].dt.strftime("%m/%Y")



# URL da API para dados de Trabalho do IBGE
url_trabalho = f"https://apisidra.ibge.gov.br/values/t/4093/n1/all/v/1641,4088,4090,4092,4094,4096,4099,12466/p/all"

response_trabalho = requests.get(url_trabalho)
dados_trabalho = response_trabalho.json()

# Transformando em DataFrame
df_trabalho = pd.DataFrame(dados_trabalho)

# Exclusão das variáveis desnecessárias e renomear as variáveis úteis
df_trabalho = df_trabalho.iloc[1:].reset_index(drop=True)
df_trabalho = df_trabalho[["D2C", "D3C", "MN", "V"]]
df_trabalho.rename(columns={
    "D2C": "variavel",
    "D3C": "data",
    "MN": "unid_medida",
    "V": "valor"
    }, inplace=True)

def converte_trimestre(valor):
    valor = str(valor)
    # garante que tem 6 dígitos (AAAA + TT)
    if len(valor) != 6 or not valor.isdigit():
        return pd.NaT
    ano = int(valor[:4])
    trimestre = int(valor[4:])
    # trimestre → mês inicial
    mes = {1: 1, 2: 4, 3: 7, 4: 10}.get(trimestre)
    # se trimestre for inválido
    if mes is None:
        return pd.NaT
    return pd.Timestamp(year=ano, month=mes, day=1)


# limpar valores inválidos
df_trabalho["data"] = df_trabalho["data"].replace(["...", "", "nan", None, pd.NA], np.nan)
df_trabalho = df_trabalho[df_trabalho["data"].str.match(r"^\d{6}$", na=False)]

# Limpar a coluna 'valor'
df_trabalho["valor"] = (
    df_trabalho["valor"]
    .replace(["...", "", "nan", "NaN", None, pd.NA], np.nan)
    .astype(float)
)

# Converter trimestre
df_trabalho["data"] = df_trabalho["data"].apply(converte_trimestre)

# Pivotar ANTES do resample
df_trabalho = df_trabalho.pivot_table(index="data", columns="variavel", values="valor", aggfunc="mean").reset_index()

lista_nomes_variaveis_trabalho = {
    "1641": "pessoas_14mais_mil",
    "4088": "p14m_forca_trabalho_mil",
    "4090": "p14m_ocupadas_mil",
    "4092": "p14m_desocupadas_mil",
    "4094": "p14m_fora_forca_trabalho_mil",
    "4096": "tx_foca_trabalho_p14m_pct",
    "4099": "tx_desocupacao_p14m_pct",
    "12466": "tx_informalidade_p14m_ocupadas_pct"
}

# Renomear colunas
df_trabalho.rename(columns=lista_nomes_variaveis_trabalho, inplace=True)

# expandir para meses
df_trabalho_mensal = (df_trabalho.set_index("data").resample("ME").ffill().reset_index())
df_trabalho_mensal["data"] = df_trabalho_mensal["data"].dt.strftime("%m/%Y")

# Juntar em um único dataframe:
df_ibge = pd.merge(df_renda_mensal, df_trabalho_mensal, on="data", how="outer")
df_ibge = df_ibge.sort_values("data").reset_index(drop=True)

In [7]:
# Dicionário com nome → dataframe
dfs_para_salvar = {
    "inadimplencia": df_inadimplencia,
    "economico": df_economico,
    "ibge": df_ibge
}

# Loop para salvar todos
for nome, df in dfs_para_salvar.items():
    caminho = f"../data/raw/df_{nome}.csv"
    df.to_csv(caminho, index=False)
    print(f"Salvo: {caminho}")

Salvo: ../data/raw/df_inadimplencia.csv
Salvo: ../data/raw/df_economico.csv
Salvo: ../data/raw/df_ibge.csv


In [8]:
# -------------------------------
# Cletar dados do INMET
# -------------------------------

zip_path = "C:/Users/pedro/projeto_inadimplencia/data/raw/INPE.zip"

dfs = []

with zipfile.ZipFile(zip_path, 'r') as z:
    arquivos = [f for f in z.namelist() if f.lower().endswith(".csv")]

    print(f"Total de arquivos CSV encontrados: {len(arquivos)}")

    for i, arquivo in enumerate(arquivos, start=1):
        print(f"Lendo arquivo {i}/{len(arquivos)}: {arquivo}")

        with z.open(arquivo) as f:
            # Ler o arquivo inteiro como texto
            conteudo = f.read().decode("latin1")  # encoding típico do INMET

            # Quebrar em linhas
            linhas = conteudo.splitlines()

            # Encontrar a linha onde começa a tabela (a que contém "Data Medicao")
            inicio_tabela = None
            for idx, linha in enumerate(linhas):
                if "Data Medicao" in linha:
                    inicio_tabela = idx
                    break

            if inicio_tabela is None:
                print(f"⚠ Não encontrei tabela em {arquivo}")
                continue

            # Extrair apenas a parte tabular
            tabela = "\n".join(linhas[inicio_tabela:])

            # Ler a tabela com pandas
            df = pd.read_csv(
                io.StringIO(tabela),
                sep=";",              # separador é ;
                decimal=",",           # vírgula decimal
                encoding="latin1"
            )

            df["arquivo_origem"] = arquivo
            dfs.append(df)

# Concatenar tudo
df_inmet = pd.concat(dfs, ignore_index=True)

Total de arquivos CSV encontrados: 322
Lendo arquivo 1/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82704_M_2014-01-01_2016-01-29.csv
Lendo arquivo 2/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82915_M_2014-01-01_2023-08-14.csv
Lendo arquivo 3/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82807_M_2014-01-01_2025-12-26.csv
Lendo arquivo 4/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82113_M_2014-01-01_2025-04-24.csv
Lendo arquivo 5/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82410_M_2014-01-01_2025-04-27.csv
Lendo arquivo 6/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82425_M_2014-01-01_2016-01-29.csv
Lendo arquivo 7/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82326_M_2014-01-01_2025-12-26.csv
Lendo arquivo 8/322: $2a$10$iB76aNQvei789VcEXoSH.94OovXJv3GWpaZFK7vBSt2h3mIIbT6/dados_82610_M_2014-01-01_2025-12-26.csv
L

In [9]:
# Renomear colunas
rename_map = {
    "Data Medicao": "data",
    "INSOLACAO TOTAL, MENSAL(h)": "insolacao_total_h",
    "PRECIPITACAO TOTAL, MENSAL(mm)": "precipitacao_total_mm",
    "TEMPERATURA MAXIMA MEDIA, MENSAL(Â°C)": "temp_max_media_c",
    "TEMPERATURA MINIMA MEDIA, MENSAL(Â°C)": "temp_min_media_c",
    "UMIDADE RELATIVA DO AR, MEDIA MENSAL(%)": "umidade_media_pct",
    "VENTO, VELOCIDADE MAXIMA MENSAL(m/s)": "vento_vel_max_ms",
    "VENTO, VELOCIDADE MEDIA MENSAL(m/s)": "vento_vel_media_ms"
}

df_inmet = df_inmet.rename(columns=rename_map)

# Remover colunas indesejáveis
df_inmet = df_inmet.drop(columns=[
    "NEBULOSIDADE, MEDIA MENSAL(dÃ©cimos)",
    "Unnamed: 9",
    "arquivo_origem",
    "Unnamed: 1"
], errors="ignore")

# Converter data
df_inmet["data"] = pd.to_datetime(df_inmet["data"], errors="coerce")
df_inmet["data"] = df_inmet["data"].dt.strftime("%m/%Y")

# Juntar os dados como média mensal
df_mensal = df_inmet.groupby("data", as_index=False).mean(numeric_only=True).round(2)
df_inmet = df_mensal.sort_values("data").reset_index(drop=True)

In [10]:
# Salvar
output_path = "C:/Users/pedro/projeto_inadimplencia/data/raw/df_inmet.csv"
os.makedirs("C:/Users/pedro/projeto_inadimplencia/data/raw/", exist_ok=True)
df_inmet.to_csv(output_path, index=False, encoding="utf-8")

print(f"\nArquivo final salvo em: {output_path}")


Arquivo final salvo em: C:/Users/pedro/projeto_inadimplencia/data/raw/df_inmet.csv


In [11]:
# Configurações
BASE_PATH = Path("C:/Users/pedro/projeto_inadimplencia/data/raw")
BASE_PATH.mkdir(parents=True, exist_ok=True)

def criar_dataset_eventos(data_inicio="2015-01-01", data_fim="2024-12-31"):
    print("--- Gerando Dataset de Eventos Políticos ---")
    
    # 1. Criar range de datas mensal
    dates = pd.date_range(start=data_inicio, end=data_fim, freq="MS") # MS = Month Start
    df = pd.DataFrame({"data": dates})
    
    # 2. Inicializar Flags (0 para tudo)
    df["flag_pandemia"] = 0
    df["flag_greve_caminhoneiros"] = 0
    df["flag_eleicao_presidencial"] = 0
    df["flag_joesley_day"] = 0
    df["flag_impeachment"] = 0
    
    # 3. Definir Períodos Presidenciais (Categorical)
    # Útil para agrupar análises: "Qual foi a média de inadimplência no Gov Temer?"
    df["presidente"] = "Indefinido"
    
    mask_dilma = (df["data"] < "2016-09-01")
    mask_temer = (df["data"] >= "2016-09-01") & (df["data"] < "2019-01-01")
    mask_bolsonaro = (df["data"] >= "2019-01-01") & (df["data"] < "2023-01-01")
    mask_lula3 = (df["data"] >= "2023-01-01")
    
    df.loc[mask_dilma, "presidente"] = "Dilma"
    df.loc[mask_temer, "presidente"] = "Temer"
    df.loc[mask_bolsonaro, "presidente"] = "Bolsonaro"
    df.loc[mask_lula3, "presidente"] = "Lula"

    # 4. Mapear Eventos de Choque (Dummies)
    
    # Pandemia (OMS declara em Mar/2020 - Fim da emergência no BR em Mai/2022)
    # Impacto econômico severo
    df.loc[(df["data"] >= "2020-03-01") & (df["data"] <= "2022-05-01"), "flag_pandemia"] = 1
    
    # Greve dos Caminhoneiros (Maio de 2018)
    # Parou o país e gerou inflação imediata
    df.loc[df["data"] == "2018-05-01", "flag_greve_caminhoneiros"] = 1
    
    # Joesley Day (Maio de 2017)
    # Circuit Breaker na bolsa, disparada do dólar
    df.loc[df["data"] == "2017-05-01", "flag_joesley_day"] = 1
    
    # Processo de Impeachment (Acolhimento Dez/15 até Afastamento definitivo Ago/16)
    # Incerteza política extrema
    df.loc[(df["data"] >= "2015-12-01") & (df["data"] <= "2016-08-01"), "flag_impeachment"] = 1

    # Eleições Presidenciais (Mês do 1º e 2º turno - Outubro)
    # Volatilidade de mercado
    anos_eleitorais = ["2018-10-01", "2022-10-01"]
    df.loc[df["data"].isin(anos_eleitorais), "flag_eleicao_presidencial"] = 1

    return df

# ==============================================================================
# VALIDAÇÃO (PANDERA)
# ==============================================================================
def validar_e_salvar_eventos(df):
    # Schema: Garante que as flags sejam binárias (0 ou 1) e data esteja ok
    schema = DataFrameSchema({
        "data": Column(pd.Timestamp),
        "presidente": Column(str, checks=pa.Check.isin(["Dilma", "Temer", "Bolsonaro", "Lula"])),
        "flag_pandemia": Column(int, checks=pa.Check.isin([0, 1])),
        "flag_greve_caminhoneiros": Column(int, checks=pa.Check.isin([0, 1])),
        "flag_eleicao_presidencial": Column(int, checks=pa.Check.isin([0, 1])),
        "flag_impeachment": Column(int, checks=pa.Check.isin([0, 1])),
    }, strict=False) # strict=False aceita colunas extras se houver

    try:
        schema.validate(df)
        
        # Formatação final para CSV (String MM/YYYY) igual aos outros datasets
        df_save = df.copy()
        df_save["data"] = df_save["data"].dt.strftime("%m/%Y")
        
        caminho = BASE_PATH / "df_eventos_politicos.csv"
        df_save.to_csv(caminho, index=False, encoding="utf-8")
        print(f"✅ Sucesso! Arquivo salvo em: {caminho}")
        
    except pa.errors.SchemaErrors as err:
        print("❌ Erro na validação dos eventos:")
        print(err.failure_cases)

# ==============================================================================
# EXECUÇÃO
# ==============================================================================
if __name__ == "__main__":
    df_eventos = criar_dataset_eventos()
    validar_e_salvar_eventos(df_eventos)

--- Gerando Dataset de Eventos Políticos ---
✅ Sucesso! Arquivo salvo em: C:\Users\pedro\projeto_inadimplencia\data\raw\df_eventos_politicos.csv


  df.loc[df["data"].isin(anos_eleitorais), "flag_eleicao_presidencial"] = 1
top-level pandera module will be **removed in a future version of pandera**.
If you're using pandera to validate pandas objects, we highly recommend updating
your import:

```
# old import
import pandera as pa

# new import
import pandera.pandas as pa
```

If you're using pandera to validate objects from other compatible libraries
like pyspark or polars, see the supported libraries section of the documentation
for more information on how to import pandera:

https://pandera.readthedocs.io/en/stable/supported_libraries.html


```
```

