In [37]:
import sys; sys.path.append('..'); sys.path.append('../..')
from common.postgresql import PostgresConnector
import pandas as pd
import numpy as np

db = PostgresConnector()
read_sql = db.read_sql
sql = db.execute_sql

# CVM Cadastro Fix
Strategy: Deduplicate history tables -> Temporal Merge (Spine) -> Forward Fill

In [38]:
TABLES_MAP = {
    'fi_cad_fi_hist_admin': ['cnpj_admin', 'admin', 'dt_ini_admin', 'dt_fim_admin'],
    'fi_cad_fi_hist_auditor': ['cnpj_auditor', 'auditor', 'dt_fim_auditor'],
    'fi_cad_fi_hist_classe': ['classe', 'dt_fim_classe'],
    'fi_cad_fi_hist_condom': ['condom', 'dt_fim_condom'],
    'fi_cad_fi_hist_controlador': ['cnpj_controlador', 'controlador', 'dt_fim_controlador'],
    'fi_cad_fi_hist_custodiante': ['cnpj_custodiante', 'custodiante', 'dt_fim_custodiante'],
    'fi_cad_fi_hist_denom_social': ['denom_social', 'dt_fim_denom_social'],
    'fi_cad_fi_hist_denom_comerc': ['denom_comerc', 'dt_fim_denom_comerc'],
    'fi_cad_fi_hist_diretor_resp': ['diretor', 'dt_fim_diretor'],
    'fi_cad_fi_hist_exclusivo': ['fundo_exclusivo', 'dt_fim_st_exclusivo'],
    'fi_cad_fi_hist_fic': ['fundo_cotas', 'dt_fim_st_cotas'],
    'fi_cad_fi_hist_gestor': ['pf_pj_gestor', 'cpf_cnpj_gestor', 'gestor', 'dt_ini_gestor', 'dt_fim_gestor'],
    'fi_cad_fi_hist_publico_alvo': ['publico_alvo', 'dt_fim_publico_alvo'],
    'fi_cad_fi_hist_rentab': ['rentab_fundo', 'dt_fim_rentab'],
    'fi_cad_fi_hist_sit': ['sit', 'dt_ini_sit', 'dt_fim_sit'],
    'fi_cad_fi_hist_taxa_adm': ['taxa_adm', 'inf_taxa_adm'],
    'fi_cad_fi_hist_taxa_perfm': ['vl_taxa_perfm', 'ds_taxa_perfm'],
    'fi_cad_fi_hist_trib_lprazo': ['trib_lprazo', 'dt_fim_st_trib_lprazo']
}

final_cols = [
    'cnpj_fundo', 'dt_reg', 'dt_ini', 
    'cnpj_admin', 'admin', 
    'cnpj_auditor', 'auditor',
    'cnpj_custodiante', 'custodiante',
    'classe', 'condom', 
    'cnpj_controlador', 'controlador',
    'denom_comerc', 'denom_social', 'diretor',
    'fundo_exclusivo', 'fundo_cotas',
    'pf_pj_gestor', 'cpf_cnpj_gestor', 'gestor',
    'publico_alvo', 'rentab_fundo', 'sit',
    'taxa_adm', 'inf_taxa_adm', 'vl_taxa_perfm', 'ds_taxa_perfm', 'trib_lprazo',
    'dt_fim'
]

In [39]:
# Step 1: Load and Deduplicate
cleaned_tables = {}

for table_name, target_cols in TABLES_MAP.items():
    print(f"Processing {table_name}...")
    # Select all required columns plus standard keys
    cols_to_fetch = list(set(['cnpj_fundo', 'dt_reg'] + target_cols))
    cols_str = ", ".join(cols_to_fetch)
    
    query = f"SELECT {cols_str} FROM cvm.{table_name}"
    df = read_sql(query)
    
    # Standardize 'dt_ini' and 'dt_fim' names for the merge logic
    rename_dict = {}
    for c in df.columns:
        if 'dt_ini' in c and c not in ['dt_reg', 'dt_ini']: rename_dict[c] = 'dt_ini'
        if 'dt_fim' in c and c not in ['dt_reg', 'dt_fim']: rename_dict[c] = 'dt_fim'
    
    # If table has no explicit dt_ini (e.g. auditor), use dt_reg as start date of that info
    # Most tables have dt_reg which implies the start of that snapshot validity.
    if 'dt_ini' not in rename_dict.values() and 'dt_ini' not in df.columns:
        df['dt_ini'] = df['dt_reg']
        
    df.rename(columns=rename_dict, inplace=True)
    
    # Convert dates
    for c in ['dt_reg', 'dt_ini', 'dt_fim']:
        if c in df.columns:
            df[c] = pd.to_datetime(df[c], errors='coerce')
            
    # DEDUPLICATION LOGIC
    # Filter to keep only the LATEST dt_reg for a given (cnpj_fundo, dt_ini)
    # This removes redundant daily snapshots where the information (started at dt_ini) hasn't changed.
    if 'dt_ini' in df.columns:
        # Sort by dt_reg desc to keep latest
        df = df.sort_values('dt_reg', ascending=False)
        # Drop duplicates on Key + StartDate
        # Note: If attributes changed but dt_ini didn't, we might have an issue, 
        # but usually a change implies a new record with new dt_ini or at least new dt_reg.
        # Assuming (cnpj, dt_ini) defines a unique period of validity for that attribute.
        df = df.drop_duplicates(subset=['cnpj_fundo', 'dt_ini'], keep='first')
        
    cleaned_tables[table_name] = df
    print(f"  -> {len(df)} unique records")

Processing fi_cad_fi_hist_admin...
  -> 82229 unique records
Processing fi_cad_fi_hist_auditor...
  -> 55039 unique records
Processing fi_cad_fi_hist_classe...
  -> 55059 unique records
Processing fi_cad_fi_hist_condom...
  -> 55059 unique records
Processing fi_cad_fi_hist_controlador...
  -> 54978 unique records
Processing fi_cad_fi_hist_custodiante...
  -> 55004 unique records
Processing fi_cad_fi_hist_denom_social...
  -> 59343 unique records
Processing fi_cad_fi_hist_denom_comerc...
  -> 42014 unique records
Processing fi_cad_fi_hist_diretor_resp...
  -> 59279 unique records
Processing fi_cad_fi_hist_exclusivo...
  -> 55054 unique records
Processing fi_cad_fi_hist_fic...
  -> 55059 unique records
Processing fi_cad_fi_hist_gestor...
  -> 94243 unique records
Processing fi_cad_fi_hist_publico_alvo...
  -> 35382 unique records
Processing fi_cad_fi_hist_rentab...
  -> 49986 unique records
Processing fi_cad_fi_hist_sit...
  -> 139363 unique records
Processing fi_cad_fi_hist_taxa_adm...


In [40]:
# Step 2: Create Master Spine (Timeline)
# Collect ALL unique Change Dates (dt_ini) for every Fund

all_spines = []
for name, df in cleaned_tables.items():
    if 'dt_ini' in df.columns:
        all_spines.append(df[['cnpj_fundo', 'dt_ini']])

df_spine = pd.concat(all_spines).drop_duplicates().sort_values(['cnpj_fundo', 'dt_ini'])
# Drop duplicates on spine to ensure one row per date per fund
df_spine = df_spine.drop_duplicates(subset=['cnpj_fundo', 'dt_ini'])
df_spine = df_spine.sort_values(['cnpj_fundo', 'dt_ini'])
df_spine.reset_index(drop=True, inplace=True)
print(f"Master Spine Size: {len(df_spine)} rows")

Master Spine Size: 225294 rows


In [41]:
# Step 3: Temporal Merge & Forward Fill
df_master = df_spine.copy()

for table_name, df in cleaned_tables.items():
    print(f"Merging {table_name}...")
    # We merge on [cnpj, dt_ini]. 
    # This puts the specific attribute changes exactly on their calendar date in the spine.
    
    # Allow dt_reg to be merged specifically for denom_social to serve as the anchor date
    # Valid columns to merge:
    cols_to_merge = [c for c in df.columns if c not in ['dt_fim']]
    
    # For most tables, we DON'T want dt_reg as it conflicts or is just a snapshot date.
    # But for 'fi_cad_fi_hist_denom_social', we WANT it.
    if table_name != 'fi_cad_fi_hist_denom_social' and 'dt_reg' in cols_to_merge:
        cols_to_merge.remove('dt_reg')
        
    # Also exclude keys from merge list (cnpj_fundo, dt_ini are merge keys)
    cols_check = [c for c in cols_to_merge if c not in ['cnpj_fundo', 'dt_ini']]
    
    df_part = df[['cnpj_fundo', 'dt_ini'] + cols_check]
    df_master = pd.merge(df_master, df_part, on=['cnpj_fundo', 'dt_ini'], how='left')
    
print("Forward Filling...")
# Forward Fill propagates the last valid observation forward to the next change point
# Warning: groupby().ffill() returns ONLY the filled columns, dropping the grouper key!
df_filled = df_master.groupby('cnpj_fundo').ffill()

# Restore keys that were potentially lost or not filled (though spine has them)
# The index is preserved, so we can just assign back the keys from the pre-grouped dataframe
df_filled['cnpj_fundo'] = df_master['cnpj_fundo']
df_filled['dt_ini'] = df_master['dt_ini']

# --- Recalculate Logic for dt_fim ---
# Strategy: The end date of a period is (start date of NEXT period - 1 day). 
# The very last period for a fund remains with dt_fim = None (Active).

# Ensure properly sorted
df_filled = df_filled.sort_values(['cnpj_fundo', 'dt_ini'])

# Shift dt_ini backwards to get the next start date
df_filled['next_dt_ini'] = df_filled.groupby('cnpj_fundo')['dt_ini'].shift(-1)

# dt_fim = next_dt_ini - 1 day
df_filled['dt_fim'] = df_filled['next_dt_ini'] - pd.Timedelta(days=1)

# Drop helper
df_filled.drop(columns=['next_dt_ini'], inplace=True)

df_master = df_filled
print("Done. Final Shape:", df_master.shape)

Merging fi_cad_fi_hist_admin...
Merging fi_cad_fi_hist_auditor...
Merging fi_cad_fi_hist_classe...
Merging fi_cad_fi_hist_condom...
Merging fi_cad_fi_hist_controlador...
Merging fi_cad_fi_hist_custodiante...
Merging fi_cad_fi_hist_denom_social...
Merging fi_cad_fi_hist_denom_comerc...
Merging fi_cad_fi_hist_diretor_resp...
Merging fi_cad_fi_hist_exclusivo...
Merging fi_cad_fi_hist_fic...
Merging fi_cad_fi_hist_gestor...
Merging fi_cad_fi_hist_publico_alvo...
Merging fi_cad_fi_hist_rentab...
Merging fi_cad_fi_hist_sit...
Merging fi_cad_fi_hist_taxa_adm...
Merging fi_cad_fi_hist_taxa_perfm...
Merging fi_cad_fi_hist_trib_lprazo...
Forward Filling...
Done. Final Shape: (225294, 30)


In [42]:
# Validation
target_fund = df_master[df_master['denom_social'].fillna('').str.upper().str.contains('KINEA NIX')]
cols_view = ['dt_ini', 'dt_fim', 'denom_social', 'sit', 'gestor']
cols_view = [c for c in cols_view if c in df_master.columns]
display(target_fund[cols_view].head(20))

Unnamed: 0,dt_ini,dt_fim,denom_social,sit,gestor
217526,2025-01-17,2025-01-30,KINEA NIX FUNDO DE INVESTIMENTO FINANCEIRO MUL...,FASE PRÉ-OPERACIONAL,KINEA INVESTIMENTOS LTDA.
217527,2025-01-31,NaT,KINEA NIX FUNDO DE INVESTIMENTO FINANCEIRO MUL...,EM FUNCIONAMENTO NORMAL,KINEA INVESTIMENTOS LTDA.


In [44]:
# Final Column Selection
available_cols = [c for c in final_cols if c in df_master.columns]
df_final = df_master[available_cols]
df_final.head()
df_final[df_final['cnpj_fundo'] == '26.218.403/0001-03']

Unnamed: 0,cnpj_fundo,dt_reg,dt_ini,cnpj_admin,admin,cnpj_auditor,auditor,cnpj_custodiante,custodiante,classe,...,gestor,publico_alvo,rentab_fundo,sit,taxa_adm,inf_taxa_adm,vl_taxa_perfm,ds_taxa_perfm,trib_lprazo,dt_fim
119267,26.218.403/0001-03,2016-12-26,2016-12-26,62.418.140/0001-31,INTRAG DTVM LTDA.,61.562.112/0001-20,PRICEWATERHOUSECOOPERS AUDITORES INDEPENDENTES...,60.701.190/0001-04,ITAU UNIBANCO S.A.,Multimercado,...,KINEA INVESTIMENTOS LTDA.,Qualificado,DI de um dia,FASE PRÉ-OPERACIONAL,2.0,,20.0,"O GESTOR RECEBERÁ TAMBÉM TAXA DE PERFORMANCE, ...",S,2016-12-28
119268,26.218.403/0001-03,2016-12-26,2016-12-29,62.418.140/0001-31,INTRAG DTVM LTDA.,61.562.112/0001-20,PRICEWATERHOUSECOOPERS AUDITORES INDEPENDENTES...,60.701.190/0001-04,ITAU UNIBANCO S.A.,Multimercado,...,KINEA INVESTIMENTOS LTDA.,Qualificado,DI de um dia,EM FUNCIONAMENTO NORMAL,2.0,,20.0,"O GESTOR RECEBERÁ TAMBÉM TAXA DE PERFORMANCE, ...",S,NaT
