In [49]:
!pip install yfinance



## Stock DATASET

In [50]:
from torch.utils.data import Dataset, DataLoader
from typing import Union
import torch
import yfinance as yf
import pandas as pd
import numpy as np
import os

In [86]:
class StockPriceDataset(Dataset):
    def __init__(self, start_date: str="jj-mm-aaaa", end_date: str="jj-mm-aaaa", 
                 interval: int=1, nb_samples: int=20, transform=None,
                 file_dir: str="data/", csv_file: str=None):

        # If a local data file must be loaded:
        if csv_file is not None:
            self.root_dir = file_dir
            self.filename = csv_file
            with open(os.path.join(file_dir,csv_file), 'r') as file:
                data = pd.read_csv(file, sep=',', header='infer')

        else: # Data must be loaded on an online database:
            dataset = yf.download('^GSPC', start=start_date, end=end_date, interval=interval)

        self.data = dataset
        self.nb_samples = nb_samples
        self.transform = transform

    def __len__(self) -> int:
        return len(self.data)//self.nb_samples

    def __getitem__(self, index) -> Union[torch.Tensor, float]:
        # Load one sample more than nb_samples for normalizing, transform
        sample = self.data['Close'][index*self.nb_samples:(index+1)*self.nb_samples+1]
        # sample = self.data['Close'][index:index+self.nb_samples+1]
        sample = torch.tensor(sample)
        if self.transform:
            sample = self.transform(sample)[1:]
        else:
            sample = sample[1:]
        
        label = sample[-1] # label is the last elem of sample

        sample = sample[:-1] # removes label from sample
        return sample, label

def normalize_by_last_unknown_price(sample: torch.Tensor) -> torch.Tensor:
    """Divides the whole stock price sample by the last unknown price w_{p*t-1}"""
    last_price = sample[0] # w_{pt-1}
    return sample/last_price


if __name__ == "__main__":

    # download S&P data from yahoo finance
    START_DATE = '1950-01-03'
    END_DATE = '2021-11-16'
    INTERVAL = '1d'
    nb_samples = 15

    dataset = StockPriceDataset(START_DATE, END_DATE, INTERVAL, nb_samples,
                                transform=normalize_by_last_unknown_price)

    dataloader = DataLoader(dataset, batch_size=16)
    print(len(dataloader))
    # for i_batch, batch in enumerate(dataloader):
    #     print("i_batch = {}, batch = {}".format(i_batch, batch))


[*********************100%***********************]  1 of 1 completed
76


## Modèle

In [52]:
from torch.nn import LSTM, Module, Dropout, ModuleList

In [67]:
class StockAI(Module):
     
    def __init__(self, input_size, lstm_size, num_layers, keep_prob) -> None:
        super().__init__()
        self.input_size = input_size
        self.lstm_size = lstm_size
        self.num_layers = num_layers
        self.lstm_list = ModuleList([LSTM(self.input_size, self.lstm_size) for _ in range(self.num_layers)])
        self.dropout_list = ModuleList([Dropout(1-keep_prob) for _ in range(self.num_layers)])

    
    def forward(self,x):
        y = x
        for i in range(self.num_layers):
            y = self.lstm_list[i](x)
            y = self.dropout_list[i](x)
        return y
    

## Configuration

In [87]:
class StockAIConfig():
    config = {
        "model":{
            "input_size": 1,
            "lstm_size": 128,
            "num_layers": 1,
            "keep_prob": 0.8
        },

        "dataset_train":{
            "start_date": '1950-01-03',
            "end_date": '2008-11-16',
            "interval_date": '1d',
            "nb_samples":15,
            "batch_size": 16,
            "shuffle":False
        },
        "dataset_test":{
            "start_date": '2008-11-17',
            "end_date": '2021-11-16',
            "interval_date": '1d',
            "nb_samples":15,
            "batch_size": 16,
            "shuffle":False
        },

        "learning":{
            "num_steps": 30,
            "init_lr": 1e-03,
            "lr_decay": 0.99,
            "init_epoch": 5,
            "max_epoch": 50
        }   
    }

##Prediction : Train / Test

In [88]:
import math
import torch
from torch.utils.data import DataLoader
from torch.nn import MSELoss
from torch.optim import RMSprop

In [89]:
# Model config
config = StockAIConfig().config

# CUDA for PyTorch
use_cuda = torch.cuda.is_available()
device = torch.device("cuda:0" if use_cuda else "cpu")
torch.backends.cudnn.benchmark = True

# Init of the Dataset_train
dataset_train = StockPriceDataset(config["dataset_train"]["start_date"], 
                            config["dataset_train"]["end_date"],
                            config["dataset_train"]["interval_date"], 
                            config["dataset_train"]["nb_samples"],
                            transform=normalize_by_last_unknown_price)

# Init dataloader of the Dataset_train
dataloader_train = DataLoader(dataset_train, config["dataset_train"]["batch_size"], config["dataset_train"]["shuffle"], drop_last=True)

# Init of the Dataset_test
dataset_test = StockPriceDataset(config["dataset_test"]["start_date"], 
                            config["dataset_test"]["end_date"],
                            config["dataset_test"]["interval_date"], 
                            config["dataset_test"]["nb_samples"],
                            transform=normalize_by_last_unknown_price)

# Init dataloader of Dataset_test
dataloader_test = DataLoader(dataset_test, config["dataset_test"]["batch_size"], config["dataset_test"]["shuffle"], drop_last=True)

# Init of the model
model = StockAI(config["model"]["input_size"],
                config["model"]["lstm_size"],
                config["model"]["num_layers"],
                config["model"]["keep_prob"])

# Learning rate to use along the epochs
learning_rates = [config["learning"]["init_lr"] * (config["learning"]["lr_decay"] ** max(float(i + 1 - config["learning"]["init_epoch"]), 0.0)) for i in range(config["learning"]["max_epoch"])]

# Loss
loss_fn = MSELoss()
optimizer = RMSprop(model.parameters(), lr=learning_rates[0], eps=1e-08)

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [90]:
len(dataloader_train)

61

In [91]:
# Learning
for epoch_step in range(config["learning"]["max_epoch"]):
    lr = learning_rates[epoch_step]
    print(f"Running for epoch {epoch_step}...")
    for i_batch, batch in enumerate(dataloader_train):
        x, y = batch
        x = torch.unsqueeze(x, -1).float()
        y = y.float()
        x, y = x.to(device), y.to(device)
        y_pred = model.forward(x)
        loss = torch.autograd.Variable(loss_fn(y_pred, y), requires_grad=True)

        if i_batch%10==0:
            print(f"step: {i_batch}, loss = {loss}")

        # Zero gradients, perform a backward pass, and update the weights.
        optimizer.zero_grad()
        # loss.backward()
        optimizer.step()

Running for epoch 0...
step: 0, loss = 0.23164567351341248
step: 10, loss = 0.26553449034690857
step: 20, loss = 0.26878464221954346
step: 30, loss = 0.2568964660167694


  return F.mse_loss(input, target, reduction=self.reduction)


step: 40, loss = 0.25350311398506165
step: 50, loss = 0.2149365395307541
step: 60, loss = 0.25484320521354675
Running for epoch 1...
step: 0, loss = 0.23554246127605438
step: 10, loss = 0.2621947228908539
step: 20, loss = 0.2974683940410614
step: 30, loss = 0.26550665497779846
step: 40, loss = 0.23632611334323883
step: 50, loss = 0.27096685767173767
step: 60, loss = 0.2420056313276291
Running for epoch 2...
step: 0, loss = 0.2443443387746811
step: 10, loss = 0.25730183720588684
step: 20, loss = 0.24456095695495605
step: 30, loss = 0.277869313955307
step: 40, loss = 0.24509792029857635
step: 50, loss = 0.26694878935813904
step: 60, loss = 0.21770481765270233
Running for epoch 3...
step: 0, loss = 0.2101908177137375
step: 10, loss = 0.22040104866027832
step: 20, loss = 0.26846617460250854
step: 30, loss = 0.23177795112133026
step: 40, loss = 0.29501789808273315
step: 50, loss = 0.24020805954933167
step: 60, loss = 0.2422550469636917
Running for epoch 4...
step: 0, loss = 0.25276321172714

In [92]:
#test
runnning_mape = 0
for i_batch, batch in enumerate(dataloader_test):
        x, y = batch
        x = torch.unsqueeze(x, -1).float()
        y = y.float()
        x, y = x.to(device), y.to(device)
        y_pred = model.forward(x)
        error = torch.mean(torch.abs((y - y_pred) / y))
        runnning_mape += error

mape = runnning_mape / len(dataloader_test)
print("",mape)

 tensor(0.4035)
