In [2]:
# ============================================
# Security Market Line (SML) — CAPM
# Snowflake → IBEX + activos → β (excesos, OLS) → SML anual (geom.)
# Formatos: MENSUAL (β mensual, anualiza E[Ri]) y ANUAL (β anual)
# Estética uniforme:
#   - SML desde β=0 → arranca en Rf
#   - Ȳ = E[Rm] (geom) con etiqueta a la derecha
#   - Caja FÓRMULA (arriba-derecha, dentro del gráfico)
#   - Caja CLASES TP/FP/FN/TN (abajo-derecha, dentro del gráfico)
#   - Encabezado visible indicando ANUAL / MENSUAL
# ============================================

import pandas as pd
import numpy as np
import snowflake.connector
import statsmodels.api as sm
import plotly.graph_objects as go

# ============
# 0) CONFIG
# ============
CONFIG = dict(
    start_date   = '2020-01-01',
    end_date     = '2024-12-31',
    ticker_ibex  = '^IBEX',
    rf_anual     = 0.03,               # 3% anual
    tickers      = [
        "IBE.MC","ITX.MC","TEF.MC","BBVA.MC","SAN.MC","REP.MC",
        "AENA.MC","IAG.MC","ENG.MC","ACS.MC","FER.MC","CABK.MC",
        "ELE.MC","MAP.MC"
    ],
    # Conexión Snowflake
    sf = dict(
        user='TFMGRUPO4',
        password='TFMgrupo4ucm01_01#',
        account='WYNIFVB-YE01854',
        warehouse='COMPUTE_WH',
        database='YAHOO_FINANCE',
        schema='MACHINE_LEARNING',
        role='ACCOUNTADMIN'
    )
)

# Frecuencias disponibles y cómo remuestrear precios:
FREQ_PRESETS = {
    # β mensual; anualizamos expectativas con geom periods_per_year=12
    'mensual': dict(
        resample_rule='ME',           # fin de mes
        resample_agg='mean',          # 'mean' o 'last' (cámbialo si prefieres fin de mes estricto)
        periods_per_year=12,
        freq_label='mensual',
        periodo_label='meses'
    ),
    # β anual; expectativas ya son anuales (periods_per_year=1)
    'anual': dict(
        resample_rule='Y',            # fin de año
        resample_agg='last_valid',    # último precio válido del año
        periods_per_year=1,
        freq_label='anual',
        periodo_label='años'
    )
}

# ==========================
# 1) UTILIDADES DE DATOS
# ==========================
def connect_snowflake(sf_cfg):
    return snowflake.connector.connect(**sf_cfg)

def fetch_ibex(conn, ticker_ibex, start_date, end_date):
    q = f"""
        SELECT DATE, STOCKCLOSE
        FROM INDEX_TOTALES
        WHERE TICKER = '{ticker_ibex}'
          AND DATE BETWEEN '{start_date}' AND '{end_date}'
        ORDER BY DATE
    """
    cur = conn.cursor(); cur.execute(q)
    df = pd.DataFrame(cur.fetchall(), columns=['DATE','IBEX'])
    cur.close()
    df['DATE'] = pd.to_datetime(df['DATE'])
    df['IBEX'] = pd.to_numeric(df['IBEX'], errors='coerce').astype(float)
    return df.set_index('DATE').sort_index()

def fetch_ticker(conn, tk, start_date, end_date):
    q = f"""
        SELECT FECHA, CLOSE
        FROM TICKERS_INDEX
        WHERE TICKER = '{tk}'
          AND FECHA BETWEEN '{start_date}' AND '{end_date}'
        ORDER BY FECHA
    """
    cur = conn.cursor(); cur.execute(q)
    df = pd.DataFrame(cur.fetchall(), columns=['FECHA', tk])
    cur.close()
    df['FECHA'] = pd.to_datetime(df['FECHA'])
    df[tk] = pd.to_numeric(df[tk], errors='coerce').astype(float)
    return df.set_index('FECHA').sort_index()

def build_panel(conn, cfg):
    df_ibex = fetch_ibex(conn, cfg['ticker_ibex'], cfg['start_date'], cfg['end_date'])
    dfs = {'IBEX': df_ibex['IBEX']}
    for tk in cfg['tickers']:
        df_tk = fetch_ticker(conn, tk, cfg['start_date'], cfg['end_date'])
        dfs[tk] = df_tk[tk]
    return pd.concat(dfs, axis=1, join='outer').sort_index()

def last_valid(series: pd.Series):
    s = series.dropna()
    return s.iloc[-1] if not s.empty else np.nan

def resample_prices(panel, preset):
    rule = preset['resample_rule']
    agg  = preset['resample_agg']
    if agg == 'mean':
        px = panel.resample(rule).mean()
    elif agg == 'last':
        px = panel.resample(rule).last()
    elif agg == 'last_valid':
        px = panel.resample(rule).apply(last_valid)
    else:
        raise ValueError("resample_agg no soportado")
    return px

def returns_from_prices(panel_resampled):
    # pct_change sin forward fill para evitar mezclas
    return panel_resampled.pct_change(fill_method=None)

def rf_period_from_annual(rf_anual, periods_per_year):
    if periods_per_year == 1:
        return rf_anual
    return (1 + rf_anual)**(1/periods_per_year) - 1

# ==========================
# 2) UTILIDADES ESTADÍSTICAS
# ==========================
def geometric_annualized(r: pd.Series, periods_per_year=12):
    r = r.dropna()
    n = r.shape[0]
    if n == 0:
        return np.nan
    gross = (1 + r).prod()
    return gross**(periods_per_year / n) - 1

def estimate_beta_alpha_r2(ri: pd.Series, rm: pd.Series, rf_period: float):
    """
    OLS en EXCESOS con constante:
        (Ri - Rf) = α + β (Rm - Rf) + ε
    """
    pair = pd.concat([rm, ri], axis=1, join='inner').dropna()
    pair.columns = ['Rm', 'Ri']
    nobs = pair.shape[0]
    if nobs < 3:
        return None, None, None, nobs
    X = pair['Rm'] - rf_period
    y = pair['Ri'] - rf_period
    Xc = sm.add_constant(X)
    res = sm.OLS(y, Xc).fit()
    beta  = float(res.params.get('Rm', np.nan))
    alpha = float(res.params.get('const', np.nan))
    r2    = float(res.rsquared)
    return beta, alpha, r2, nobs

def build_points(rets, tickers, rf_anual, preset):
    """Calcula E[Rm] geom anual, β/α/R² por activo y mispricing; clasifica TP/FP/FN/TN."""
    periods_per_year = preset['periods_per_year']
    rf_p = rf_period_from_annual(rf_anual, periods_per_year)

    # E[Rm] geom anual sobre retornos de mercado a la frecuencia de β
    E_Rm_ann = geometric_annualized(rets['IBEX'], periods_per_year=periods_per_year)
    if pd.isna(E_Rm_ann):
        raise RuntimeError("No se pudo estimar E[Rm] geométrico (IBEX).")
    market_premium = E_Rm_ann - rf_anual

    rows = []
    for tk in tickers:
        beta, alpha, r2, nobs = estimate_beta_alpha_r2(rets[tk], rets['IBEX'], rf_p)
        if beta is None:
            print(f"[AVISO] {tk}: observaciones insuficientes (n={nobs}). Se omite.")
            continue
        pair = pd.concat([rets['IBEX'], rets[tk]], axis=1, join='inner').dropna()
        # E[Ri] anual geométrico
        E_Ri_ann = geometric_annualized(pair[tk], periods_per_year=periods_per_year)
        E_Ri_capm = rf_anual + beta * (E_Rm_ann - rf_anual)
        mispricing = E_Ri_ann - E_Ri_capm
        rows.append({
            'Ticker': tk,
            'Beta': beta,
            'Alpha_excesos': alpha,
            'R2': r2,
            'N_obs': nobs,
            'E_Ri_ann_geom': E_Ri_ann,
            'E_Ri_CAPM': E_Ri_capm,
            'Mispricing': mispricing
        })

    df_points = pd.DataFrame(rows).sort_values('Beta').reset_index(drop=True)
    if df_points.empty:
        raise RuntimeError("No hay puntos para graficar; revisa cobertura y fechas.")

    # Clasificación TP / FP / FN / TN (respecto a E[Rm] y SML)
    above_mean = df_points['E_Ri_ann_geom'] >= E_Rm_ann
    above_sml  = df_points['E_Ri_ann_geom'] >= df_points['E_Ri_CAPM']
    def _class_row(am, asml):
        if am and asml:        return 'TP'
        if am and not asml:    return 'FP'
        if (not am) and asml:  return 'FN'
        return 'TN'
    df_points['Class'] = [_class_row(am, asml) for am, asml in zip(above_mean, above_sml)]

    return df_points, E_Rm_ann, market_premium

# ==========================
# 3) PLOTTING SML (Plotly)
# ==========================
CLASS_COLOR  = {'TP':'#2ca02c', 'FP':'#ff7f0e', 'FN':'#1f77b4', 'TN':'#d62728'}
CLASS_SYMBOL = {'TP':'circle',  'FP':'diamond', 'FN':'triangle-up', 'TN':'x'}

def plot_sml(df_points, E_Rm_ann, rf_anual, market_premium,
             cfg, preset, *, mode_key, show_labels=True):
    beta_min = 0.0
    beta_max = max(1.5, float(df_points['Beta'].max() + 0.2))
    betas_line = np.linspace(beta_min, beta_max, 200)
    # Línea SML ANCLADA en Rf
    E_R_line = rf_anual + market_premium * betas_line

    y_vals = np.concatenate([
        df_points['E_Ri_ann_geom'].values,
        [rf_anual, E_Rm_ann, E_R_line.min(), E_R_line.max()]
    ])
    y_min = float(np.nanmin(y_vals)) - 0.03
    y_max = float(np.nanmax(y_vals)) + 0.03

    fig = go.Figure()

    # SML
    fig.add_trace(go.Scatter(
        x=betas_line, y=E_R_line, mode='lines',
        line=dict(width=3),
        showlegend=False, name='SML'
    ))

    # Ȳ = E[Rm] (geom)
    fig.add_hline(
        y=E_Rm_ann, line_dash="dash", line_width=2,
        annotation_text=f"Ȳ = E[Rm] {E_Rm_ann:.2%}",
        annotation_position="top right", annotation_yshift=6
    )

    # Rf (β=0)
    fig.add_trace(go.Scatter(
        x=[0], y=[rf_anual], mode='markers',
        marker=dict(symbol='x', size=12, line=dict(width=1.5)),
        showlegend=False, name='Rf',
        hovertemplate="<b>Activo sin riesgo</b><br>β=0.00<br>Rendimiento= %{y:.2%}<extra></extra>"
    ))

    # Mercado (β=1)
    fig.add_trace(go.Scatter(
        x=[1], y=[E_Rm_ann], mode='markers',
        marker=dict(size=12, line=dict(width=1.5)),
        showlegend=False, name='Mercado',
        hovertemplate="<b>Mercado</b><br>β=1.00<br>E[Rm]= %{y:.2%}<extra></extra>"
    ))

    # Puntos por clase
    periodo_label = preset['periodo_label']
    for cls in ['TP','FP','FN','TN']:
        sub = df_points[df_points['Class'] == cls]
        if sub.empty: 
            continue
        fig.add_trace(go.Scatter(
            x=sub['Beta'], y=sub['E_Ri_ann_geom'], mode='markers',
            marker=dict(
                size=10, symbol=CLASS_SYMBOL[cls],
                color=CLASS_COLOR[cls],
                line=dict(width=1, color='rgba(0,0,0,0.45)')
            ),
            showlegend=False, name=f'Activos {cls}',
            hovertemplate=(
                "<b>%{customdata[0]}</b><br>"
                f"Clase = {cls} "
                "(%{customdata[8]})<br>"
                f"n = %{{customdata[7]}} {periodo_label}<br>"
                "β = %{x:.3f} | E[Ri] (geom, anual) = %{y:.2%}<br>"
                "E[Rm] = %{customdata[1]:.2%} | Rf = %{customdata[2]:.2%}<br>"
                "<b>CAPM</b>: E[Ri]_CAPM = Rf + β·(E[Rm]−Rf) = "
                "%{customdata[2]:.2%} + %{x:.3f}·(%{customdata[1]:.2%} − %{customdata[2]:.2%}) "
                "= %{customdata[3]:.2%}<br>"
                "α (excesos) = %{customdata[4]:.2%} | R² = %{customdata[5]:.3f}<br>"
                "Mispricing = E[Ri] − E[Ri]_CAPM = %{customdata[6]:.2%}"
                "<extra></extra>"
            ),
            customdata=np.stack([
                sub['Ticker'],
                np.full(len(sub), E_Rm_ann),
                np.full(len(sub), rf_anual),
                sub['E_Ri_CAPM'],
                sub['Alpha_excesos'],
                sub['R2'],
                sub['Mispricing'],
                sub['N_obs'],
                np.where(
                    (sub['E_Ri_ann_geom'] >= E_Rm_ann) & (sub['E_Ri_ann_geom'] >= sub['E_Ri_CAPM']),
                    "↑media & ↑SML",
                    np.where(
                        (sub['E_Ri_ann_geom'] >= E_Rm_ann) & (sub['E_Ri_ann_geom'] < sub['E_Ri_CAPM']),
                        "↑media & ↓SML",
                        np.where(
                            (sub['E_Ri_ann_geom'] < E_Rm_ann) & (sub['E_Ri_ann_geom'] >= sub['E_Ri_CAPM']),
                            "↓media & ↑SML",
                            "↓media & ↓SML"
                        )
                    )
                )
            ], axis=-1)
        ))

    # Etiquetas conmutables (si show_labels=True)
    if show_labels:
        fig.add_trace(go.Scatter(
            x=df_points['Beta'], y=df_points['E_Ri_ann_geom'],
            mode='text', text=df_points['Ticker'],
            textposition='top center', textfont=dict(size=10),
            showlegend=False, name='Etiquetas'
        ))

    # -------- Encabezado visual del modo (ANUAL / MENSUAL) --------
    mode_badge = preset['freq_label'].upper()
    fig.add_annotation(
        xref="paper", yref="paper",
        x=0.01, y=0.98, xanchor="left", yanchor="top",
        text=f"<b>{mode_badge}</b>",
        showarrow=False, align="left",
        bordercolor="rgba(0,0,0,0.15)", borderwidth=1,
        bgcolor="rgba(255,255,255,0.90)", font=dict(size=12),
        borderpad=6
    )

    # Caja FÓRMULA (arriba-derecha, dentro del plot)
    nota_formula = (
        "CAPM:  E[Ri] = Rf + β·(E[Rm]−Rf)"
        f"<br>Rf = {rf_anual:.2%} · E[Rm] (geom) = {E_Rm_ann:.2%}"
        f"<br>Prima de mercado = {market_premium:.2%}"
    )
    fig.add_annotation(
        xref="paper", yref="paper",
        x=0.99, y=0.98, xanchor="right", yanchor="top",
        text=nota_formula, showarrow=False, align="right",
        bordercolor="rgba(0,0,0,0.15)", borderwidth=1,
        bgcolor="rgba(255,255,255,0.90)", font=dict(size=11),
        borderpad=6
    )

    # Caja CLASES (abajo-derecha, dentro del plot)
    cnt = df_points['Class'].value_counts().to_dict()
    nota_clases = (
        "<b>Clases</b>: "
        f"<span style='color:{CLASS_COLOR['TP']}'>■ TP</span> "
        f"<span style='color:{CLASS_COLOR['FP']}'>■ FP</span> "
        f"<span style='color:{CLASS_COLOR['FN']}'>■ FN</span> "
        f"<span style='color:{CLASS_COLOR['TN']}'>■ TN</span>"
        f"<br>TP={cnt.get('TP',0)} · FP={cnt.get('FP',0)} · FN={cnt.get('FN',0)} · TN={cnt.get('TN',0)}"
    )
    fig.add_annotation(
        xref="paper", yref="paper",
        x=0.98, y=0.06, xanchor="right", yanchor="bottom",
        text=nota_clases, showarrow=False, align="right",
        bordercolor="rgba(0,0,0,0.15)", borderwidth=1,
        bgcolor="rgba(255,255,255,0.90)", font=dict(size=11),
        borderpad=6
    )

    # Título con modo destacado: “SML (CAPM) — ANUAL / MENSUAL …”
    title_prefix = f"SML (CAPM) — {mode_badge}"
    fig.update_layout(
        template='simple_white',
        title=(f"{title_prefix}:  E[Ri] = Rf + β(E[Rm]−Rf)  ·  "
               f"Periodo: {cfg['start_date']} → {cfg['end_date']}  ·  Frecuencia β: {preset['freq_label']}"),
        xaxis_title="Beta (β)",
        yaxis_title="Rendimiento esperado ANUAL",
        margin=dict(l=40, r=60, t=90, b=80),
        hovermode='closest',
        showlegend=False
    )
    fig.update_xaxes(
        range=[beta_min, beta_max],
        zeroline=True, zerolinewidth=1, zerolinecolor="#B0B0B0",
        showline=True, linecolor="#888", dtick=0.25
    )
    fig.update_yaxes(
        range=[y_min, y_max],
        zeroline=True, zerolinewidth=1, zerolinecolor="#B0B0B0",
        showline=True, linecolor="#888", tickformat=".0%"
    )
    fig.show()
    return fig

# ==========================
# 4) PIPELINE PRINCIPAL
# ==========================
def run_sml(mode='mensual', cfg=CONFIG):
    """
    mode ∈ {'mensual','anual'}
      - mensual: β mensual (excesos con Rf mensual), E[Ri] anual geom (periods_per_year=12)
      - anual  : β anual (excesos con Rf anual),   E[Ri] anual geom (periods_per_year=1)
    """
    preset = FREQ_PRESETS[mode]

    # Encabezado en consola
    print("\n" + "="*78)
    print(f"SML — {preset['freq_label'].upper()}  |  Periodo: {cfg['start_date']} → {cfg['end_date']}")
    print("="*78)

    conn = connect_snowflake(cfg['sf'])
    try:
        panel = build_panel(conn, cfg)
    finally:
        conn.close()

    # 1) Remuestreo de precios (según frecuencia)
    panel_rs = resample_prices(panel, preset)
    # 2) Retornos a la frecuencia
    rets = returns_from_prices(panel_rs)

    # 3) Puntos SML (β/α/R², E[Rm], mispricing y clases)
    df_points, E_Rm_ann, market_premium = build_points(rets, cfg['tickers'], cfg['rf_anual'], preset)

    # 4) Gráfica
    plot_sml(df_points, E_Rm_ann, cfg['rf_anual'], market_premium, cfg, preset,
             mode_key=mode, show_labels=True)

    return df_points

# ==========================
# 5) EJEMPLOS DE EJECUCIÓN
# ==========================
# --- ANUAL (2020–2024)
df_anual = run_sml(mode='anual', cfg=CONFIG)

# --- MENSUAL (mismo rango; β mensual, anualiza E[Ri])
#     Si prefieres usar fin de mes en lugar de media mensual:
#     FREQ_PRESETS['mensual']['resample_agg'] = 'last'
df_mensual = run_sml(mode='mensual', cfg=CONFIG)

# --- (Opcional) Exportar resultados
# df_anual.to_csv("SML_anual_df_points.csv", index=False)
# df_mensual.to_csv("SML_mensual_df_points.csv", index=False)



SML — ANUAL  |  Periodo: 2020-01-01 → 2024-12-31
[AVISO] SAN.MC: observaciones insuficientes (n=0). Se omite.
[AVISO] IAG.MC: observaciones insuficientes (n=0). Se omite.
[AVISO] ENG.MC: observaciones insuficientes (n=0). Se omite.



'Y' is deprecated and will be removed in a future version, please use 'YE' instead.




SML — MENSUAL  |  Periodo: 2020-01-01 → 2024-12-31
[AVISO] SAN.MC: observaciones insuficientes (n=0). Se omite.
[AVISO] IAG.MC: observaciones insuficientes (n=0). Se omite.
[AVISO] ENG.MC: observaciones insuficientes (n=0). Se omite.
