In [45]:
# =========================================
#  backtest_v2.py  (usarlo dentro del nb)
# =========================================
import sys, pathlib, joblib, numpy as np, pandas as pd
from sklearn.preprocessing import StandardScaler
from src import config as cfg
from src.rebalanceo_v2 import (
    resolver_optimizacion_v2,   # <-- nuevo
    elegir_w_star_v2            # <-- nuevo
)

# ------------------------------------------------------------------------------------
# 1) PATHS din√É¬°micos seg√∫n el tipo de modelo (id√©ntico a tu c√≥digo, s√≥lo compacto)
# ------------------------------------------------------------------------------------
PROJECT_ROOT = pathlib.Path().resolve().parent
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

MODEL_TYPE = cfg.MODEL_TYPE            # "lstm", "lstm5d", "gru5d" √≥ "xgb"
WIN        = cfg.WINDOW
STEP       = cfg.REBAL_FREQ
TAU_TURN   = 0.4                       # --- NUEVO par√°metro de turnover

model, scaler_lstm5d = None, None      # ‚Üì‚Üì mismo bloque de selecci√≥n que ya usas
if MODEL_TYPE == "lstm":
    from tensorflow import keras
    model = keras.models.load_model(cfg.MODELS / cfg.LSTM_MODEL_NAME, compile=False)
    DATA_PATH = cfg.DATA / "processed" / "lstm_data.pkl"
elif MODEL_TYPE == "lstm5d":
    from tensorflow import keras
    model = keras.models.load_model(cfg.MODELS / cfg.LSTM5D_MODEL_NAME, compile=False)
    scaler_lstm5d = joblib.load(cfg.MODELS / "scaler_X_lstm5d.pkl")
    DATA_PATH = cfg.DATA / "processed" / "lstm5d_data.pkl"
elif MODEL_TYPE == "gru5d":
    from tensorflow import keras
    model = keras.models.load_model(cfg.MODELS / cfg.GRU5D_MODEL_NAME, compile=False)
    DATA_PATH = cfg.DATA / "processed" / "gru5d_data.pkl"
elif MODEL_TYPE == "xgb":
    model = joblib.load(cfg.MODELS / cfg.XGB_MODEL_NAME)
    DATA_PATH = cfg.DATA / "processed" / "xgb_data.pkl"
else:
    raise ValueError(f"Modelo '{MODEL_TYPE}' no soportado")

# ------------------------------------------------------------------------------------
# 2) Datos de mercado y features (id√©nticos a los tuyos)
# ------------------------------------------------------------------------------------
PRICES = pd.read_parquet(cfg.DATA / "raw" / "prices.parquet").sort_index()
lstm_data = joblib.load(DATA_PATH)
tickers   = lstm_data["tickers"]               # ‚Üê 38 tickers alineados
PRICES    = PRICES[tickers]

RET = np.log(PRICES / PRICES.shift(1)).dropna()

ret5   = RET.rolling(5).sum()
vol5   = RET.rolling(5).std()
mom    = (ret5 / (vol5 + 1e-6)).shift(1)

FEAT = pd.concat([RET.shift(1), mom], axis=1).dropna()

print("üü¢ Features:", FEAT.shape, "|  Window:", WIN, "| Paso rebalanceo:", STEP)

üü¢ Features: (2699, 80) |  Window: 60 | Paso rebalanceo: 10


In [47]:
# ------------------------------------------------------------------------------------
# 3) Funci√≥n por fecha  (solo cambia la llamada al optimizador v2)
# ------------------------------------------------------------------------------------
def rebalancear_en_fecha_v2(fecha, w_prev):
    idx = FEAT.index.get_loc(fecha)
    ventana_feat = FEAT.iloc[idx-WIN:idx]        # 60 d√≠as de features
    ventana_ret  = RET.iloc[idx-WIN:idx]         # mismo rango para Œ£

    # --------- ¬µ_hat seg√∫n el modelo -------------------------------------------------
    if MODEL_TYPE == "lstm":
        X_in = StandardScaler().fit_transform(ventana_feat.values)[None, ...]
        r_hat = model.predict(X_in, verbose=0)[0]

    elif MODEL_TYPE == "lstm5d":
        X_in = scaler_lstm5d.transform(ventana_feat.values).reshape(1, WIN, -1)
        r_hat = model.predict(X_in, verbose=0)[0]

    elif MODEL_TYPE == "gru5d":
        # reconstruimos feature-set id√©ntico al de entrenamiento GRU
        ret5_ = RET.rolling(5).sum()
        vol5_ = RET.rolling(5).std()
        mom_  = (ret5_ / (vol5_ + 1e-6)).shift(1)
        v_mom = mom_.loc[ventana_ret.index]
        input_block = pd.concat([ventana_ret, v_mom], axis=1)
        X_in = StandardScaler().fit_transform(input_block.values).reshape(1, WIN, -1)
        r_hat = model.predict(X_in, verbose=0)[0]

    elif MODEL_TYPE == "xgb":
        X_in = StandardScaler().fit_transform(ventana_feat.values)
        r_hat = np.array([model[i].predict(X_in[-1].reshape(1, -1))[0]
                          for i in range(X_in.shape[1])])
    else:
        raise ValueError("Modelo no reconocido")

    # --------- matriz de covarianza ---------------------------------------------------
    Sigma = ventana_ret.cov().values + 1e-8*np.eye(len(tickers))

    # --------- optimizaci√≥n evolutiva v2 ----------------------------------------------
    res    = resolver_optimizacion_v2(r_hat, Sigma, w_prev, tau=TAU_TURN)
    w_star = elegir_w_star_v2(res)

    # --------- retorno real de los pr√≥ximos 'STEP' d√≠as -------------------------------
    realized = RET.iloc[idx:idx+STEP].values @ w_star
    return w_star, realized.sum()



In [49]:
# ------------------------------------------------------------------------------------
# 4) Bucle de back-test
# ------------------------------------------------------------------------------------
results   = []
dates     = FEAT.loc[cfg.START_BACKTEST:].index
w_prev    = np.full(len(tickers), 1/len(tickers))      # cartera 1/N inicial

for i in range(WIN, len(dates)-STEP, STEP):
    fecha = dates[i]
    try:
        w_star, ret = rebalancear_en_fecha_v2(fecha, w_prev)
        results.append({"fecha": fecha, "ret": ret, "w": w_star})
        w_prev = w_star
        print(f"‚úÖ {fecha.date()} | Ret {ret:+6.2%}")
    except Exception as e:
        print(f"‚ùå {fecha.date()} | {e}")



‚ùå 2019-04-24 | 'NoneType' object is not subscriptable
‚ùå 2019-05-10 | 'NoneType' object is not subscriptable
‚ùå 2019-05-30 | 'NoneType' object is not subscriptable


KeyboardInterrupt: 

In [None]:
# ------------------------------------------------------------------------------------
# 5) Guardar DataFrame
# ------------------------------------------------------------------------------------
res_df = (pd.DataFrame(results)
            .set_index("fecha")
            .sort_index())

out_file = cfg.RESULT / f"backtest_{MODEL_TYPE}_v2.pkl"
out_file.parent.mkdir(parents=True, exist_ok=True)
joblib.dump(res_df, out_file)
print("üíæ Back-test v2 guardado en", out_file)