# Test 5

### Import

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from backtesting import Backtest
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
import os
from datetime import datetime
from lumibot.brokers import Alpaca
import matplotlib.pyplot as plt
from lumibot.backtesting import YahooDataBacktesting
import numpy as np

### Device

In [None]:
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

device = "cpu"
print(f"Using device: {device}")

### Hyperparameter

In [None]:
# Model parameter
input_size = 8
output_size = 1
hidden_size = 1000
num_layers = 6
dropout = 0.2

# Training parameter
batch_size = 16
num_epochs = 5
learning_rate = 0.001
seq_size = 30

### LSTM Model

In [None]:
class Net(nn.Module):
    def __init__(self, input_size, output_size, hidden_size, num_layers, dropout=0.2):
        super(Net, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.fc(x[:, -1, :])  # Output from the last timestep
        return x


### Dataloader

In [None]:
class FinanceDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return torch.tensor(self.X[idx], dtype=torch.float32), torch.tensor(self.y[idx], dtype=torch.float32)

In [None]:
def create_sequences(data, labels, seq_length):
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i + seq_length])
        y.append(labels[i + seq_length])
    return np.array(X), np.array(y)

### Init

In [None]:
# Initialize model, loss function, optimizer
net = Net(input_size, output_size, hidden_size, num_layers)
criterion = nn.MSELoss()
optimizer = optim.Adam(net.parameters(), lr=0.001)

In [None]:
# Data

In [None]:
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()

df = pd.read_pickle("../Data/train_dax_data.pkl")
df["Y_scaled"] = scaler_y.fit_transform(df["Y"].values.reshape(-1, 1))
print(df.dtypes)
X = scaler_X.fit_transform(df.iloc[:, 2:-2])

seq_size = 30
X_sequences, y_sequences = create_sequences(X, df["Y_scaled"].values, seq_size)

dataset = FinanceDataset(X_sequences, y_sequences)
train_loader = DataLoader(dataset, batch_size=16, shuffle=True)

### Training

In [None]:
losses = []

for epoch in range(num_epochs):
    net.train()
    epoch_loss = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs.squeeze(-1), labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()

    epoch_loss /= len(train_loader)
    losses.append(epoch_loss)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss:.4f}")

# Plot training loss
plt.plot(losses, label='Training Loss')
plt.legend()
plt.show()

torch.save(net.state_dict(), "../Models/best_model.pt")


### Backtesting

In [None]:
scaler = MinMaxScaler()
df = pd.read_pickle("../Data/train_dax_data.pkl").reset_index().iloc[:, :-1]
if "Date" in df.columns:
    df = df.drop("Date", axis=1)
if "index" in df.columns:
    df = df.drop("index", axis=1)
df = df[["Open", "High", "Low", "Close", "Adj Close", "Volume", "month", "weekday"]]
display(df)
scaler.fit(df.values)

model_path = "../Models/best_model.pt"

In [None]:
import pandas as pd
import pandas as pd
import torch
from sklearn.preprocessing import MinMaxScaler
import numpy as np

model = Net(input_size, output_size, hidden_size, num_layers)

# Load state_dict only
model.load_state_dict(torch.load(model_path)) 
model.eval()

df = pd.read_pickle('../Data/train_dax_data.pkl')
df = df[["Open", "High", "Low", "Close", "Adj Close", "Volume", "month", "weekday", "Y"]]

seq_size = 30
X_sequences, y_sequences = create_sequences(X, df["Y_scaled"].values, seq_size)

dataset = FinanceDataset(X_sequences, y_sequences)

all_predictions = []
all_labels = []

with torch.no_grad():
    for inputs, labels in test_loader:
        out = model(inputs) 
        
        all_predictions.append(out.numpy())  
        all_labels.append(labels.numpy())

all_predictions = np.concatenate(all_predictions)
all_labels = np.concatenate(all_labels)

print(f'Predicted values: {all_predictions.flatten()}')
print(f'Actual values: {all_labels.flatten()}')

output_df = pd.DataFrame({'Predicted': all_predictions.flatten(), 'Actual': all_labels.flatten()})
display(output_df)


In [None]:
os.makedirs("logs", exist_ok=True)
os.makedirs("results", exist_ok=True)

test_data = pd.read_pickle("../Data/test_dax_data.pkl")

scaler = MinMaxScaler()
scaler.fit(test_data.iloc[:, 2:-1])

scaler_y = MinMaxScaler()
scaler_y.fit(test_data.iloc[:, -1].values.reshape(-1, 1))

model = Net(input_size, output_size, hidden_size, num_layers)
model_path = "../Models/best_model.pt"
model.load_state_dict(torch.load(model_path))
model.eval()

ALPACA_CREDS = {
    "API_KEY": os.getenv("ALPACA_API_KEY"),
    "API_SECRET": os.getenv("ALPACA_API_SECRET"),
    "PAPER": True,
}

# Strategy setup
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)
broker = Alpaca(ALPACA_CREDS)

strategy = Backtest(
    name="Test5",
    broker=broker,
    parameters={
        "symbol": "^GDAXI",
        "cash_at_risk": 0.5,
        "model": model,
        "num_prior_days": 30,
        "dataset": test_data,
        "scaler": scaler,
        "scaler_y": scaler_y,
    },
)

# Run backtest
backtest_results = strategy.backtest(
    YahooDataBacktesting,
    start_date,
    end_date,
    name="Test5",
    parameters={
        "symbol": "^GDAXI",
        "cash_at_risk": 0.5,
        "model": model,
        "dataset": test_data,
        "num_prior_days": 30,
        "scaler": scaler,
        "scaler_y": scaler_y,
    },
    benchmark_asset="SPY",
    show_plot=True,
    show_tearsheet=True,
)

# Save results
pd.DataFrame(backtest_results).to_csv("results/backtest_results.csv.gz", index=False, compression="gzip")

print("Backtesting complete. Results saved to backtest_results.csv.gz.")

In [None]:
import numpy as np
import pandas as pd
import torch
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, Dataset
import matplotlib.pyplot as plt

# Dataset-Klasse für Backtesting
class FinanceDataset(Dataset):
    def __init__(self, X, y, seq_size):
        self.X = X
        self.y = y
        self.seq_size = seq_size

    def __len__(self):
        return len(self.X) - self.seq_size

    def __getitem__(self, idx):
        return (
            torch.tensor(self.X[idx:idx + self.seq_size], dtype=torch.float32),
            torch.tensor(self.y[idx + self.seq_size], dtype=torch.float32),
        )

# Backtesting-Funktion
def backtest_model(model, dataloader, scaler_y, seq_size):
    model.eval()
    predictions = []
    actuals = []

    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            # Vorhersage
            output = model(X_batch)
            predictions.extend(output.numpy())
            actuals.extend(y_batch.numpy())

    # Rücktransformation der Vorhersagen und tatsächlichen Werte
    predictions = scaler_y.inverse_transform(np.array(predictions).reshape(-1, 1))
    actuals = scaler_y.inverse_transform(np.array(actuals).reshape(-1, 1))

    return predictions.flatten(), actuals.flatten()

# Backtesting starten
def run_backtest(test_df, model_path, seq_size):
    # Daten vorverarbeiten
    scaler_X = MinMaxScaler()
    scaler_y = MinMaxScaler()

    X_test = test_df.iloc[:, 2:-1]
    y_test = test_df.iloc[:, -1]

    X_test_scaled = scaler_X.fit_transform(X_test)
    y_test_scaled = scaler_y.fit_transform(y_test.values.reshape(-1, 1))

    # Sequenzen erstellen
    test_dataset = FinanceDataset(X_test_scaled, y_test_scaled, seq_size)
    test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False)

    # Modell laden
    model = Net(input_size=X_test.shape[1], output_size=1, hidden_size=100, num_layers=3)
    model.load_state_dict(torch.load(model_path))
    model.eval()

    # Backtesting durchführen
    predictions, actuals = backtest_model(model, test_dataloader, scaler_y, seq_size)

    # Ergebnisse visualisieren
    plt.figure(figsize=(14, 7))
    plt.plot(predictions, label="Predicted", color="blue")
    plt.plot(actuals, label="Actual", color="orange")
    plt.title("Backtesting Results")
    plt.legend()
    plt.show()

    # Statistiken berechnen
    df_results = pd.DataFrame({"Actual": actuals, "Predicted": predictions})
    mse = ((df_results["Actual"] - df_results["Predicted"]) ** 2).mean()
    mae = np.abs(df_results["Actual"] - df_results["Predicted"]).mean()
    print(f"Mean Squared Error (MSE): {mse:.2f}")
    print(f"Mean Absolute Error (MAE): {mae:.2f}")

    return df_results

# Anwendung der Backtesting-Funktion
test_data = pd.read_pickle("../Data/test_dax_data.pkl")  # Testdatensatz laden
model_path = "../Models/best_model.pt"  # Pfad zum gespeicherten Modell
seq_size = 30  # Sequenzgröße

results = run_backtest(test_data, model_path, seq_size)
results.to_csv("backtest_results.csv", index=False)  # Ergebnisse speichern