In [101]:
import requests
import pandas as pd
import numpy as np
from datetime import date
import pandas_market_calendars as mcal
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [102]:
# Filtro de datas
start_date = pd.Timestamp(date(date.today().year, 5, 30))
end_date = pd.Timestamp(date.today())

# Calendário de feriados da b3
B3 = mcal.get_calendar('B3')
B3_holidays = B3.holidays()

# Filtro correto por range
feriados = [h for h in B3_holidays.holidays if start_date <= h <= end_date]

# Define um CustomBusinessDay com esses feriados
bday_brasil = pd.offsets.CustomBusinessDay(holidays=feriados)

# data d-2
d2_date = (date.today()-2*bday_brasil).strftime('%Y-%m-%d')
# d2_date = '2025-06-02'

date_range = pd.date_range(start=start_date, end=d2_date, freq='B')
date_range_no_holidays = date_range[~date_range.isin(feriados)]

Configuração de API e funções que coletam os dados

In [103]:
# User configs for API authentication
api_acces_id = "89297662e92386720e192e56ffdc0d5e.access"
api_secret = "b8b3cfabf25982a64a1074360f83b0dc143aa5bd75560abf5c901b0977364de4"
api_password = "juNTr1QbtbY9NZ8ACrMF"
user_name = "alberto.coppola@perenneinvestimentos.com.br"

columns_filter = ['portfolio_id','overview_date', 'instrument_name', 'instrument_id','book_name', 'instrument_type', 'quantity', 'price', 'asset_value','exposure_value','dtd_ativo_fin']


# Retorna um dataframe a partir da parametros
def fetch_data(url: str, params: dict, access_id, secret, user_name, api_password):
    # trocar para a URL do ambiente desejado
    base_url = "https://perenne.bluedeck.com.br/api"
    url_token = "auth/token"
    
    # trocar o e-mail e a senha de aplicação
    data = {"username": user_name, "password": api_password}

    # trocar o ID e o secret
    client_headers = {
        "CF-Access-Client-Id": access_id,
        "CF-Access-Client-Secret": secret
    }

    # realiza request
    response = requests.post(f"{base_url}/{url_token}", data=data, headers=client_headers)

    # Converte em json
    json_response = response.json()

    # token de acesso
    access_token = json_response['access_token']
    token_type = json_response['token_type']
    # expiration_dt = json_response['expires_at']

    # Definindo headers para realização de chamadas
    client_header = {
        "CF-Access-Client-Id": access_id,
        "CF-Access-Client-Secret": secret
    }
    token_header = {"Authorization": f"{token_type} {access_token}"}
    headers = {**client_header, **token_header}

    response_api = requests.post(f"{base_url}/{url}", headers=headers, json=params)
    try:
        response_json = response_api.json()['objects']
    except:
        response_json = response_api.json()['variations']
    
    return pd.DataFrame(response_json)

Busco todas posições de ativos

In [104]:
# URL de posições
url = 'portfolio_position/positions/get'

# define parametros de busca
params = {
    "start_date": "2025-06-25",
    "end_date": d2_date,
    # "instrument_position_aggregation": 3
    "portfolio_group_ids": [1],
    # "portfolio_ids": [1175,1517]  # ID do Fundo,
    # "book_depth": 5,
}

df_positions = fetch_data(url, params, api_acces_id, api_secret, user_name, api_password)

# Salva o de-para de portfolio_id e nome
funds_nav = df_positions.loc[['portfolio_id','date','net_asset_value']].T.drop_duplicates()
funds_nav.rename(columns={"net_asset_value":"portfolio_nav"},inplace=True)

Busco todos os fundos que são explodidos e geridos internamente

In [105]:
# define URL de acesso
url_funds = 'portfolio_registration/portfolio_group/get'

# define parametros de busca
params_funds = {"get_composition": True}

# Identifica todos os fundos do sistema para explodi-los
all_funds = fetch_data(url_funds, params_funds, api_acces_id, api_secret, user_name, api_password)

all_funds = list(all_funds['8'].loc['composition_names'].values())

funds_name = df_positions.loc[['portfolio_id','name']].T.drop_duplicates()
funds_name.to_csv("funds_name.csv", index=False)

In [106]:
missing_funds = [name for name in all_funds if name not in df_positions.loc['name'].unique()]
print(missing_funds)

['Carteira 4']


Construção do CSV de posições e custos

In [107]:
registros = pd.DataFrame()
costs_df = pd.DataFrame()

# Iterar pelas colunas de posição (ids)
for data_col in df_positions.columns:

    try:
        # Pega os dados do id (coluna), vira Series
        id_col = df_positions[data_col]

        # Pega o portfolio_id
        portfolio_id = id_col['portfolio_id']

        # Pula o portfolio consolidado para evitar duplicidade 
        if portfolio_id == 49:
            continue

        # Pega o dicionário de instrument_positions e transforma em df
        instrument_positions = pd.DataFrame(id_col['instrument_positions'])

        # Pega o dicionário de custos e transforma em df
        costs_position = pd.DataFrame(id_col['financial_transaction_positions'])

        # Concatena o portfolio id
        instrument_positions['portfolio_id'] = portfolio_id

        # Busca a lista de pnl dentro da coluna Attribution e concatena pro df de instrument_position
        pnl_ = []
        for _, row in instrument_positions.iterrows():
            try:
                pnl_.append(np.float64(row.attribution['total']['financial_value']))
            except:
                pnl_.append(0.0)

        instrument_positions['dtd_ativo_fin'] = pnl_

        # Faz o mesmo pra custos
        pnl_ = []
        for _, row in costs_position.iterrows():
            try:
                pnl_.append(np.float64(row.attribution['total']['financial_value']))
            except:
                pnl_.append(0.0)

        costs_position['dtd_custos_fin'] = pnl_

        # Concatena a data nos custos
        costs_position['overview_date'] = instrument_positions.overview_date.unique()[0]
        costs_position['origin_portfolio_id'] = portfolio_id

        # Concatena no df de registros
        registros = pd.concat([registros,instrument_positions])

        # Concatena no df de custos
        costs_df = pd.concat([costs_df,costs_position])
    
    except Exception as e:
        print(f"Erro ao processar data {data_col}: {e}")
    

# Filtro pelas colunas de interesse e transforma quantidade, preço e aum em float
registros = registros[columns_filter]
registros.loc[:,['quantity','asset_value','exposure_value','price','dtd_ativo_fin']] = registros[['quantity','asset_value','exposure_value','price','dtd_ativo_fin']].astype(np.float64)

# Transforma a coluna de custos em float e o id em int
costs_df['financial_value']=costs_df['financial_value'].astype(float)


"""
----------- Filtro de contingencia enquanto não arruma as categorias no sistema ---------

        soma as quantidades e notional pegando o primeiro book name para categorização
"""
registros = registros.groupby(['overview_date','portfolio_id','instrument_name']).agg({
    'instrument_id':'first',
    'book_name':'first',
    'instrument_type':'first',
    'quantity':'sum',
    'price':'first',
    'asset_value':'sum',
    'exposure_value':'sum',
    'dtd_ativo_fin': 'sum'
}).reset_index()

# Adiciono uma coluna com o NAV do fundo para o calculo de PNL
registros = pd.merge(registros,funds_nav,left_on=['portfolio_id','overview_date'],right_on=['portfolio_id','date']).drop(columns='date')
registros.rename(columns={"net_asset_value":"portfolio_nav"},inplace=True)

# Filtro as colunas de custos
costs_df = costs_df[['financial_value','attribution','book_name','category_name','origin_portfolio_id','origin_accounting_transaction_id','dtd_custos_fin','overview_date']]


In [108]:
# reg = registros[['portfolio_id','instrument_name','book_name']].drop_duplicates()
# duplicados = reg.groupby('instrument_name')['book_name'].nunique().reset_index()
# duplicados = duplicados[duplicados['book_name'] > 1]['instrument_name']

# # 2. Filtro final: ou está nos duplicados, ou é Padrão
# df_filtrado = reg[
#     (reg['instrument_name'].isin(duplicados)) |
#     (reg['book_name'] == 'Padrão')
# ]

# df_filtrado = df_filtrado.merge(funds_name,on='portfolio_id',how='left')
# df_filtrado[df_filtrado['name'].isin(all_funds)]

Flatten o csv de posições em um novo csv 'explodido'

In [109]:
def explodir_portfolio(portfolio_id, data, todas_posicoes, todos_custos, visitados=None, notional = None, portfolio_origem_id=None, nivel = 0):
    """
    - portfolio_id: o portfólio que estamos processando
    - data: a data da posição
    - todas_posicoes: DataFrame com todas as posições
    - multiplicador: proporção da posição herdada
    """
    
    if visitados is None:
        visitados = set()
        # print("STARTING: ",portfolio_id)
    if portfolio_id in visitados:
        return [], pd.DataFrame()

    visitados.add(portfolio_id)

    # Filtra as posições desse portfolio na data
    posicoes = todas_posicoes[
        (todas_posicoes['overview_date'] == data) &
        (todas_posicoes['portfolio_id'] == portfolio_id)
    ]

    # Filtra os custos do portfolio na data
    custos = todos_custos[
        (todos_custos['overview_date'] == data) &
        (todos_custos['origin_portfolio_id'] == portfolio_id)
    ]
    

    # Calculo do AUM total do fundo, soma ativos + custos
    aum = posicoes.asset_value.sum() + custos.financial_value.sum()
    # print(aum)
    if notional is None:
        notional = aum
    # print(notional)
    # Calculo um multiplicador para proporcionalizar as posições
    mult = notional/aum

    if portfolio_origem_id is None:
        portfolio_origem_id = portfolio_id
        # exposure = 1
    else:
        # print("Multiplicando por: ", round(exposure*100,4), "%")
        posicoes.loc[:,['quantity','asset_value','exposure_value','dtd_ativo_fin']] = posicoes[['quantity','asset_value','exposure_value','dtd_ativo_fin']] * mult
        custos.loc[:,['financial_value','dtd_custos_fin']] = custos.loc[:,['financial_value','dtd_custos_fin']] * mult

    # Seta o portfolio_id para a origem e cria na tabela de custos
    posicoes.loc[:,['portfolio_id']] = portfolio_origem_id
    custos.loc[:,['root_portfolio']] = portfolio_origem_id

    resultados = []

    for _, row in posicoes.iterrows():
        row_portfolio_id = row['instrument_id']
        if row.instrument_name in (all_funds):  # Checa se a linha é um fundo
            # print(row.instrument_name)
            # É um fundo investido, explodir recursivamente
            sub_resultados, resultado_custo = explodir_portfolio(
                row_portfolio_id,
                data,
                todas_posicoes,
                todos_custos,
                visitados=visitados,
                notional = np.float64(row.asset_value),
                portfolio_origem_id=portfolio_origem_id,
                nivel = nivel + 1
            )
            resultados += sub_resultados
            # Concatena no df de custos
            custos = pd.concat([custos,resultado_custo])
        else:
            novo = row.copy()
            novo['portfolio_origem'] = portfolio_id
            novo["nivel"] = nivel
            resultados.append(novo)
    
    return resultados, custos

todas_explodidas = pd.DataFrame()
todos_custos_explodidos = pd.DataFrame()

datas = registros.overview_date.unique()
portfolios = registros['portfolio_id'].unique()

for data in datas:
    for portfolio in portfolios:
        explodido, custo = explodir_portfolio(portfolio, data, registros, costs_df)
        todas_explodidas = pd.concat([todas_explodidas,pd.DataFrame(explodido)])
        todos_custos_explodidos = pd.concat([todos_custos_explodidos,custo])
df_explodido = pd.DataFrame(todas_explodidas)

# Subscrevo o portfolio_nav para referenciar ao portfolio_id original
df_explodido = pd.merge(df_explodido.drop(columns=['portfolio_nav']),funds_nav,left_on=['portfolio_id','overview_date'],right_on=['portfolio_id','date'],how='left').drop(columns=['date'])

todos_custos_explodidos = todos_custos_explodidos.groupby(['overview_date','root_portfolio','origin_portfolio_id','category_name']).agg({
    'book_name':'first',
    'financial_value':'sum',
    'dtd_custos_fin':'sum'
}).reset_index()

df_explodido['pct_exposicao'] = df_explodido['exposure_value'].astype(float) / df_explodido['portfolio_nav'].astype(float)

  mult = notional/aum
  mult = notional/aum


Tratamento de exceção

In [119]:
# Caixas Fundos viram CPR
filtro = df_explodido['book_name'] == 'Caixa >> Caixa Fundos'
df_explodido.loc[filtro, 'book_name'] = 'Caixas e Provisionamentos'

Quebra os grupos em colunas

In [123]:
# Split da coluna pelo separador " >> "
df_split = df_explodido['book_name'].str.split(' >> ', expand=True)

# Renomeia as colunas de acordo com a quantidade de splits encontrados
df_split.columns = [f'grupo_{i+1}' for i in range(df_split.shape[1])]

# Junta com o df original (se quiser)
pos_to_export = pd.concat([df_explodido, df_split], axis=1)

pos_to_export.drop(columns='instrument_name',inplace=True)
pos_to_export['overview_date'] = pd.to_datetime(pos_to_export['overview_date'])


Exportações

Concatena a tabela no csv de posições e custos, reprocessando as datas coincidentes

In [124]:
try:
    main_csv_position = pd.read_csv("portfolio_positions_exploded.csv",parse_dates=['overview_date'])
    main_csv_costs = pd.read_csv("portfolio_costs_exploded.csv",parse_dates=['overview_date'])

    main_csv_position.to_csv("portfolio_positions_exploded_backup.csv", index=False)
    main_csv_costs.to_csv("portfolio_costs_exploded_backup.csv", index=False)

    main_csv_portfoliopnl = pd.read_csv("portfolio_pnl_history.csv",parse_dates=['date'])



    """ Para posições: """
    # Primeiro: remove do main_csv_position todas as linhas que têm datas que existem no pos_to_export
    main_csv_filtrado = main_csv_position[~main_csv_position['overview_date'].isin(pos_to_export['overview_date'])]

    # Segundo: concatena os dois
    df_resultado = pd.concat([main_csv_filtrado, pos_to_export], ignore_index=True)

    # Terceiro: opcional - reordenar por data, se quiser
    df_resultado = df_resultado.sort_values('overview_date').reset_index(drop=True)

    #Exporta CSV
    df_resultado.to_csv("portfolio_positions_exploded.csv", index=False)


    """ Para custos: """
    # Primeiro: remove do main_csv_costs todas as linhas que têm datas que existem no todos_custos_explodidos
    main_csv_filtrado = main_csv_costs[~main_csv_costs['overview_date'].isin(todos_custos_explodidos['overview_date'])]

    # Segundo: concatena os dois
    todos_custos_explodidos.loc[:,'book_name'] = 'Caixas e Provisionamentos'
    df_resultado = pd.concat([main_csv_filtrado, todos_custos_explodidos], ignore_index=True)

    # Terceiro: opcional - reordenar por data, se quiser
    df_resultado = df_resultado.sort_values('overview_date').reset_index(drop=True)

    #Exporta CSV
    df_resultado.to_csv("portfolio_costs_exploded.csv", index=False)

    """ Para o pnl: """
    df_portfoliopnl = df_positions.loc[['date','portfolio_id','profitability_in_day','profitability_in_month','profitability_in_year']].T
    df_portfoliopnl.date = pd.to_datetime(df_portfoliopnl.date)
    
    # Primeiro: remove do main_csv_portfoliopnl todas as linhas que têm datas que existem no df_portfoliopnl
    main_csv_filtrado = main_csv_portfoliopnl[~main_csv_portfoliopnl['date'].isin(df_portfoliopnl['date'])]

    # Segundo: concatena os dois
    df_resultado = pd.concat([main_csv_filtrado, df_portfoliopnl], ignore_index=True)

    # Terceiro: opcional - reordenar por data, se quiser
    df_resultado = df_resultado.sort_values('date').reset_index(drop=True)

    #Exporta CSV
    df_resultado.to_csv("portfolio_pnl_history.csv", index=False)
except Exception as e:
    print(f"Error: {e}")
    print("Not processed.")
    #Exporta CSV
    # pos_to_export.to_csv("portfolio_positions_exploded.csv", index=False)
    # todos_custos_explodidos.loc[:,'book_name'] = 'CPR'
    # todos_custos_explodidos.to_csv("portfolio_costs_exploded.csv", index=False)

    # #Exporta pnl diário de cada fundo fechado
    # df_positions.loc[['date','portfolio_id','profitability_in_day','profitability_in_month','profitability_in_year']].T.to_csv("portfolio_pnl_history.csv", index=False)

In [113]:
# Lista para armazenar os pares (Investidor, Investida)
# relacoes = []

# # Iterar pelas colunas (fundos investidores)
# for investidor in df_positions.columns:
#     linha = pd.DataFrame(df_positions.loc['instrument_positions', investidor])  # obtém o dicionário da linha 'instrument_positions' e transforma em Dataframe
#     # linha = linha[linha.instrument_type==3] # filtra para só buscar posições em portfolios
#     for investida in linha.instrument_name:
#         relacoes.append((df_positions[investidor].loc['name'],investida))
    

# # Criar DataFrame final
# df_relacoes = pd.DataFrame(relacoes, columns=['Cliente', 'Carteira'])

In [114]:
funds_nav.to_csv("funds_nav.csv", index=False)

# df_relacoes.to_csv('funds.csv')

In [115]:
try:
    df_existente = pd.read_csv("instruments.csv")
except FileNotFoundError:
    df_explodido[['instrument_id','instrument_name']].drop_duplicates().to_csv("instruments.csv", index=False)

# Gera df com os instrumentos do novo arquivo
df_novos = df_explodido[['instrument_id', 'instrument_name']].drop_duplicates()

# Junta os dois, mas só adiciona os que NÃO estão no existente
df_final = pd.concat([df_existente, df_novos]).drop_duplicates(subset=['instrument_id'])

# Salva o resultado atualizado
df_final.to_csv("instruments.csv", index=False)