In [1]:
# Bibliotecas a serem utilizadas
import random
import numpy as np
import pandas as pd
from deap import algorithms
from deap import base
from deap import creator
from deap import tools
import math
import os
from sklearn.preprocessing import StandardScaler
import hdbscan
from deap.tools import DeltaPenalty
import multiprocessing
from pathos.multiprocessing import ProcessingPool as Pool

In [2]:
year = 2020
# Caminho da pasta no WSL
folder_path = "/mnt/c/Users/msses/Desktop/ETF/weekly_log_returns"
# Lista os arquivos da pasta que começam com o padrão desejado
arquivos_do_ano = [
    f for f in os.listdir(folder_path)
    if f.startswith(f"weekly_log_returns_{year}_") and f.endswith(".csv")
]

# Carrega e concatena os arquivos em um único DataFrame
df_concatenado = pd.concat(
    [pd.read_csv(os.path.join(folder_path, arquivo)) for arquivo in sorted(arquivos_do_ano)],
    ignore_index=True
)

# Visualiza as primeiras linhas do DataFrame
print(df_concatenado.head())

  ticker  week_start  first_price  last_price  log_return  year  month
0   AAAU  2020-01-06      15.6200     15.5700   -0.003206  2020      1
1   AAAU  2020-01-13      15.4500     15.5300    0.005165  2020      1
2   AAAU  2020-01-20      15.5500     15.6800    0.008325  2020      1
3   AAAU  2020-01-27      15.7900     15.8300    0.002530  2020      1
4   AADR  2020-01-06      50.7008     51.4471    0.014612  2020      1


In [3]:
def filtrar_tickers_completos(df):
    """
    Filtra o DataFrame para manter apenas os tickers com o número máximo de observações.

    Parâmetros:
    - df: DataFrame contendo colunas 'ticker'

    Retorna:
    - df_filtrado: DataFrame apenas com os tickers completos
    """
    contagem_por_ticker = df['ticker'].value_counts()
    n_max = contagem_por_ticker.max()
    tickers_completos = contagem_por_ticker[contagem_por_ticker == n_max].index.tolist()
    df_filtrado = df[df['ticker'].isin(tickers_completos)]


    return df_filtrado

def selecionar_tickers_representativos(df_filtrado, num_tickers):
    """
    Aplica HDBSCAN para selecionar os tickers mais representativos com base em seus log_returns.

    Parâmetros:
    - df_filtrado: DataFrame com tickers completos
    - num_tickers: número de tickers representativos a retornar

    Retorna:
    - df_final: DataFrame apenas com os tickers selecionados
    - tickers_selecionados: lista de tickers representativos
    """
    # Pivotar a matriz para ter um vetor de log_return por ticker
    df_pivot = df_filtrado.pivot_table(
        index='ticker',
        columns=['year', 'month', 'week_start'],
        values='log_return'
    ).fillna(0)

    # Padronizar
    X = StandardScaler().fit_transform(df_pivot)

    # Clusterização
    clusterer = hdbscan.HDBSCAN(min_cluster_size=5)
    cluster_labels = clusterer.fit_predict(X)

    # Associar cluster a ticker
    df_clusters = pd.DataFrame({
        'ticker': df_pivot.index,
        'cluster': cluster_labels
    })

    # Selecionar o maior cluster
    maior_cluster = df_clusters['cluster'].value_counts().idxmax()
    tickers_maior_cluster = df_clusters[df_clusters['cluster'] == maior_cluster]['ticker'].tolist()

    # Selecionar os N primeiros tickers do maior cluster
    tickers_selecionados = tickers_maior_cluster[:num_tickers]
    df_final = df_filtrado[df_filtrado['ticker'].isin(tickers_selecionados)]

    print(f"Tickers selecionados: {tickers_selecionados}")

    return df_final, tickers_selecionados

In [4]:
df = df_concatenado
# =======================
# 1. Filtrar tickers completos
# =======================
contagem_por_ticker = df['ticker'].value_counts()
n_max = contagem_por_ticker.max()

# Filtrar apenas os tickers com número de observações == n_max
tickers_completos = contagem_por_ticker[contagem_por_ticker == n_max].index.tolist()
df_filtrado = df[df['ticker'].isin(tickers_completos)]

print(f"Total de tickers: {df['ticker'].nunique()}")
print(f"Tickers completos restantes: {len(tickers_completos)}")

# =======================
# 2. Aplicar HDBSCAN para selecionar os num_tickers mais representativos
# =======================

# Número desejado de tickers representativos
num_tickers = 30  # ou outro valor que você quiser

# Pivotar para cada linha ser um ticker com colunas de log_returns semanais
df_pivot = df_filtrado.pivot_table(
    index='ticker', 
    columns=['year', 'month', 'week_start'], 
    values='log_return'
).fillna(0)  # Pode usar média ou zero para preencher ausências

# Padronizar os dados
X = StandardScaler().fit_transform(df_pivot)

# Rodar HDBSCAN
clusterer = hdbscan.HDBSCAN(min_cluster_size=5)
cluster_labels = clusterer.fit_predict(X)

# Adicionar os labels ao dataframe original
df_clusters = pd.DataFrame({
    'ticker': df_pivot.index,
    'cluster': cluster_labels
})

# Filtrar o maior cluster (mais representativo)
maior_cluster = df_clusters['cluster'].value_counts().idxmax()
tickers_maior_cluster = df_clusters[df_clusters['cluster'] == maior_cluster]['ticker'].tolist()

# Selecionar os N primeiros tickers do maior cluster
tickers_selecionados = tickers_maior_cluster[:num_tickers]

# Filtrar o DataFrame final
df_final = df_filtrado[df_filtrado['ticker'].isin(tickers_selecionados)]

print(f"Tickers selecionados (representativos): {tickers_selecionados}")
print(len(tickers_selecionados))

Total de tickers: 2223
Tickers completos restantes: 1900
Tickers selecionados (representativos): ['AADR', 'ACES', 'ACIO', 'ACSI', 'ACTX', 'ACWF', 'ACWV', 'ADME', 'ADRE', 'AESR', 'AFK', 'AFLG', 'AFTY', 'AGNG', 'AGQ', 'AIA', 'AIEQ', 'AIQ', 'AIRR', 'AIVI', 'ALTY', 'AMJ', 'AMLP', 'AMUB', 'AMZA', 'ANGL', 'AOA', 'AOK', 'AOR', 'ARGT']
30




In [None]:
# -------- PARAMETROS --------
BETA = 0.98
CVaR_ALPHA = 0.05
NUM_ATIVOS = 30
POP_SIZE = 1000
N_GEN = 500

# -------- PREPARO DOS DADOS --------
# df_final deve conter: 'ticker', 'week_start', 'log_return', 'first_price', 'last_price'
tickers = df_final['ticker'].unique()
df_pivot = df_final.pivot(index='week_start', columns='ticker', values='log_return').fillna(0)
log_returns = df_pivot[tickers].values  # matriz [T x N]
n_semanas, n_ativos = log_returns.shape

# -------- DEFINIÇÃO DAS FUNÇÕES --------
def avaliar(individuo):
    pesos = np.array(individuo[:n_ativos])
    selecionados = np.array(individuo[n_ativos:])
    pesos = pesos * selecionados

    soma_pesos = pesos.sum()
    if soma_pesos > 0:
        pesos = pesos / soma_pesos

    # ---- RETORNO PONDERADO COM BETA ----
    beta_weights = np.array([BETA**(n_semanas - 1 - i) for i in range(n_semanas)])
    portfolio_returns = log_returns @ pesos
    weighted_return = np.sum(portfolio_returns * beta_weights)

    # ---- CVaR ----
    sorted_returns = np.sort(portfolio_returns)
    index_cut = max(1, int(np.floor(CVaR_ALPHA * len(sorted_returns))))
    cvar = -np.mean(sorted_returns[:index_cut])

    return weighted_return, cvar


def feasible(individuo):
    pesos = np.array(individuo[:n_ativos])
    selecionados = np.array(individuo[n_ativos:])
    pesos = pesos * selecionados

    soma_pesos = pesos.sum()
    num_selecionados = selecionados.sum()
    return soma_pesos==1 and num_selecionados>1

def distance(individuo):
    pesos = np.array(individuo[:n_ativos])
    selecionados = np.array(individuo[n_ativos:])
    pesos = pesos * selecionados
    soma_pesos = pesos.sum()

    return abs(1.0 - soma_pesos)

def custom_mutate(individuo, indpb_float=0.8, indpb_bin=0.9):
    # Mutação dos genes de alocação (contínuos)
    for i in range(n_ativos):
        if np.random.rand() < indpb_float:
            individuo[i] = np.random.uniform(0, 1)

    # Mutação dos genes binários (seleção)
    for i in range(n_ativos, 2 * n_ativos):
        if np.random.rand() < indpb_bin:
            individuo[i] = 1 - individuo[i]  # flip 0↔1

    return individuo,
# -------- DEAP SETUP --------
creator.create("FitnessMulti", base.Fitness, weights=(1.0, -1.0))  # max retorno, min CVaR
creator.create("Individual", list, fitness=creator.FitnessMulti)

toolbox = base.Toolbox()

# Inicialização de genes contínuos (alocação)
toolbox.register("attr_float", lambda: np.random.uniform(0, 1))

# Inicialização de genes binários (seleção)
toolbox.register("attr_bin", lambda: np.random.randint(0, 2))

# Criação de indivíduo
toolbox.register("individual", tools.initCycle, creator.Individual,
                 (toolbox.attr_float,) * n_ativos + (toolbox.attr_bin,) * n_ativos, n=1)

toolbox.register("population", tools.initRepeat, list, toolbox.individual)
penalty = (-1e6, 1e6)  

toolbox.register(
    "evaluate",
    DeltaPenalty(feasible, penalty, distance)(avaliar)
)

toolbox.register("mate", tools.cxUniform, indpb=0.05) 
toolbox.register("mutate", custom_mutate)
toolbox.register("select", tools.selNSGA2)
toolbox.register("map", Pool().map)
# -------- EXECUÇÃO --------
pop = toolbox.population(n=POP_SIZE)
def get_adaptive_probs(gen, ngen,
                       cxpb_start=0.3, cxpb_end=0.1,
                       mutpb_start=0.7, mutpb_end=0.9):
    frac = gen / ngen
    cxpb = cxpb_start + frac * (cxpb_end - cxpb_start)
    mutpb = mutpb_start + frac * (mutpb_end - mutpb_start)
    return cxpb, mutpb

# Avaliar população inicial
invalid_ind = [ind for ind in pop if not ind.fitness.valid]
fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
    ind.fitness.values = fit

# Evolução manual
for gen in range(1, N_GEN + 1):
    cxpb, mutpb = get_adaptive_probs(gen, N_GEN)

    offspring = toolbox.select(pop, len(pop))
    offspring = list(map(toolbox.clone, offspring))

    for i in range(1, len(offspring), 2):
        if np.random.rand() < cxpb:
            toolbox.mate(offspring[i - 1], offspring[i])
            del offspring[i - 1].fitness.values
            del offspring[i].fitness.values

    for mutant in offspring:
        if np.random.rand() < mutpb:
            toolbox.mutate(mutant)
            del mutant.fitness.values

    invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
    fitnesses = toolbox.map(toolbox.evaluate, invalid_ind)
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit

    # Injetar diversidade
    num_novos = int(0.05 * POP_SIZE)
    novos_inds = [toolbox.individual() for _ in range(num_novos)]
    fitnesses = toolbox.map(toolbox.evaluate, novos_inds)
    for ind, fit in zip(novos_inds, fitnesses):
        ind.fitness.values = fit

    pop[:] = tools.selNSGA2(pop + offspring, POP_SIZE)

    print(f"Geração {gen} - cxpb: {cxpb:.2f}, mutpb: {mutpb:.2f}")

# Resultado final
melhores = tools.selBest(pop, k=5)
for i, ind in enumerate(melhores):
    print(f"Indivíduo {i + 1} - Retorno: {avaliar(ind)[0]:.5f}, CVaR: {avaliar(ind)[1]:.5f}")

Geração 1 - cxpb: 0.30, mutpb: 0.70
Geração 2 - cxpb: 0.30, mutpb: 0.70
Geração 3 - cxpb: 0.30, mutpb: 0.70
Geração 4 - cxpb: 0.30, mutpb: 0.70
Geração 5 - cxpb: 0.30, mutpb: 0.70
Geração 6 - cxpb: 0.30, mutpb: 0.70
Geração 7 - cxpb: 0.30, mutpb: 0.70
Geração 8 - cxpb: 0.30, mutpb: 0.70
Geração 9 - cxpb: 0.30, mutpb: 0.70
Geração 10 - cxpb: 0.30, mutpb: 0.70
Geração 11 - cxpb: 0.30, mutpb: 0.70
Geração 12 - cxpb: 0.30, mutpb: 0.70
Geração 13 - cxpb: 0.29, mutpb: 0.71
Geração 14 - cxpb: 0.29, mutpb: 0.71
Geração 15 - cxpb: 0.29, mutpb: 0.71
Geração 16 - cxpb: 0.29, mutpb: 0.71
Geração 17 - cxpb: 0.29, mutpb: 0.71
Geração 18 - cxpb: 0.29, mutpb: 0.71
Geração 19 - cxpb: 0.29, mutpb: 0.71
Geração 20 - cxpb: 0.29, mutpb: 0.71
Geração 21 - cxpb: 0.29, mutpb: 0.71
Geração 22 - cxpb: 0.29, mutpb: 0.71
Geração 23 - cxpb: 0.29, mutpb: 0.71
Geração 24 - cxpb: 0.29, mutpb: 0.71
Geração 25 - cxpb: 0.29, mutpb: 0.71
Geração 26 - cxpb: 0.29, mutpb: 0.71
Geração 27 - cxpb: 0.29, mutpb: 0.71
Geração 28

In [8]:
# Selecionar os 5 melhores indivíduos
melhores_inds = tools.selBest(pop, k=5)

for idx, ind in enumerate(melhores_inds, start=1):
    # Separar pesos e seleção
    pesos = np.array(ind[:n_ativos])
    selecionados = np.array(ind[n_ativos:])
    pesos = pesos * selecionados

    # Normalizar os pesos (se soma > 0)
    soma_pesos = pesos.sum()
    if soma_pesos > 0:
        pesos = pesos / soma_pesos

    # Criar dicionário com os ativos selecionados
    portfolio = {
        ticker: peso for ticker, peso, sel in zip(tickers, pesos, selecionados) if sel == 1
    }

    # Imprimir resultados
    print(f"\nIndivíduo {idx} - Retorno: {avaliar(ind)[0]:.5f}, CVaR: {avaliar(ind)[1]:.5f}")
    print("Portfólio (pesos normalizados):")
    for ticker, peso in portfolio.items():
        print(f"  {ticker}: {peso:.4f}")


Indivíduo 1 - Retorno: 0.19667, CVaR: 0.12125
Portfólio (pesos normalizados):
  ACTX: 0.2385
  AFLG: 0.0137
  AGNG: 0.1617
  AIEQ: 0.2729
  AIVI: 0.1944
  AMZA: 0.1187

Indivíduo 2 - Retorno: 0.19667, CVaR: 0.12125
Portfólio (pesos normalizados):
  ACTX: 0.2385
  AFLG: 0.0137
  AGNG: 0.1617
  AIEQ: 0.2729
  AIVI: 0.1944
  AMZA: 0.1187

Indivíduo 3 - Retorno: 0.19667, CVaR: 0.12125
Portfólio (pesos normalizados):
  ACTX: 0.2385
  AFLG: 0.0137
  AGNG: 0.1617
  AIEQ: 0.2729
  AIVI: 0.1944
  AMZA: 0.1187

Indivíduo 4 - Retorno: 0.19667, CVaR: 0.12125
Portfólio (pesos normalizados):
  ACTX: 0.2385
  AFLG: 0.0137
  AGNG: 0.1617
  AIEQ: 0.2729
  AIVI: 0.1944
  AMZA: 0.1187

Indivíduo 5 - Retorno: 0.19667, CVaR: 0.12125
Portfólio (pesos normalizados):
  ACTX: 0.2385
  AFLG: 0.0137
  AGNG: 0.1617
  AIEQ: 0.2729
  AIVI: 0.1944
  AMZA: 0.1187
