## Paquetes Necesarios

In [1]:
from __future__ import annotations

import glob
import os
from typing import Dict, Tuple, List, Optional

import numpy as np
import pandas as pd
from scipy.stats import norm, kstest, kendalltau, t as tdist

from statsmodels.distributions.copula.api import (
    GumbelCopula,
    StudentTCopula,
)

df = pd.read_csv('../data/clean/datos_limpios_log.csv')

## Cargar modelos marginales

In [None]:
def cargar_cola(name: str, carpeta: str) -> Dict:
    pattern = os.path.join(carpeta, f"{name}_tail_*.csv")
    files = glob.glob(pattern)
    if not files:
        raise FileNotFoundError(f"No se encontró tail para {name} en {carpeta}")

    ruta = files[0]
    dfp = pd.read_csv(ruta)

    if dfp.empty:
        raise ValueError(f"CSV de tail vacío: {ruta}")

    model = str(dfp.loc[0, "model_name"]).strip()
    u = float(dfp.loc[0, "u_opt"])
    p_u = float(dfp.loc[0, "p_u"])

    param_cols = [c for c in dfp.columns if "param" in c.lower()]
    param_cols = sorted(param_cols,
                        key=lambda s: int("".join(filter(str.isdigit, s)) or 0))
    params = tuple(float(dfp.loc[0, c]) for c in param_cols)

    return {"model": model, "u": u, "p_u": p_u, "params": params}

## Obtener CDF

In [3]:
def F_exc_tail(y: float, model: str, params: Tuple[float, ...]) -> float:

    if y <= 0:
        return 0.0

    m = model.lower()

    if m == "gpd":
        sigma, xi = params
        if abs(xi) < 1e-12:
            return 1.0 - np.exp(-y / sigma)
        base = 1.0 + xi * y / sigma
        if base <= 0:
            return 1.0
        return 1.0 - base ** (-1.0 / xi)

    if m == "pareto":
        xm, k = params
        return 1.0 - (xm / (xm + y)) ** k

    if m == "burr":
        c, k, lam = params
        return 1.0 - (1.0 + (y / lam) ** c) ** (-k)

    if m.startswith("ln") or m.startswith("lognormal"):
        mu, sig = params
        return float(norm.cdf((np.log(y) - mu) / sig))

    raise ValueError(f"Modelo de cola no implementado en F_exc_tail: {model}")


def F_hybrid(x: float,
             body_sample: np.ndarray,
             tail: Dict,
             a: float = 1.0,
             b: float = 2.0) -> float:

    u = tail["u"]
    p_u = tail["p_u"]

    if x <= u:
        sample = np.asarray(body_sample)
        sample = sample[~np.isnan(sample)]
        if sample.size == 0:
            return np.nan
        s = np.sort(sample)
        r = np.searchsorted(s, x, side="right")
        return (r + a) / (len(s) + b)

    y = x - u
    Fexc = F_exc_tail(y, tail["model"], tail["params"])
    return 1.0 - p_u * (1.0 - Fexc)

## Pasar a Unif con prueba KS

In [4]:
def ks_test_uniform(U: np.ndarray) -> Tuple[float, float]:

    stat, pval = kstest(U, "uniform")
    return float(stat), float(pval)

In [5]:
def U_variable(series: pd.Series,
               name: str,
               carpeta_tail: str,
               clip_eps: float = 1e-6) -> Tuple[np.ndarray, np.ndarray, Dict, float, float]:

    tail = cargar_cola(name, carpeta_tail)

    vals = series.to_numpy(dtype=float)
    body_sample = vals[vals <= tail["u"]]

    U = np.array(
        [F_hybrid(float(x), body_sample, tail) for x in vals],
        dtype=float
    )
    U = np.clip(U, clip_eps, 1.0 - clip_eps)

    ks_stat, ks_p = ks_test_uniform(U)
    return U, body_sample, tail, ks_stat, ks_p

## Ajuste de Cópulas

In [6]:
def fit_copulas(U1: np.ndarray,
                U2: np.ndarray) -> Dict:

    U1 = np.asarray(U1, float)
    U2 = np.asarray(U2, float)
    mask = np.isfinite(U1) & np.isfinite(U2)
    data = np.column_stack([U1[mask], U2[mask]])

    if data.shape[0] < 5:
        raise ValueError("Muy pocos datos para ajustar cópulas")

    # ============================
    # 1) Kendall tau (una sola vez)
    # ============================
    tau, _ = kendalltau(data[:, 0], data[:, 1])
    if not np.isfinite(tau):
        tau = 0.0

    # ============================
    # 2) Gumbel
    # ============================
    tau_g = np.clip(tau, 1e-6, 0.999)
    theta = 1.0 / (1.0 - tau_g)  # θ de Gumbel
    gcop = GumbelCopula(theta=theta)

    ll_g = float(np.sum(gcop.logpdf(data)))
    aic_g = -2.0 * ll_g + 2.0 * 1  # k=1

    # ============================
    # 3) t-Student
    # ============================
    tcop = StudentTCopula()

    # Estimar parámetros internos (incluye correlación y nu)
    params = tcop.fit(data)   # <-- ESTA ES LA CLAVE

    # loglik
    ll_t = float(np.sum(tcop.logpdf(data)))  # sin argumentos nombrados

    # parámetros: correlación y nu están dentro de params
    rho_t = params["corr"][0, 1]
    nu_t  = params["df"]

    R = params["corr"]
    nu = float(params["df"])

    aic_t = -2.0 * ll_t + 2.0 * 2  # parámetros = rho + nu


    # ============================
    # 4) Modelo ganador
    # ============================
    winner = "t-student" if aic_t < aic_g else "gumbel"

    return {
        "tau": float(tau),
        "gumbel": {
            "theta": float(theta),
            "ll": ll_g,
            "aic": aic_g
        },
        "t": {
            "R": R,
            "nu": float(nu),
            "ll": ll_t,
            "aic": aic_t
        },
        "winner": winner
    }


## Simular la Cola

In [7]:
def simulate_copula(fit: Dict,
                    n_sims: int = 50_000,
                    random_state: Optional[int] = None) -> np.ndarray:

    rng = np.random.default_rng(random_state)

    if fit["winner"] == "gumbel":
        theta = fit["gumbel"]["theta"]
        cop = GumbelCopula(theta=theta)
        if hasattr(cop, "random"):
            U = cop.random(n_sims, random_state=random_state)
        else:
            U = cop.rvs(n_sims, random_state=random_state)
        U = np.asarray(U, float)

    else:
        R = np.asarray(fit["t"]["R"], float)
        nu = float(fit["t"]["nu"])
        k = R.shape[0]

        L = np.linalg.cholesky(R + 1e-12 * np.eye(k))

        g = rng.standard_normal(size=(n_sims, k))
        z = g @ L.T

        w = rng.chisquare(df=nu, size=n_sims) / nu
        t_samples = z / np.sqrt(w[:, None])

        U = tdist.cdf(t_samples, df=nu)

    return np.clip(U, 1e-12, 1.0 - 1e-12)

## Cuantil

In [None]:
def Q_hybrid(alpha: float,
             body_sample: np.ndarray,
             tail: Dict) -> float:

    alpha = float(np.clip(alpha, 1e-12, 1.0 - 1e-12))
    u = tail["u"]
    p_u = tail["p_u"]

    sample = np.asarray(body_sample)
    sample = sample[~np.isnan(sample)]

    if alpha <= (1.0 - p_u) or sample.size == 0:
        return float(np.quantile(sample, alpha))

    alpha_exc = (alpha - (1.0 - p_u)) / p_u
    alpha_exc = float(np.clip(alpha_exc, 1e-12, 1.0 - 1e-12))

    m = tail["model"].lower()
    params = tail["params"]

    if m == "gpd":
        sigma, xi = params
        if abs(xi) < 1e-12:
            q_exc = -sigma * np.log1p(-alpha_exc)
        else:
            q_exc = (sigma / xi) * (np.power(1.0 - alpha_exc, -xi) - 1.0)

    elif m == "pareto":
        xm, k = params
        q_exc = xm * (np.power(1.0 - alpha_exc, -1.0 / k) - 1.0)

    elif m == "burr":
        c, k, lam = params
        inner = np.power(1.0 - alpha_exc, -1.0 / k) - 1.0
        inner = max(inner, 1e-18)
        q_exc = lam * (inner ** (1.0 / c))

    elif m.startswith("ln") or m.startswith("lognormal"):
        mu, sig = params
        z = norm.ppf(alpha_exc)
        q_exc = np.exp(mu + sig * z)

    else:
        raise ValueError(f"Modelo no implementado en Q_hybrid: {tail['model']}")

    return float(u + q_exc)

## VaR y CVaR para parejas

In [9]:
def simulate_joint_losses(U_sim: np.ndarray,
                          body1: np.ndarray,
                          tail1: Dict,
                          body2: np.ndarray,
                          tail2: Dict) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """
    Recibe:
        U_sim: matriz (N x 2) de uniformes conjuntas (salida de simulate_copula)
        body1, tail1: muestra de cuerpo + cola para la variable 1
        body2, tail2: idem para la variable 2

    Devuelve:
        X1, X2, S = X1 + X2
    """
    U1 = U_sim[:, 0]
    U2 = U_sim[:, 1]

    X1 = np.array([Q_hybrid(float(a), body1, tail1) for a in U1], dtype=float)
    X2 = np.array([Q_hybrid(float(a), body2, tail2) for a in U2], dtype=float)
    S = X1 + X2
    return X1, X2, S


# ============================================================
# 8. VAR Y CVAR DE UNA SERIE
# ============================================================

def var_cvar(S: np.ndarray, alpha: float) -> Tuple[float, float]:
    """
    Calcula VaR y CVaR (TVaR) empíricos para una muestra S.

    VaR = cuantil alpha
    CVaR = media condicional S > VaR
    """
    S = np.asarray(S, float)
    S = S[np.isfinite(S)]

    if S.size == 0:
        return np.nan, np.nan

    q = float(np.quantile(S, alpha))
    tail = S[S > q]
    cvar = float(tail.mean()) if tail.size > 0 else q
    return q, cvar

## Pipe completo

In [10]:
def dependencia_y_riesgo(df: pd.DataFrame,
                         col1: str,
                         val1: str,
                         carpeta1: str,
                         col2: str,
                         val2: str,
                         carpeta2: str,
                         n_sims: int = 50_000,
                         alphas: Tuple[float, ...] = (0.95, 0.99),
                         random_state: Optional[int] = None) -> Optional[Dict]:
    """
    Pipeline completo para una pareja (val1 de col1) vs (val2 de col2):

    1) filtra df por col1 == val1 y col2 == val2
    2) obtiene U1, U2 via F_hybrid + cola EVT
    3) KS test de uniformidad para U1 y U2
    4) ajusta cópulas Gumbel y t-Student y elige la mejor
    5) simula U_sim ~ copula ganadora
    6) aplica Q_hybrid para obtener X1, X2 y S = X1+X2
    7) calcula VaR y CVaR para cada alpha en alphas

    Devuelve un diccionario con resultados o None si no hay datos suficientes.
    """
    df_pair = df[(df[col1] == val1) & (df[col2] == val2)].copy()
    if df_pair.shape[0] < 20:
        # muy pocos datos
        return None

    # Marginal 1
    U1, body1, tail1, ks1_stat, ks1_p = U_variable(
        df_pair["total"], name=str(val1), carpeta_tail=carpeta1
    )

    # Marginal 2
    U2, body2, tail2, ks2_stat, ks2_p = U_variable(
        df_pair["total"], name=str(val2), carpeta_tail=carpeta2
    )

    # Ajuste de cópulas
    fit = fit_copulas(U1, U2)

    # Simulación conjunta en espacio U
    U_sim = simulate_copula(fit, n_sims=n_sims, random_state=random_state)

    # Simulación de pérdidas
    X1, X2, S = simulate_joint_losses(U_sim, body1, tail1, body2, tail2)

    res = {
        "col1": col1,
        "val1": val1,
        "col2": col2,
        "val2": val2,
        "n_obs": int(df_pair.shape[0]),
        "ks1_stat": ks1_stat,
        "ks1_p": ks1_p,
        "ks2_stat": ks2_stat,
        "ks2_p": ks2_p,
        "tau": fit["tau"],
        "best_copula": fit["winner"],
        "gumbel_aic": fit["gumbel"]["aic"],
        "t_aic": fit["t"]["aic"],
    }

    for a in alphas:
        VaR, CVaR_ = var_cvar(S, a)
        res[f"VaR({a})"] = VaR
        res[f"CVaR({a})"] = CVaR_

    return res

In [11]:
def todas_dependencias(
    df: pd.DataFrame,
    provincias: List[str],
    categorias: List[str],
    sectores: List[str],
    carpeta_prov: str = "../res/provincia",
    carpeta_cat: str = "../res/categoria",
    carpeta_sec: str = "../res/sector",
    n_sims: int = 50_000,
    alphas: Tuple[float, ...] = (0.95, 0.99),
    random_state: Optional[int] = 42,
) -> pd.DataFrame:
    """
    Ejecuta dependencia_y_riesgo para:

        1) provincia - sector
        2) provincia - categoria
        3) categoria - sector

    y devuelve un DataFrame con todos los resultados.
    """
    resultados = []

    # 1) provincia - sector
    for p in provincias:
        for s in sectores:
            r = dependencia_y_riesgo(
                df, "provincia", p, carpeta_prov,
                "sector", s, carpeta_sec,
                n_sims=n_sims, alphas=alphas, random_state=random_state
            )
            if r is not None:
                r["tipo_par"] = "provincia-sector"
                resultados.append(r)

    # 2) provincia - categoria
    for p in provincias:
        for c in categorias:
            r = dependencia_y_riesgo(
                df, "provincia", p, carpeta_prov,
                "categoria", c, carpeta_cat,
                n_sims=n_sims, alphas=alphas, random_state=random_state
            )
            if r is not None:
                r["tipo_par"] = "provincia-categoria"
                resultados.append(r)

    # 3) categoria - sector
    for c in categorias:
        for s in sectores:
            r = dependencia_y_riesgo(
                df, "categoria", c, carpeta_cat,
                "sector", s, carpeta_sec,
                n_sims=n_sims, alphas=alphas, random_state=random_state
            )
            if r is not None:
                r["tipo_par"] = "categoria-sector"
                resultados.append(r)

    if not resultados:
        return pd.DataFrame()

    return pd.DataFrame(resultados)

# Uso

In [12]:
# Supongamos que df ya está cargado y limpio
provincias = sorted(df["provincia"].dropna().unique())
categorias = sorted(df["categoria"].dropna().unique())
sectores   = sorted(df["sector"].dropna().unique())

tabla = todas_dependencias(
    df,
    provincias=provincias,
    categorias=categorias,
    sectores=sectores,
    carpeta_prov="../res/provincias",
    carpeta_cat="../res/categorias",
    carpeta_sec="../res/sectores",
    n_sims=200_000,
    alphas=(0.95, 0.99),
    random_state=123
)

tabla.head()


  return np.power(-np.log(t), theta)
  return - theta * (-np.log(t))**(theta - 1) / t
  logpdfv = np.sum(np.log(np.abs(phi_d1(u, *args))), axis)
  d2 = (phi**(2 / th) + (th - 1) * phi**(1 / th)) / (phi**2 * th**2)
  d2 = (phi**(2 / th) + (th - 1) * phi**(1 / th)) / (phi**2 * th**2)
  d2 = (phi**(2 / th) + (th - 1) * phi**(1 / th)) / (phi**2 * th**2)
  d2 = (phi**(2 / th) + (th - 1) * phi**(1 / th)) / (phi**2 * th**2)
  logpdfv += np.log(np.abs(psi_d(psi, *args)))


AttributeError: 'StudentTCopula' object has no attribute 'fit'