In [72]:
import tushare as ts
import pandas as pd
import torch
import numpy as np

In [73]:
data = ts.get_hist_data('sh')
data_train = data[100:]
data_test = data[:100]
data = [data_train, data_test]

In [74]:
input_size = 5
hidden_size = 16
num_layers = 1
import torch.nn as nn
class StockLSTM(nn.Module):
    def __init__(self):
        super(StockLSTM, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size,
                           num_layers, batch_first=True)
        self.fc_net = nn.Sequential(
        nn.Linear(hidden_size, 2),
        nn.LogSoftmax(dim=-1))
    def forward(self,x):
        h0 = torch.zeros(num_layers, x.shape[0], hidden_size)
        c0 = torch.zeros(num_layers, x.shape[0], hidden_size)
        o, (h, _) = self.lstm(x, (h0, c0))
        pred = self.fc_net(o[:, -1, :])
        return pred

In [75]:
def get_data(train):
    o = data[train]['open'].values
    c = data[train]['close'].values
    h = data[train]['high'].values
    l = data[train]['low'].values
    v = data[train]['volume'].values
    data_len = len(v)
    x, y = [], []
    for i in range(5, data_len - 1):
        d = [
            v[i-5: i], o[i-5:i],c[i-5:i], h[i-5: i],l[i-5:i]
        ]

        d = np.asarray(d).T
        x.append(d)
        if c[i] > (c[i-1]):
            y.append(1)
        else:
            y.append(0)
    x, y = np.asarray(x), np.asarray(y)
    print (x.shape, y.shape)
    return x, y

In [76]:
x_train, y_train = get_data(0)
x_test, y_test = get_data(1)

((503, 5, 5), (503,))
((94, 5, 5), (94,))


In [77]:
v_min, v_max = np.min(x_train[:, :, 0]), np.max(x_train[:, :, 0])
v_min, v_max = np.log(v_min), np.log(v_max)

In [78]:
import torch.utils.data as data
class stock_dataset(data.Dataset):
    def __init__(self, train):
        self.x = x_train if train else x_test
        self.y = y_train if train else y_test
        self.x[:, :, 0] = np.log(self.x[:, :, 0])
        self.x[:, :, 0] = (self.x[:, :, 0]-v_min)/(v_max-v_min)
    def __getitem__(self,index):
        x = torch.Tensor(self.x[index, :, :5])
        y = self.y[index]
        p_min = torch.min(x[:, 1:5]) - 1
        p_max = torch.max(x[:, 1:5]) + 1
        x[:, 1:5] = (x[:, 1:5] - p_min)/(p_max-p_min)
        return x, y
    def __len__(self):
        return len(self.y)

In [79]:
import torch.optim as optim
import torch.nn.functional as F
torch.utils.data.DataLoader
train_loader = torch.utils.data.DataLoader(
    dataset=stock_dataset(True),
    batch_size=32,
    shuffle=True)
test_loader = torch.utils.data.DataLoader(
    dataset=stock_dataset(False),
    batch_size=32,
    shuffle=False)

In [80]:
def train(model, epoch):
    model.train()
    optimizer = optim.Adam(model.parameters())
    for i, (x, y) in enumerate(train_loader):
        y = y.long()
        optimizer.zero_grad()
        pred = model(x)
        loss = F.nll_loss(pred, y)
        loss.backward()
        optimizer.step()
def test(model, epoch):
    model.eval()
    conf_mat = np.zeros([2,2])
    for x, y in test_loader:
        y = y.long()
        pred = model(x)
        pred_class = torch.argmax(pred, dim=1).long()
        for i in range(len(pred_class)):
            conf_mat[pred_class[i], y[i]] += 1
    print (conf_mat)

In [81]:
model = StockLSTM()
for i in range(1,1):
    train(model, i)
    test(model, i)