In [5]:
import os, sys

import warnings
warnings.filterwarnings("ignore")

from pathlib import Path
sys.path.append(str(Path.cwd().resolve().parent))

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import math
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import r2_score, mean_absolute_percentage_error

from preprocess import VN30, TARGETS, preprocess_v2

In [2]:
symbol = 'ACB'

# Bước 1: Tiền xử lý dữ liệu

In [4]:
train_loader, valid_loader, test_loader, scaler = preprocess_v2(symbol, 'rnn', verbose=True)

Train shape: torch.Size([1094, 30, 4]), torch.Size([1094, 4])
Valid shape: torch.Size([121, 30, 4]), torch.Size([121, 4])


In [7]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int = 5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)          # (max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() *
                             -(math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(1))  # (max_len, 1, d_model)

    def forward(self, x: torch.Tensor):
        # x: (seq_len, batch, d_model)
        x = x + self.pe[:x.size(0)]
        return x

class Transformer(nn.Module):
    def __init__(
        self,
        input_dim: int,
        model_dim: int = 64,
        num_heads: int = 4,
        num_layers: int = 2,
        dim_feedforward: int = 128,
        dropout: float = 0.1,
        seq_len: int = 30
    ):
        super().__init__()
        # project input features → model_dim
        self.input_proj = nn.Linear(input_dim, model_dim)
        # positional encoding
        self.pos_encoder = PositionalEncoding(model_dim, max_len=seq_len)
        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=model_dim,
            nhead=num_heads,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=False    # we’ll feed (seq_len, batch, model_dim)
        )
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        # final regression head
        self.output_layer = nn.Linear(model_dim, input_dim)

    def forward(self, x: torch.Tensor):
        """
        x: (batch, seq_len, input_dim)
        returns: (batch, input_dim)
        """
        # → (batch, seq_len, model_dim)
        x = self.input_proj(x)
        # prepare for Transformer: (seq_len, batch, model_dim)
        x = x.permute(1, 0, 2)
        # add positional encoding
        x = self.pos_encoder(x)
        # run through encoder
        x = self.transformer(x)           # (seq_len, batch, model_dim)
        # take the last time step’s output
        last = x[-1, :, :]               # (batch, model_dim)
        # project back to our 4‐dim target
        return self.output_layer(last)   # (batch, input_dim)

In [8]:
model = Transformer(
    input_dim=4,      # 4 features: open, high, low, close
    model_dim=64,
    num_heads=4,
    num_layers=2,
    dim_feedforward=128,
    dropout=0.1,
    seq_len=30
)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.8)
criterion = nn.SmoothL1Loss()

In [9]:
best_val_loss = float('inf')
n_epochs = 200

for epoch in range(1, n_epochs + 1):
    # --- train ---
    model.train()
    train_loss = 0.0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch, y_batch
        optimizer.zero_grad()
        preds = model(X_batch)
        loss  = criterion(preds, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * X_batch.size(0)
    train_loss /= len(train_loader.dataset)

    # --- validation ---
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for X_batch, y_batch in valid_loader:
            X_batch, y_batch = X_batch, y_batch
            preds = model(X_batch)
            val_loss += criterion(preds, y_batch).item() * X_batch.size(0)
    val_loss /= len(valid_loader.dataset)

    scheduler.step()

    # --- checkpoint ---
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), f'checkpoints/tft_{symbol}.pth')

    if epoch % 10 == 0 or epoch == n_epochs:
        print(f"Epoch {epoch:3d}/{n_epochs}: "
              f"Train Loss = {train_loss:.6f}, "
              f"Valid Loss = {val_loss:.6f}, "
              f"Best Val Loss = {best_val_loss:.6f}, "
			  f"LR = {optimizer.param_groups[0]['lr']:.6f}")

Epoch  10/200: Train Loss = 0.003191, Valid Loss = 0.003000, Best Val Loss = 0.001134, LR = 0.000800
Epoch  20/200: Train Loss = 0.001885, Valid Loss = 0.000633, Best Val Loss = 0.000633, LR = 0.000640
Epoch  30/200: Train Loss = 0.001823, Valid Loss = 0.000721, Best Val Loss = 0.000614, LR = 0.000512
Epoch  40/200: Train Loss = 0.001551, Valid Loss = 0.000748, Best Val Loss = 0.000614, LR = 0.000410
Epoch  50/200: Train Loss = 0.001390, Valid Loss = 0.000748, Best Val Loss = 0.000614, LR = 0.000328
Epoch  60/200: Train Loss = 0.001176, Valid Loss = 0.000671, Best Val Loss = 0.000614, LR = 0.000262
Epoch  70/200: Train Loss = 0.001237, Valid Loss = 0.000635, Best Val Loss = 0.000614, LR = 0.000210
Epoch  80/200: Train Loss = 0.001214, Valid Loss = 0.000681, Best Val Loss = 0.000614, LR = 0.000168
Epoch  90/200: Train Loss = 0.001107, Valid Loss = 0.000624, Best Val Loss = 0.000614, LR = 0.000134
Epoch 100/200: Train Loss = 0.001076, Valid Loss = 0.000642, Best Val Loss = 0.000614, LR =

In [None]:
model.load_state_dict(torch.load(f'checkpoints/tft_{symbol}.pth', map_location='cpu'))
model.eval()

# Thu thập dự đoán và nhãn
all_preds   = []
all_targets = []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch = X_batch
        preds = model(X_batch).cpu().numpy()
        all_preds.append(preds)
        all_targets.append(y_batch.numpy())

all_preds   = np.vstack(all_preds)   # (n_samples, 5)
all_targets = np.vstack(all_targets)

# Inverse scaling
all_preds_inv   = scaler.inverse_transform(all_preds)
all_targets_inv = scaler.inverse_transform(all_targets)

# Tính metrics
r2   = r2_score(all_targets_inv, all_preds_inv, multioutput='uniform_average')
mape = mean_absolute_percentage_error(all_targets_inv, all_preds_inv) * 100

print(f"Test R²: {r2:.4f}")
print(f"Test MAPE: {mape:.4f}%")

Test R²: -8.1745
Test MAPE: 12.3678%
