In [6]:
import math
import json
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Tuple

# Tente habilitar KMeans; se não existir, a heurística continua funcionando
try:
    from sklearn.cluster import KMeans
    HAS_SK = True
except Exception:
    HAS_SK = False

In [None]:
# ---------------------- Dataset de exemplo (mais dados) ----------------------
# 5 trimestres por ticker (Q1 2023 ... Q1 2024)
entrada: List[Dict[str, Any]] = [
    # AAPL
    {"ativo":"AAPL","nome":"Apple Inc.","setor":"Tecnologia","cotacao":150.20,"trimestre":"Q1 2023","dividendos_pagos":0.24,"lucro_liquido":24000000000},
    {"ativo":"AAPL","nome":"Apple Inc.","setor":"Tecnologia","cotacao":175.50,"trimestre":"Q2 2023","dividendos_pagos":0.24,"lucro_liquido":19000000000},
    {"ativo":"AAPL","nome":"Apple Inc.","setor":"Tecnologia","cotacao":180.10,"trimestre":"Q3 2023","dividendos_pagos":0.24,"lucro_liquido":23000000000},
    {"ativo":"AAPL","nome":"Apple Inc.","setor":"Tecnologia","cotacao":190.00,"trimestre":"Q4 2023","dividendos_pagos":0.24,"lucro_liquido":30000000000},
    {"ativo":"AAPL","nome":"Apple Inc.","setor":"Tecnologia","cotacao":190.23,"trimestre":"Q1 2024","dividendos_pagos":0.24,"lucro_liquido":25000000000},

    # NVDA
    {"ativo":"NVDA","nome":"NVIDIA Corporation","setor":"Tecnologia / Semicondutores","cotacao":230.00,"trimestre":"Q1 2023","dividendos_pagos":0.04,"lucro_liquido":2000000000},
    {"ativo":"NVDA","nome":"NVIDIA Corporation","setor":"Tecnologia / Semicondutores","cotacao":300.40,"trimestre":"Q2 2023","dividendos_pagos":0.04,"lucro_liquido":6000000000},
    {"ativo":"NVDA","nome":"NVIDIA Corporation","setor":"Tecnologia / Semicondutores","cotacao":480.30,"trimestre":"Q3 2023","dividendos_pagos":0.04,"lucro_liquido":9000000000},
    {"ativo":"NVDA","nome":"NVIDIA Corporation","setor":"Tecnologia / Semicondutores","cotacao":650.15,"trimestre":"Q4 2023","dividendos_pagos":0.04,"lucro_liquido":11000000000},
    {"ativo":"NVDA","nome":"NVIDIA Corporation","setor":"Tecnologia / Semicondutores","cotacao":880.12,"trimestre":"Q1 2024","dividendos_pagos":0.04,"lucro_liquido":12000000000},

    # KO
    {"ativo":"KO","nome":"The Coca-Cola Company","setor":"Bebidas / Consumo","cotacao":60.00,"trimestre":"Q1 2023","dividendos_pagos":0.46,"lucro_liquido":3000000000},
    {"ativo":"KO","nome":"The Coca-Cola Company","setor":"Bebidas / Consumo","cotacao":62.10,"trimestre":"Q2 2023","dividendos_pagos":0.46,"lucro_liquido":3200000000},
    {"ativo":"KO","nome":"The Coca-Cola Company","setor":"Bebidas / Consumo","cotacao":58.75,"trimestre":"Q3 2023","dividendos_pagos":0.46,"lucro_liquido":3100000000},
    {"ativo":"KO","nome":"The Coca-Cola Company","setor":"Bebidas / Consumo","cotacao":60.50,"trimestre":"Q4 2023","dividendos_pagos":0.46,"lucro_liquido":3400000000},
    {"ativo":"KO","nome":"The Coca-Cola Company","setor":"Bebidas / Consumo","cotacao":61.45,"trimestre":"Q1 2024","dividendos_pagos":0.46,"lucro_liquido":3300000000},

    # META
    {"ativo":"META","nome":"Meta Platforms, Inc.","setor":"Tecnologia / Mídia Social","cotacao":140.00,"trimestre":"Q1 2023","dividendos_pagos":0.00,"lucro_liquido":5000000000},
    {"ativo":"META","nome":"Meta Platforms, Inc.","setor":"Tecnologia / Mídia Social","cotacao":220.00,"trimestre":"Q2 2023","dividendos_pagos":0.00,"lucro_liquido":7000000000},
    {"ativo":"META","nome":"Meta Platforms, Inc.","setor":"Tecnologia / Mídia Social","cotacao":310.00,"trimestre":"Q3 2023","dividendos_pagos":0.00,"lucro_liquido":10000000000},
    {"ativo":"META","nome":"Meta Platforms, Inc.","setor":"Tecnologia / Mídia Social","cotacao":400.00,"trimestre":"Q4 2023","dividendos_pagos":0.54,"lucro_liquido":14000000000},
    {"ativo":"META","nome":"Meta Platforms, Inc.","setor":"Tecnologia / Mídia Social","cotacao":490.37,"trimestre":"Q1 2024","dividendos_pagos":0.54,"lucro_liquido":12000000000},
]

In [12]:
# ---------------------- Utils ----------------------
def _parse_quarter_to_key(q: str) -> int:
    """Converte 'Q1 2024' ou '2024 Q1' em ano*4+tri, para ordenar no tempo."""
    try:
        q = str(q).strip().upper().replace("º","")
        parts = q.split()
        if len(parts) == 2:
            if parts[0].startswith("Q"):
                tri = int(parts[0].replace("Q",""))
                ano = int(parts[1])
            else:
                ano = int(parts[0])
                tri = int(parts[1].replace("Q",""))
            tri = max(1, min(4, tri))
            return ano*4 + tri
    except Exception:
        pass
    return 0

def _safe_pct_change(first: float, last: float) -> float:
    if first is None or last is None or first == 0:
        return 0.0
    try:
        return (last - first) / float(first)
    except Exception:
        return 0.0

def _compute_drawdown(prices: np.ndarray) -> float:
    """Drawdown do último ponto versus pico histórico."""
    if prices.size == 0:
        return 0.0
    running_max = np.maximum.accumulate(prices)
    peak = running_max[-1] if running_max[-1] != 0 else prices[-1]
    if peak == 0:
        return 0.0
    return max(0.0, 1.0 - (prices[-1] / peak))

def _log_return_volatility(prices: np.ndarray) -> float:
    """Volatilidade de retornos log entre observações."""
    if prices.size <= 1:
        return 0.0
    rets = np.diff(np.log(prices + 1e-9))
    return float(np.std(rets))

def _min_max_scale(arr: np.ndarray, out_min: float, out_max: float) -> np.ndarray:
    a_min, a_max = float(np.min(arr)), float(np.max(arr))
    if math.isclose(a_min, a_max):
        return np.full_like(arr, (out_min + out_max)/2.0)
    return (arr - a_min) / (a_max - a_min) * (out_max - out_min) + out_min

In [13]:
# ---------------------- Core: feature engineering ----------------------
def build_features(registros: List[Dict[str, Any]]) -> pd.DataFrame:
    df = pd.DataFrame(registros).copy()
    if df.empty:
        return pd.DataFrame()
    df["tri_key"] = df["trimestre"].map(_parse_quarter_to_key)
    df = df.sort_values(["ativo","tri_key"]).reset_index(drop=True)

    feats = []
    for ativo, g in df.groupby("ativo"):
        g = g.sort_values("tri_key")
        prices = g["cotacao"].astype(float).to_numpy()
        incomes = g["lucro_liquido"].astype(float).to_numpy()
        dividends = g["dividendos_pagos"].astype(float).to_numpy()
        name = g["nome"].iloc[-1]
        setor = g["setor"].iloc[-1]

        last_price = float(prices[-1]) if prices.size else 0.0
        mean_price = float(np.mean(prices)) if prices.size else last_price

        # Dividend yield TTM aprox.: média dos últimos 4 trimestres * 4 / preço atual
        div_ttm = float(np.mean(dividends[-4:])) * 4.0 if dividends.size else 0.0
        dividend_yield = (div_ttm / last_price) if last_price > 0 else 0.0

        vol = _log_return_volatility(prices)
        dd = _compute_drawdown(prices)
        momentum = (last_price / mean_price - 1.0) if mean_price > 0 else 0.0
        income_growth = _safe_pct_change(float(incomes[0]) if incomes.size else 0.0,
                                         float(incomes[-1]) if incomes.size else 0.0)

        feats.append({
            "ticker": ativo,
            "nome": name,
            "setor": setor,
            "last_price": last_price,
            "dividend_yield": dividend_yield,
            "vol": vol,
            "drawdown": dd,
            "momentum": momentum,
            "income_growth": income_growth
        })
    return pd.DataFrame(feats)


In [14]:
# ---------------------- Risco: Heurística ----------------------
def label_risk_heuristic(fdf: pd.DataFrame) -> pd.Series:
    risk_score = (
        0.6 * fdf["vol"].to_numpy() +
        0.4 * fdf["drawdown"].to_numpy() -
        0.2 * fdf["dividend_yield"].to_numpy() -
        0.1 * fdf["income_growth"].to_numpy()
    )
    if len(risk_score) == 1:
        q1 = q2 = risk_score[0]
    else:
        q1 = float(np.quantile(risk_score, 1/3))
        q2 = float(np.quantile(risk_score, 2/3))

    labels = []
    for s in risk_score:
        if s <= q1:
            labels.append("baixo")
        elif s <= q2:
            labels.append("médio")
        else:
            labels.append("alto")
    return pd.Series(labels, index=fdf.index)

In [15]:
# ---------------------- Risco: KMeans opcional ----------------------
def label_risk_kmeans(fdf: pd.DataFrame, random_state: int = 42) -> pd.Series:
    if not HAS_SK or len(fdf) < 3:
        # Fallback para heurística se não tiver sklearn ou poucos pontos
        return label_risk_heuristic(fdf)

    X = fdf[["vol","drawdown","dividend_yield","income_growth"]].to_numpy()
    km = KMeans(n_clusters=3, n_init="auto", random_state=random_state)
    clusters = km.fit_predict(X)

    # Mapeia clusters -> ("baixo","médio","alto") por um escore de risco
    # (maior vol+dd e menor yield/growth => risco maior)
    centroid_scores = []
    for k in range(3):
        c = km.cluster_centers_[k]
        c_vol, c_dd, c_yield, c_growth = c
        score = 0.6*c_vol + 0.4*c_dd - 0.2*c_yield - 0.1*c_growth
        centroid_scores.append((k, score))
    centroid_scores.sort(key=lambda x: x[1])  # baixo -> alto

    # menor score = "baixo", intermediário = "médio", maior = "alto"
    mapping = {centroid_scores[0][0]: "baixo",
               centroid_scores[1][0]: "médio",
               centroid_scores[2][0]: "alto"}
    return pd.Series([mapping[c] for c in clusters], index=fdf.index)

In [16]:
# ---------------------- Ranking + Decisão ----------------------
def rank_and_decide(fdf: pd.DataFrame, risco_labels: pd.Series) -> pd.DataFrame:
    score = (
        0.5 * fdf["momentum"].to_numpy() +
        0.3 * fdf["income_growth"].to_numpy() +
        0.2 * fdf["dividend_yield"].to_numpy() -
        0.4 * fdf["vol"].to_numpy() -
        0.2 * fdf["drawdown"].to_numpy()
    )
    rent = _min_max_scale(score, 5.0, 12.0)  # mapeia para 5%-12% (didático)

    decisao = []
    for r, risco in zip(rent, risco_labels.tolist()):
        if r >= 9.0 and risco in ("baixo","médio"):
            decisao.append("comprar")
        elif r >= 6.0:
            decisao.append("manter")
        else:
            decisao.append("vender" if risco == "alto" else "manter")

    out = fdf.copy()
    out["risco"] = risco_labels
    out["rentabilidade_estimada"] = rent
    out["decisao_investimento"] = decisao
    out["preco_alvo"] = out["last_price"] * (1.0 + out["rentabilidade_estimada"]/100.0)

    out = out.sort_values("rentabilidade_estimada", ascending=False).reset_index(drop=True)
    out["ranking"] = np.arange(1, len(out)+1)
    return out


In [17]:
# ---------------------- Pipeline principal ----------------------
def classificar_ativos(
    registros: List[Dict[str, Any]],
    risk_method: str = "heuristic"  # "heuristic" | "kmeans"
) -> Tuple[pd.DataFrame, List[Dict[str, Any]]]:
    fdf = build_features(registros)
    if fdf.empty:
        return fdf, []

    if risk_method == "kmeans":
        risco = label_risk_kmeans(fdf)
    else:
        risco = label_risk_heuristic(fdf)

    out = rank_and_decide(fdf, risco)

    # Formata saída no estilo pedido
    saida: List[Dict[str, Any]] = []
    for _, row in out.iterrows():
        saida.append({
            "ticker": row["ticker"],
            "nome": row["nome"],
            "cotacao": round(float(row["last_price"]), 2),
            "dividend_yield": round(float(row["dividend_yield"]), 4),  # fração (0.0123 = 1.23%)
            "preco_alvo": round(float(row["preco_alvo"]), 2),
            "decisao_investimento": row["decisao_investimento"],
            "rentabilidade_estimada": round(float(row["rentabilidade_estimada"]), 2),  # %
            "risco": row["risco"],
            "ranking": int(row["ranking"]),
        })
    return out, saida

In [None]:
# ---------------------- Execução de exemplo ----------------------
out_df, saida = classificar_ativos(entrada, risk_method="heuristic")  # troque para "kmeans" se quiser
display(out_df[["ticker","nome","last_price","dividend_yield","vol","drawdown","momentum","income_growth","risco","rentabilidade_estimada","preco_alvo","decisao_investimento","ranking"]])

print("\nPrévia JSON:")
print(json.dumps(saida, ensure_ascii=False, indent=2)[:800], "...\n")

# Exporte se quiser baixar do Colab
out_df.to_csv("ranking_ativos.csv", index=False)
with open("saida_modelo.json","w",encoding="utf-8") as f:
    json.dump(saida, f, ensure_ascii=False, indent=2)

print("Arquivos salvos: ranking_ativos.csv, sa... (saida_modelo.json)")