# 以LSTM演算法預測股價

## 載入相關套件 

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchtext
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt

## 判斷GPU是否存在

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 載入資料

In [3]:
#df = pd.read_csv('./nlp_data/AMZN_2006-01-01_to_2018-01-01.csv')
df = pd.read_csv('./nlp_data/data.csv',encoding='ANSI',header=None).T
rename_columns = {0: "Date",1:"Quantity"} #把需要更換的欄位名稱改成依字典的方式表示，這邊是指把第一欄名稱改成hello，第三欄名稱改成world。
df.rename(columns=rename_columns, inplace=True) #用rename更新表個欄位名稱
df = df.iloc[1:,:]
print(df.shape)
df.head()

FileNotFoundError: [Errno 2] No such file or directory: './nlp_data/data.csv'

In [None]:
df.tail()

## 繪圖

In [None]:
type(df["Quantity"][1])
df = df.dropna()
df["Quantity"] = pd.to_numeric(df["Quantity"])

print(df["Quantity"])

In [None]:
df2 = df.set_index('Date')
df2.Quantity.plot(legend=None)
plt.xticks(rotation=30);


In [None]:
len(df2)

## 實驗一 : 直接透過給定的前幾天預測下一天

In [None]:
from sklearn.preprocessing import MinMaxScaler
look_back = 5 # 以前N期資料為 X，當期資料為 Y

# 函數：以前N期資料為 X，當前期資料為 Y
def create_dataset(data1, look_back):
    print(data1.shape)
    x, y = [], []
    for i in range(len(data1)-look_back-1):
        _x = data1[i:(i+look_back)]
        _y = data1[i+look_back]
        x.append(_x)
        y.append(_y)
    return torch.Tensor(np.array(x)), torch.Tensor(np.array(y))

dataset = df2[['Quantity']].values.astype('float32')
#dataset = df2.values.astype('float32')
# X 常態化
scaler = MinMaxScaler()
dataset = scaler.fit_transform(dataset)

# 資料分割
train_size = int(len(dataset) * 0.67)
test_size = len(dataset) - train_size
train_data, test_data = dataset[0:train_size,:], dataset[train_size-look_back:len(dataset),:]

trainX, trainY = create_dataset(train_data, look_back)
testX, testY = create_dataset(test_data, look_back)
dataset.shape, trainY.shape

In [None]:
trainX.shape, trainY.shape, testX.shape, testY.shape

## 建立模型

In [None]:
hidden_size = 5
num_layers = 1
feature_number = testX.shape[2]
class TimeSeriesModel(nn.Module):
    def __init__(self, look_back, hidden_size=4, num_layers=1):
        super().__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.LSTM(feature_number, self.hidden_size, num_layers=self.num_layers
                           , batch_first=True)
        self.fc = nn.Linear(self.hidden_size, 1)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, x):
        #print(x.shape)
        # rnn_out, h_out = self.rnn(x)
        h_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size,device=x.device)        
        c_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size,device=x.device)
        out, (h_out, _) = self.rnn(x, (h_0, c_0))
        #print(h_out.shape)
        
        # 取最後一層的 h，並轉成二維
#         h_out = h_out[-1].view(-1, self.hidden_size)  
#         return self.fc(h_out)
        # 取最後一個輸出，並轉成二維
        flatten_output = out[:,-1,:].view(-1, self.hidden_size)  
        return self.fc(flatten_output)

model = TimeSeriesModel(look_back, hidden_size=hidden_size, num_layers=num_layers).to(device)

In [None]:
model

## 模型訓練

In [None]:
num_epochs = 2000
learning_rate = 0.001

def train(trainX, trainY):
    criterion = torch.nn.MSELoss()  # MSE
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=0.000001)
    model.train()
    for epoch in range(num_epochs):
        trainX ,trainY = trainX.to(device), trainY.to(device) 
        optimizer.zero_grad()
        outputs = model(trainX)
        if epoch <= 0: print(outputs.shape)
        loss = criterion(outputs, trainY)    
        loss.backward()
        optimizer.step()
        if epoch % 100 == 0:
            print(f"Epoch: {epoch}, loss: {loss.item():.10}")

train(trainX, trainY)

## 模型評估

In [None]:
model.eval()
trainPredict = model(trainX.to(device)).cpu().detach().numpy()
testPredict = model(testX.to(device)).cpu().detach().numpy()
trainPredict.shape

In [None]:
trainY.shape, trainPredict.shape

In [None]:
from sklearn.metrics import mean_squared_error
import math 

# 還原常態化的訓練及測試資料
print(trainPredict.shape)
trainPredict = scaler.inverse_transform(trainPredict)
print(trainY.shape)
trainY_actual = scaler.inverse_transform(trainY)

testPredict = scaler.inverse_transform(testPredict)
testY_actual = scaler.inverse_transform(testY.reshape(-1, 1))

# 計算 RMSE
trainScore = math.sqrt(mean_squared_error(trainY_actual, trainPredict.reshape(-1)))
print(f'Train RMSE: {trainScore:.2f}')
testScore = math.sqrt(mean_squared_error(testY_actual, testPredict.reshape(-1)))
print(f'Test RMSE:  {testScore:.2f}')

## 繪製實際資料和預測資料的圖表

In [None]:
dataset.shape

In [None]:
# 訓練資料的 X/Y
trainPredictPlot = np.empty_like(dataset)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
#trainPredictPlot[:look_back] = 0
print(trainPredictPlot.shape)
# 測試資料 X/Y
testPredictPlot = np.empty_like(dataset)
testPredictPlot[:, :] = np.nan
testPredictPlot[-testPredict.shape[0]-1:-1, :] = testPredict

print(testPredictPlot.shape)
# 繪圖
plt.figure(figsize=(12,6))
plt.plot(scaler.inverse_transform(dataset), label='Actual')
plt.plot(trainPredictPlot, label='train predict')
plt.plot(testPredictPlot, label='test predict')
plt.xticks(rotation=30)
plt.xlabel("Date")
plt.legend()
plt.show()


In [None]:
# 繪圖
min_x = 120
dist = 30

plt.figure(figsize=(12,6))
print(trainPredictPlot.shape[0])
plt.plot(scaler.inverse_transform(dataset), label='Actual')
plt.plot(trainPredictPlot, label='train predict')
plt.plot(testPredictPlot, label='test predict')
plt.xticks(rotation=30)
plt.xlim(xmin=min_x,xmax=min_x+dist)
try:
    data = np.hstack([dataset[min_x:min_x+dist],trainPredictPlot[min_x:min_x+dist]])
    plt.ylim(ymin=np.min(data)-100,ymax=np.max(data)+100)
except:
    data = np.hstack([dataset[min_x:min_x+dist],testPredictPlot[min_x:min_x+dist]])
    plt.ylim(ymin=np.min(data)-100,ymax=np.max(data)+100)
plt.xlabel("Date")
plt.legend()
plt.show()

In [None]:
# 繪圖
min_x = 120
dist = 30

plt.figure(figsize=(12,6))
print(trainPredictPlot.shape[0])
plt.plot(scaler.inverse_transform(dataset), label='Actual')
plt.plot(trainPredictPlot, label='train predict')
plt.plot(testPredictPlot, label='test predict')
plt.xticks(rotation=30)
plt.xlim(xmin=min_x,xmax=min_x+dist)
#print(trainPredictPlot[min_x:min_x+dist])

plt.ylim(ymin=np.min(testPredictPlot[min_x:min_x+dist])-100,ymax=np.max(testPredictPlot[min_x:min_x+dist])+1000)
plt.xlabel("Date")
plt.legend()
plt.show()