# Librerías

In [37]:
import pandas as pd
import yfinance as yf
import numpy as np
import matplotlib.pyplot as plt
import cvxpy as cp
import warnings
import statsmodels.api as sm
import seaborn as sns
from scipy.optimize import fsolve
warnings.filterwarnings("ignore")

# Funciones

## Extracción de datos

In [38]:
def get_data(tickers, start_date, end_date, interval):
    data = yf.download(tickers, start=start_date, end=end_date, interval=interval)['Close'].tz_localize(None)
    if len(tickers) > 1:
        data = data[tickers]
        data = data.dropna(axis=1)
    #returns = np.log(data/data.shift(1)).dropna()
    returns = data.pct_change().dropna()
    return data, returns

def get_variables(returns, tickers):

    mu = returns.mean()
    sigma = returns.std()
    semi_ret = np.minimum(returns, 0)
    semi_sigma = semi_ret.std()
    cov = returns.cov()
    semi_cov = semi_ret.cov()

    if len(tickers) > 1:
        cov = returns.cov()
        semi_cov = semi_ret.cov()
    if len(tickers) == 1:
        cov = None
        semi_cov = None

    # Max Drawdown (MDD)
    prices = (1 + returns).cumprod() * 100 # Precios simulados
    # Calcular el Max Drawdown para cada activo
    def max_drawdown(prices):
        cumulative_max = prices.cummax()  # Máximo acumulado
        drawdown = (prices - cumulative_max) / cumulative_max  # Caída porcentual desde el pico
        max_drawdown = drawdown.min()  # Máximo drawdown (valor más bajo de drawdown)
        return max_drawdown
    max_drawdown_df = prices.apply(max_drawdown)
    max_drawdown_df = max_drawdown_df.abs()

    return mu, sigma, semi_ret, semi_sigma, cov, semi_cov, max_drawdown_df

## Métodos de optimización

In [39]:
# Sharpe Portfolio
def get_sharpe_portfolio(assets, mu, cov):

    n = len(mu)
    w = cp.Variable(n)
    obj = cp.Minimize(cp.quad_form(w, cov))

    constraints = [
        w >= 0,      
        w @ mu == 1,    
    ]
    prob = cp.Problem(obj, constraints)
    result = prob.solve()
    wpt = w.value / w.value.sum()
    pf = pd.DataFrame(wpt.round(3), index=assets, columns=["Sharpe"])
    return pf

# Sortino Portfolio
def get_sortino_portfolio(assets, mu, semi_cov):

    n = len(mu)
    w = cp.Variable(n) 
    objective = cp.Minimize(cp.quad_form(w, semi_cov))
    constraints = [w@mu==1, w>=0]
    prob = cp.Problem(objective, constraints)
    result = prob.solve()
    wpt_sc = w.value/w.value.sum()
    wpt_sc = wpt_sc.round(4)
    pf_sortino = pd.DataFrame(wpt_sc, index=assets, columns=["Sortino"])
    return pf_sortino

# Treynor Portfolio
def get_treynor_imputs(assets, returns, index_returns, mu):

    betas = []
    var_errores = []
    rcuadrados = []
    treynor_assets = []

    for asset in assets:

        retornos_activo = returns[asset]
        modelo = sm.OLS(retornos_activo, sm.add_constant(index_returns)).fit()
        beta = modelo.params.to_list()[1]
        var_error = modelo.resid.var()

        betas.append(beta)
        var_errores.append(var_error)
        rcuadrados.append(modelo.rsquared)
        treynor_assets.append(asset)

    treynor_df = pd.DataFrame({'Beta': betas, 'Var_error': var_errores, 'R2': rcuadrados, "Mu": mu, 
                    "Coef_treynor": mu/betas, "Asset": treynor_assets})
    treynor_df = treynor_df.sort_values(by="Coef_treynor", ascending=False)

    return treynor_df

def get_treynor_portfolio(assets, returns, index_returns, mu):

    treynor_df = get_treynor_imputs(assets, returns, index_returns, mu)
    sigmaindice = index_returns.std()
    ratio1 = treynor_df["Mu"] *  treynor_df["Beta"] / treynor_df["Var_error"]
    suma1 = np.cumsum(ratio1)
    ratio2 = treynor_df["Beta"]**2 / treynor_df["Var_error"]
    suma2 = np.cumsum(ratio2)
    tasac = sigmaindice**2 * suma1 / (1 + sigmaindice**2 * suma2)
    tasacorte = max(tasac)
    zi = treynor_df["Beta"] / treynor_df["Var_error"] * (treynor_df["Coef_treynor"] - tasacorte)
    zi = np.maximum(zi,0)
    wpbt = zi / zi.sum()
    pf = pd.DataFrame(wpbt.round(4), index=assets, columns=["Treynor"])
    return pf

# Minimum Variance Portfolio
def get_pmvg_portfolio(assets, mu, cov, robj="No"):
    n = len(mu)
    w = cp.Variable(n) 
    objective = cp.Minimize(cp.quad_form(w, cov))
    constraints = [cp.sum(w) == 1, w >= 0]
    if robj != "No":
        constraints.append(w @ mu == robj)
        col_name = "PMVg_obj"
    if robj == "No":
        col_name = "PMVg"
    prob = cp.Problem(objective, constraints)
    result = prob.solve()
    pf = pd.DataFrame((w.value).round(3), index=assets, columns=[col_name])
    return pf

# Var and CVaR Portfolio
def get_var_cvar_portfolio(assets, returns, mu, cov, alpha=0.05):

    n = len(mu)
    beta = 0.95
    w = cp.Variable(n)
    portfolio_return = returns.values @ w

    # VaR y CVaR
    VaR = cp.Variable()  # Se formula para que el optimizador calcule el VaR
    cvar = VaR + (1 / (1 - beta)) * cp.sum(cp.pos(-portfolio_return - VaR))/n

    # Problema de optimización
    objective = cp.Minimize(cvar)  # Minimizar CVaR
    constraints = [cp.sum(w) == 1, w >= 0]
    problem = cp.Problem(objective, constraints)

    problem.solve() # GLPK method

    pf = pd.DataFrame((w.value).round(3), index=assets, columns=["VaR_CVaR"])
    return pf

# Risk Parity Portfolio
def get_risk_parity_portfolio(assets, mu, cov):

    n = len(mu)
    b = np.repeat(1/n,n)
    # cov @ x - b/x = 0
    func = lambda x: cov @ x - b/x

    root = fsolve(func, b)
    w=root/root.sum()   # zi

    pf = pd.DataFrame((w).round(3), index=assets, columns=["Risk_Parity"])
    return pf

# Portafolio con distribución de pesos igual en caso de error
def get_eq_portfolio(assets):
    n = len(assets)
    w = np.repeat(1/n, n)
    pf_eq = pd.DataFrame({"EQ": w}, index=assets)
    return pf_eq


## Métricas de portafolios

In [40]:
def get_portfolios_metrics(data, portfolios, returns, index_returns, h=0, esg_info=None):
    df = pd.DataFrame(columns=["Portfolio", "Mu", "Sigma", "Sharpe", "Treynor", "Sortino", "Omega", 
                                "Mu_anual", "Sigma_anual", "Beta","VaR", "CVaR", "MaxDrawdown", "Profit_%"])
    
    # Agregar constante para la regresión (intercepto)
    index_returns = sm.add_constant(index_returns)
    
    for pf in portfolios:
        # Rendimientos del portafolio
        ret = returns @ portfolios[pf]
        mu = ret.mean()
        sigma = ret.std()
        sharpe = mu / sigma
        
        try:
        # Regresión lineal para beta y Treynor
            model = sm.OLS(ret, index_returns).fit()
            beta = model.params[1]  # Coeficiente de pendiente (beta del portafolio)
            treynor = mu / beta  # Ratio de Treynor
        except: 
            beta = np.nan
            treynor = np.nan
        
        # Ratio de Sortino
        sortino = mu / (ret[ret < 0].std())
        
        # Omega ratio
        gain = np.maximum(ret, h).sum()
        loss = -np.minimum(ret, h).sum()
        omega = gain / loss
        
        # Métricas anuales
        annual_mu = mu * 12
        annual_sigma = sigma * np.sqrt(12)
        
        # VaR
        var = -np.percentile(ret, 5)
        # CVaR
        cvar = -np.mean(ret[ret < np.percentile(ret, 5)])
        
        
        # Max Drawdown (MDD)
        prices = (1 + pd.DataFrame(ret)).cumprod() # Precios simulados
        # Calcular el Max Drawdown para cada activo
        def max_drawdown(prices):
            cumulative_max = prices.cummax()  # Máximo acumulado
            drawdown = (prices - cumulative_max) / cumulative_max  # Caída porcentual desde el pico
            max_drawdown = drawdown.min()  # Máximo drawdown (valor más bajo de drawdown)
            return max_drawdown

        max_drawdown_df = prices.apply(max_drawdown)
        max_drawdown_df = max_drawdown_df.abs()
        # Retorno acumulado
        profit = (1 + ret).prod() - 1   
        
        # Agregar métricas al DataFrame
        df.loc[pf] = [pf, mu*100, sigma*100, sharpe, treynor, sortino, omega, annual_mu*100, annual_sigma*100, 
                      beta, var, cvar, max_drawdown_df[0]*100, profit*100, ]
        df = df.round(3)
    return df

## Elección de métodos de optimización

In [41]:
def pf_method(method, filtered_assets, mu_opt, cov_opt, sigma_opt, opt_data, opt_index_data, index, max_drawdown_df):
    try:
        if method == "Sharpe":
            try:
                pf = get_sharpe_portfolio(filtered_assets, mu_opt, cov_opt)
            except Exception as e:
                #print(f"Error en Sharpe: {e}, usando portafolio equiponderado")
                pf = get_eq_portfolio(filtered_assets)
        elif method == "Sortino":
            try:
                # Aquí se requiere la semi-covarianza (no cov_opt directamente)
                semi_cov = cov_opt  # Ajustar si tienes semi-covarianza precomputada
                pf = get_sortino_portfolio(filtered_assets, mu_opt, semi_cov)
            except Exception as e:
                #print(f"No se pudo calcular el portafolio con Sortino: {e}, usando portafolio equiponderado")
                pf = get_eq_portfolio(filtered_assets)
        elif method == "Treynor":
            try:
                pf = get_treynor_portfolio(filtered_assets, opt_data, opt_index_data[index], mu_opt)
            except Exception as e:
                #print(f"Error en Treynor: {e}, usando portafolio equiponderado")
                pf = get_eq_portfolio(filtered_assets)

        elif method == "PMVg":
            try:
                pf = get_pmvg_portfolio(filtered_assets, mu_opt, cov_opt, robj="No")
            except Exception as e:
                #print(f"Error en PMVg: {e}, usando portafolio equiponderado")
                pf = get_eq_portfolio(filtered_assets)


        elif method == "VaR_CVaR":
            try:
                pf = get_var_cvar_portfolio(filtered_assets, opt_data, mu_opt, cov_opt)
            except Exception as e:
                #print(f"Error en VaR_CVaR: {e}, usando portafolio equiponderado")
                pf = get_eq_portfolio(filtered_assets)
        elif method == "Risk_Parity":
            try:
                pf = get_risk_parity_portfolio(filtered_assets, mu_opt, cov_opt)
            except Exception as e:
                #print(f"Error en Risk_Parity: {e}, usando portafolio equiponderado")
                pf = get_eq_portfolio(filtered_assets)
        else:
            #print(f"Método {method} no reconocido. Usando portafolio equiponderado.")
            pf = get_eq_portfolio(filtered_assets)

    except Exception as e:
        #print(f"Error inesperado en pf_method: {e}, usando portafolio equiponderado")
        pf = get_eq_portfolio(filtered_assets)

    return pf

# Método de selección de activos
def assets_selection_method(method, mu_opt, sigma_opt, semi_sigma_opt, max_drawdown_df, returns_for_model, len_test, sub_method=None):
    sharpe_metric = mu_opt / sigma_opt
    sortino_metric = mu_opt / semi_sigma_opt
    if method == "Sharpe_metric":
        filtered_assets = sharpe_metric.sort_values(ascending=False).head(20).index.tolist() # Pesos sobre todos los activos
    elif method == "Menor_rend_y_vol":
        # Activos con menor rendimiento pero con menor volatilidad
        filtr_1 = mu_opt.sort_values(ascending=False).tail(3)
        filtr_2 = sigma_opt.loc[filtr_1.index]
        filtr_2 = filtr_2.sort_values(ascending=False).tail(3) 
        filtered_assets = filtr_2.index.tolist()
    elif method == "Mayor_rend_y_vol":
        # Activos con mayor rendimiento y menor volatilidad
        filtr_1 = mu_opt.sort_values(ascending=False).head(3)
        filtr_2 = sigma_opt.loc[filtr_1.index]
        filtr_2 = filtr_2.sort_values(ascending=False).tail(3) 
        filtered_assets = filtr_2.index.tolist()
    elif method == "Sortino_metric":
        filtered_assets = sortino_metric.sort_values(ascending=False).head(3).index.tolist()
    elif method == "Max_DD":
        filtered_assets = max_drawdown_df.sort_values(ascending=False).head(3).index.tolist() # Los 5 con menos DD

    return filtered_assets

# Obtención de datos

## Clases de activos y activos

In [42]:
# Tecnología & Growth / Disruptores
tecnologia = ["AAPL", "MSFT", "GOOGL", "AMZN", "NVDA", "TSLA",
              "META", "ADBE", "INTC", "ORCL", "CSCO", "NFLX"]
# Salud / Biofarma / Seguros
salud = ["JNJ", "PFE", "MRNA", "UNH", "LLY", "MRK", "ABBV"]

# Financieros
financieros = ["JPM", "BAC", "WFC", "GS", "V", "MA", "C", "SCHW"]

# Consumo Defensivo / Cíclico Global
consumo = ["KO", "PEP", "PG", "MCD", "WMT", "COST"]

# Energía & Petróleo / Gas / Integradas
energia = ["XOM", "CVX", "BP", "EC", "COP"]

# Industriales / Capital Goods
industriales = ["GE", "MMM", "CAT", "BA", "DE", "HON", "UNP"]

# Renta fija
bonos = ["TLT", "SHY", "IEF", "IGOV", "EMB", "LQD", "HYG", "TIP", "SCHP"]

# Criptomonedas
criptomonedas = ["BTC-USD", "ETH-USD", "BNB-USD", "XRP-USD", "ADA-USD", "SOL-USD"]

# Activos alternativos (commodities y REITs incluidos en alternativos)
alternativos = ["VNQ", "UNG", "USO", "IYR", "GLD", "PLD", "O", "DLR", "AMT", "SPG"]

assets = tecnologia + salud + financieros + consumo + energia + industriales + bonos + criptomonedas + alternativos

# Materias primas
#commodities = ["GLD", "SLV", "PPLT", "PALL", "USO", "UNG", "DBA", "CORN", "SOYB", "WEAT", "DBC"]

# REITs (inmobiliario listado)
#reits = ["PLD", "O", "DLR", "AMT", "SPG"]

## Datos de precios

In [43]:
start_date = "2017-12-01"
end_date = "2025-01-01"
index = "^SPX"

data, returns = get_data(assets, start_date, end_date, "1mo")
data_tecnologia, returns_tecnologia = get_data(tecnologia, start_date, end_date, "1mo")
data_salud, returns_salud = get_data(salud, start_date, end_date, "1mo")
data_financieros, returns_financieros = get_data(financieros, start_date, end_date, "1mo")
data_consumo, returns_consumo = get_data(consumo, start_date, end_date, "1mo")
data_energia, returns_energia = get_data(energia, start_date, end_date, "1mo")
data_industriales, returns_industriales = get_data(industriales, start_date, end_date, "1mo")            
#data_reits, returns_reits = get_data(reits, start_date, end_date, "1mo")
data_bonos, returns_bonos = get_data(bonos, start_date, end_date, "1mo")
#data_commodities, returns_commodities = get_data(commodities, start_date, end_date, "1mo")
data_criptomonedas, returns_criptomonedas = get_data(criptomonedas, start_date, end_date, "1mo")
data_alternativos, returns_alternativos = get_data(alternativos, start_date, end_date, "1mo")
data_index, index_returns = get_data([index], start_date, end_date, "1mo")


assets = data.columns.tolist()
assets.sort()


[*********************100%***********************]  70 of 70 completed
[*********************100%***********************]  12 of 12 completed
[*********************100%***********************]  7 of 7 completed
[*********************100%***********************]  8 of 8 completed
[*********************100%***********************]  6 of 6 completed
[*********************100%***********************]  5 of 5 completed
[*********************100%***********************]  7 of 7 completed
[*********************100%***********************]  9 of 9 completed
[*********************100%***********************]  6 of 6 completed
[*********************100%***********************]  10 of 10 completed
[*********************100%***********************]  1 of 1 completed


# Función para división de datos

> La siguiente función permite evaluar distintos modelos de optimización de portafolios, con base en la cantidad de meses de datos para optimizar y la cantidad de meses para mantener ese portafolio, volviendo a optimizar con los datos de los últimos X meses específicados para optimización para mantener el portafolio durante los siguientes X meses, bajo el supuesto de que se cierran las posiciones cada vez que se optimiza el portafolio y se hace una nueva compra con base en la última optimización

> También hace un filtrado de activos en base a distintos métodos de selección escogidos

In [44]:
def backtest_portfolio_monthly(close_data, returns, index_returns, opt_len_months, test_len_months, method, index, method_assets):

    # Crear columnas para el año y el mes con el fin de ver los resultados al final por año y mes
    returns["YearMonth"] = returns.index.to_period('M')
    index_returns["YearMonth"] = index_returns.index.to_period('M')
    close_data["YearMonth"] = close_data.index.to_period('M')

    pf_metrics_df = pd.DataFrame()
    months = []
    years = []

    pf_returns = pd.DataFrame()

    # Variables para la iteración
    current_start_month = opt_len_months  # La primera optimización comienza después de los primeros datos
    unique_months = returns["YearMonth"].unique()

    # Iterar mientras queden suficientes meses para prueba
    while current_start_month + test_len_months <= len(unique_months):

        # Determinar el rango de meses para prueba
        test_months = unique_months[current_start_month:current_start_month + test_len_months]
        # Determinar el rango de meses para optimización (justo antes del período de prueba)
        opt_months = unique_months[current_start_month - opt_len_months:current_start_month]

        # Datos para el período de optimización
        opt_data = returns[returns["YearMonth"].isin(opt_months)].drop(columns=["YearMonth"])
        opt_index_data = index_returns[index_returns["YearMonth"].isin(opt_months)].drop(columns=["YearMonth"])
        close_opt = close_data[close_data["YearMonth"].isin(opt_months)].drop(columns=["YearMonth"])

        # Datos para el período de prueba
        test_data = returns[returns["YearMonth"].isin(test_months)].drop(columns=["YearMonth"])
        test_index_data = index_returns[index_returns["YearMonth"].isin(test_months)].drop(columns=["YearMonth"])
        close_test = close_data[close_data["YearMonth"].isin(test_months)].drop(columns=["YearMonth"])

        # Selección de activos
        mu_opt, sigma_opt, semi_ret_opt, semi_sigma_opt, cov_opt, semi_cov_opt, max_drawdown_df = get_variables(opt_data, assets)

        filtered_assets = assets_selection_method(method_assets, mu_opt, sigma_opt, semi_sigma_opt, max_drawdown_df, opt_data, len(test_data), sub_method="GradientBoosting")

        opt_data = opt_data[filtered_assets]
        test_data = test_data[filtered_assets]

        # Variables de activos seleccionados
        mu_opt, sigma_opt, semi_ret_opt, semi_sigma_opt, cov_opt, semi_cov_opt, max_drawdown_df = get_variables(opt_data, assets)
        pf = pf_method(method, filtered_assets, mu_opt, cov_opt, sigma_opt, opt_data, opt_index_data, index, max_drawdown_df)
        #print("OPT")
        #print(opt_data)
        #print("=======================================")
        #print("TEST")
        #print(test_data)
        # Crear portafolios y calcular métricas
        pf = pd.DataFrame(pf)
        pf_metrics = get_portfolios_metrics(close_test, pf, test_data, test_index_data[index], h=0)

        # Agregar el último mes del período de prueba al resultado
        #months.append(f"{test_months[0]} --- {test_months[-1]}")  
        months.append(test_months[-1])
        pf_metrics_df = pd.concat([pf_metrics_df, pf_metrics])
        years.append(test_months[-1].year)
        pf_returns = pd.concat([pf_returns, test_data @ pf.values])
        # Avanzar el inicio al final del conjunto de prueba actual
        current_start_month += test_len_months
        #plot_weights(pf)
    
    pf_metrics_df["YearMonth"] = months
    pf_metrics_df["Year"] = years

    return pf_metrics_df, pf_returns

# Backtest

In [45]:
# Diccionario para evitar código repetido
asset_groups = {
    "Tecnologia":    (data_tecnologia,    returns_tecnologia),
    "Salud":         (data_salud,         returns_salud),
    "Financieros":   (data_financieros,   returns_financieros),
    "Consumo":       (data_consumo,       returns_consumo),
    "Energia":       (data_energia,       returns_energia),
    "Industriales":  (data_industriales,  returns_industriales),
    #"REITs":         (data_reits,         returns_reits),
    "Bonos":         (data_bonos,         returns_bonos),
    #"Commodities":   (data_commodities,   returns_commodities),
    "Criptomonedas": (data_criptomonedas, returns_criptomonedas),
    "Alternativos":  (data_alternativos,  returns_alternativos),
}

methods_list = ["Sharpe", "Sortino", "Treynor", "PMVg", "VaR_CVaR", "Risk_Parity"]

all_method_metrics = []     # Para acumular métricas de TODOS los métodos
all_pf_returns_long = []    # (Opcional) Para acumular retornos en formato largo

for method in methods_list:
    per_method_metrics = []  # acumula métricas solo de este método
    
    for asset_name, (data_, returns_) in asset_groups.items():
        pf_metrics, pf_returns = backtest_portfolio_monthly(
            data_,
            returns_,
            index_returns,
            opt_len_months=12,
            test_len_months=12,
            method=method,
            index=index,
            method_assets="Sharpe_metric"
        )
        
        # Añade etiquetas para identificar de dónde viene cada fila
        pf_metrics = pf_metrics.copy()
        pf_metrics["Asset_Class"] = asset_name
        pf_metrics["Method"] = method
        per_method_metrics.append(pf_metrics)
    
    # concatena resultados de todas las clases de activos para este método
    method_df = pd.concat(per_method_metrics, ignore_index=True)
    all_method_metrics.append(method_df)

# CONCAT FINAL: todos los métodos, todas las clases
final_df = pd.concat(all_method_metrics, ignore_index=True)


In [46]:
final_df.to_csv("./portfolios_metrics.csv", index=False)
final_df.to_excel("./portfolios_metrics.xlsx", index=False)
