# Clusterização dos Prospectos de Fundos de Investimentos

## 1. Pipeline Tratamento Dados LLM

A classe abaixo: (i) carrega o dataframe com os dados extraídos via LLM, (ii) inclui coluna classificação ANBIMA, (iii) inclui historico cotacao bolsa, (iv) filtra por tipo de fundo e (v) integra dados LLM Kinea e Concorrência

In [0]:
!pip install yfinance pandas numpy 

In [0]:
from pyspark.sql import DataFrame
from pyspark.sql.functions import regexp_replace, to_date, when, col, coalesce, lit
import pandas as pd
import numpy as np
import yfinance as yf
from pyspark.sql.types import StringType, DateType, FloatType
from datetime import datetime, timedelta
from pyspark.sql import functions as F

class DataPipeline:
    def __init__(self, spark):
        self.spark = spark
        self.df = None
        self.taxa_rf_diaria = None  # para armazenar taxa SELIC diária

    def load_and_merge(self, df1: DataFrame, df2: DataFrame):
        if "tipo" in df1.columns:
            df1 = df1.withColumnRenamed("tipo", "tipo_fundo")
        if "tipo" in df2.columns:
            df2 = df2.withColumnRenamed("tipo", "tipo_fundo")
        self.df = df1.unionByName(df2)
        return self

    def standardize_columns(self):
        self.df = (
            self.df
            .withColumn("cnpj", regexp_replace("cnpj", "[^0-9]", ""))
            .withColumn("data_emissao", to_date("data_emissao", "yyyy-MM-dd"))
            .withColumn("tipo_fundo", when(col("tipo_fundo") == "F.I.I.", "FII").otherwise(col("tipo_fundo")))
        )
        return self

    def add_anbima_classification(self, anbima_table: str):
        df_anbima = (
            self.spark.table(anbima_table)
            .select(col("identificador_classe").alias("cnpj"), col("tipo_anbima"))
        )
        self.df = self.df.join(df_anbima, on="cnpj", how="left")
        return self

    def add_ticker(self, ticker_table: str):
        df_ticker = (
            self.spark.table(ticker_table)
            .select(col("Ticker"), col("CNPJ").alias("cnpj"))
            .withColumn("cnpj", regexp_replace("cnpj", "[^0-9]", ""))
        )
        self.df = self.df.join(df_ticker, on="cnpj", how="left")
        return self

    def drop_duplicates(self):
        self.df = self.df.dropDuplicates(["cnpj", "data_emissao"])
        return self

    def carregar_taxa_selic(self):
        """Busca taxa SELIC média diária do último ano."""
        hoje = datetime.today()
        inicio = hoje - timedelta(days=365)
        selic_url = (
            f"https://api.bcb.gov.br/dados/serie/bcdata.sgs.1178/dados"
            f"?formato=csv&dataInicial={inicio.strftime('%d/%m/%Y')}"
            f"&dataFinal={hoje.strftime('%d/%m/%Y')}"
        )
        try:
            selic_df = pd.read_csv(selic_url, sep=';', decimal=',', parse_dates=['data'], dayfirst=True)
            selic_df.columns = ['Data', 'SELIC']
            selic_df['SELIC'] = pd.to_numeric(selic_df['SELIC'], errors='coerce')
            selic_df.dropna(inplace=True)
            taxa_rf_anual = selic_df['SELIC'].mean() / 100
            self.taxa_rf_diaria = (1 + taxa_rf_anual) ** (1/252) - 1
        except Exception as e:
            print(f"[ERRO] Falha ao buscar SELIC: {e}")
            self.taxa_rf_diaria = 0.0  # fallback para evitar crash

    def calcular_metricas(self, ticker):
        if not isinstance(ticker, str) or ticker.strip() == '':
            return pd.Series([np.nan]*5, index=[
                'volatilidade_historica', 'liquidez_media', 'drawdown_max',
                'retorno_acumulado', 'sharpe_ratio'
            ])

        ticker_yf = ticker.strip()
        if not ticker_yf.endswith('.SA'):
            ticker_yf += ".SA"

        try:
            dados = yf.download(ticker_yf, period="1y", interval="1d", progress=False, auto_adjust=True)
            if dados.empty:
                return pd.Series([np.nan]*5, index=[
                    'volatilidade_historica', 'liquidez_media', 'drawdown_max',
                    'retorno_acumulado', 'sharpe_ratio'
                ])

            close = dados['Close']
            volume = dados['Volume']
            retorno_diario = close.pct_change().dropna()

            # garante float escalar
            volatilidade = float(retorno_diario.std() * np.sqrt(252))
            liquidez = float(volume.mean())
            retorno_acumulado = float((close.iloc[-1] / close.iloc[0]) - 1)
            drawdown_max = float(((close - close.cummax()) / close.cummax()).min())

            taxa_rf = float(self.taxa_rf_diaria or 0)
            excesso_retorno_diario = retorno_diario - taxa_rf
            sharpe_ratio = float((excesso_retorno_diario.mean() * 252) / volatilidade) if volatilidade != 0 else np.nan

            return pd.Series([
                volatilidade, liquidez, drawdown_max, retorno_acumulado, sharpe_ratio
            ], index=[
                'volatilidade_historica', 'liquidez_media', 'drawdown_max',
                'retorno_acumulado', 'sharpe_ratio'
            ])

        except Exception as e:
            print(f"[ERRO] Falha ao calcular métricas para {ticker_yf}: {e}")
            return pd.Series([np.nan]*5, index=[
                'volatilidade_historica', 'liquidez_media', 'drawdown_max',
                'retorno_acumulado', 'sharpe_ratio'
            ])

    def add_market_metrics(self):
        """Adiciona métricas de mercado para cada ticker (inclui Sharpe Ratio)."""
        # Pega SELIC uma vez só
        self.carregar_taxa_selic()

        tickers = [
            row['Ticker']
            for row in self.df.select('Ticker').filter(col("Ticker").isNotNull()).distinct().collect()
            if str(row['Ticker']).strip() != ''
        ]

        lista_metricas = []
        for t in tickers:
            serie = self.calcular_metricas(t)
            d = serie.to_dict()
            d['Ticker'] = t
            lista_metricas.append(d)

        df_metricas = pd.DataFrame(lista_metricas)
        for col_name in ['volatilidade_historica', 'liquidez_media', 'drawdown_max', 'retorno_acumulado', 'sharpe_ratio']:
            df_metricas[col_name] = df_metricas[col_name].astype(float)
        df_metricas['Ticker'] = df_metricas['Ticker'].astype(str)
        df_metricas = df_metricas.where(pd.notnull(df_metricas), None)

        df_metricas_spark = self.spark.createDataFrame(df_metricas)
        self.df = self.df.join(df_metricas_spark, on='Ticker', how='left')
        return self
    
    def add_valor_cotado_atual(self):
        """
        Calcula o valor cotado atual de cada fundo com Ticker
        e adiciona uma coluna 'valor_cotado_atual' no DataFrame.
        """

        # Lista de tickers únicos
        tickers = [
            row['Ticker']
            for row in self.df.select('Ticker').filter(col("Ticker").isNotNull()).distinct().collect()
            if str(row['Ticker']).strip() != ''
        ]

        # Busca valores cotados atuais
        lista_valores = []
        for t in tickers:
            ticker_yf = t.strip()
            if not ticker_yf.endswith(".SA"):
                ticker_yf += ".SA"
            try:
                dados = yf.download(ticker_yf, period="5d", interval="1d", progress=False, auto_adjust=True)
                if dados.empty:
                    valor_atual = None
                else:
                    valor_atual = float(dados['Close'].iloc[-1])
                lista_valores.append({"Ticker": t, "valor_cotado_atual": valor_atual})
            except Exception as e:
                print(f"[ERRO] Falha ao obter valor atual para {t}: {e}")
                lista_valores.append({"Ticker": t, "valor_cotado_atual": None})

        # Cria DataFrame Spark com valores cotados
        df_valores_spark = self.spark.createDataFrame(pd.DataFrame(lista_valores))

        # Faz join com o DataFrame principal
        self.df = self.df.join(df_valores_spark, on="Ticker", how="left")

        return self

    def filter_by_type(self, tipo):
        return self.df.filter(self.df["tipo_fundo"] == tipo)

    def get_df(self):
        return self.df
    
    def cast_fundo_columns(self):
        schema_cast = {
            "tipo_fundo": StringType(),
            "cnpj": StringType(),
            "data_emissao": DateType(),
            "qt_emissoes": FloatType(),
            "nome_fundo": StringType(),
            "valor_cota_emissao": FloatType(),
            "direito_preferencia_sobras_montante_adicional": StringType(),
            "taxa_distribuicao_emissao": FloatType(),
            "tabela_ativos_fundo": StringType(),
            "sumario_experiencia_socios": StringType(),
            "quantidade_cotas_emissao": FloatType(),
            "quantidade_cotas_adicionais_emissao": FloatType(),
            "publico_alvo": StringType(),
            "obs_publico_alvo": StringType(),
            "procuracao_AGE": StringType(),
            "planilha_custos": StringType(),
            "ordenar_fatores_risco": StringType(),
            "montante_minimo_emissao": FloatType(),
            "investimento_minimo_cpf_cnpj": FloatType(),
            "investimento_minimo_inst": FloatType(),
            'investimento_maximo_cpf_cnpj': FloatType(),
            'investimento_maximo_inst': FloatType(),
            'percentual_oferta_institucional': FloatType(),
            'volume_base_emissao': FloatType()
        }

        for col_name, dtype in schema_cast.items():
            if col_name in self.df.columns:
                self.df = self.df.withColumn(col_name, col(col_name).cast(dtype))

        if "quantidade_cotas_emissao" in self.df.columns and "quantidade_cotas_adicionais_emissao" in self.df.columns:
            self.df = self.df.withColumn(
                "quantidade_cotas_totais",
                coalesce(col("quantidade_cotas_emissao"), lit(0)) + coalesce(col("quantidade_cotas_adicionais_emissao"), lit(0))
            )
        return self
    
    def analisar_potencial_nova_emissao(self):
            """
            Para fundos com ticker, verifica se valor_cotado_atual > valor_cota_emissao.
            Adiciona coluna 'potencial_nova_emissao' (True/False).
            """
            self.df = self.df.withColumn(
                "potencial_nova_emissao",
                F.when(
                    (F.col("Ticker").isNotNull()) & (F.col("valor_cotado_atual") > F.col("valor_cota_emissao")),
                    True
                ).otherwise(False)
            )
            return self


Aplicando a classe para os tipos de fundos trabalhados (FIDC, FII e FIP):

In [0]:
from pyspark.sql.functions import lit

df1 = spark.table("desafio_kinea.prospecto_fundos.extracao_prospectos_kinea_v3")
df2 = spark.table("desafio_kinea.prospecto_fundos.extracao_prospectos_v3")

# adicionando colunas artificiais - alterar futuramente
df1 = df1.withColumn("tipo_gestor", lit("KINEA"))
df2 = df2.withColumn("tipo_gestor", lit("CONCORRENTE"))

pipeline = DataPipeline(spark)

df_final = (
    pipeline
    .load_and_merge(df1, df2)
    .standardize_columns()
    .cast_fundo_columns()
    .add_anbima_classification("desafio_kinea.prospecto_fundos.anbima_dev")
    .add_ticker("desafio_kinea.prospecto_fundos.mercado_fiis")
    .drop_duplicates()
    .add_market_metrics()
    .add_valor_cotado_atual()
    .analisar_potencial_nova_emissao()
    .get_df()  
)

df_fii = df_final.filter(col("tipo_fundo") == "FII")
df_fip = df_final.filter(col("tipo_fundo") == "FIP")
df_fidc = df_final.filter(col("tipo_fundo") == "FIDC")

In [0]:
display(df_fii)

## 2. Modelo de Clusterização 

In [0]:
!pip install sentence-transformers

In [0]:
from pyspark.sql.functions import col, coalesce, lit

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import RobustScaler
from sklearn.decomposition import PCA
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sentence_transformers import SentenceTransformer
import warnings

# Suprime warnings específicos do threadpoolctl
warnings.filterwarnings("ignore", category=UserWarning, module='threadpoolctl')

class ClusterizadorFundos:
    def __init__(self, cols_numericas, cols_categoricas, modelo_embeddings='paraphrase-MiniLM-L6-v2'):
        # Inicializa atributos com listas de colunas e modelo de embeddings
        self.cols_numericas = cols_numericas
        self.cols_categoricas = cols_categoricas
        self.model = SentenceTransformer(modelo_embeddings)
        self.scaler = RobustScaler()
        self.pca = None
        self.kmeans = None
        self.embedding_cols = []
        self.df_final = None

    def _processar_categoricas(self, df_spark):
        """
        Processa as colunas categóricas do DataFrame Spark:
        - Seleciona as colunas categóricas e trata valores nulos.
        - Gera embeddings para cada coluna categórica usando SentenceTransformer.
        - Reduz a dimensionalidade dos embeddings com PCA.
        - Retorna um DataFrame pandas com as colunas de embeddings.
        """
        df_cat = df_spark.select('cnpj', 'data_emissao', 'tipo_fundo', *self.cols_categoricas)
        for col_name in self.cols_categoricas:
            df_cat = df_cat.withColumn(col_name, coalesce(col(col_name), lit('desconhecido')))
        df_cat_pd = df_cat.toPandas().drop_duplicates(subset=['cnpj', 'data_emissao'])

        embedding_cols_local = []
        # Gera embeddings para cada coluna categórica e reduz dimensionalidade com PCA
        for col_name in self.cols_categoricas:
            texts = df_cat_pd[col_name].astype(str).fillna('desconhecido').tolist()
            embeddings = self.model.encode(texts, batch_size=32, show_progress_bar=False)
            pca_emb = PCA(n_components=min(10, embeddings.shape[1]), random_state=42)
            emb_reduced = pca_emb.fit_transform(embeddings)
            for i in range(emb_reduced.shape[1]):
                col_emb_name = f'{col_name}_emb_{i}'
                df_cat_pd[col_emb_name] = emb_reduced[:, i]
                embedding_cols_local.append(col_emb_name)
        self.embedding_cols = embedding_cols_local

        # Retorna DataFrame sem as colunas categóricas originais
        return df_cat_pd.drop(columns=self.cols_categoricas)

    def preparar_dados(self, df_spark):
        """
        Prepara os dados para clusterização:
        - Preenche valores nulos e converte colunas numéricas para float.
        - Processa colunas categóricas e numéricas, mesclando ambas.
        - Normaliza os dados e aplica PCA para redução de dimensionalidade.
        - Retorna DataFrame mesclado e matriz reduzida pelo PCA.
        """
        df_spark = df_spark.fillna(0)
        for c in self.cols_numericas:
            df_spark = df_spark.withColumn(c, col(c).cast("float"))

        # Processa colunas categóricas e numéricas, mesclando ambas
        df_cat_pd = self._processar_categoricas(df_spark)
        df_num_pd = df_spark.select(['cnpj', 'data_emissao', 'tipo_fundo'] + self.cols_numericas).toPandas()
        df_merged = df_num_pd.merge(df_cat_pd, on=['cnpj', 'data_emissao', 'tipo_fundo'], how='left')

        # Preenche valores nulos nas colunas de interesse
        for col_name in self.cols_numericas + self.embedding_cols:
            if col_name in df_merged.columns:
                df_merged[col_name] = df_merged[col_name].fillna(0)

        # Normaliza os dados e aplica PCA para redução de dimensionalidade
        X = df_merged[self.cols_numericas + self.embedding_cols].values
        X_scaled = self.scaler.fit_transform(X)

        n_features = X_scaled.shape[1]
        if n_features < 2:
            n_components_pca = n_features
        else:
            n_components_pca = 0.9
        self.pca = PCA(n_components=n_components_pca, random_state=42)
        X_pca = self.pca.fit_transform(X_scaled)

        return df_merged, X_pca

    def escolher_k_automatico(self, X_pca, k_min=2, k_max=10, tipo_fundo=None):
        """
        Determina automaticamente o número ótimo de clusters (k):
        - Calcula SSE (inércia) e Silhouette Score para diferentes valores de k.
        - Plota gráficos do método do cotovelo e silhueta.
        - Retorna o valor de k considerado ótimo.
        """
        sse = []
        sil_scores = []
        k_range = range(k_min, k_max + 1)

        for k in k_range:
            kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
            labels = kmeans.fit_predict(X_pca)
            sse.append(kmeans.inertia_)
            sil_scores.append(silhouette_score(X_pca, labels))

        # Plota gráficos do método do cotovelo e silhueta
        plt.figure(figsize=(12,5))
        plt.subplot(1, 2, 1)
        plt.plot(k_range, sse, marker='o')
        titulo_cotovelo = "Método do Cotovelo"
        if tipo_fundo:
            titulo_cotovelo += f" - {tipo_fundo}"
        plt.title(titulo_cotovelo)
        plt.xlabel("Número de Clusters (k)")
        plt.ylabel("SSE (Inércia)")

        plt.subplot(1, 2, 2)
        plt.plot(k_range, sil_scores, marker='o', color='orange')
        titulo_silhueta = "Coeficiente de Silhueta"
        if tipo_fundo:
            titulo_silhueta += f" - {tipo_fundo}"
        plt.title(titulo_silhueta)
        plt.xlabel("Número de Clusters (k)")
        plt.ylabel("Silhouette Score")
        plt.tight_layout()
        plt.show()

        # Define k ótimo com base nos métodos do cotovelo e silhueta
        k_otimo_inercia = k_range[np.argmin(np.gradient(np.gradient(sse)))]
        k_otimo_sil = k_range[np.argmax(sil_scores)]

        if k_otimo_inercia == k_otimo_sil:
            return k_otimo_inercia
        else:
            return round((k_otimo_inercia + k_otimo_sil) / 2)

    def aplicar_kmeans(self, df_merged, X_pca, k, tipo_fundo=None):
        """
        Aplica o algoritmo KMeans para clusterização:
        - Ajusta o modelo KMeans com o número de clusters k.
        - Adiciona colunas de identificação de cluster ao DataFrame.
        - Plota os clusters se houver mais de uma dimensão.
        - Retorna o DataFrame com os clusters atribuídos.
        """
        self.kmeans = KMeans(n_clusters=k, random_state=42, n_init=20)
        clusters = self.kmeans.fit_predict(X_pca)
        df_merged['cluster_id'] = clusters
        df_merged['cluster_id_full'] = df_merged['tipo_fundo'].astype(str) + "_" + df_merged['cluster_id'].astype(str)
        
        # Plota os clusters se houver mais de uma dimensão
        if X_pca.shape[1] > 1:
            plt.figure(figsize=(8,6))
            sns.scatterplot(x=X_pca[:,0], y=X_pca[:,1], hue=clusters, palette='Set1', s=60)
            titulo_cluster = f"Clusters K-Means (k={k}) + PCA + Embeddings"
            if tipo_fundo:
                titulo_cluster += f" - {tipo_fundo}"
            plt.title(titulo_cluster)
            plt.xlabel("PC1")
            plt.ylabel("PC2")
            plt.legend(title='Cluster')
            plt.show()
        
        return df_merged

    def clusterizar_todos_tipos(self, df_spark, tipos, k_por_tipo=None):
        """
        Realiza a clusterização para cada tipo de fundo especificado:
        - Para cada tipo, prepara os dados, define k (manual ou automaticamente), aplica KMeans.
        - Retorna um dicionário com os resultados em pandas e Spark DataFrame para cada tipo.
        """
        resultados = {}
        for tipo in tipos:
            try:
                df_tipo = df_spark.filter(df_spark["tipo_fundo"] == tipo)
                df_merge, X_pca = self.preparar_dados(df_tipo)

                # Define k manualmente ou automaticamente
                if k_por_tipo and tipo in k_por_tipo:
                    k = k_por_tipo[tipo]
                else:
                    k = self.escolher_k_automatico(X_pca, tipo_fundo=tipo)

                df_result = self.aplicar_kmeans(df_merge, X_pca, k, tipo_fundo=tipo)
                resultados[tipo] = df_result

                # Adiciona cluster_id e cluster_id_full ao DataFrame Spark de cada tipo
                df_tipo_pd = df_result[['cnpj', 'data_emissao', 'cluster_id', 'cluster_id_full']]
                df_tipo_spark = df_tipo.join(
                    spark.createDataFrame(df_tipo_pd),
                    on=['cnpj', 'data_emissao'],
                    how='left'
                )
                resultados[f"{tipo}_spark"] = df_tipo_spark

            except Exception:
                continue

        return resultados

### 2.1. Parâmetros de Clusterização

In [0]:
# Lista de colunas numéricas relevantes para clusterização dos fundos
cols_num = [
    'valor_cota_emissao',                  # Valor da cota na emissão
    'taxa_distribuicao_emissao',           # Taxa de distribuição na emissão
    'quantidade_cotas_totais',             # Quantidade de cotas totais emitidas
    'investimento_minimo_cpf_cnpj',        # Investimento mínimo para CPF/CNPJ
    'investimento_minimo_inst',            # Investimento mínimo para institucionais
    'investimento_maximo_cpf_cnpj',        # Investimento máximo para CPF/CNPJ
    'investimento_maximo_inst',            # Investimento máximo para institucionais
    'percentual_oferta_institucional',     # Percentual da oferta para institucionais
    'montante_minimo_emissao',             # Montante mínimo da emissão
    'volume_base_emissao',                 # Volume base da emissão
    'chamada_capital_ipca',                # Chamada de capital indexada ao IPCA
    'volatilidade_historica', 
    'liquidez_media', 
    'drawdown_max', 
    'retorno_acumulado', 
    'sharpe_ratio'
]

# Lista de colunas categóricas relevantes para clusterização dos fundos
cols_cat = [
    'tabela_ativos_fundo',                 # Tabela de ativos do fundo
    'publico_alvo',                        # Público alvo do fundo
    'ordenar_fatores_risco',               # Fatores de risco ordenados
    'diluicao_economica_novas_emissoes',   # Diluição econômica em novas emissões
    'criterio_rateio',                     # Critério de rateio
    'tipo_anbima'                          # Tipo ANBIMA do fundo
]

### 2.2. Aplicando Classe de Cluster

In [0]:
# Inicializa o clusterizador com colunas numéricas e categóricas
clusterizador = ClusterizadorFundos(cols_num, cols_cat)

# Define os tipos de fundos a serem clusterizados
tipos_fundos = ['FIDC', 'FII', 'FIP']

# Define manualmente o número de clusters para cada tipo de fundo (ajustado conforme Elbow e Silluette Methods)
# Caso contrário, o modelo define automaticamente a média das metodologias
k_manual = {'FII': 5, 'FIDC': 5, 'FIP': 5}

# Realiza a clusterização para cada tipo de fundo usando os valores de k definidos
resultados = clusterizador.clusterizar_todos_tipos(df_final, tipos_fundos, k_por_tipo=k_manual)

In [0]:
for tipo in tipos_fundos:
    print(f'Visualizando: {tipo}')
    display(resultados[tipo])

In [0]:
# Salvando os resultados no Schema

from pyspark.sql.functions import col

def select_cluster_cols(df_cluster):
    """Seleciona apenas as colunas relevantes para o join de clusters."""
    return df_cluster.select("cnpj", "data_emissao", "cluster_id", "cluster_id_full")

def bind_clusters(df_base, df_cluster):
    """Faz join entre o DataFrame base e o de clusterização."""
    return df_base.join(
        select_cluster_cols(df_cluster),
        on=["cnpj", "data_emissao"],
        how="left"
    )

# Lista de tipos de fundo a serem processados
tipos_fundos = ['FIDC', 'FII', 'FIP']

dfs_bind = {}
for tipo in tipos_fundos:
    # Recupera o DataFrame base para o tipo de fundo
    df_base = globals()[f"df_{tipo.lower()}"]
    # Recupera o DataFrame de clusters para o tipo de fundo
    df_cluster = resultados[f"{tipo}_spark"]

    # Realiza o join entre o DataFrame base e o de clusters
    df_bind = bind_clusters(df_base, df_cluster)
    dfs_bind[tipo] = df_bind

    # Cria variável global para o DataFrame resultante do join
    globals()[f"df_{tipo.lower()}_bind"] = df_bind

    # Salva o DataFrame resultante no schema especificado, substituindo se já existir
    tabela = f"desafio_kinea.prospecto_fundos.resultados_cluster_{tipo.lower()}"
    df_bind.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable(tabela)