<img style="float: right; margin: 5px 5px 20px 20px;" src="https://upload.wikimedia.org/wikipedia/commons/d/db/Logo_ITESO_normal.jpg" width="100px" height="75px"/>

# 003 Deep learning

### Microstructures and trading systems

> **Evelin Ramirez, Pedro Gael Rayas**

## Resumen

## Introducción 

## Metodología y código

### 0. Importar librerías 

In [41]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import optuna
import ta
import logging
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, Input
from tensorflow.keras.optimizers import Adam

### 1. Carga de datos 

* Se utiliza el archivo CSV 'aapl_5m_train.csv' proporcionado por el profesor, que contiene datos históricos de las acciones de Apple registrados cada 5 minutos. Los datos abarcan el período comprendido desde el 04 de enero de 2021 a las 14:30:00 hasta el 30 de diciembre de 2022 a las 21:00:00.

In [44]:
# Carga de datos
data = pd.read_csv('aapl_5m_train.csv').dropna()
print(f"Tamaño inicial de data: {len(data)}")

Tamaño inicial de data: 39160


### 2. Preparación y Entrenamiento del Modelo LSTM

Se normalizan las variables y se generan secuencias temporales mediante una ventana de lookback para capturar la dinámica de los precios y los indicadores. Posteriormente, se construye y entrena un modelo LSTM con dos capas (50 unidades cada una), intercalando capas de dropout para reducir el sobreajuste, y se utiliza la salida para clasificar la dirección del cambio en el precio.

In [47]:
# Función para preparar datos para LSTM
def prepare_lstm_data(dataset, lookback=20):
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(dataset[["Close", "RSI", "BB", "MACD", "MACD_signal"]])
    X, y = [], []
    for i in range(lookback, len(scaled_data)):
        X.append(scaled_data[i-lookback:i])
        y.append(dataset[["BUY_SIGNAL", "SELL_SIGNAL"]])
    X = np.array(X)
    y = np.array(y)
    train_size = int(len(X) * 0.8)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    return X_train, y_train, X_test, y_test, scaler

In [49]:
# Función para crear y entrenar el modelo LSTM
def train_lstm(X_train, y_train, lookback=20):
    model = Sequential()
    model.add(Input(shape=(lookback, 5)))
    model.add(LSTM(50, return_sequences=True))
    model.add(Dropout(0.2))
    model.add(LSTM(50))
    model.add(Dropout(0.2))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=3, batch_size=32, verbose=0)
    return model

### 3. Cálculo de Métricas de Desempeño

Esta función evalúa la eficiencia de la estrategia de trading a través de los siguientes pasos:

* Se convierte la evolución del portafolio en retornos porcentuales.
* Se calcula el Sharpe Ratio dividiendo la media de los retornos (ajustada por una tasa libre de riesgo) entre su desviación estándar y anualizando el resultado.
* El Sortino Ratio se obtiene de forma similar, pero usando solo los retornos negativos para medir el riesgo a la baja.
* El Calmar Ratio se determina dividiendo el retorno anualizado entre el máximo drawdown, representando la peor caída acumulada del portafolio.
* Se calcula el porcentaje de operaciones ganadoras sobre el total de operaciones para obtener el Win/Loss Percentage.
* Esta metodología permite evaluar tanto la rentabilidad como la gestión del riesgo de la estrategia.

### 4. Preprocessing data 

In [53]:
# Prepare_lstm_data is defined
def prepare_lstm_data(dataset, lookback=20, features=None):
    if features is None:
        features = ["Close", "RSI", "BB", "MACD", "MACD_signal"]
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(dataset[features])
    X, y = [], []
    for i in range(lookback, len(scaled_data)):
        X.append(scaled_data[i - lookback:i])
        y.append(1 if scaled_data[i, 0] > scaled_data[i - 1, 0] else 0)  # Binary classification based on Close
    X, y = np.array(X), np.array(y)
    
    train_size = int(len(X) * 0.8)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    
    return X_train, y_train, X_test, y_test, scaler

# Updated train_lstm with Input layer
def train_lstm(X_train, y_train, lookback, units=50):
    model = Sequential()
    model.add(Input(shape=(X_train.shape[1], X_train.shape[2])))
    model.add(LSTM(units, return_sequences=False))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=3, batch_size=32, verbose=0)
    return model

# Updated calculate_metrics to handle NaN
def calculate_metrics(portfolio_value, wins, losses):
    returns = pd.Series(portfolio_value).pct_change().dropna()
    mean_ret = returns.mean()
    std_ret = returns.std()
    if std_ret == 0 or np.isnan(mean_ret) or np.isnan(std_ret):
        return {"Sharpe": -float('inf')}  # Penalize invalid cases
    sharpe = (mean_ret / std_ret) * np.sqrt(252)  # Annualized Sharpe Ratio
    return {"Sharpe": sharpe}


In [55]:
# Optimization for RSI

# Objective function for RSI
def objective_rsi(trial, data, verbose=False):
    rsi_window = trial.suggest_int("rsi_window", 10, 100)
    rsi_lower = trial.suggest_int("rsi_lower", 5, 35)
    rsi_upper = trial.suggest_int("rsi_upper", 65, 95)
    stop_loss = trial.suggest_float("stop_loss", 0.01, 0.2)
    take_profit = trial.suggest_float("take_profit", 0.01, 0.2)
    n_shares = trial.suggest_categorical("n_shares", [1000, 2000, 3000, 3500, 4000])
    lookback = trial.suggest_int("lookback", 10, 50)

    dataset = data.copy()
    rsi = ta.momentum.RSIIndicator(dataset.Close, window=rsi_window)
    dataset["RSI"] = rsi.rsi()

    X_train, y_train, X_test, y_test, scaler = prepare_lstm_data(dataset.dropna(), lookback=lookback, features=["Close", "RSI"])
    model = train_lstm(X_train, y_train, lookback=lookback)

    scaled_data = scaler.transform(dataset[["Close", "RSI"]])
    X_full = [scaled_data[i - lookback:i] for i in range(lookback, len(scaled_data))]
    X_full = np.array(X_full)
    lstm_preds = (model.predict(X_full, verbose=0) > 0.5).astype(int).flatten()

    dataset = dataset.iloc[lookback:].reset_index(drop=True)
    dataset["LSTM_BUY"] = pd.Series(lstm_preds) == 1
    dataset["LSTM_SELL"] = pd.Series(lstm_preds) == 0

    dataset["RSI_BUY"] = dataset["RSI"] < rsi_lower
    dataset["RSI_SELL"] = dataset["RSI"] > rsi_upper

    dataset["BUY_SIGNAL"] = dataset["RSI_BUY"]
    dataset["SELL_SIGNAL"] = dataset["RSI_SELL"]

    dataset = dataset.dropna()
    capital = 1000000
    com = 0.5 / 100
    portfolio_value = [capital]
    active_long_pos = None
    active_short_pos = None
    wins = 0
    losses = 0

    for i, row in dataset.iterrows():
        if active_long_pos:
            if row.Close < active_long_pos["stop_loss"]:
                pnl = row.Close * n_shares * (1 - com) - active_long_pos["cost"]
                capital += active_long_pos["cost"] + pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_long_pos = None
            elif row.Close > active_long_pos["take_profit"]:
                pnl = row.Close * n_shares * (1 - com) - active_long_pos["cost"]
                capital += active_long_pos["cost"] + pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_long_pos = None

        if active_short_pos:
            if row.Close > active_short_pos["stop_loss"]:
                pnl = active_short_pos["revenue"] - row.Close * n_shares * (1 + com)
                capital += pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_short_pos = None
            elif row.Close < active_short_pos["take_profit"]:
                pnl = active_short_pos["revenue"] - row.Close * n_shares * (1 + com)
                capital += pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_short_pos = None

        if row["BUY_SIGNAL"] and active_long_pos is None and active_short_pos is None:
            cost = row.Close * n_shares * (1 + com)
            if capital > cost:
                capital -= cost
                active_long_pos = {
                    "datetime": row.Datetime,
                    "cost": cost,
                    "take_profit": row.Close * (1 + take_profit),
                    "stop_loss": row.Close * (1 - stop_loss)
                }

        if row["SELL_SIGNAL"] and active_short_pos is None and active_long_pos is None:
            revenue = row.Close * n_shares * (1 - com)
            capital += revenue
            active_short_pos = {
                "datetime": row.Datetime,
                "revenue": revenue,
                "take_profit": row.Close * (1 - take_profit),
                "stop_loss": row.Close * (1 + stop_loss)
            }

        long_value = row.Close * n_shares if active_long_pos else 0
        short_value = (active_short_pos["revenue"] - row.Close * n_shares) if active_short_pos else 0
        portfolio_value.append(capital + long_value + short_value)

    if len(portfolio_value) <= 1:
        return -float('inf')
    metrics = calculate_metrics(portfolio_value, wins, losses)
    return metrics["Sharpe"]


In [None]:
# Running the optimization
study_rsi = optuna.create_study(direction="maximize")
study_rsi.optimize(lambda trial: objective_rsi(trial, data), n_trials=50)
print("Mejores parámetros RSI:", study_rsi.best_params)

[I 2025-03-27 19:28:44,457] A new study created in memory with name: no-name-163c3fec-4f61-4981-aa8a-78df37e212ac
[I 2025-03-27 19:28:50,866] Trial 0 finished with value: 0.1672539133304027 and parameters: {'rsi_window': 76, 'rsi_lower': 21, 'rsi_upper': 67, 'stop_loss': 0.13915718780040712, 'take_profit': 0.1639530694448835, 'n_shares': 1000, 'lookback': 10}. Best is trial 0 with value: 0.1672539133304027.
[I 2025-03-27 19:29:07,735] Trial 1 finished with value: -inf and parameters: {'rsi_window': 87, 'rsi_lower': 23, 'rsi_upper': 82, 'stop_loss': 0.07332686369830341, 'take_profit': 0.1497731486192994, 'n_shares': 2000, 'lookback': 39}. Best is trial 0 with value: 0.1672539133304027.


In [59]:
# Prepare_lstm_data is defined
#Optimization for Bollinger Bands
def prepare_lstm_data(dataset, lookback=20, features=None):
    if features is None:
        features = ["Close", "RSI", "BB", "MACD", "MACD_signal"]
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(dataset[features])
    X, y = [], []
    for i in range(lookback, len(scaled_data)):
        X.append(scaled_data[i - lookback:i])
        y.append(1 if scaled_data[i, 0] > scaled_data[i - 1, 0] else 0)  # Binary classification based on Close
    X, y = np.array(X), np.array(y)
    
    train_size = int(len(X) * 0.8)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    
    return X_train, y_train, X_test, y_test, scaler

# Updated train_lstm with Input layer
def train_lstm(X_train, y_train, lookback, units=50):
    model = Sequential()
    model.add(Input(shape=(X_train.shape[1], X_train.shape[2])))
    model.add(LSTM(units, return_sequences=False))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=3, batch_size=32, verbose=0)
    return model

# Updated calculate_metrics to handle NaN
def calculate_metrics(portfolio_value, wins, losses):
    returns = pd.Series(portfolio_value).pct_change().dropna()
    mean_ret = returns.mean()
    std_ret = returns.std()
    if std_ret == 0 or np.isnan(mean_ret) or np.isnan(std_ret):
        return {"Sharpe": -float('inf')}  # Penalize invalid cases
    sharpe = (mean_ret / std_ret) * np.sqrt(252)  # Annualized Sharpe Ratio
    return {"Sharpe": sharpe}

In [61]:
# Objective function for Bollinger Bands
def objective_bb(trial, data, verbose=False):
    # Hiperparámetros específicos de BB
    bb_window = trial.suggest_int("bb_window", 10, 30)
    bb_window_dev = trial.suggest_float("bb_window_dev", 1.5, 3.0)
    
    # Hiperparámetros generales de trading
    stop_loss = trial.suggest_float("stop_loss", 0.01, 0.2)
    take_profit = trial.suggest_float("take_profit", 0.01, 0.2)
    n_shares = trial.suggest_categorical("n_shares", [1000, 2000, 3000, 3500, 4000])
    lookback = trial.suggest_int("lookback", 10, 50)

    dataset = data.copy()
    bb = ta.volatility.BollingerBands(dataset.Close, window=bb_window, window_dev=bb_window_dev)
    dataset["BB"] = bb.bollinger_mavg()
    dataset["BB_lower"] = bb.bollinger_lband()
    dataset["BB_upper"] = bb.bollinger_hband()

    X_train, y_train, X_test, y_test, scaler = prepare_lstm_data(dataset.dropna(), lookback=lookback, features=["Close", "BB"])
    model = train_lstm(X_train, y_train, lookback=lookback)

    scaled_data = scaler.transform(dataset[["Close", "BB"]])
    X_full = [scaled_data[i - lookback:i] for i in range(lookback, len(scaled_data))]
    X_full = np.array(X_full)
    lstm_preds = (model.predict(X_full, verbose=0) > 0.5).astype(int).flatten()

    dataset = dataset.iloc[lookback:].reset_index(drop=True)
    dataset["LSTM_BUY"] = pd.Series(lstm_preds) == 1
    dataset["LSTM_SELL"] = pd.Series(lstm_preds) == 0

    dataset["BB_BUY"] = dataset.Close < dataset["BB_lower"]
    dataset["BB_SELL"] = dataset.Close > dataset["BB_upper"]

    dataset["BUY_SIGNAL"] = dataset["BB_BUY"]
    dataset["SELL_SIGNAL"] = dataset["BB_SELL"]

    dataset = dataset.dropna()
    capital = 1000000
    com = 0.5 / 100
    portfolio_value = [capital]
    active_long_pos = None
    active_short_pos = None
    wins = 0
    losses = 0

    for i, row in dataset.iterrows():
        if active_long_pos:
            if row.Close < active_long_pos["stop_loss"]:
                pnl = row.Close * n_shares * (1 - com) - active_long_pos["cost"]
                capital += active_long_pos["cost"] + pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_long_pos = None
            elif row.Close > active_long_pos["take_profit"]:
                pnl = row.Close * n_shares * (1 - com) - active_long_pos["cost"]
                capital += active_long_pos["cost"] + pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_long_pos = None

        if active_short_pos:
            if row.Close > active_short_pos["stop_loss"]:
                pnl = active_short_pos["revenue"] - row.Close * n_shares * (1 + com)
                capital += pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_short_pos = None
            elif row.Close < active_short_pos["take_profit"]:
                pnl = active_short_pos["revenue"] - row.Close * n_shares * (1 + com)
                capital += pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_short_pos = None

        if row["BUY_SIGNAL"] and active_long_pos is None and active_short_pos is None:
            cost = row.Close * n_shares * (1 + com)
            if capital > cost:
                capital -= cost
                active_long_pos = {
                    "datetime": row.Datetime,
                    "cost": cost,
                    "take_profit": row.Close * (1 + take_profit),
                    "stop_loss": row.Close * (1 - stop_loss)
                }

        if row["SELL_SIGNAL"] and active_short_pos is None and active_long_pos is None:
            revenue = row.Close * n_shares * (1 - com)
            capital += revenue
            active_short_pos = {
                "datetime": row.Datetime,
                "revenue": revenue,
                "take_profit": row.Close * (1 - take_profit),
                "stop_loss": row.Close * (1 + stop_loss)
            }

        long_value = row.Close * n_shares if active_long_pos else 0
        short_value = (active_short_pos["revenue"] - row.Close * n_shares) if active_short_pos else 0
        portfolio_value.append(capital + long_value + short_value)

    if len(portfolio_value) <= 1:
        return -float('inf')
    metrics = calculate_metrics(portfolio_value, wins, losses)
    return metrics["Sharpe"]


In [63]:
# Run the optimization
study_bb = optuna.create_study(direction="maximize")
study_bb.optimize(lambda trial: objective_bb(trial, data), n_trials=50)
print("Mejores parámetros BB:", study_bb.best_params)

[I 2025-03-27 19:27:06,495] A new study created in memory with name: no-name-e1ab3475-4d02-439b-be73-5aa1fea73e05
[W 2025-03-27 19:27:09,389] Trial 0 failed with parameters: {'bb_window': 14, 'bb_window_dev': 2.38699335707287, 'stop_loss': 0.18297057346367818, 'take_profit': 0.02095814166217433, 'n_shares': 3000, 'lookback': 33} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/var/folders/46/p3w40sm501lfx91hwfw5sfmh0000gp/T/ipykernel_63405/1195347147.py", line 3, in <lambda>
    study_bb.optimize(lambda trial: objective_bb(trial, data), n_trials=50)
                                    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/46/p3w40sm501lfx91hwfw5sfmh0000gp/T/ipykernel_63405/1799593870.py", line 20, in objective_bb
    model = train_lstm(X_train, y_train, lookbac

KeyboardInterrupt: 

In [65]:
# Prepare_lstm_data is defined as before
def prepare_lstm_data(dataset, lookback=20, features=None):
    if features is None:
        features = ["Close", "RSI", "BB", "MACD", "MACD_signal"]
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(dataset[features])
    X, y = [], []
    for i in range(lookback, len(scaled_data)):
        X.append(scaled_data[i - lookback:i])
        y.append(1 if scaled_data[i, 0] > scaled_data[i - 1, 0] else 0)  # Binary classification based on Close
    X, y = np.array(X), np.array(y)
    
    train_size = int(len(X) * 0.8)
    X_train, X_test = X[:train_size], X[train_size:]
    y_train, y_test = y[:train_size], y[train_size:]
    
    return X_train, y_train, X_test, y_test, scaler

# Updated train_lstm with Input layer
def train_lstm(X_train, y_train, lookback, units=50):
    model = Sequential()
    model.add(Input(shape=(X_train.shape[1], X_train.shape[2])))
    model.add(LSTM(units, return_sequences=False))
    model.add(Dense(1, activation='sigmoid'))
    model.compile(optimizer=Adam(learning_rate=0.001), loss='binary_crossentropy', metrics=['accuracy'])
    model.fit(X_train, y_train, epochs=3, batch_size=32, verbose=0)
    return model

# Updated calculate_metrics to handle NaN
def calculate_metrics(portfolio_value, wins, losses):
    returns = pd.Series(portfolio_value).pct_change().dropna()
    mean_ret = returns.mean()
    std_ret = returns.std()
    if std_ret == 0 or np.isnan(mean_ret) or np.isnan(std_ret):
        return {"Sharpe": -float('inf')}  # Penalize invalid cases
    sharpe = (mean_ret / std_ret) * np.sqrt(252)  # Annualized Sharpe Ratio
    return {"Sharpe": sharpe}

In [67]:
# Optimization for MACD

# Objective function for MACD
def objective_macd(trial, data, verbose=False):
    # Hiperparámetros específicos de MACD
    macd_window_slow = trial.suggest_int("macd_window_slow", 20, 40)
    macd_window_fast = trial.suggest_int("macd_window_fast", 5, 20)
    macd_window_sign = trial.suggest_int("macd_window_sign", 5, 15)
    
    # Hiperparámetros generales de trading
    stop_loss = trial.suggest_float("stop_loss", 0.01, 0.2)
    take_profit = trial.suggest_float("take_profit", 0.01, 0.2)
    n_shares = trial.suggest_categorical("n_shares", [1000, 2000, 3000, 3500, 4000])
    lookback = trial.suggest_int("lookback", 10, 50)

    dataset = data.copy()
    macd = ta.trend.MACD(dataset.Close, window_slow=macd_window_slow, window_fast=macd_window_fast, window_sign=macd_window_sign)
    dataset["MACD"] = macd.macd()
    dataset["MACD_signal"] = macd.macd_signal()

    X_train, y_train, X_test, y_test, scaler = prepare_lstm_data(dataset.dropna(), lookback=lookback, features=["Close", "MACD", "MACD_signal"])
    model = train_lstm(X_train, y_train, lookback=lookback)

    scaled_data = scaler.transform(dataset[["Close", "MACD", "MACD_signal"]])
    X_full = [scaled_data[i - lookback:i] for i in range(lookback, len(scaled_data))]
    X_full = np.array(X_full)
    lstm_preds = (model.predict(X_full, verbose=0) > 0.5).astype(int).flatten()

    dataset = dataset.iloc[lookback:].reset_index(drop=True)
    dataset["LSTM_BUY"] = pd.Series(lstm_preds) == 1
    dataset["LSTM_SELL"] = pd.Series(lstm_preds) == 0

    dataset["MACD_BUY"] = dataset["MACD"] > dataset["MACD_signal"]
    dataset["MACD_SELL"] = dataset["MACD"] < dataset["MACD_signal"]

    dataset["BUY_SIGNAL"] = dataset["MACD_BUY"]
    dataset["SELL_SIGNAL"] = dataset["MACD_SELL"]

    dataset = dataset.dropna()
    capital = 1000000
    com = 0.5 / 100
    portfolio_value = [capital]
    active_long_pos = None
    active_short_pos = None
    wins = 0
    losses = 0

    for i, row in dataset.iterrows():
        if active_long_pos:
            if row.Close < active_long_pos["stop_loss"]:
                pnl = row.Close * n_shares * (1 - com) - active_long_pos["cost"]
                capital += active_long_pos["cost"] + pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_long_pos = None
            elif row.Close > active_long_pos["take_profit"]:
                pnl = row.Close * n_shares * (1 - com) - active_long_pos["cost"]
                capital += active_long_pos["cost"] + pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_long_pos = None

        if active_short_pos:
            if row.Close > active_short_pos["stop_loss"]:
                pnl = active_short_pos["revenue"] - row.Close * n_shares * (1 + com)
                capital += pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_short_pos = None
            elif row.Close < active_short_pos["take_profit"]:
                pnl = active_short_pos["revenue"] - row.Close * n_shares * (1 + com)
                capital += pnl
                if pnl > 0:
                    wins += 1
                else:
                    losses += 1
                active_short_pos = None

        if row["BUY_SIGNAL"] and active_long_pos is None and active_short_pos is None:
            cost = row.Close * n_shares * (1 + com)
            if capital > cost:
                capital -= cost
                active_long_pos = {
                    "datetime": row.Datetime,
                    "cost": cost,
                    "take_profit": row.Close * (1 + take_profit),
                    "stop_loss": row.Close * (1 - stop_loss)
                }

        if row["SELL_SIGNAL"] and active_short_pos is None and active_long_pos is None:
            revenue = row.Close * n_shares * (1 - com)
            capital += revenue
            active_short_pos = {
                "datetime": row.Datetime,
                "revenue": revenue,
                "take_profit": row.Close * (1 - take_profit),
                "stop_loss": row.Close * (1 + stop_loss)
            }

        long_value = row.Close * n_shares if active_long_pos else 0
        short_value = (active_short_pos["revenue"] - row.Close * n_shares) if active_short_pos else 0
        portfolio_value.append(capital + long_value + short_value)

    if len(portfolio_value) <= 1:
        return -float('inf')
    metrics = calculate_metrics(portfolio_value, wins, losses)
    return metrics["Sharpe"]


In [69]:
# Run the optimization
study_macd = optuna.create_study(direction="maximize")
study_macd.optimize(lambda trial: objective_macd(trial, data), n_trials=50)
print("Mejores parámetros MACD:", study_macd.best_params)

[I 2025-03-27 19:28:26,392] A new study created in memory with name: no-name-9cdc9168-c418-4100-9a16-682fc3dbfa5d
[W 2025-03-27 19:28:34,875] Trial 0 failed with parameters: {'macd_window_slow': 26, 'macd_window_fast': 5, 'macd_window_sign': 6, 'stop_loss': 0.1316624422138451, 'take_profit': 0.06642994117520674, 'n_shares': 3500, 'lookback': 35} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "/opt/anaconda3/lib/python3.12/site-packages/optuna/study/_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "/var/folders/46/p3w40sm501lfx91hwfw5sfmh0000gp/T/ipykernel_63405/431582906.py", line 3, in <lambda>
    study_macd.optimize(lambda trial: objective_macd(trial, data), n_trials=50)
                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/var/folders/46/p3w40sm501lfx91hwfw5sfmh0000gp/T/ipykernel_63405/1796038692.py", line 22, in objective_macd
    model = train_lstm

KeyboardInterrupt: 

In [None]:
def preprocess(data: pd.DataFrame, *rsi_window: int, ...) -> pd.DataFrame:
    """Addd TA columns 

    Args:
        data (pd.DataFrame): Original data from csv

    Returns:
        pd.DataFrame: New data frame with RSI, BB and MACD
    """
    dataset = data.copy()
    rsi = ta.momentum.RSIIndicator(dataset.Close, window=rsi_window)
    ...
    dataset['RSI'] = rsi.rsi()
    dataset['BB_MAVG'] = bb.bollinger_mavg()
    dataset['BB_LOWER'] = bb.bollinger_lband()
    dataset['BB_UPPER'] = bb.bollinger_hband()
    dataset['MACD'] = macd.macd()
    dataset['MACD_SIGNAL'] = macd.macd_signal()


    # Signals
    dataset['RSI_BUY'] = dataset['RSI'] < 25
    dataset['RSI_SELL'] = dataset['RSI'] > 75

    dataset['BB_BUY'] = bb.bollinger_lband_indicator().astype(bool)
    dataset['BB_SELL'] = bb.bollinger_hband_indicator().astype(bool)

    dataset['MACD_BUY'] = dataset['MACD'] > dataset['MACD_SIGNAL']
    dataset['MACD_SELL'] = dataset['MACD'] < dataset['MACD_SIGNAL']

    dataset = dataset.dropna()
    dataset.head()
    return dataset

In [30]:
# Función para calcular métricas
def calculate_metrics(portfolio_value, wins, losses):
    portfolio_series = pd.Series(portfolio_value)
    if len(portfolio_series) < 2:
        print("Error: portfolio_value tiene menos de 2 elementos")
        return {"Sharpe": 0, "Sortino": 0, "Calmar": 0, "Win_Loss_Percent": 0}
    
    returns = portfolio_series.pct_change().dropna()
    if len(returns) == 0:
        print("Error: No hay retornos válidos")
        return {"Sharpe": 0, "Sortino": 0, "Calmar": 0, "Win_Loss_Percent": 0}
    
    risk_free_rate = 0.0
    sharpe = (returns.mean() - risk_free_rate) / returns.std() * np.sqrt(252 * 12)
    downside_returns = returns[returns < 0]
    sortino = (returns.mean() - risk_free_rate) / downside_returns.std() * np.sqrt(252 * 12) if len(downside_returns) > 0 else 0
    cumulative_returns = (portfolio_series / portfolio_series.iloc[0] - 1)
    max_drawdown = abs(cumulative_returns.min()) if cumulative_returns.min() != 0 else 1e-10
    annual_return = (portfolio_series.iloc[-1] / portfolio_series.iloc[0] - 1) * (252 * 12 / len(returns))
    calmar = annual_return / max_drawdown
    total_trades = wins + losses
    win_loss = wins / total_trades * 100 if total_trades > 0 else 0
    
    return {
        "Sharpe": sharpe if not np.isnan(sharpe) else 0,
        "Sortino": sortino if not np.isnan(sortino) else 0,
        "Calmar": calmar if not np.isnan(calmar) else 0,
        "Win_Loss_Percent": win_loss
    }

### 4. Función Objetivo para Optimización

Esta función se utiliza para evaluar la estrategia de trading en cada prueba de Optuna. En resumen, realiza los siguientes pasos:

Generación de Hiperparámetros:
* Se definen rangos para parámetros críticos (ventana y umbrales del RSI, stop loss, take profit, número de acciones y lookback) utilizando las sugerencias de Optuna.

Cálculo de Indicadores Técnicos:
* Se calculan los indicadores RSI, Bollinger Bands y MACD, y se añaden al dataset.

Preparación y Entrenamiento del Modelo LSTM:
* Se preparan los datos para el modelo (normalización y creación de secuencias temporales) y se entrena un modelo LSTM que se utiliza para predecir la dirección del movimiento del precio (compra/venta).

Generación de Señales de Trading:
* Se combinan las señales del modelo LSTM con las generadas por los indicadores técnicos para definir las órdenes de compra y venta.

Simulación de Trading (Backtesting):
* Se ejecuta un backtesting que simula operaciones (tanto long como short), actualizando el capital, el valor del portafolio y registrando las operaciones ganadoras y perdedoras.

Cálculo del Desempeño:
* Se calculan métricas clave (como el Sharpe Ratio, entre otras) a partir de la evolución del portafolio. El Sharpe ratio se devuelve como valor objetivo para la optimización.

Esta función permite iterar sobre distintas combinaciones de hiperparámetros para encontrar la configuración que maximice el rendimiento ajustado por riesgo de la estrategia.

In [None]:
import optuna

# Crear y ejecutar el estudio
study = optuna.create_study(direction="maximize")
study.optimize(lambda trial: objective_fun(trial, data, verbose=False), n_trials=50)

# Mostrar los mejores parámetros y el mejor valor
print(f"Mejor valor (Sharpe Ratio): {study.best_value}")
print(f"Mejores parámetros: {study.best_params}")

### 5. Optimización de Hiperparámetros con Optuna

Esta parte del código se encarga de optimizar los hiperparámetros de la estrategia de trading utilizando Optuna. Primero, se crea un estudio cuyo objetivo es maximizar la función de rendimiento (en este caso, el Sharpe ratio). Luego, se ejecuta la optimización realizando 50 pruebas (trials), en cada una de las cuales se evalúa la función objetivo 'objective_fun' con distintas combinaciones de hiperparámetros. Esto permite identificar la configuración que proporciona el mejor rendimiento ajustado por riesgo.

In [None]:
# Optimización
study = optuna.create_study(direction="maximize")
study.optimize(lambda trial: objective_fun(trial, data), n_trials=50)

### 6. Backtesting Final con Mejores Parámetros

En esta sección se utiliza la mejor configuración de hiperparámetros obtenida en la optimización para ejecutar un backtesting completo de la estrategia. Se calculan nuevamente los indicadores técnicos (RSI, MACD y Bollinger Bands) aplicando los parámetros óptimos, se preparan los datos y se entrena el modelo LSTM. A partir de las predicciones del modelo y las señales generadas por los indicadores, se simula la ejecución de operaciones (compras y ventas cortas), actualizando el capital y registrando las operaciones ganadoras y perdedoras. Finalmente, se calcula el valor final del portafolio y se obtienen las métricas de desempeño para evaluar la eficacia de la estrategia.

In [None]:
# Backtesting final con mejores parámetros
best_params = study.best_params
rsi = ta.momentum.RSIIndicator(data.Close, window=best_params["rsi_window"])
macd = ta.trend.MACD(data.Close, window_slow=26, window_fast=12, window_sign=9)
bb = ta.volatility.BollingerBands(data.Close, window=15, window_dev=2)  # Calculamos BB aquí

dataset = data.copy()
dataset["RSI"] = rsi.rsi()
dataset["MACD"] = macd.macd()
dataset["MACD_signal"] = macd.macd_signal()
dataset["BB"] = bb.bollinger_mavg()  # Añadimos la media móvil de BB al dataset

X_train, y_train, X_test, y_test, scaler = prepare_lstm_data(dataset.dropna(), lookback=best_params["lookback"])
model = train_lstm(X_train, y_train, lookback=best_params["lookback"])
scaled_data = scaler.transform(dataset[["Close", "RSI", "BB", "MACD", "MACD_signal"]])
X_full = [scaled_data[i-best_params["lookback"]:i] for i in range(best_params["lookback"], len(scaled_data))]
X_full = np.array(X_full)
lstm_preds = (model.predict(X_full, verbose=0) > 0.5).astype(int).flatten()

dataset = dataset.iloc[best_params["lookback"]:].reset_index(drop=True)
dataset["LSTM_BUY"] = pd.Series(lstm_preds) == 1
dataset["LSTM_SELL"] = pd.Series(lstm_preds) == 0

# Recalculamos BB para las señales después de cortar dataset
bb = ta.volatility.BollingerBands(dataset.Close, window=15, window_dev=2)
dataset["RSI_BUY"] = dataset["RSI"] < best_params["rsi_lower"]
dataset["RSI_SELL"] = dataset["RSI"] > best_params["rsi_upper"]
dataset["BB_BUY"] = dataset.Close < bb.bollinger_lband()
dataset["BB_SELL"] = dataset.Close > bb.bollinger_hband()
dataset["MACD_BUY"] = dataset["MACD"] > dataset["MACD_signal"]
dataset["MACD_SELL"] = dataset["MACD"] < dataset["MACD_signal"]

dataset["BUY_SIGNAL"] = (dataset["LSTM_BUY"] | dataset["RSI_BUY"] | dataset["BB_BUY"] | dataset["MACD_BUY"])
dataset["SELL_SIGNAL"] = (dataset["LSTM_SELL"] | dataset["RSI_SELL"] | dataset["BB_SELL"] | dataset["MACD_SELL"])

dataset = dataset.dropna()

capital = 1000000
com = 0.125 / 100
portfolio_value = [capital]
active_long_pos = None
active_short_pos = None
wins = 0
losses = 0

for i, row in dataset.iterrows():
    if active_long_pos:
        if row.Close < active_long_pos["stop_loss"]:
            pnl = row.Close * best_params["n_shares"] * (1 - com) - active_long_pos["cost"]
            capital += active_long_pos["cost"] + pnl
            if pnl > 0:
                wins += 1
            else:
                losses += 1
            active_long_pos = None
        elif row.Close > active_long_pos["take_profit"]:
            pnl = row.Close * best_params["n_shares"] * (1 - com) - active_long_pos["cost"]
            capital += active_long_pos["cost"] + pnl
            if pnl > 0:
                wins += 1
            else:
                losses += 1
            active_long_pos = None

    if active_short_pos:
        if row.Close > active_short_pos["stop_loss"]:
            pnl = active_short_pos["revenue"] - row.Close * best_params["n_shares"] * (1 + com)
            capital += active_short_pos["revenue"] + pnl
            if pnl > 0:
                wins += 1
            else:
                losses += 1
            active_short_pos = None
        elif row.Close < active_short_pos["take_profit"]:
            pnl = active_short_pos["revenue"] - row.Close * best_params["n_shares"] * (1 + com)
            capital += active_short_pos["revenue"] + pnl
            if pnl > 0:
                wins += 1
            else:
                losses += 1
            active_short_pos = None

    if row["BUY_SIGNAL"] and active_long_pos is None and active_short_pos is None:
        cost = row.Close * best_params["n_shares"] * (1 + com)
        if capital > cost:
            capital -= cost
            active_long_pos = {
                "datetime": row.Datetime,
                "cost": cost,
                "take_profit": row.Close * (1 + best_params["take_profit"]),
                "stop_loss": row.Close * (1 - best_params["stop_loss"])
            }

    if row["SELL_SIGNAL"] and active_short_pos is None and active_long_pos is None:
        revenue = row.Close * best_params["n_shares"] * (1 - com)
        capital += revenue
        active_short_pos = {
            "datetime": row.Datetime,
            "revenue": revenue,
            "take_profit": row.Close * (1 - best_params["take_profit"]),
            "stop_loss": row.Close * (1 + best_params["stop_loss"])
        }

    long_value = row.Close * best_params["n_shares"] if active_long_pos else 0
    short_value = (active_short_pos["revenue"] - row.Close * best_params["n_shares"]) if active_short_pos else 0
    portfolio_value.append(capital + long_value + short_value)

# Calcular métricas finales
final_metrics = calculate_metrics(portfolio_value, wins, losses)

### 7. Visualización y Resultados Finales

Esta sección del código muestra el desempeño final de la estrategia. Se imprime el valor final del portafolio, los mejores parámetros obtenidos y las métricas de rendimiento (Sharpe Ratio, Sortino Ratio, Calmar Ratio y Win/Loss Percentage). Además, se genera una gráfica que compara la evolución del portafolio con el precio de cierre de las acciones de Apple, facilitando la interpretación visual de los resultados.

In [None]:
# Mostrar resultados
print("Mejor valor del portafolio:", portfolio_value[-1])
print("Mejores parámetros:", best_params)
print("Métricas finales:")
print(f"Sharpe Ratio: {final_metrics['Sharpe']:.2f}")
print(f"Sortino Ratio: {final_metrics['Sortino']:.2f}")
print(f"Calmar Ratio: {final_metrics['Calmar']:.2f}")
print(f"Win/Loss Percentage: {final_metrics['Win_Loss_Percent']:.2f}%")

# Graficar
plt.figure(figsize=(12, 6))
plt.plot(portfolio_value, label="Portfolio Value")
plt.legend()
plt.twinx().plot(data.Close, c="orange", label="AAPL Close")
plt.legend(loc="upper right")
plt.show()

### Conclusiones 
