<a href="https://colab.research.google.com/github/Vaincookie/FundamentalAnalysis/blob/master/%E7%A7%91%E7%A0%94%E4%BB%BB%E5%8A%A1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable


class BP(nn.Module):

    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        # Create the super constructor.
        super(BP, self).__init__()
        # Get the member variables.
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # Create the parameter of the input.
        self.W_i = nn.Parameter(torch.Tensor(input_size, hidden_size))
        # Create the parameter of the hidden.
        self.W_h = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the bias_input.
        self.B_h = nn.Parameter(torch.Tensor(hidden_size))

        # Create the parameter of the hidden.
        self.W_o = nn.Parameter(torch.Tensor(hidden_size, output_size))
        # Create the parameter of the bias_output.
        self.B_o = nn.Parameter(torch.Tensor(output_size))
        # Initialize the parameters.
        self.init_weights()

    # Initialize the parameters.
    def init_weights(self):
        stdv = 1.0 / np.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def forward(self, x, init_states=None):

        """
        assumes x.shape represents (batch_size, sequence_size, input_size)
        """
        bs, seq_sz, _ = x.size()
        output = []

        if init_states is None:
            i_t = Variable(torch.zeros(bs, self.hidden_size).to(x.device))
            o_t = Variable(torch.zeros(bs, self.output_size).to(x.device))

        else:
            input_t = init_states
            output_t = init_states

        for t in range(seq_sz):
            x_t = x[:, t, :]

            i_t = torch.tanh(x_t @ self.W_i + self.B_h)
            o_t = i_t @ self.W_o + self.B_o

            output.append(o_t)

        Pre_output = output[-1]

        return Pre_output


In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import random
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
from tqdm import trange

class ChaoticCNN(nn.Module):
    """
     S_t = tanh(x_t @ W_s1 - B_s)
     E_t = sigmoid(L_t @ W_e1 + E_t @ W_e2 - I_t @ W_e3 + S_t @ W_e4 - B_e)
     I_t = sigmoid(L_t @ W_i1 - E_t @ W_i2 - I_t @ W_i3 + S_t @ W_i4 - B_i)
     L_t = (E_t - I_t)* exp(-K * (S_t) ^2) - S_T
     O_t = L_t @ W_o1 - Bo
    """

    def __init__(self, input_size: int, hidden_size :int, output_size :int):
        # Create the super constructor.
        super(ChaoticCNN, self).__init__()
        # Get the member variables.
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # Create the parameter of the In bias W_s1.
        self.W_s1 = nn.Parameter(torch.Tensor(input_size, hidden_size))
        # Create the parameter of the In bias B_i.
        self.B_s = nn.Parameter(torch.Tensor(hidden_size))

        # Create the parameter of the Ex weight e1.
        self.W_e1 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the Ex weight e2.
        self.W_e2 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the Ex weight e3.
        self.W_e3 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the Ex weight e4.
        self.W_e4 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the Ex bias B_e.
        self.B_e = nn.Parameter(torch.Tensor(hidden_size))

        # Create the parameter of the In weight i1.
        self.W_i1 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the In weight i2.
        self.W_i2 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the In weight i3.
        self.W_i3 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the In weight i4.
        self.W_i4 = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the In bias B_i.
        self.B_i = nn.Parameter(torch.Tensor(hidden_size))

        # Create the parameter of the In weight W_o1.
        self.W_o1 = nn.Parameter(torch.Tensor(hidden_size, output_size))
        # Create the parameter of the In bias B_i.
        self.B_o = nn.Parameter(torch.Tensor(output_size))

        # Create the parameter of the K.
        self.K = nn.Parameter(torch.FloatTensor(1))
        # Create the parameter of the N.
        self.N = nn.Parameter(torch.FloatTensor(1))

        self.init_weights()



    # Initialize the parameters.
    def init_weights(self):
        stdv = 1.0 / np.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)


    def forward(self ,x ,init_states=None):

        """
        assumes x.shape represents (batch_size, sequence_size, input_size)
        """
        bs, seq_sz, _ = x.size()
        output = []

        if init_states is None:

            E_t = Variable(torch.zeros(bs, self.hidden_size).to(x.device))
            I_t = Variable(torch.zeros(bs, self.hidden_size).to(x.device))
            L_t = Variable(torch.zeros(bs, self.hidden_size).to(x.device))

        else:

            E_t = init_states
            I_t = init_states
            L_t = init_states



        for t in range(seq_sz):
            x_t = x[:, t, :]

            S_t = torch.tanh(x_t @ self.W_s1 - self.B_s)
            E_t = torch.sigmoid(L_t @ self.W_e1 + E_t @ self.W_e2 - I_t @ self.W_e3 + S_t @ self.W_e4 - self.B_e)
            I_t = torch.sigmoid(L_t @ self.W_i1 - E_t @ self.W_i2 - I_t @ self.W_i3 + S_t @ self.W_i4 - self.B_i)
            L_t = (E_t - I_t )* torch.exp(- self.K * torch.pow(S_t, 2)) + S_t
            O_t = L_t @ self.W_o1 - self.B_o

            output.append(O_t.unsqueeze(0))
            # reshape hidden_seq p/ retornar
        hidden_seq = torch.cat(output, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        Pre_output = hidden_seq[:, -1, :]
        return Pre_output, L_t


In [3]:

import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable


class GRU(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        super(GRU, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # z_t
        self.W_z = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_z = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_z = nn.Parameter(torch.Tensor(hidden_size))

        # r_t
        self.W_r = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_r = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_r = nn.Parameter(torch.Tensor(hidden_size))

        # h_t
        self.W_h = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_h = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_h = nn.Parameter(torch.Tensor(hidden_size))

        # o_t
        self.U_o = nn.Parameter(torch.Tensor(hidden_size, output_size))
        self.b_o = nn.Parameter(torch.Tensor(output_size))

        self.init_weights()

    def init_weights(self):
        stdv = 1.0 / np.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def forward(self, x, init_states=None):

        """
        assumes x.shape represents (batch_size, sequence_size, input_size)
        """
        bs, seq_sz, _ = x.size()
        output = []

        if init_states is None:
            h_t, g_t, o_t = (
                Variable(torch.zeros(bs, self.hidden_size).to(x.device)),
                Variable(torch.zeros(bs, self.hidden_size).to(x.device)),
                Variable(torch.zeros(bs, self.output_size).to(x.device)),
            )
        else:
            h_t, g_t, o_t = init_states

        for t in range(seq_sz):
            x_t = x[:, t, :]

            z_t = torch.sigmoid(x_t @ self.W_z + h_t @ self.U_z + self.b_z)
            r_t = torch.sigmoid(x_t @ self.W_r + h_t @ self.U_r + self.b_r)
            g_t = torch.tanh(x_t @ self.W_h + (r_t * h_t) @ self.U_h + self.b_h)
            h_t = z_t * h_t + (1 - z_t) * g_t
            o_t = h_t @ self.U_o + self.b_o

            output.append(o_t.unsqueeze(0))
        # reshape hidden_seq p/ retornar
        hidden_seq = torch.cat(output, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        Pre_output = hidden_seq[:, -1, :]

        return Pre_output, (o_t, h_t)


In [4]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable


class LSTM(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        super(LSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size

        # i_t
        self.W_i = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_i = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_i = nn.Parameter(torch.Tensor(hidden_size))

        # f_t
        self.W_f = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_f = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_f = nn.Parameter(torch.Tensor(hidden_size))

        # c_t
        self.W_c = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_c = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_c = nn.Parameter(torch.Tensor(hidden_size))

        # o_t
        self.W_o = nn.Parameter(torch.Tensor(input_size, hidden_size))
        self.U_o = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        self.b_o = nn.Parameter(torch.Tensor(hidden_size))

        # q_t
        self.U_q = nn.Parameter(torch.Tensor(hidden_size, output_size))
        self.b_q = nn.Parameter(torch.Tensor(output_size))

        self.init_weights()

    def init_weights(self):
        stdv = 1.0 / np.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)

    def forward(self, x, init_states=None):

        """
        assumes x.shape represents (batch_size, sequence_size, input_size)
        """
        bs, seq_sz, _ = x.size()
        output = []

        if init_states is None:
            c_t, h_t, q_t = (
                Variable(torch.zeros(bs, self.hidden_size).to(x.device)),
                Variable(torch.zeros(bs, self.hidden_size).to(x.device)),
                Variable(torch.zeros(bs, self.output_size).to(x.device)),
            )
        else:
            h_t, c_t = init_states

        for t in range(seq_sz):
            x_t = x[:, t, :]

            i_t = torch.sigmoid(x_t @ self.W_i + h_t @ self.U_i + self.b_i)
            f_t = torch.sigmoid(x_t @ self.W_f + h_t @ self.U_f + self.b_f)
            o_t = torch.sigmoid(x_t @ self.W_o + h_t @ self.U_o + self.b_o)
            g_t = torch.tanh(x_t @ self.W_c + h_t @ self.U_c + self.b_c)
            c_t = f_t * c_t + i_t * g_t
            h_t = o_t * torch.tanh(c_t)
            q_t = h_t @ self.U_q + self.b_q

            output.append(q_t.unsqueeze(0))
        # reshape hidden_seq p/ retornar
        hidden_seq = torch.cat(output, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        Pre_output = hidden_seq[:, -1, :]

        return Pre_output, q_t, h_t


In [5]:
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable

class RNN(nn.Module):

    def __init__(self, input_size: int, hidden_size: int, output_size :int):
        # Create the super constructor.
        super(RNN, self).__init__()
        # Get the member variables.
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        # Create the parameter of the input.
        self.W_i = nn.Parameter(torch.Tensor(input_size, hidden_size))
        # Create the parameter of the hidden.
        self.W_h = nn.Parameter(torch.Tensor(hidden_size, hidden_size))
        # Create the parameter of the bias_input.
        self.B_h = nn.Parameter(torch.Tensor(hidden_size))

        # Create the parameter of the hidden.
        self.W_o = nn.Parameter(torch.Tensor(hidden_size, output_size))
        # Create the parameter of the bias_output.
        self.B_o = nn.Parameter(torch.Tensor(output_size))
        # Initialize the parameters.
        self.init_weights()



    # Initialize the parameters.
    def init_weights(self):
        stdv = 1.0 / np.sqrt(self.hidden_size)
        for weight in self.parameters():
            weight.data.uniform_(-stdv, stdv)


    def forward(self ,x ,init_states=None):

        """
        assumes x.shape represents (batch_size, sequence_size, input_size)
        """
        bs, seq_sz, _ = x.size()
        # output = torch.zeros(bs, self.output_size).to(x.device)
        output = list()

        if init_states is None:
            i_t = Variable(torch.zeros(bs, self.hidden_size).to(x.device))
            h_t = Variable(torch.zeros(bs, self.output_size).to(x.device))

        else:
            i_t = init_states
            h_t = init_states

        for t in range(seq_sz):
            x_t = x[:, t, :]

            i_t = torch.tanh(x_t @ self.W_i + i_t @ self.W_h + self.B_h)
            h_t = i_t @ self.W_o + self.B_o

            output.append(h_t.unsqueeze(0))
        # reshape hidden_seq p/ retornar
        hidden_seq = torch.cat(output, dim=0)
        hidden_seq = hidden_seq.transpose(0, 1).contiguous()
        Pre_output = hidden_seq[:, -1, :]

        return Pre_output, h_t


In [6]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.autograd import Variable
import torch.optim as optim
import time
from sklearn.metrics import mean_squared_error, mean_absolute_error
import math
from tqdm import trange
import matplotlib.pyplot as plt


new=pd.DataFrame({'product':product,
                  'model':model_name,
                  'epoch':epoch,
                  'learning rate':lr,
                  'hidden_size':hidden_size,
                  'input_size':input_size,
                  'time_steps':time_steps,
                  'mean_squared_error':mse,
                  'root_mean_squared_error':rmse,
                  'mean_absolute_error':mae,
                  },index=[0])   # 自定义索引为：1 ，这里也可以不设置index
new.to_csv("new_record_exp.csv")
product_list = ["AUS200","CHN50","ESP35","EUSTX50","FRA40","GBPUSD","GER30","HKG33","JPN225","NAS100","SPX500","UK100","US30","US2000","USDCNH","USDEUR"]
product = "JPN225"




for par1 in [64,128,256,512]:
  for par2 in [1e-4,1e-5]:
    for par3 in [3,5,10,15,31]:
      for product_item in product_list:
        product = product_item
        data = pd.read_csv('/content/drive/MyDrive/Research_essay/'+product+".csv")
        # 设置随机种子
        torch.manual_seed(1)
        input_size = 9
        hidden_size = par1
        output_size = 1
        lr = par2
        dim_in = 9
        dim_out = 1
        time_steps = par3
        smoothing_window_size = 10000
        epoch = 500

        # 定义训练集
        data = data.sort_values(['trade_date'], ascending=True)
        data = data[['ts_code', 'trade_date', 'bid_close', 'bid_open', 'bid_high', 'bid_low',
              'ask_open', 'ask_close', 'ask_high', 'ask_low', 'tick_qty']]
        data = data.sort_values(['trade_date'], ascending=True)
        data = data.drop(labels=["ts_code",'trade_date'], axis=1)
        train_data = data[int(data.shape[0]*0.2):]
        test_data = data[:int(data.shape[0] * 0.2)]

        scaler = MinMaxScaler()
        train_data = train_data.values.astype('float32')
        test_data = test_data.values.astype('float32')


        train_len = len(train_data)
        remainder = int(train_len) % smoothing_window_size
        for di in range(0, int(train_len) - remainder, smoothing_window_size):
            scaler.fit(train_data[di:di + smoothing_window_size, :])
            train_data[di:di + smoothing_window_size, :] = scaler.transform(train_data[di:di + smoothing_window_size, :])

        di = int(train_len) - remainder
        if (remainder != 0):
            scaler.fit(train_data[di:, :])
            train_data[di:, :] = scaler.transform(train_data[di:, :])
        test_data = scaler.transform(test_data)


        # 根据原始数据集构建矩阵
        def create_dataset(data, time_steps):
            dataX, dataY = [], []
            for i in range(len(data) - time_steps):
                a = data[i:(i + time_steps), :]
                dataX.append(a)
                dataY.append(data[i + time_steps, 0:1])
            return np.array(dataX), np.array(dataY)

        trainX, trainY = create_dataset(train_data, time_steps)
        testX, testY = create_dataset(test_data, time_steps)

        trainX = torch.from_numpy(trainX).to(torch.float32)
        trainY = torch.from_numpy(trainY).to(torch.float32)
        testX = torch.from_numpy(testX).to(torch.float32)
        testY = torch.from_numpy(testY).to(torch.float32)


        for par in [1,2,3,4,5]:
            model_select = par
            if model_select == 1:
                model_name = "BP"
                model = BP(input_size, hidden_size, output_size)
            if model_select == 2:
                model_name = "LSTM"
                model = LSTM(input_size, hidden_size, output_size)
            if model_select == 3:
                model_name = "RNN"
                model = RNN(input_size, hidden_size, output_size)
            if model_select == 4:
                model_name = "GRU"
                model = GRU(input_size, hidden_size, output_size)
            if model_select == 5:
                model_name = "CCNN"
                model = ChaoticCNN(input_size, hidden_size, output_size)

            bp = model
            device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
            bp = bp.to(device)
            trainx = trainX.to(device)
            trainy = trainY.to(device)
            testx = testX.to(device)
            testy = testY.to(device)
            loss_function = nn.MSELoss()
            optimizer = optim.Adam(bp.parameters(), lr)

            list_x = []
            list_y = []
            list_ax = []
            list_ay = []

            begin = time.time()

            for epoch in range(epoch):

                init_states = None
                if(model_select == 1):
                    output = bp(trainx, init_states)
                elif(model_select == 2):
                    output, ht ,qt= bp(trainx, init_states)
                else:
                    output, ht = bp(trainx, init_states)
                loss = loss_function(output, trainy)
                running_loss = loss.item()
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                Loss = []

                list_x.append(epoch)
                list_y.append(loss.item())

                if epoch == 500:
                    list_ax.append(epoch)
                    list_ay.append(loss.item())
                    plt.plot(list_x, list_y, color='r', linewidth=3)
                    plt.scatter(list_ax, list_ay, marker='*', color='r')
                    plt.show()

                if (epoch + 1) % 100 == 0:
                    Loss.append(running_loss)
                    print('Epoch: {}, Loss:{:.5f}, learning rate:{}'.format(epoch + 1, loss.item(), 1e-4))

            pass

            time_elapsed = time.time() - begin
            print('Training Complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))

            bp = bp.eval()
            init_states = None
            if (model_select == 1):
                pre_output = bp(testx, init_states)
            elif (model_select == 2):
                pre_output,pre_qt,pre_ht = bp(testx, init_states)
            else:
                pre_output,pre_ht = bp(testx, init_states)
                # pre_qt = pre_qt.cpu()
                # pre_ht = pre_ht.cpu()
                #ht = ht.cpu()

            output = output.cpu()
            trainx = trainX.cpu()
            trainy = trainY.cpu()
            testx = testX.cpu()
            testy = testY.cpu()
            pre_output = pre_output.cpu()



            pred = pre_output.data.numpy().reshape(testx.shape[0], 1)
            real = testy.reshape(testx.shape[0], 1)

            mse = mean_squared_error(pred, real)
            rmse = math.sqrt(mean_squared_error(pred, real))
            mae = mean_absolute_error(pred, real)
            print('mean_squared_error:%.6f' % mse)
            print('root_mean_squared_error:%.6f' % rmse)
            print('mean_absolute_error:%.6f' % mae)


            record = pd.read_csv("/content/drive/MyDrive/Research_essay/new_record_exp.csv",index_col=0)
            record = record.append([{'product':product,
                              'model':model_name,
                              'epoch':epoch,
                              'learning rate':lr,
                              'hidden_size':hidden_size,
                              'input_size':input_size,
                              'time_steps':time_steps,            
                              'mean_squared_error':mse,
                              'root_mean_squared_error':rmse,
                              'mean_absolute_error':mae,
                                'time':time_elapsed,

                              }],ignore_index=True)
            record.to_csv("/content/drive/MyDrive/Research_essay/new_record_exp.csv")


Epoch: 100, Loss:0.08466, learning rate:0.0001
Epoch: 200, Loss:0.01489, learning rate:0.0001
Epoch: 300, Loss:0.00739, learning rate:0.0001
Epoch: 400, Loss:0.00675, learning rate:0.0001
Epoch: 500, Loss:0.00627, learning rate:0.0001
Training Complete in 0m 1s
mean_squared_error:0.016221
root_mean_squared_error:0.127363
mean_absolute_error:0.096953
Epoch: 100, Loss:0.10911, learning rate:0.0001
Epoch: 200, Loss:0.00352, learning rate:0.0001
Epoch: 300, Loss:0.00104, learning rate:0.0001
Epoch: 400, Loss:0.00097, learning rate:0.0001
Training Complete in 0m 3s
mean_squared_error:0.003788
root_mean_squared_error:0.061543
mean_absolute_error:0.041463
Epoch: 100, Loss:0.01147, learning rate:0.0001
Epoch: 200, Loss:0.00721, learning rate:0.0001
Epoch: 300, Loss:0.00508, learning rate:0.0001
Epoch: 400, Loss:0.00337, learning rate:0.0001
Training Complete in 0m 1s
mean_squared_error:0.003871
root_mean_squared_error:0.062215
mean_absolute_error:0.048069
Epoch: 100, Loss:0.10687, learning rat