In [52]:
import numpy as np
import pandas as pd
import pandas_datareader.data as pdr
import matplotlib.pyplot as plt
import datetime
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [53]:
num=1
fn = ['DRB동일.csv', 'KEC.csv', 'SK아이이테크놀로지.csv', '기신정기.csv', '동양피스톤.csv', '두산퓨얼셀.csv', '디와이파워.csv', '삼아알미늄.csv', '티와이홀딩스.csv', '화승코퍼레이션.csv']

df = pd.read_csv('data/aside/'+fn[num], index_col = 'Date', parse_dates=True)
df['Mid']=(df['Low']+df['High'])/2

cut_line = len(df['Mid'])-round(len(df['Mid'])/10)

train_data = df['Mid'].values[:cut_line].reshape(-1,1).astype('float')
test_data = df['Mid'].values[cut_line:].reshape(-1,1).astype('float')


from sklearn.preprocessing import minmax_scale as mm

train_data = mm(train_data)
test_data = mm(test_data)

In [54]:
def create_seq(data, seq_length):
    x = []
    y = []
    for i in range(len(data)-seq_length):
        x.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    x = np.array(x)
    y = np.array(y)
    return x, y

In [55]:
x_train, y_train = create_seq(train_data, 7)
x_test, y_test = create_seq(test_data, 7)

In [56]:
x_train, y_train, x_test, y_test = map(lambda data:torch.from_numpy(data).float(), [x_train, y_train, x_test, y_test])

In [57]:
y_train[:2]

tensor([[0.6802],
        [0.6828]])

In [58]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # device

In [62]:
class CoronaVirusPredictor(nn.Module):

    def __init__(self, n_features, n_hidden, seq_len, n_layers=2):
        super(CoronaVirusPredictor, self).__init__()

        self.n_hidden = n_hidden
        self.seq_len = seq_len
        self.n_layers = n_layers

        self.lstm = nn.LSTM(
          input_size=n_features,
          hidden_size=n_hidden,
          num_layers=n_layers,
          dropout=0.5
        )

        self.linear = nn.Linear(in_features=n_hidden, out_features=1)

    def reset_hidden_state(self):
        self.hidden = (
            torch.zeros(self.n_layers, self.seq_len, self.n_hidden),
            torch.zeros(self.n_layers, self.seq_len, self.n_hidden)
        )

    def forward(self, sequences):
        lstm_out, self.hidden = self.lstm(
          sequences.view(len(sequences), self.seq_len, -1),
          self.hidden
        )
        last_time_step = \
          lstm_out.view(self.seq_len, len(sequences), self.n_hidden)[-1]
        y_pred = self.linear(last_time_step)
        return y_pred

In [63]:
def train_model(
  model, 
  train_data, 
  train_labels, 
  test_data=None, 
  test_labels=None
):
    loss_fn = torch.nn.MSELoss(reduction='sum')

    optimiser = torch.optim.Adam(model.parameters(), lr=1e-3)
    num_epochs = 60

    train_hist = np.zeros(num_epochs)
    test_hist = np.zeros(num_epochs)

    for t in range(num_epochs):
        model.reset_hidden_state()

        y_pred = model(x_train)

        loss = loss_fn(y_pred.float(), y_train)

        if test_data is not None:
            with torch.no_grad():
                y_test_pred = model(x_test)
                test_loss = loss_fn(y_test_pred.float(), y_test)
            test_hist[t] = test_loss.item()

            if t % 10 == 0:  
                print(f'Epoch {t} train loss: {loss.item()} test loss: {test_loss.item()}')
        elif t % 10 == 0:
            print(f'Epoch {t} train loss: {loss.item()}')

        train_hist[t] = loss.item()
    
        optimiser.zero_grad()

        loss.backward()

        optimiser.step()
  
    return model.eval(), train_hist, test_hist

In [64]:
model = CoronaVirusPredictor(
  n_features=1, 
  n_hidden=512, 
  seq_len=7, 
  n_layers=2
)
model, train_hist, test_hist = train_model(
  model, 
  x_train, 
  y_train, 
  x_test, 
  y_test
)

Epoch 0 train loss: 378.59039306640625 test loss: 100.20598602294922
Epoch 10 train loss: 173.80374145507812 test loss: 42.656028747558594
Epoch 20 train loss: 175.6184844970703 test loss: 44.23836898803711
Epoch 30 train loss: 173.88369750976562 test loss: 38.23432540893555
Epoch 40 train loss: 173.3882598876953 test loss: 38.02839279174805
Epoch 50 train loss: 173.27777099609375 test loss: 36.86834716796875


In [None]:
# plt.rc('font', family='NanumGothic') # use when korean text output is needed

from sklearn.preprocessing import MinMaxScaler as MMS

scaler = MMS()
scaler.fit(df['Mid'].values[cut_line:].reshape(-1,1).astype('float'))
data_predict = model(x_test.to(device)).data.detach().cpu().numpy()
data_predict = scaler.inverse_transform(data_predict) #reverse transformation

history_cut = 0 # length of latest history to view. Set to 0 to see whole test set.

plt.figure(figsize=(7.5,4.5)) #plotting
plt.plot(df['Mid'].values[cut_line:].reshape(-1,1).astype('float')[cut_line:][-history_cut:], label='actual price', marker='o') #actual plot
plt.plot(data_predict[cut_line:][-history_cut:], label='predicted price', marker='v') #predicted plot
plt.title("Company I".split(".")[0])
plt.legend(loc='upper left')
plt.show() 