In [1]:
import numpy as np
import torch
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
from torch import nn
import torch.optim as optim

In [2]:


# Beispiel für Forex-Daten (EUR/USD in 15-Minuten-Intervallen)
# Daten sollten als numpy-Array vorliegen: (Anzahl der Beobachtungen, 1)
forex_data = np.random.rand(1000, 1)  # Dummy-Daten, ersetzen Sie dies durch Ihre Forex-Daten

# Skalierung der Daten
scaler = MinMaxScaler()
forex_data = scaler.fit_transform(forex_data)

# Parameter
sequence_length = 96  # 1 Tag in 15-Minuten-Intervallen
forecast_horizon = 1  # Vorhersage von 1 Zeitschritt (15 Minuten) in die Zukunft

# Funktion zur Erstellung von Eingabesequenzen und Zielwerten
def create_sequences(data, seq_length, horizon):
    inputs = []
    targets = []
    for i in range(len(data) - seq_length - horizon + 1):
        input_seq = data[i:i + seq_length]
        target_seq = data[i + seq_length:i + seq_length + horizon]
        inputs.append(input_seq)
        targets.append(target_seq)
    return np.array(inputs), np.array(targets)

# Erstellen der Eingabesequenzen und Zielwerte
X, y = create_sequences(forex_data, sequence_length, forecast_horizon)

# Umwandlung in PyTorch-Tensoren
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32)


In [3]:

# Aufteilung in Trainings-, Validierungs- und Testdatensätze
train_size = int(0.7 * len(X))
val_size = int(0.15 * len(X))
test_size = len(X) - train_size - val_size

X_train, y_train = X[:train_size], y[:train_size]
X_val, y_val = X[train_size:train_size + val_size], y[train_size:train_size + val_size]
X_test, y_test = X[train_size + val_size:], y[train_size + val_size:]

# Dataset und DataLoader für PyTorch
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print(f"Train size: {len(train_loader.dataset)}")
print(f"Validation size: {len(val_loader.dataset)}")
print(f"Test size: {len(test_loader.dataset)}")


Train size: 632
Validation size: 135
Test size: 137


In [4]:

# Hyperparameter
input_size = 1  # Dimension der Forex-Daten
hidden_size = 64  # Dimension des versteckten Zustands
latent_space_dim = 32  # Dimension des latenten Raums
num_layers = 3  # Anzahl der BiLSTM-Schichten
k = 3  # Zeitverzögerungslänge
d = 2  # Ordnung der nichtlinearen Merkmale
sequence_length = 96  # Länge der Eingabesequenz (1 Tag bei 15-Minuten-Abständen)
batch_size = 16
num_epochs = 1
learning_rate = 1e-3


In [53]:
class VARModule(nn.Module):
    def __init__(self, input_size, k, d):
        super(VARModule, self).__init__()
        self.k = k
        self.d = d
        self.linear_transform = nn.Linear(input_size * k, input_size * k)
        self.nonlinear_transform = nn.Linear(input_size * (d * (d + 1)) // 2, input_size * d)

    def forward(self, x):
        batch_size, seq_len, input_size = x.size()
        linear_features = []
        nonlinear_features = []

        for t in range(self.k, seq_len):
            linear_feature = x[:, t-self.k:t, :].reshape(batch_size, -1)
            linear_features.append(self.linear_transform(linear_feature))

            nonlinear_feature = torch.cat([torch.mul(x[:, t-i, :], x[:, t-j, :])
                                           for i in range(1, self.d + 1)
                                           for j in range(i, self.d + 1)], dim=-1)
            nonlinear_features.append(self.nonlinear_transform(nonlinear_feature))

        linear_features = torch.stack(linear_features, dim=1)
        nonlinear_features = torch.stack(nonlinear_features, dim=1)
        return linear_features, nonlinear_features


In [52]:
class BiLSTMModule(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout=0.2):
        super(BiLSTMModule, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.bilstm = nn.LSTM(self.input_size, self.hidden_size, self.num_layers, 
                              batch_first=True, bidirectional=True, dropout=dropout)
       

    def forward(self, x, h_c=None):
        if h_c == None:
            h_c = (torch.zeros(self.num_layers, x.size(0), self.hidden_size),torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(x.device)
        
        self.bilstm.flatten_parameters()
        out, h_c = self.bilstm(x, h_c)
        self.hidden_size = h_c
        return out, h_c

In [51]:
class DAFA_Modell(nn.Module):
    def __init__(self, input_size, hidden_size, latent_space_dim, num_layers, k, d):
        super(DAFA_BiLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.latent_space_dim = latent_space_dim
        self.num_layers = num_layers
        self.k = k
        self.d = d
        
        self.var_module = VARModule(input_size, k, d)
        self.bilstm_layers = nn.ModuleList([BiLSTMModule(self.input_size + self.input_size * k, self.hidden_size, 1) for _ in range(num_layers)])
        self.fc = nn.Linear(hidden_size * 2, input_size)  # Bidirectional LSTM doubles the hidden size
        
    def forward(self, x):
        linear_features, nonlinear_features = self.var_module(x)
        out = nonlinear_features
        for i in range(self.num_layers):
            out, _ = self.bilstm_layers[i](torch.cat([out, linear_features], dim=-1))
        out = self.fc(out)
        return out
        
      

In [55]:
def loss_function(recon_x, x):
    return nn.MSELoss()(recon_x, x)

In [56]:

# Hyperparameter
input_size = 1  # Dimension der Forex-Daten
hidden_size = 64  # Dimension des versteckten Zustands
latent_space_dim = 32  # Dimension des latenten Raums
num_layers = 3  # Anzahl der BiLSTM-Schichten
k = 3  # Zeitverzögerungslänge
d = 2  # Ordnung der nichtlinearen Merkmale
sequence_length = 96  # Länge der Eingabesequenz (1 Tag bei 15-Minuten-Abständen)
batch_size = 16
num_epochs = 1
learning_rate = 1e-3


In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs):
    model.train()  # Set model to training mode
    
    for epoch in range(num_epochs):
        running_loss = 0.0
        
        for inputs, targets in train_loader:
            optimizer.zero_grad()  # Zero the parameter gradients
            
            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            
            # Backward and optimize
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item() * inputs.size(0)
        
        epoch_loss = running_loss / len(train_loader.dataset)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
    
    print('Training complete')

In [58]:

# Initialisierung des Modells, Optimierers und der Loss-Funktion
model = DAFA_BiLSTM(input_size, hidden_size, latent_space_dim, num_layers, k, d)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

