# LSTM Trading Strategy Implementation
This notebook implements an LSTM-based trading strategy using TensorFlow/Keras and VectorBT for backtesting.

## 1. Environment Setup

In [None]:
# Install required packages
!pip install vectorbt scikit-learn tensorflow seaborn

## 2. Import Libraries

In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import vectorbt as vbt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dropout, Activation, Dense, LSTM
from tensorflow.keras.models import Sequential
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.preprocessing import MinMaxScaler

# Configure settings
vbt.settings.array_wrapper['freq'] = '1s'
sns.set(style='whitegrid', context='notebook')
plt.rcParams['figure.figsize'] = (12, 6)

# Set random seeds for reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

## 3. Define Helper Functions

In [None]:
def create_trading_decisions(price_data, strategy_type, y_hat_last, start_index):
    """Create trading decisions based on momentum strategy."""
    decisions = pd.Series(0, index=price_data.index, dtype=int)
    
    if strategy_type == 'momentum':
        test_end_index = start_index + len(y_hat_last)
        if test_end_index > len(price_data):
            test_end_index = len(price_data)
            y_hat_last = y_hat_last[:len(price_data) - start_index]
        
        test_indices = price_data.index[start_index:test_end_index]
        prev_prices = price_data['midPrice'].shift(1).loc[test_indices]
        
        buy_signals = y_hat_last > prev_prices.values
        sell_signals = y_hat_last < prev_prices.values
        
        decisions.loc[test_indices] = np.select(
            [buy_signals, sell_signals],
            [1, -1],
            default=0
        )
    
    return decisions

def mean_absolute_percentage_error(y_true, y_pred):
    """Custom MAPE metric implementation for TensorFlow."""
    y_true = tf.cast(y_true, tf.float32)
    y_pred = tf.cast(y_pred, tf.float32)
    
    epsilon = 1e-4
    y_true_safe = tf.clip_by_value(y_true, epsilon, float('inf'))
    
    percentage_errors = tf.abs((y_true_safe - y_pred) / y_true_safe) * 100
    max_percentage = 1000.0
    percentage_errors_clipped = tf.clip_by_value(percentage_errors, 0.0, max_percentage)
    
    return tf.reduce_mean(percentage_errors_clipped)

def numpy_mape(y_true, y_pred):
    """Custom MAPE implementation for NumPy with debugging info."""
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    
    print(f"MAPE Debug - y_true range: {np.min(y_true):.6f} to {np.max(y_true):.6f}")
    print(f"MAPE Debug - y_pred range: {np.min(y_pred):.6f} to {np.max(y_pred):.6f}")
    print(f"MAPE Debug - y_true mean: {np.mean(y_true):.6f}")
    print(f"MAPE Debug - y_pred mean: {np.mean(y_pred):.6f}")
    
    epsilon = 1e-6
    y_true_safe = np.clip(y_true, epsilon, None)
    
    percentage_errors = np.abs((y_true_safe - y_pred) / y_true_safe) * 100
    max_percentage = 1000.0
    percentage_errors_clipped = np.clip(percentage_errors, 0.0, max_percentage)
    
    print(f"MAPE Debug - Percentage errors range: {np.min(percentage_errors_clipped):.2f}% to {np.max(percentage_errors_clipped):.2f}%")
    print(f"MAPE Debug - Percentage errors mean: {np.mean(percentage_errors_clipped):.2f}%")
    print(f"MAPE Debug - Number of clipped values: {np.sum(percentage_errors > max_percentage)}")
    
    return np.mean(percentage_errors_clipped)

def to_sequences(data, seq_len):
    """Convert 2D array into sequences of specified length."""
    d = []
    for index in range(len(data) - seq_len):
        d.append(data[index: index + seq_len])
    return np.array(d)

def preprocess(data_raw, seq_len, train_split):
    """Preprocess data into sequences and split into train/test sets."""
    data = to_sequences(data_raw, seq_len)
    num_train = int(train_split * data.shape[0])

    # X: sequences of length (SEQ_LEN - 10)
    # y: 10th step after the X sequence ends
    X_train = data[:num_train, :-10, :]
    y_train = data[:num_train, -10, :]

    X_test = data[num_train:, :-10, :]
    y_test = data[num_train:, -10, :]

    return X_train, y_train, X_test, y_test

## 4. Main Execution Pipeline

In [None]:
def main():
    # GPU Configuration
    if tf.config.list_physical_devices('GPU'):
        print("GPU detected - configuring...")
        try:
            tf.config.set_logical_device_configuration(
                tf.config.list_physical_devices('GPU')[0],
                [tf.config.LogicalDeviceConfiguration(memory_limit=1024)]
            )
            device_name = "/device:GPU:0"
        except RuntimeError as e:
            print(e)
            device_name = "/device:CPU:0"
    else:
        print("No GPU detected - using CPU")
        device_name = "/device:CPU:0"
        
    print(f"Using device: {device_name}")

    # Load and prepare data
    csv_path = "data/2025-08-06-AKBNK-10.csv"
    df = pd.read_csv(csv_path, sep=';', parse_dates=['DateTime'])
    
    # Feature engineering
    df['midPrice'] = (df['Level 1 Bid Price'] + df['Level 1 Ask Price']) / 2
    
    feature_columns = [
        'Depth Ratio', 'Last Price', 'Total Bid Volume', ' Total Ask Volume',
        'Level 1 Bid Price', 'Level 1 Bid Volume', 'Level 1 Ask Price', 'Level 1 Ask Volume',
        'Level 2 Bid Price', 'Level 2 Bid Volume', 'Level 2 Ask Price', 'Level 2 Ask Volume',
        'Level 3 Bid Price', 'Level 3 Bid Volume', 'Level 3 Ask Price', 'Level 3 Ask Volume',
        'Level 4 Bid Price', 'Level 4 Bid Volume', 'Level 4 Ask Price', 'Level 4 Ask Volume',
        'Level 5 Bid Price', 'Level 5 Bid Volume', 'Level 5 Ask Price', 'Level 5 Ask Volume',
        'midPrice'
    ]
    
    print(f"\nSelected {len(feature_columns)} features")
    
    # Handle missing values
    feature_data = df[feature_columns].values
    if np.isnan(feature_data).any():
        print("NaN values found - forward filling...")
        feature_data = pd.DataFrame(feature_data, columns=feature_columns).fillna(method='ffill').values
    
    # Normalize features
    scaler = MinMaxScaler()
    scaled_features = scaler.fit_transform(feature_data)
    
    # Sequence preprocessing
    SEQ_LEN = 300
    X_train, y_train, X_test, y_test = preprocess(scaled_features, SEQ_LEN, train_split=0.9)
    
    print(f"\nData shapes:\nX_train: {X_train.shape}\ny_train: {y_train.shape}")
    print(f"X_test: {X_test.shape}\ny_test: {y_test.shape}")
    
    # Model architecture
    DROPOUT = 0.2
    WINDOW_SIZE = SEQ_LEN - 10
    N_FEATURES = scaled_features.shape[1]

    model = Sequential([
        LSTM(WINDOW_SIZE, return_sequences=True, input_shape=(WINDOW_SIZE, N_FEATURES)),
        Dropout(DROPOUT),
        LSTM(WINDOW_SIZE, return_sequences=True),
        Dropout(DROPOUT),
        LSTM(WINDOW_SIZE, return_sequences=False),
        Dropout(DROPOUT),
        Dense(N_FEATURES),
        Activation('linear')
    ])
    
    # Callbacks
    lr_scheduler = keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss', 
        factor=0.5, 
        patience=3, 
        min_lr=1e-3
    )
    
    checkpoint_filepath = 'best_model.h5'
    model_checkpoint = ModelCheckpoint(
        filepath=checkpoint_filepath,
        monitor='val_loss',
        save_best_only=True,
        verbose=1
    )

    # Compile and train
    model.compile(
        loss='mean_squared_error',
        optimizer=Adam(learning_rate=1e-4),
        metrics=[mean_absolute_percentage_error]
    )
    
    print("\nTraining model...")
    history = model.fit(
        X_train,
        y_train,
        epochs=350,
        batch_size=500,
        shuffle=False,
        validation_split=0.2,
        callbacks=[lr_scheduler, model_checkpoint]
    )
    
    # Load best model
    if os.path.exists(checkpoint_filepath):
        best_model = keras.models.load_model(
            checkpoint_filepath, 
            custom_objects={'mean_absolute_percentage_error': mean_absolute_percentage_error}
        )
    else:
        best_model = model
    
    # Evaluate
    test_loss, test_mape = best_model.evaluate(X_test, y_test, verbose=0)
    print(f"\nTest Metrics:\nLoss: {test_loss:.4f}\nMAPE: {test_mape:.2f}%")
    
    # Plot training history
    plt.figure(figsize=(12, 4))
    plt.plot(history.history['loss'], label='Train Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Model Loss')
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.legend()
    plt.grid(alpha=0.3)
    plt.show()
    
    # Make predictions
    y_hat = best_model.predict(X_test)
    
    # Inverse transform predictions
    y_test_inverse = scaler.inverse_transform(y_test)
    y_hat_inverse = scaler.inverse_transform(y_hat)
    
    # Extract midPrice predictions
    mid_price_idx = feature_columns.index('midPrice')
    y_test_mid = y_test_inverse[:, mid_price_idx]
    y_hat_mid = y_hat_inverse[:, mid_price_idx]
    
    # Plot predictions vs actual
    plt.figure(figsize=(12, 6))
    plt.plot(y_test_mid, label="Actual", color='green', alpha=0.7)
    plt.plot(y_hat_mid, label="Predicted", color='red', alpha=0.7)
    plt.title('Mid Price Predictions')
    plt.xlabel('Time Steps')
    plt.ylabel('Price')
    plt.legend()
    plt.grid(alpha=0.3)
    plt.show()
    
    # Calculate metrics
    mse = np.mean((y_test_mid - y_hat_mid) ** 2)
    mae = np.mean(np.abs(y_test_mid - y_hat_mid))
    rmse = np.sqrt(mse)
    mape_val = numpy_mape(y_test_mid, y_hat_mid)
    
    print(f"\nPrediction Metrics:")
    print(f"MSE: {mse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAPE: {mape_val:.2f}%")
    
    # Generate trading signals
    start_idx = SEQ_LEN + int(0.9 * (len(df) - SEQ_LEN))
    decisions = create_trading_decisions(df, 'momentum', y_hat_mid, start_idx)
    
    # Backtest with VectorBT
    print("\nRunning backtest...")
    weights = pd.DataFrame(decisions).div(pd.DataFrame(decisions).abs().sum(axis=1), axis=0).fillna(0)
    
    pf = vbt.Portfolio.from_orders(
        close=df['midPrice'],
        size=weights,
        size_type='targetpercent',
        freq='1s',
        init_cash=100,
        cash_sharing=True
    )
    
    # Display backtest results
    orders = pf.orders
    print(f"\nTrades executed:\nBuy: {orders.buy.count()}\nSell: {orders.sell.count()}")
    
    full_stats = pf.stats()
    print("\nPerformance Metrics:")
    print(f"Total Return: {full_stats['Total Return [%]']:.2f}%")
    print(f"Sharpe Ratio: {full_stats['Sharpe Ratio']:.2f}")
    print(f"Max Drawdown: {full_stats['Max Drawdown [%]']:.2f}%")
    
    # Plot portfolio value
    pf.value().plot(title='Portfolio Value')
    plt.show()
    
    # Show detailed portfolio plot
    pf.plot().show()

# Run the main function
if __name__ == "__main__":
    main()