In [54]:
import numpy as np # type: ignore
import pandas as pd # type: ignore
import matplotlib.pyplot as plt # type: ignore
import yfinance as yf # type: ignore

# Load financial data

In [55]:
data = yf.download('AAPL')

[*********************100%***********************]  1 of 1 completed


In [56]:
data.head()

Price,Adj Close,Close,High,Low,Open,Volume
Ticker,AAPL,AAPL,AAPL,AAPL,AAPL,AAPL
Date,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
1980-12-12,0.098834,0.128348,0.128906,0.128348,0.128348,469033600
1980-12-15,0.093678,0.121652,0.12221,0.121652,0.12221,175884800
1980-12-16,0.086802,0.112723,0.113281,0.112723,0.113281,105728000
1980-12-17,0.088951,0.115513,0.116071,0.115513,0.115513,86441600
1980-12-18,0.09153,0.118862,0.11942,0.118862,0.118862,73449600


In [57]:
data = data.to_numpy()
data.shape

(11089, 6)

In [58]:
def create_sequence(data: np.ndarray, seq_len: int = 8) -> tuple[np.ndarray, np.ndarray]:
    X = []
    y = []
    for i in range(len(data) - seq_len):
        X.append(data[i:i+seq_len])
        y.append(data[i+seq_len])
    return np.array(X), np.array(y)

In [59]:
X, y = create_sequence(data, seq_len=1)
print(f'X: {X.shape}, y: {y.shape}')

X: (11088, 1, 6), y: (11088, 6)


In [60]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Test

In [61]:
from torch_transformer import Transformer
import torch

src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f'Available device: {device}')

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
transformer = transformer.to(device)

X_train, X_test, y_train, y_test = torch.Tensor(X_train), torch.Tensor(X_test), torch.Tensor(y_train), torch.Tensor(y_test)
X_train, X_test, y_train, y_test = X_train.to(device), X_test.to(device), y_train.to(device), y_test.to(device)

# Generate random sample data
#src_data = torch.randint(1, src_vocab_size, (64, max_seq_length)).to(device)  # (batch_size, seq_length)
#tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length)).to(device)  # (batch_size, seq_length)

Available device: cuda


In [62]:
import torch.nn as nn
import torch.optim as optim

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(X_train, y_train)
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), y_train[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

RuntimeError: The size of tensor a (6) must match the size of tensor b (512) at non-singleton dimension 2

In [None]:
transformer.eval()

# Generate random sample validation data
#val_src_data = torch.randint(1, src_vocab_size, (64, max_seq_length)).to(device)  # (batch_size, seq_length)
#val_tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length)).to(device)  # (batch_size, seq_length)

with torch.no_grad():

    val_output = transformer(X_test, y_test)
    val_loss = criterion(val_output.contiguous().view(-1, tgt_vocab_size), y_test.contiguous().view(-1))
    print(f"Validation Loss: {val_loss.item()}")

Validation Loss: 8.693814277648926


In [None]:
print(torch.cuda.is_available())

True
