In [None]:
import numpy as np
import torch
import torch.nn as nn
from torch.nn import Parameter
import torch.optim as optim

from typing import *
from pathlib import Path
from enum import IntEnum
class Dim(IntEnum):
    batch = 0
    seq = 1
    feature = 2

In [None]:
# 參數配置
csv_path = '../data/nanrui.csv'
dim_per_time = 4
time_col_count = 2
target_dim_index = 0
hidden_size = 256

train_len = 17434
val_len = 5811
test_len = 5811

In [None]:
class MogLSTM(nn.Module):
    def __init__(self, input_sz: int, hidden_sz: int, mog_iterations: int):
        super().__init__()
        self.input_size = input_sz
        self.hidden_size = hidden_sz
        self.mog_iterations = mog_iterations
        #Define/initialize all tensors   
        self.Wih = Parameter(torch.Tensor(input_sz, hidden_sz * 4))
        self.Whh = Parameter(torch.Tensor(hidden_sz, hidden_sz * 4))
        self.bih = Parameter(torch.Tensor(hidden_sz * 4))
        self.bhh = Parameter(torch.Tensor(hidden_sz * 4))
        #Mogrifiers
        self.Q = Parameter(torch.Tensor(hidden_sz,input_sz))
        self.R = Parameter(torch.Tensor(input_sz,hidden_sz))

        self.init_weights()
    
    def init_weights(self):
        for p in self.parameters():
            if p.data.ndimension() >= 2:
                nn.init.xavier_uniform_(p.data)
            else:
                nn.init.zeros_(p.data)

    def mogrify(self,xt,ht):
        for i in range(1,self.mog_iterations+1):
            if (i % 2 == 0):
                ht = (2*torch.sigmoid(xt @ self.R)) * ht
            else:
                xt = (2*torch.sigmoid(ht @ self.Q)) * xt
        return xt, ht

    #Define forward pass through all LSTM cells across all timesteps.
    #By using PyTorch functions, we get backpropagation for free.
    def forward(self, x: torch.Tensor, 
                init_states: Optional[Tuple[torch.Tensor, torch.Tensor]]=None
               ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Assumes x is of shape (batch, sequence, feature)"""
        batch_sz, seq_sz, _ = x.size()
        hidden_seq = []
        #ht and Ct start as the previous states and end as the output states in each loop below
        if init_states is None:
            ht = torch.zeros((batch_sz,self.hidden_size)).to(x.device)
            Ct = torch.zeros((batch_sz,self.hidden_size)).to(x.device)
        else:
            ht, Ct = init_states
        for t in range(seq_sz): # iterate over the time steps
            xt = x[:, t, :]
            xt, ht = self.mogrify(xt,ht) #mogrification
            gates = (xt @ self.Wih + self.bih) + (ht @ self.Whh + self.bhh)
            ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)

            ### The LSTM Cell!
            ft = torch.sigmoid(forgetgate)
            it = torch.sigmoid(ingate)
            Ct_candidate = torch.tanh(cellgate)
            ot = torch.sigmoid(outgate)
            #outputs
            Ct = (ft * Ct) + (it * Ct_candidate)
            ht = ot * torch.tanh(Ct)

        return ht, Ct

In [None]:
#sanity testing
#note that our hidden_sz is also our defined output size for each LSTM cell.
batch_sz, seq_len, feat_sz, hidden_sz = 4, 12, 1, 10
arr = torch.randn(batch_sz, seq_len, feat_sz)
lstm = MogLSTM(feat_sz, hidden_sz,5)
hn, cn = lstm(arr)
hn.shape #shape should be batch_sz x hidden_sz = 4x10

In [None]:
class LSTM(nn.Module):
    def __init__(self, encoder, hidden_layer_size=hidden_size, output_size=1):
        super().__init__()
        self.encoder = encoder
        
        self.hidden_layer_size = hidden_layer_size

        self.linear = nn.Linear(hidden_layer_size, output_size)

    def forward(self, input_seq):
        lstm_out, self.hidden_cell = self.encoder(input_seq)
        predictions = self.linear(lstm_out)
        return predictions.squeeze(1)

model = LSTM(MogLSTM(dim_per_time,hidden_size,5))
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
print(model)

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [None]:
# 数据读取
df = pd.read_csv(csv_path)
df.head()

In [None]:
# 数据预处理
data = np.array(df)
data = np.array(data[:, time_col_count:], dtype='float')  # 数据删除时间列

In [None]:
from sklearn.preprocessing import MinMaxScaler

scaler = MinMaxScaler(feature_range=(0, 1))
scaler.fit(data[:train_len])
all_data_normalized = scaler.transform(data)
print(all_data_normalized.shape)

In [None]:
# 数据生成器函数
def generator(data, lookback, delay, min_index, max_index, shuffle=False, batch_size=128, step=6):
    if max_index is None:
        max_index = len(data) - delay - 1
    i = min_index + lookback
    while 1:
        if shuffle:
            rows = np.random.randint(min_index + lookback, max_index, size=batch_size)
        else:
            if i + batch_size >= max_index:
                i = min_index + lookback
            rows = np.arange(i, min(i + batch_size, max_index))
            i += len(rows)

        samples = np.zeros((len(rows), lookback // step, data.shape[-1]))
        targets = np.zeros((len(rows),))
        for j, row in enumerate(rows):
            indices = range(rows[j] - lookback, rows[j], step)
            samples[j] = data[indices]
            targets[j] = data[rows[j] + delay][target_dim_index]
        yield samples, targets, [None]

In [None]:
lookback = 5
step = 1
delay = 0
batch_size = 128
train_gen = generator(all_data_normalized,
                      lookback=lookback,
                      delay=delay,
                      min_index=0,
                      max_index=train_len,
                      shuffle=True,
                      step=step,
                      batch_size=batch_size)

# 准备数据
X = []
y = []

howmanybatch = (train_len-lookback)//batch_size  # 需要多少个batch
for train_one in train_gen:
    X.append(train_one[0])
    y.append(train_one[1])
    howmanybatch = howmanybatch - 1
    if howmanybatch == 0:
        break

In [None]:
print(np.array(X).shape)
print(np.array(y).shape)

In [None]:
epochs = 10

for epoch in range(epochs):
    for i in range(len(X)):
        optimizer.zero_grad()

        y_pred = model(torch.FloatTensor(X[i]))

        single_loss = loss_function(y_pred, torch.FloatTensor(y[i]))
        single_loss.backward()
        optimizer.step()

    print(f'epoch: {epoch:3} loss: {single_loss.item():10.8f}')

print(f'epoch: {epoch:3} loss: {single_loss.item():10.10f}')

In [None]:
model.eval()

In [None]:
test_gen = generator(all_data_normalized,
                     lookback=lookback,
                     delay=delay,
                     min_index=train_len+val_len,
                     max_index=None,
                     step=step,
                     batch_size=batch_size)

# 准备数据
X = []
y = []

howmanybatch = (test_len - lookback) // batch_size  # 需要预测多少个batch
for test_one in test_gen:
    X.append(test_one[0])
    y.append(test_one[1])
    howmanybatch = howmanybatch - 1
    if howmanybatch == 0:
        break

test_X = np.vstack(X)
test_y = np.hstack(y)

In [None]:
test_X.shape

In [None]:
test_y.shape

In [None]:
with torch.no_grad():
    predict_y = model(torch.FloatTensor(test_X))

In [None]:
predict_y.shape

In [None]:
test_y = scaler.inverse_transform(np.repeat(test_y.reshape(-1,1), dim_per_time, axis=1))[:,target_dim_index]

In [None]:
predict_y = scaler.inverse_transform(np.repeat(predict_y.detach().numpy().reshape(-1,1), dim_per_time, axis=1))[:,target_dim_index]

In [None]:
from sklearn.metrics import mean_squared_error,mean_absolute_error,r2_score
from math import sqrt

In [None]:
# 误差评估
print('mae : ' + str(mean_absolute_error(test_y, predict_y)))
print('rmse : ' + str(sqrt(mean_squared_error(test_y, predict_y))))
print('r2 : ' + str(r2_score(test_y,predict_y)))

# 预测结果部分展现
plt.figure(figsize=(6, 3))
plt.plot(test_y, label='Actual')
plt.plot(predict_y, label='MogLSTM')
plt.xlabel('Time')
plt.ylabel('Temperature')
plt.legend()
plt.show()