In [1]:
import pandas as pd
import random
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, balanced_accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
import optuna   
import json
import os
from optuna.visualization import plot_optimization_history

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
SEED = 1234
 # Se puede cambiar a "precision" o "recall" o "f1-score"
SCORE = "f1-score"

In [3]:
def set_seeds(seed):
    random.seed(seed)
    np.random.seed(seed)

Score Trend Changes Score

In [4]:
def get_trend_changes_report_dict(y_test: np.array, y_pred: np.array) -> float:
    """
    Calculate the trend changes score based on the test and predicted values.
    
    Args:
        y_test (np.array): True labels.
        y_pred (np.array): Predicted labels.
        
    Returns:
        float: The trend changes score.
    """
    y_df = pd.DataFrame([y_test, y_pred]).T
    y_df.columns = ["y_test", "y_pred"]
    y_df["y_test_shifted"] = y_df["y_test"].shift(-1)
    y_df["is_changed_trend_test"] = y_df["y_test"] != y_df["y_test_shifted"]
    y_df["y_predict_shifted"] = y_df["y_pred"].shift(-1)
    y_df["is_changed_trend_predict"] = y_df["y_pred"] != y_df["y_predict_shifted"]
    return classification_report(
        y_df["is_changed_trend_test"][:-1], 
        y_df["is_changed_trend_predict"][:-1], 
        digits=4,
        output_dict=True,
        zero_division=0
    )


def trend_changes_score(y_test: np.array, y_pred: np.array) -> float:
    """
    Calculate the trend changes score based on the test and predicted values.
    
    Args:
        y_test (np.array): True labels.
        y_pred (np.array): Predicted labels.
        
    Returns:
        float: The trend changes score.
    """
    y_df = pd.DataFrame([y_test, y_pred]).T
    y_df.columns = ["y_test", "y_pred"]
    y_df["y_test_shifted"] = y_df["y_test"].shift(-1)
    y_df["is_changed_trend_test"] = y_df["y_test"] != y_df["y_test_shifted"]
    y_df["y_predict_shifted"] = y_df["y_pred"].shift(-1)
    y_df["is_changed_trend_predict"] = y_df["y_pred"] != y_df["y_predict_shifted"]
    return classification_report(y_df["is_changed_trend_test"][:-1], y_df["is_changed_trend_predict"][:-1], digits=4)

def trend_changes_true(y_test: np.array, y_pred: np.array) -> float:
    """
    Calculate the trend changes score based on the test and predicted values.
    
    Args:
        y_test (np.array): True labels.
        y_pred (np.array): Predicted labels.
        
    Returns:
        float: The trend changes score.
    """
    y_df = pd.DataFrame([y_test, y_pred]).T
    y_df.columns = ["y_test", "y_pred"]
    y_df["y_test_shifted"] = y_df["y_test"].shift(-1)
    y_df["is_changed_trend_test"] = y_df["y_test"] != y_df["y_test_shifted"]
    y_df["y_predict_shifted"] = y_df["y_pred"].shift(-1)
    y_df["is_changed_trend_predict"] = y_df["y_pred"] != y_df["y_predict_shifted"]
    report = classification_report(
        y_df["is_changed_trend_test"][:-1],
        y_df["is_changed_trend_predict"][:-1],
        output_dict=True,
        zero_division=0
    )
    return report["True"][SCORE]

Carga de datos

In [5]:
# Cargar datos
train = pd.read_csv("../../../data/post_cleaning/training_set.csv", parse_dates=["date"])
val = pd.read_csv("../../../data/post_cleaning/validation_set.csv", parse_dates=["date"])
test_set = pd.read_csv("../../../data/post_cleaning/test_set.csv", parse_dates=['date'])
X_train = train.drop(columns=["date", "target_trend"]).values
y_train = train["target_trend"].values
X_val = val.drop(columns=["date", "target_trend"]).values
y_val = val["target_trend"].values
X_test = test_set.drop(columns=['target_trend','date']).values
y_test = test_set['target_trend'].values

IMPORTANTE: Hay que sumar a la columna de prediccion porque -1 no funciona en funcion de LOSS

In [6]:
y_train += 1
y_val += 1

Integracion de metrica trend_changes_score en la funcion objetivo Optuna

In [7]:
def objective(trial):
    set_seeds(SEED)  # Fijar semilla antes de cada trial
    penalty = trial.suggest_categorical("penalty", ["l1", "l2", "elasticnet"])
    C = trial.suggest_float("C", 1e-3, 50, log=True)
    l1_ratio = None
    if penalty == "elasticnet":
        l1_ratio = trial.suggest_float("l1_ratio", 0.1, 0.9)

    model = Pipeline([
        ("scaler", StandardScaler()),
        ("clf", LogisticRegression(
            multi_class="multinomial",
            solver="saga",
            penalty=penalty,
            C=C,
            l1_ratio=l1_ratio,
            max_iter=800,
            random_state=SEED
        ))
    ])

    model.fit(X_train, y_train)
    y_val_pred = model.predict(X_val)
    score = trend_changes_true(y_val, y_val_pred)  
    return score

Ejecucion de estudio con Optuna

In [8]:
study = optuna.create_study(
    direction="maximize",
    sampler=optuna.samplers.TPESampler(seed=SEED)
)
study.optimize(objective, n_trials=100)

[I 2025-08-24 15:43:28,830] A new study created in memory with name: no-name-7eaa8c53-21b2-4d41-a62b-c17258b59178


[I 2025-08-24 15:43:30,315] Trial 0 finished with value: 0.36619718309859156 and parameters: {'penalty': 'l2', 'C': 4.9020352232191025}. Best is trial 0 with value: 0.36619718309859156.
[I 2025-08-24 15:43:32,149] Trial 1 finished with value: 0.3611111111111111 and parameters: {'penalty': 'l1', 'C': 5.8610214330700705}. Best is trial 0 with value: 0.36619718309859156.
[I 2025-08-24 15:43:32,567] Trial 2 finished with value: 0.29850746268656714 and parameters: {'penalty': 'l1', 'C': 0.22602738844381381}. Best is trial 0 with value: 0.36619718309859156.
[I 2025-08-24 15:43:32,893] Trial 3 finished with value: 0.3582089552238806 and parameters: {'penalty': 'l2', 'C': 0.43355484601640604}. Best is trial 0 with value: 0.36619718309859156.
[I 2025-08-24 15:43:34,826] Trial 4 finished with value: 0.3611111111111111 and parameters: {'penalty': 'elasticnet', 'C': 14.044441691902366, 'l1_ratio': 0.3919087871210979}. Best is trial 0 with value: 0.36619718309859156.
[I 2025-08-24 15:43:36,767] Tri

In [9]:
# Visualizar el historial de optimización
plot_optimization_history(study)

In [10]:
print("Mejores hiperparámetros encontrados:")
print(study.best_params)
print(f"Mejor score de {SCORE}: {study.best_value:.4f}")
best_params = study.best_params

Mejores hiperparámetros encontrados:
{'penalty': 'l2', 'C': 22.178146342804567}
Mejor score de f1-score: 0.3836


GUARDAR EN JSON

In [11]:
# Guardar los mejores hiperparámetros y su valor
history = []
if os.path.exists("best_hyperparams.json"):
    try:
        with open("best_hyperparams.json", "r") as f:
            history = json.load(f)
    except (json.JSONDecodeError, ValueError):
        history = []

# Guardar ambos en un solo diccionario
history.append({
    "params": study.best_params,
    "value": study.best_value
})

with open("best_hyperparams.json", "w") as f:
    json.dump(history, f, indent=2)

CARGAR HIPERPARAMETROS DESDE JSON

In [62]:
# # Cargar historial de hiperparámetros y valores
# with open("best_hyperparams.json", "r") as f:
#     history = json.load(f)

# # Escoger el último (más reciente)
# best_params = history[-1]["params"]
# best_value = history[-1]["value"]

# # Si quieres ver todos:
# for i, entry in enumerate(history):
#     print(f"Hiperparámetros #{i+1}: {entry['params']}, Valor: {entry['value']}")

# # Si quieres escoger uno específico (por índice):
# # best_params = history[indice_que_quieras]["params"]
# # best_value = history[indice_que_quieras]["value"]

In [14]:
# Entrenar modelo final con los mejores hiperparámetros
set_seeds(SEED)
best_params = study.best_params
final_l1_ratio = best_params["l1_ratio"] if best_params["penalty"] == "elasticnet" else None
final_model = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(
        multi_class="multinomial",
        solver="saga",
        penalty=best_params["penalty"],
        C=best_params["C"],
        l1_ratio=final_l1_ratio,
        max_iter=800,
        random_state=SEED
    ))
])

final_model.fit(X_train, y_train)
y_val_pred = final_model.predict(X_val)
print("Trend Change F1 Score:\n", trend_changes_score(y_val, y_val_pred))

Trend Change F1 Score:
               precision    recall  f1-score   support

       False     0.9213    0.9035    0.9123       259
        True     0.3590    0.4118    0.3836        34

    accuracy                         0.8464       293
   macro avg     0.6401    0.6576    0.6479       293
weighted avg     0.8560    0.8464    0.8509       293




The max_iter was reached which means the coef_ did not converge



In [15]:
# Reporte completo: precisión, recall y F1 por clase
report = classification_report(y_val, y_val_pred, digits=4)
print("LightGBM Report:\n", report)
print("Balanced accuracy:", balanced_accuracy_score(y_val, y_val_pred))

LightGBM Report:
               precision    recall  f1-score   support

           0     0.8774    0.9894    0.9300        94
           1     0.6667    0.3902    0.4923        41
           2     0.9146    0.9434    0.9288       159

    accuracy                         0.8810       294
   macro avg     0.8196    0.7743    0.7837       294
weighted avg     0.8681    0.8810    0.8683       294

Balanced accuracy: 0.7743339436605927


In [13]:
# Obtener predicciones
y_pred_test = final_model.predict(X_test)
print("Trend Change F1 Score:\n", trend_changes_score(y_test, y_pred_test))

Trend Change F1 Score:
               precision    recall  f1-score   support

       False     0.9198    0.9163    0.9181       263
        True     0.3125    0.3226    0.3175        31

    accuracy                         0.8537       294
   macro avg     0.6162    0.6195    0.6178       294
weighted avg     0.8558    0.8537    0.8548       294



In [16]:
# --- Exportar y Comparar Métricas de Modelos (Validación) ---

# 1. Definir el nombre del modelo actual y el archivo de salida
model_name = 'Regresion Logistic'
output_file = '../../../score_models/model_comparison_metrics.csv'

# 2. Calcular el reporte de clasificación estándar
# Usamos y_val_m y y_val_pred que están en la misma escala (0,1,2)
report_dict = classification_report(y_val, y_val_pred, output_dict=True, zero_division=0)
precision = report_dict['macro avg']['precision']
recall = report_dict['macro avg']['recall']
f1_score = report_dict['macro avg']['f1-score']


# 3. Calcular el reporte de cambio de tendencia
report = get_trend_changes_report_dict(y_val, y_val_pred)
trend_change_precision = report['True']['precision']
trend_change_recall = report['True']['recall']
trend_change_f1_score = report['True']['f1-score']

# 4. Organizar las nuevas métricas
new_metrics = {
    'precision': precision,
    'recall': recall,
    'f1_score': f1_score,
    'trend_change_precision': trend_change_precision,
    'trend_change_recall': trend_change_recall,
    'trend_change_f1_score': trend_change_f1_score
}

# 5. Cargar, actualizar y guardar el DataFrame de comparación
try:
    # Intentar cargar el archivo existente
    comparison_df = pd.read_csv(output_file, index_col='model')
    # Si existe, actualizar o añadir la fila para el modelo actual
    comparison_df.loc[model_name] = new_metrics
except FileNotFoundError:
    # Si no existe, crear un DataFrame nuevo directamente con los datos actuales
    comparison_df = pd.DataFrame([new_metrics], index=[model_name])

# Guardar el DataFrame actualizado en el CSV
comparison_df.to_csv(output_file, index_label='model')

In [None]:
# --- Preparación de Datos para Backtesting ---

# 2. Crear un DataFrame para el backtest
# Usamos una copia del test_set original que contiene precios y fechas
backtest_df = test_set.copy()
backtest_df['signal'] = y_pred_test

# La columna de precio que usaremos es la del día más reciente en la ventana deslizante
# Basado en tu notebook dataCleaning.ipynb, esta columna debería existir. 
# Si el nombre es diferente, ajústalo aquí.
price_col = [col for col in test_set.columns if 'open_d' in col][-1]


# --- Simulación de Backtesting ---

initial_capital = 10000.0
cash = initial_capital
position = 0.0  # Unidades de BTC
portfolio_values = []

for i, row in backtest_df.iterrows():
    price = row[price_col]
    signal = row['signal']
    
    # Lógica de trading
    if signal == 1 and cash > 0:  # Señal de compra
        position = cash / price
        cash = 0.0
    elif signal == -1 and position > 0:  # Señal de venta
        cash = position * price
        position = 0.0
    
    # Calcular el valor actual del portafolio
    current_portfolio_value = cash + position * price
    portfolio_values.append(current_portfolio_value)

backtest_df['portfolio_value'] = portfolio_values


# --- Cálculo de Métricas y Visualización ---

# 1. Calcular métricas de rendimiento
final_portfolio_value = backtest_df['portfolio_value'].iloc[-1]
total_return_pct = (final_portfolio_value - initial_capital) / initial_capital * 100
buy_and_hold_return_pct = (backtest_df[price_col].iloc[-1] - backtest_df[price_col].iloc[0]) / backtest_df[price_col].iloc[0] * 100

print("--- Resultados del Backtesting ---")
print(f"Capital Inicial: ${initial_capital:,.2f}")
print(f"Valor Final del Portafolio: ${final_portfolio_value:,.2f}")
print(f"Retorno Total de la Estrategia: {total_return_pct:.2f}%")
print(f"Retorno de Comprar y Mantener (Buy & Hold): {buy_and_hold_return_pct:.2f}%")

# 2. Visualizar los resultados
plt.figure(figsize=(14, 7))
plt.plot(backtest_df['date'], backtest_df['portfolio_value'], label='Estrategia del Modelo', color='blue')

# Normalizar el precio para comparar con "Buy & Hold"
buy_and_hold_values = (backtest_df[price_col] / backtest_df[price_col].iloc[0]) * initial_capital
plt.plot(backtest_df['date'], buy_and_hold_values, label='Comprar y Mantener (Buy & Hold)', color='orange', linestyle='--')

plt.title('Rendimiento de la Estrategia vs. Buy & Hold')
plt.xlabel('Fecha')
plt.ylabel('Valor del Portafolio ($)')
plt.legend()
plt.grid(True)
plt.show()