In [None]:

from backtesting import Backtest, Strategy
from backtesting.test import GOOG, SMA
from backtesting.lib import crossover

import yfinance as yf
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers.schedules import ExponentialDecay

tf.random.set_seed(42)
np.random.seed(42)

In [None]:
class SmaCross(Strategy):
    def init(self):
        price = self.data.Close
        self.ma1 = self.I(SMA, price, 2)
        self.ma2 = self.I(SMA, price, 5)

    def next(self):
        if crossover(self.ma1, self.ma2):
            self.buy()
        elif crossover(self.ma2, self.ma1):
            self.sell()


bt = Backtest(GOOG, SmaCross, commission=0.002, exclusive_orders=True)
stats = bt.run()
bt.plot()

In [None]:
stats

In [None]:
symbol = "^GDAXI"
ticker = yf.Ticker(symbol)

hist_data = ticker.history(period="2y")

train_cutoff_date = "2025-01-02"
test_end_date = "2025-03-31"

train_hist_data = hist_data[hist_data.index < train_cutoff_date]
test_hist_data = hist_data[(hist_data.index >= train_cutoff_date) & 
                           (hist_data.index <= test_end_date)]

train_data = train_hist_data[["Close"]].copy().reset_index()
test_data = test_hist_data[["Close"]].copy().reset_index()

scaler = MinMaxScaler(feature_range=(0, 1))
train_scaled = scaler.fit_transform(train_data[["Close"]])
test_scaled = scaler.transform(test_data[["Close"]])


In [None]:
def create_sequences(data, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i : i + seq_length])
        y.append(data[i + seq_length])
    return np.array(X), np.array(y)

seq_length = 20
X_train, y_train = create_sequences(train_scaled, seq_length)
X_test, y_test = create_sequences(test_scaled, seq_length)

model = Sequential([
    LSTM(64, return_sequences=True, input_shape=(seq_length, 1), recurrent_regularizer=l2(0.01)),
    Dropout(0.3),
    LSTM(64, recurrent_regularizer=l2(0.01)),
    Dropout(0.3),
    Dense(32, activation='relu'),
    Dense(1)
])

optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='huber')

early_stopping = EarlyStopping(monitor="val_loss", patience=15, restore_best_weights=True, min_delta=0.0001)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=0.00001)

history = model.fit(
    X_train, y_train,
    epochs=150,
    batch_size=16,
    validation_split=0.2,
    callbacks=[early_stopping, reduce_lr],
    verbose=1
)


In [None]:
train_predict = model.predict(X_train)
test_predict = model.predict(X_test)

train_predict = scaler.inverse_transform(train_predict)
y_train_inv = scaler.inverse_transform(y_train)
test_predict = scaler.inverse_transform(test_predict)
y_test_inv = scaler.inverse_transform(y_test)

train_rmse = np.sqrt(mean_squared_error(y_train_inv, train_predict))
test_rmse = np.sqrt(mean_squared_error(y_test_inv, test_predict))
train_mape = np.mean(np.abs((y_train_inv - train_predict) / y_train_inv)) * 100
test_mape = np.mean(np.abs((y_test_inv - test_predict) / y_test_inv)) * 100

print(f"Train RMSE: {train_rmse:.2f}")
print(f"Test RMSE: {test_rmse:.2f}")
print(f"Train MAPE: {train_mape:.2f}%")
print(f"Test MAPE: {test_mape:.2f}%")

In [None]:
class LSTMOptionStrategy(Strategy):
    def init(self):
        price = self.data.Close
        self.sma20 = self.I(SMA, price, 20)
        self.model = model
        self.scaler = scaler
        self.predictions = self.I(lambda: np.full(len(self.data), np.nan))
        self.buy_signals = self.I(lambda: np.zeros(len(self.data)))
        self.sell_signals = self.I(lambda: np.zeros(len(self.data)))
        
        self.tomorrow_threshold = 0.3
        self.week_avg_threshold = 0.5
        self.days20_avg_threshold = 0.8
        self.tomorrow_sell_threshold = -0.2
        self.week_avg_sell_threshold = -0.4
        self.days20_avg_sell_threshold = -0.6
    
    def get_prediction(self, days_ahead=5):
        if np.isnan(self.sma20[-1]):
            return None
        
        recent_prices = np.array([self.data.Close[-i] for i in range(20, 0, -1)])
        recent_df = pd.DataFrame(recent_prices, columns=["Close"])
        scaled_data = self.scaler.transform(recent_df)
        
        future_predictions = []
        temp_data = scaled_data.copy()
        
        for _ in range(days_ahead):
            X = temp_data[-20:].reshape(1, 20, 1)
            next_pred = self.model.predict(X, verbose=0)[0, 0]
            future_predictions.append(next_pred)
            temp_data = np.vstack([temp_data, next_pred])
        
        future_df = pd.DataFrame(np.array(future_predictions).reshape(-1, 1), 
                               columns=["Close"])
        future_prices = self.scaler.inverse_transform(future_df)
        
        future_20_predictions = []
        if days_ahead < 20:
            days_to_predict = 20 - days_ahead
            for _ in range(days_to_predict):
                X = temp_data[-20:].reshape(1, 20, 1)
                next_pred = self.model.predict(X, verbose=0)[0, 0]
                future_20_predictions.append(next_pred)
                temp_data = np.vstack([temp_data, next_pred])
                
        future_20_df = pd.DataFrame(np.array(future_20_predictions).reshape(-1, 1), 
                                  columns=["Close"])
        future_20_prices = self.scaler.inverse_transform(future_20_df)
        future_20_prices = np.append(future_prices.flatten(), 
                                   future_20_prices.flatten())
        
        return future_prices.flatten(), future_20_prices
    
    def analyze_trend(self, predictions, future_20_prices):
        current_price = self.data.Close[-1]
        
        tomorrow_change = (predictions[0] - current_price) / current_price * 100
        week_avg_change = (sum(predictions) / len(predictions) - current_price) / current_price * 100
        days20_avg_change = (sum(future_20_prices) / len(future_20_prices) - current_price) / current_price * 100
        
        return {
            'tomorrow_change': tomorrow_change,
            'week_avg_change': week_avg_change,
            'days20_avg_change': days20_avg_change
        }
    
    def next(self):
        if np.isnan(self.sma20[-1]):
            return
            
        current_price = self.data.Close[-1]
        preds, future_20_prices = self.get_prediction(days_ahead=5)
        
        if preds is None:
            return
            
        trend = self.analyze_trend(preds, future_20_prices)
        
        if (trend['tomorrow_change'] > self.tomorrow_threshold or
            trend['week_avg_change'] > self.week_avg_threshold or
            trend['days20_avg_change'] > self.days20_avg_threshold):
            
            self.buy_signals[-1] = 1
            self.buy()
                
        elif (trend['tomorrow_change'] < self.tomorrow_sell_threshold or
              trend['week_avg_change'] < self.week_avg_sell_threshold or
              trend['days20_avg_change'] < self.days20_avg_sell_threshold):
            
            self.sell_signals[-1] = 1
            self.sell()


In [None]:
bt = Backtest(test_hist_data, LSTMOptionStrategy, commission=0.002, exclusive_orders=True)
stats = bt.run()
bt.plot()

In [None]:
stats

In [None]:
bt = Backtest(test_hist_data, SmaCross, commission=0.002, exclusive_orders=True)
stats = bt.run()
bt.plot()