In [None]:
import yfinance as yf
import torch
from collections import deque

def Load_Dataset():
  dataset = []

  company_list = [
                    'AAPL',
                    'GOOG',
                    'NVDA',
                    'TSLA',
                    '005930.KS',
                    '000660.KS',
                    '035420.KS',
                    '035720.KS',
                    'MSFT',
                    'GOOGL',
                    'AMZN',
                    'META',
                    'AMD',
                    'V',
                    'BRK-B',
                    'JNJ',
                    'BABA',
                    'TSM',
                    'PG'
                  ]

  for company in company_list:
      slidingWindow = deque()
      ticker = yf.Ticker(company)
      data = ticker.history(interval = '1d', period = 'max', auto_adjust = True)

      Open = list(data['Open'])
      Close = list(data['Close'])
      High = list(data['High'])
      Low = list(data['Low'])

      if len(Open) < 61:
          print(f"{company} Doesn't have enough data!")
          continue
      addindex = 61
      for i in range(addindex):
          slidingWindow.append(torch.tensor([Open[i], High[i], Low[i], Close[i]], dtype = torch.float64))
      while addindex+1 < len(Open):
          dataset.append(
              (
                  torch.stack(list(slidingWindow)[:-1]),
                  list(slidingWindow)[-1]
              )
          )
          addindex += 1
          slidingWindow.append(torch.tensor([Open[addindex], High[addindex], Low[addindex], Close[addindex]], dtype = torch.float64))
          slidingWindow.popleft()
  return dataset

### Model structure

* 60*4 input layer
* Convolution layer (Conv2d, size of 8*4)
* Convolution layer (Conv1d, size of 8)
* Fully connected layer (Linear, size of 40)
* Fully connected layer (Linear, size of 20)
* Fully connected layer (Linear, size of 1)

All activations ReLU

In [None]:
import torch
import torch.nn as nn

class Finance_001_Model(nn.Module):
    def __init__(self):
        super(Finance_001_Model, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=4, out_channels=16, kernel_size=8)
        conv1_out_side = 60 - 8 + 1
        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=8)
        conv2_out_side = conv1_out_side - 8 + 1
        self.dense1 = nn.Linear(32 * conv2_out_side, 40)
        self.dense2 = nn.Linear(40, 20)
        self.dense3 = nn.Linear(20, 1)
        self.relu = nn.ReLU()

    def forward(self, x):
      x = self.relu(self.conv1(x))
      x = self.relu(self.conv2(x))
      batch_size = x.size(0)
      x = x.view(batch_size, -1)
      x = self.relu(self.dense1(x))
      x = self.relu(self.dense2(x))
      x = self.dense3(x)
      return x

### Data Loader

In [None]:
class FinanceDataset(torch.utils.data.Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
    def __len__(self):
        return len(self.dataset)
    def __getitem__(self, idx):
        return self.dataset[idx]

### Split the dataset

In [None]:
FinanceDataset = FinanceDataset(Load_Dataset())
train_size = int(0.8 * len(FinanceDataset))
validation_size = int(0.1 * len(FinanceDataset))
test_size = len(FinanceDataset) - train_size - validation_size
train_dataset, test_dataset, validation_dataset = torch.utils.data.random_split(FinanceDataset, [train_size, test_size, validation_size])

print(f"Train size: {len(train_dataset)}")
print(f"Validation size: {len(validation_dataset)}")
print(f"Test size: {len(test_dataset)}")

Train_Dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
Validation_Dataloader = torch.utils.data.DataLoader(validation_dataset, batch_size=32, shuffle=True)
Test_Dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=True)

Train size: 108816
Validation size: 13602
Test size: 13603


In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = Finance_001_Model().to(device)
print(f"Training on {device}")

Training on cpu


In [None]:
loss = nn.MSELoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

### Train the model

In [None]:
# prompt: Train the model

for epoch in range(100):
  model.train()
  for i, data in enumerate(Train_Dataloader):
    inputs, labels = data[0].to(device), data[1].to(device)
    optimizer.zero_grad()
    outputs = model(inputs)
    loss_value = loss(outputs, labels)
    loss_value.backward()
    optimizer.step()
    if i % 100 == 0:
      print(f"Epoch: {epoch + 1}/{100}, Step: {i}, Loss: {loss_value.item():.4f}")

  model.eval()
  with torch.no_grad():
    correct = 0
    total = 0
    for i, data in enumerate(Validation_Dataloader):
      inputs, labels = data[0].to(device), data[1].to(device)
      outputs = model(inputs)
      pred_y = torch.round(outputs)
      correct += (pred_y == labels).sum().item()
      total += len(labels)

    accuracy = 100 * correct / total
    print(f"Epoch: {epoch + 1}/{100}, Accuracy: {accuracy:.2f}%")

