In [None]:
from pathlib import Path
import pandas as pd
import requests
import zipfile
import warnings
import re

warnings.filterwarnings('ignore')

In [None]:
# ─── 1. Definição dos date_ranges ────────────────────────────
# 1.1. Intervalo mensal: de janeiro/2017 a dezembro/2024 (formato "YYYYMM")
date_range_monthly = (
    pd.date_range(start='2024-01-01', end='2024-12-31', freq='M')
      .strftime("%Y%m")
      .tolist()
)

# 1.2. Intervalo trimestral: de março/2017 a dezembro/2024 (formato "YYYYMM")
date_range_quarterly = (
    pd.date_range(start='2024-01-01', end='2024-12-31', freq='Q')
      .strftime("%Y%m")
      .tolist()
)

In [None]:
# ─── 2. Definição e criação de diretórios principais ───────────────────────────
base_dir    = Path('..')                
dir_inputs  = base_dir / 'Input'        
dir_outputs = base_dir / 'Output'       

for d in (dir_inputs, dir_outputs):
    d.mkdir(parents=True, exist_ok=True)

In [None]:
# ─── 3. Pipeline COSIF Individual ──────────────────────────────────────────────

# 3.1. Diretório para baixar/armazenar COSIF Individual
cosif_ind_dir = dir_inputs / 'COSIF' / 'individual'
cosif_ind_dir.mkdir(parents=True, exist_ok=True)

# 3.2. Download e extração dos arquivos mensais (raw, sem filtros)
for date in date_range_monthly:
    subfolder = cosif_ind_dir / date
    subfolder.mkdir(exist_ok=True)

    if any(subfolder.glob('*.csv')):
        continue

    downloaded = False
    used_suffix = None

    for suffix in ('BANCOS.csv', 'BANCOS.zip', 'BANCOS.csv.zip'):
        url = f"https://www.bcb.gov.br/content/estabilidadefinanceira/cosif/Bancos/{date}{suffix}"
        local_file = cosif_ind_dir / f"{date}{suffix}"

        if local_file.exists():
            downloaded = True
            used_suffix = suffix
            break

        resp = requests.get(url, timeout=30)
        content_type = resp.headers.get('Content-Type', '').lower()
        is_html_error = (
            resp.status_code != 200
            or b'<!DOCTYPE html' in resp.content[:20]
            or 'text/html' in content_type
        )

        if not is_html_error:
            with open(local_file, 'wb') as f:
                f.write(resp.content)
            downloaded = True
            used_suffix = suffix
            break

    if downloaded and used_suffix:
        if used_suffix.endswith('.zip'):
            zip_path = cosif_ind_dir / f"{date}{used_suffix}"
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(subfolder)
        else:
            csv_path = cosif_ind_dir / f"{date}{used_suffix}"
            destino = subfolder / csv_path.name
            if not destino.exists():
                csv_path.rename(destino)

# 3.3. Carregar TODOS os CSVs individuais em um DataFrame único (raw)
df_list = []
for date in date_range_monthly:
    subfolder = cosif_ind_dir / date
    if not subfolder.exists():
        continue

    csv_paths = list(subfolder.glob(f"{date}BANCOS*.csv"))
    for csv_path in csv_paths:
        temp_df = pd.read_csv(
            csv_path,
            header=3,
            encoding='unicode_escape',
            on_bad_lines='skip',
            sep=';',
            decimal=','
        )
        df_list.append(temp_df)

df_cosif_individual_full = pd.concat(df_list, ignore_index=True)

# 3.4. Exportar raw Individual para Parquet
df_cosif_individual_full.to_parquet(
    dir_outputs / 'df_cosif_individual_full.parquet',
    index=False
)

# 3.5. (Opcional) Dicionário de contas Individual em Excel
account_name = (
    df_cosif_individual_full
    .sort_values('CONTA')
    .drop_duplicates(subset=['CONTA'], keep='last')
    [['CONTA', 'NOME_CONTA']]
)
def remove_illegal(text: str) -> str:
    return re.sub(r'[\x00-\x1F\x7F]', '', text)

account_name['NOME_CONTA'] = account_name['NOME_CONTA'].astype(str).apply(remove_illegal)
account_name.to_excel(dir_outputs / 'cosif_account_name.xlsx', index=False)

In [None]:
# ─── 4. Pipeline COSIF Prudencial ──────────────────────────────────────────────

# 4.1. Diretório para baixar/armazenar COSIF Prudencial
cosif_prud_dir = dir_inputs / 'COSIF' / 'prudencial'
cosif_prud_dir.mkdir(parents=True, exist_ok=True)

# 4.2. Download e extração dos arquivos mensais (raw, sem filtros)
for date in date_range_monthly:
    subfolder = cosif_prud_dir / date
    subfolder.mkdir(exist_ok=True)

    if any(subfolder.glob('*.csv')):
        continue

    downloaded = False
    used_suffix = None

    for suffix in ('BLOPRUDENCIAL.csv', 'BLOPRUDENCIAL.zip', 'BLOPRUDENCIAL.csv.zip'):
        url = f"https://www.bcb.gov.br/content/estabilidadefinanceira/cosif/Conglomerados-prudenciais/{date}{suffix}"
        local_file = cosif_prud_dir / f"{date}{suffix}"

        if local_file.exists():
            downloaded = True
            used_suffix = suffix
            break

        resp = requests.get(url, timeout=30)
        content_type = resp.headers.get('Content-Type', '').lower()
        is_html_error = (
            resp.status_code != 200
            or b'<!DOCTYPE html' in resp.content[:20]
            or 'text/html' in content_type
        )

        if not is_html_error:
            with open(local_file, 'wb') as f:
                f.write(resp.content)
            downloaded = True
            used_suffix = suffix
            break

    if downloaded and used_suffix:
        if used_suffix.endswith('.zip'):
            zip_path = cosif_prud_dir / f"{date}{used_suffix}"
            with zipfile.ZipFile(zip_path, 'r') as zip_ref:
                zip_ref.extractall(subfolder)
        else:
            csv_path = cosif_prud_dir / f"{date}{used_suffix}"
            destino = subfolder / csv_path.name
            if not destino.exists():
                csv_path.rename(destino)

# 4.3. Carregar TODOS os CSVs Prudencial em um DataFrame único (raw)
df_prud_list = []
for date in date_range_monthly:
    subfolder = cosif_prud_dir / date
    if not subfolder.exists():
        continue

    csv_paths = list(subfolder.glob(f"{date}BLOPRUDENCIAL*.csv"))
    for csv_path in csv_paths:
        temp_df = pd.read_csv(
            csv_path,
            header=3,
            encoding='unicode_escape',
            on_bad_lines='skip',
            sep=';',
            decimal=','
        )
        df_prud_list.append(temp_df)

df_cosif_prudencial_full = pd.concat(df_prud_list, ignore_index=True)

# 4.4. Exportar raw Prudencial para Parquet
df_cosif_prudencial_full.to_parquet(
    dir_outputs / 'df_cosif_prudencial_full.parquet',
    index=False
)

# 4.5. (Opcional) Dicionário de contas Prudencial em Excel
account_prud = (
    df_cosif_prudencial_full
    .sort_values('CONTA')
    .drop_duplicates(subset=['CONTA'], keep='last')
    [['CONTA', 'NOME_CONTA']]
)
account_prud['NOME_CONTA'] = account_prud['NOME_CONTA'].astype(str).apply(remove_illegal)
account_prud.to_excel(dir_outputs / 'cosif_prudencial_account_name.xlsx', index=False)

In [None]:
# ─── 5. Pipeline IFDATA (valores + cadastro) – VERSÃO ATUALIZADA PARA TODOS TIPOS DE INSTITUIÇÃO ─────────────────────────────────
# 5.1. Diretório para baixar/armazenar IFDATA de VALORES
ifdata_dir = dir_inputs / 'IFDATA'
ifdata_dir.mkdir(parents=True, exist_ok=True)

# 5.2. Download dos CSVs “IfDataValores” (métricas) PARA TODOS OS TIPOS DE INSTITUIÇÃO
# Vamos iterar sobre cada trimestre e cada tipo de instituição (1 a 4), usando Relatorio='T' para trazer todos os relatórios
tipos_instit = [1, 2, 3]  # TipoInstituicao disponíveis na API
for date in date_range_quarterly:
    for tipo in tipos_instit:
        url_val = (
            "https://olinda.bcb.gov.br/olinda/servico/IFDATA/versao/v1/odata/"
            f"IfDataValores(AnoMes=@AnoMes,TipoInstituicao=@TipoInstituicao,Relatorio=@Relatorio)?"
            f"@AnoMes={date}&@TipoInstituicao={tipo}&@Relatorio='T'"
            "&$format=text/csv"
        )
        output_csv_val = ifdata_dir / f"{date}_{tipo}.csv"
        if not output_csv_val.exists():
            resp = requests.get(url_val, timeout=60)
            with open(output_csv_val, 'wb') as f:
                f.write(resp.content)

# 5.3. Leitura de TODOS os CSVs “IfDataValores” em um DataFrame único
df_if_list = []
for date in date_range_quarterly:
    for tipo in tipos_instit:
        path = ifdata_dir / f"{date}_{tipo}.csv"
        if path.exists() and path.stat().st_size > 0:
            temp = pd.read_csv(
                path,
                encoding='unicode_escape',
                on_bad_lines='skip',
                sep=',',
                decimal=','
            )
            # Garantir que a coluna TipoInstituicao reflita corretamente o tipo que baixamos
            if 'TipoInstituicao' not in temp.columns:
                temp['TipoInstituicao'] = tipo
            df_if_list.append(temp)
df_if_prudencial_full = pd.concat(df_if_list, ignore_index=True)


# ─── 5.4. Pipeline IFDATA_CADASTRO ─────────────────────────────────────────────
# 5.4.1. Cria diretório para baixar/armazenar “IfDataCadastro”
ifdata_cadastro_dir = dir_inputs / 'IFDATA_Cadastro'
ifdata_cadastro_dir.mkdir(parents=True, exist_ok=True)

# 5.4.2. Download dos CSVs “IfDataCadastro” da API
for date in date_range_monthly:
    url_cadastro = (
        f"https://olinda.bcb.gov.br/olinda/servico/IFDATA/versao/v1/odata/"
        f"IfDataCadastro(AnoMes=@AnoMes)?@AnoMes={date}&$format=text/csv"
    )
    output_csv_cad = ifdata_cadastro_dir / f"IfDataCadastro_{date}.csv"
    if not output_csv_cad.exists():
        resp = requests.get(url_cadastro, timeout=60)
        with open(output_csv_cad, 'wb') as f:
            f.write(resp.content)

# 5.4.3. Leitura de TODOS os CSVs “IfDataCadastro” em um DataFrame único
df_if_cad_list = []
for date in date_range_monthly:
    path = ifdata_cadastro_dir / f"IfDataCadastro_{date}.csv"
    if path.exists() and path.stat().st_size > 0:
        temp_cad = pd.read_csv(
            path,
            encoding='utf-8',
            on_bad_lines='skip',
            sep=',',
            decimal='.'
        )
        df_if_cad_list.append(temp_cad)
if df_if_cad_list:
    df_if_cadastro_full = pd.concat(df_if_cad_list, ignore_index=True)
else:
    df_if_cadastro_full = pd.DataFrame()  # Se nenhum CSV existir, DataFrame vazio

# 5.4.4. Ajustes iniciais em df_if_cadastro_full
if not df_if_cadastro_full.empty:
    df_if_cadastro_full = df_if_cadastro_full.rename(columns={
        'Data': 'DATA',
        'CodConglomeradoPrudencial': 'COD_CONGL'
    })
    df_if_cadastro_full['DATA'] = pd.to_numeric(df_if_cadastro_full['DATA'], errors='coerce').astype(int)
    df_if_cadastro_full['COD_CONGL'] = df_if_cadastro_full['COD_CONGL'].astype(str)

    # Remover caracteres não imprimíveis das colunas textuais de cadastro
    def remove_illegal(text: str) -> str:
        return re.sub(r'[\x00-\x1F\x7F]', '', str(text))
    for col in ['NomeInstituicao','SegmentoTb','Atividade','Uf','Municipio','Situacao']:
        if col in df_if_cadastro_full.columns:
            df_if_cadastro_full[col] = df_if_cadastro_full[col].astype(str).apply(remove_illegal)

# ─── 5.5. Padronização de colunas em df_if_prudencial_full (valores) ─────────────
def map_coluna(c):
    return {
        79647: "Tier_1",
        79648: "Tier_2",
        79649: "Capital",
        79650: "CRWA",
        79651: "MRWA",
        79656: "ORWA",
        79664: "BIS",
        79665: "RWA"
    }.get(c, f"Conta_{c}")

df_if_prudencial_full['NomeColuna'] = df_if_prudencial_full['Conta'].apply(map_coluna)

# 5.5.1. (Opcional) Dicionário de contas IFDATA em Excel
account_name_ifdata = (
    df_if_prudencial_full
    .sort_values('Conta')
    .drop_duplicates(subset=['Conta'], keep='last')
    [['Conta', 'NomeColuna']]
)
account_name_ifdata['NomeColuna'] = account_name_ifdata['NomeColuna'].astype(str).apply(remove_illegal)
account_name_ifdata.to_excel(dir_outputs / 'if_account_name.xlsx', index=False)

# ─── 5.6. Ajuste final de nomes e tipos em df_if_prudencial_full (valores) ─────────
df_if_prudencial_full = df_if_prudencial_full.rename(columns={
    'CodInst': 'COD_CONGL',
    'AnoMes': 'DATA'
})
df_if_prudencial_full['DATA'] = pd.to_numeric(df_if_prudencial_full['DATA'], errors='coerce').astype(int)
df_if_prudencial_full['COD_CONGL'] = df_if_prudencial_full['COD_CONGL'].astype(str)

# ─── 5.7. Merge de “IfDataValores” (df_if_prudencial_full) com “IfDataCadastro” ────
if not df_if_cadastro_full.empty:
    df_if_combined = pd.merge(
        df_if_prudencial_full,
        df_if_cadastro_full,
        on=['DATA', 'COD_CONGL'],
        how='left',
        validate='many_to_many'   # antes era 'many_to_one'
    )
else:
    df_if_combined = df_if_prudencial_full.copy()

# 5.7.1. Verificar colunas resultantes
print("Colunas após merge Valores + Cadastro (IFDATA):")
print(df_if_combined.columns.tolist())
print("Shape df_if_combined:", df_if_combined.shape)

# ─── 5.8. Correção de encoding de texto em df_if_combined ─────────────────────────
def corrigir_encoding(coluna):
    return coluna.astype(str).str.encode('latin1', errors='ignore').str.decode('utf-8', errors='ignore')

colunas_texto_if = ['NomeRelatorio','Grupo','NomeColuna','DescricaoColuna']
for col in colunas_texto_if:
    if col in df_if_combined.columns:
        df_if_combined[col] = corrigir_encoding(df_if_combined[col])

# ─── 5.9. Exportar DataFrame unificado de IFDATA para Parquet ────────────────────
df_if_combined.to_parquet(
    dir_outputs / 'df_if_prudencial_full.parquet',
    index=False
)


In [None]:
# ─── 6. Combinações finais (sem filtros) ───────────────────────────────────────

# 6.1. Assegurar que 'DATA' exista em df_cosif_prudencial_full (a partir de '#DATA_BASE')
if '#DATA_BASE' in df_cosif_prudencial_full.columns:
    df_cosif_prudencial_full.rename(columns={'#DATA_BASE': 'DATA'}, inplace=True)
df_cosif_prudencial_full['DATA'] = (
    pd.to_datetime(df_cosif_prudencial_full['DATA'].astype(str), format='%Y%m')
      .dt.strftime('%Y%m')
      .astype(int)
)

# Agora sim podemos mapear (raw, sem drop_duplicates)
df_prud_mapping = df_cosif_prudencial_full[['DATA', 'CNPJ', 'COD_CONGL', 'NOME_CONGL']]


# 6.2. Pivot opcional do Individual (wide)
# Primeiro, cria 'DATA' a partir de '#DATA_BASE' em df_cosif_individual_full
if '#DATA_BASE' in df_cosif_individual_full.columns:
    df_cosif_individual_full.rename(columns={'#DATA_BASE': 'DATA'}, inplace=True)
df_cosif_individual_full['DATA'] = (
    pd.to_datetime(df_cosif_individual_full['DATA'].astype(str), format='%Y%m')
      .dt.strftime('%Y%m')
      .astype(int)
)

df_cosif_individual = (
    df_cosif_individual_full
    .pivot_table(
        index=['DATA', 'CNPJ', 'NOME_INSTITUICAO', 'TAXONOMIA'],
        columns='CONTA',
        values='SALDO'
    )
    .reset_index()
)

df_cosif_individual.to_parquet(
    dir_outputs / 'df_cosif_individual_wide.parquet',
    index=False
)


# 6.3. Merge Individual (wide) + Prudencial (mapping)
df_cosif_full = df_cosif_individual.merge(
    df_prud_mapping,
    on=['DATA', 'CNPJ'],
    how='left'
)

df_cosif_full.to_parquet(
    dir_outputs / 'df_cosif_full.parquet',
    index=False
)


# 6.4. Merge final com IFDATA (raw)
# Certificar que df_if_combined já tem 'DATA' como inteiro
if 'DATA' in df_if_combined.columns:
    df_if_combined['DATA'] = pd.to_numeric(df_if_combined['DATA'], errors='coerce')

df_final = df_cosif_full.merge(
    df_if_combined,
    on=['DATA', 'COD_CONGL'],
    how='left'
)

df_final.to_parquet(
    dir_outputs / 'df_final.parquet',
    index=False
)

df_final.info()


In [None]:
# ===== Cell N+1: Preparar df_ind_pre a partir de df_cosif_individual_full =====

# Partindo de df_cosif_individual_full (já tem 'DATA' formatado como int)
df_ind_pre = (
    df_cosif_individual_full
    .assign(
        CNPJ=lambda d: d['CNPJ'].astype(str).str.replace(r'[^0-9]', '', regex=True)
    )
    .drop(columns=['COD_CONGL', 'NOME_CONGL'], errors='ignore')
)

print("Colunas de df_ind_pre:", df_ind_pre.columns.tolist())
print("Shape de df_ind_pre:", df_ind_pre.shape)

In [None]:
# ===== Cell N+2: Preparar df_prud_mapping a partir de df_cosif_prudencial_full =====

# Partindo de df_cosif_prudencial_full (já tem 'DATA' formatado como int)
df_pru_pre = (
    df_cosif_prudencial_full
    .assign(
        CNPJ=lambda d: d['CNPJ'].astype(str).str.replace(r'[^0-9]', '', regex=True),
        COD_CONGL=lambda d: d['COD_CONGL'].astype(str)
    )
)

df_prud_mapping = (
    df_pru_pre[['DATA', 'CNPJ', 'COD_CONGL', 'NOME_CONGL']]
    .drop_duplicates(subset=['DATA', 'CNPJ'])
    .reset_index(drop=True)
)

print("Colunas de df_prud_mapping:", df_prud_mapping.columns.tolist())
print("Shape de df_prud_mapping:", df_prud_mapping.shape)

In [None]:
# ===== Cell N+3: Criar df_cosif_full_corrigido (merge Individual + Prudencial) =====

df_cosif_full_corrigido = pd.merge(
    df_ind_pre,
    df_prud_mapping,
    on=['DATA', 'CNPJ'],
    how='left',
    validate='many_to_one'
)

print("Shape COSIF Full corrigido:", df_cosif_full_corrigido.shape)
print(
    "Linhas sem COD_CONGL (valores NaN em COD_CONGL):",
    df_cosif_full_corrigido['COD_CONGL'].isna().sum()
)

# Salvar parquet corrigido
df_cosif_full_corrigido.to_parquet(
    dir_outputs / 'df_cosif_full_corrigido.parquet',
    index=False
)

In [None]:
# ===== Cell N+4: Preparar df_if_pre a partir de df_if_combined =====

df_if_pre = (
    df_if_combined
    .assign(
        DATA=lambda d: pd.to_numeric(d['DATA'], errors='coerce').astype(int),
        COD_CONGL=lambda d: d['COD_CONGL'].astype(str)
    )
)

print("Colunas de df_if_pre:", df_if_pre.columns.tolist())
print("Shape de df_if_pre:", df_if_pre.shape)

In [None]:
# ===== Cell N+5: Criar df_final_corrigido (merge COSIF Full corrigido + IFDATA) =====

df_final_corrigido = pd.merge(
    df_cosif_full_corrigido.assign(COD_CONGL=lambda d: d['COD_CONGL'].astype(str)),
    df_if_pre,
    on=['DATA', 'COD_CONGL'],
    how='left',
    validate='many_to_many'
)

print("Shape DF Final corrigido:", df_final_corrigido.shape)
print(
    "Linhas sem informação de IFDATA (TipoInstituicao == NaN):",
    df_final_corrigido['TipoInstituicao'].isna().sum()
)

# Salvar parquet corrigido
df_final_corrigido.to_parquet(
    dir_outputs / 'df_final_corrigido.parquet',
    index=False
)

In [None]:
# ===== Cell N+6 (opcional): Informações finais dos DataFrames corrigidos =====

print("\n=== Resumo Final ===")
print("df_cosif_full_corrigido:")
df_cosif_full_corrigido.info()

print("\ndf_final_corrigido:")
df_final_corrigido.info()