In [1]:
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pickle

In [14]:
# Parameters
FILE_PATH = './Dataset/EURUSD/EURUSD_M30_features+label_v.2.1.csv'
COLUMNS = ['Close', 'SMA200', 'SMA50', 'RSI14']
LABEL = 'signal'
SEQ_LENGTH = 20
NUM_LAYERS = 4
NUM_HEADS = 4
BATCH_SIZE = 1024
EPOCHS = 50
DROPOUT = 0.4
LEARNING_RATE = 0.01
RANDOM_STATE = 13
TRAIN_SHUFFLE = True
TEST_SHUFFLE = True
TRAIN_TEST_RATIO = 0.2
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
# Load and preprocess data
data = pd.read_csv(FILE_PATH)
data = data[COLUMNS + [LABEL]]
data[COLUMNS] = data[COLUMNS].round(4)

In [4]:
# Normalize features
scaler = MinMaxScaler()
data[COLUMNS] = scaler.fit_transform(data[COLUMNS])

In [5]:
# Adjust labels
data[LABEL] = data[LABEL] - 1

In [6]:
# Create sequences
class ForexDataset(Dataset):
    def __init__(self, data, seq_length):
        self.features = []
        self.labels = []
        for i in range(len(data) - seq_length):
            self.features.append(data.iloc[i:i + seq_length][COLUMNS].values)
            self.labels.append(data.iloc[i + seq_length][LABEL])
        self.features = torch.tensor(self.features, dtype=torch.float32)
        self.labels = torch.tensor(self.labels, dtype=torch.long)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]


In [7]:
# Split data
train_data, test_data = train_test_split(data, test_size=TRAIN_TEST_RATIO, random_state=RANDOM_STATE)

In [8]:
train_dataset = ForexDataset(train_data, SEQ_LENGTH)
test_dataset = ForexDataset(test_data, SEQ_LENGTH)

  self.features = torch.tensor(self.features, dtype=torch.float32)


In [9]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=TRAIN_SHUFFLE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=TEST_SHUFFLE)

In [10]:
# Transformer model with normalization layers
class TransformerModel(nn.Module):
    def __init__(self, input_dim, num_classes, seq_length, num_layers, num_heads, dropout):
        super(TransformerModel, self).__init__()
        self.norm1 = nn.LayerNorm(input_dim)
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=input_dim, nhead=num_heads, dropout=dropout, batch_first=True
        )
        self.transformer = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
        self.norm2 = nn.LayerNorm(seq_length * input_dim)
        self.fc = nn.Linear(seq_length * input_dim, num_classes)

    def forward(self, x):
        x = self.norm1(x)
        x = self.transformer(x)
        x = x.flatten(start_dim=1)
        x = self.norm2(x)
        return self.fc(x)

In [15]:
model = TransformerModel(input_dim=len(COLUMNS), num_classes=len(set(data[LABEL])), seq_length=SEQ_LENGTH, num_layers=NUM_LAYERS, num_heads=NUM_HEADS, dropout=DROPOUT).to(DEVICE)

In [16]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
# Training loop
for epoch in range(EPOCHS):
    model.train()
    train_loss, train_preds, train_labels = 0, [], []
    for features, labels in train_loader:
        features, labels = features.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        train_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
        train_labels.extend(labels.cpu().numpy())

    train_acc = accuracy_score(train_labels, train_preds)
    train_f1 = f1_score(train_labels, train_preds, average='weighted')

    model.eval()
    test_preds, test_labels = [], []
    with torch.no_grad():
        for features, labels in test_loader:
            features, labels = features.to(DEVICE), labels.to(DEVICE)
            outputs = model(features)
            test_preds.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            test_labels.extend(labels.cpu().numpy())

    test_acc = accuracy_score(test_labels, test_preds)
    test_f1 = f1_score(test_labels, test_preds, average='weighted')

    print(f"Epoch {epoch + 1}/{EPOCHS}, "
          f"Train Loss: {train_loss:.4f}, "
          f"Train Acc: {train_acc:.4f}, Train F1: {train_f1:.4f}, "
          f"Test Acc: {test_acc:.4f}, Test F1: {test_f1:.4f}")


Epoch 1/50, Train Loss: 164.3073, Train Acc: 0.4989, Train F1: 0.3403, Test Acc: 0.4997, Test F1: 0.3330
Epoch 2/50, Train Loss: 163.4619, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 3/50, Train Loss: 163.4409, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 4/50, Train Loss: 163.2125, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 5/50, Train Loss: 163.1773, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 6/50, Train Loss: 163.1577, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 7/50, Train Loss: 163.0540, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 8/50, Train Loss: 163.1432, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 9/50, Train Loss: 163.1567, Train Acc: 0.5020, Train F1: 0.3356, Test Acc: 0.4997, Test F1: 0.3330
Epoch 10/50, Train Loss: 163.1103, Train Acc: 0.5020, T