In [112]:
import math
import time

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from matplotlib import pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, explained_variance_score, \
    mean_absolute_percentage_error
from sklearn.preprocessing import StandardScaler

from torchinfo import summary
from torch.utils.data import TensorDataset, DataLoader

seed = 1
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

In [113]:
################################################################################
# LSTM Decompostion=FC+LSTMCell
################################################################################

In [114]:
class SVDDecomposedLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, rank_approx):
        super(SVDDecomposedLSTM, self).__init__()
        self.rank_approx = rank_approx
        self.hidden_size = hidden_size

        # 创建分解权重参数
        self.lstm1_ih_u = nn.Parameter(torch.Tensor(4*hidden_size, rank_approx))
        self.lstm1_ih_s = nn.Parameter(torch.Tensor(rank_approx))
        self.lstm1_ih_v = nn.Parameter(torch.Tensor(rank_approx, input_size))

        self.lstm1_hh_u = nn.Parameter(torch.Tensor(4*hidden_size, rank_approx))
        self.lstm1_hh_s = nn.Parameter(torch.Tensor(rank_approx))
        self.lstm1_hh_v = nn.Parameter(torch.Tensor(rank_approx, hidden_size))

        self.lstm2_ih_u = nn.Parameter(torch.Tensor(4*hidden_size, rank_approx))
        self.lstm2_ih_s = nn.Parameter(torch.Tensor(rank_approx))
        self.lstm2_ih_v = nn.Parameter(torch.Tensor(rank_approx, hidden_size))

        self.lstm2_hh_u = nn.Parameter(torch.Tensor(4*hidden_size, rank_approx))
        self.lstm2_hh_s = nn.Parameter(torch.Tensor(rank_approx))
        self.lstm2_hh_v = nn.Parameter(torch.Tensor(rank_approx, hidden_size))

        self.fc_u = nn.Parameter(torch.Tensor(output_size, rank_approx))
        self.fc_s = nn.Parameter(torch.Tensor(rank_approx))
        self.fc_v = nn.Parameter(torch.Tensor(rank_approx, hidden_size))

        self.fc_bias = nn.Parameter(torch.zeros(output_size))  # 全连接层偏置

        self.init_weights()

    def init_weights(self):
        # 初始化参数
        stdv = 1.0 / torch.sqrt(torch.tensor(self.hidden_size, dtype=torch.float))
        for param in self.parameters():
            param.data.uniform_(-stdv, stdv)

    def forward(self, x, hidden=None):
        batch_size = x.size(0)
        if hidden is None:
            # 动态初始化隐藏状态
            hidden = ((torch.zeros(batch_size, self.hidden_size, device=x.device),
                       torch.zeros(batch_size, self.hidden_size, device=x.device)),
                      (torch.zeros(batch_size, self.hidden_size, device=x.device),
                       torch.zeros(batch_size, self.hidden_size, device=x.device)))

        hx1, cx1 = hidden[0]
        hx2, cx2 = hidden[1]

        # 使用分解后的权重重建原始权重进行计算
        w_ih1 = torch.mm(self.lstm1_ih_u, torch.mm(torch.diag(self.lstm1_ih_s), self.lstm1_ih_v))
        w_hh1 = torch.mm(self.lstm1_hh_u, torch.mm(torch.diag(self.lstm1_hh_s), self.lstm1_hh_v))
        hx1, cx1 = self.custom_lstm_cell(x, (hx1, cx1), w_ih1, w_hh1)

        w_ih2 = torch.mm(self.lstm2_ih_u, torch.mm(torch.diag(self.lstm2_ih_s), self.lstm2_ih_v))
        w_hh2 = torch.mm(self.lstm2_hh_u, torch.mm(torch.diag(self.lstm2_hh_s), self.lstm2_hh_v))
        hx2, cx2 = self.custom_lstm_cell(hx1, (hx2, cx2), w_ih2, w_hh2)

        w_fc = torch.mm(self.fc_u, torch.mm(torch.diag(self.fc_s), self.fc_v))
        out = torch.addmm(self.fc_bias, hx2, w_fc.t())

        return out, ((hx1, cx1), (hx2, cx2))

    def custom_lstm_cell(self, input, hidden, w_ih, w_hh, b_ih=None, b_hh=None):
        hx, cx = hidden  # hx 和 cx 都应该是 [batch_size, hidden_size]

        # 确保 input 和 hx 是二维的且具有正确的尺寸
        if input.dim() == 3:
            input = input.view(-1,
                               input.size(2))  # 假设 input 的形状为 [batch_size, 1, input_size]，平展为 [batch_size, input_size]
        if hx.dim() == 3:
            hx = hx.view(-1, hx.size(2))  # 假设 hx 的形状为 [batch_size, 1, hidden_size]，平展为 [batch_size, hidden_size]

        # 应用线性变换
        gates = F.linear(input, w_ih, b_ih) + F.linear(hx, w_hh, b_hh)  # gates 应该是 [batch_size, 4*hidden_size]

        # 分割门控制向量
        ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)

        # 激活函数
        ingate = torch.sigmoid(ingate)
        forgetgate = torch.sigmoid(forgetgate)
        cellgate = torch.tanh(cellgate)
        outgate = torch.sigmoid(outgate)

        cy = (forgetgate * cx) + (ingate * cellgate)
        hy = outgate * torch.tanh(cy)

        return hy, cy

    def count_parameters(self):
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

In [115]:
# Parameters define
input_size = 7
hidden_size = 128
output_size = 1
rank_approx=35
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [116]:
model = SVDDecomposedLSTM(input_size, hidden_size, output_size, rank_approx).to(device)
print(summary(model))
torch.save(model,'modelCache/SVD_LSTM.pth')

Layer (type:depth-idx)                   Param #
SVDDecomposedLSTM                        90,056
Total params: 90,056
Trainable params: 90,056
Non-trainable params: 0


In [117]:
seq_len = 1  # Sequence length
batch_size = 1  # Number of sequences in the batch
x = torch.randn(batch_size, seq_len, input_size).to(device)

model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    output, _ = model(x)

print("Input shape:", x.shape)
print("Output shape:", output.shape)
print("Output:", output)

Input shape: torch.Size([1, 1, 7])
Output shape: torch.Size([1, 1])
Output: tensor([[0.0254]], device='cuda:0')
