In [1]:
import torch
import torch.nn.functional as F
import torch.nn as nn

In [2]:
# 实例化LSTM对象
# 第一个参数: input_size(输入张量维度)
# 第二个参数: hidden_size(隐藏层维度)
# 第三个参数: num_layer(隐层层的层数)

input_size = 5
hidden_size = 6
num_layer = 2
seq_len = 4
batch_size = 3
lstm = nn.LSTM(input_size, hidden_size, num_layer, bidirectional=False)
# 初始化张量x
# 第一个参数: seq_len(输入序列长度)
# 第二个参数: batch_size(批次样本数)
# 第三个参数: input_size(输入张量维度)
input1 = torch.randn(seq_len, batch_size, input_size)
# 初始化隐层张量h0,细胞状态c0
# 第一个参数: num_layer*方向数(隐层层的层数*方向数)
# 第二个参数: batch_size(批次样本数)
# 第三个参数: hidden_size(隐藏层维度)
h0 = torch.randn(num_layer, batch_size, hidden_size)
c0 = torch.randn(num_layer, batch_size, hidden_size)

outputs, (hn, cn) = lstm(input1, (h0, c0))
print(outputs.shape)
print(hn.shape)
print(cn.shape)

torch.Size([4, 3, 6])
torch.Size([2, 3, 6])
torch.Size([2, 3, 6])


In [6]:
# 使用示例
# input_size = 10  # 输入维度
# hidden_size = 20  # 隐藏层维度
# layer_size = 2    # LSTM层数
# bidirectional = True  # 双向LSTM

# lstm = MyLSTM(input_size, hidden_size, layer_size, bidirectional)

# # 模拟输入：假设序列长度为5，batch size为3，输入维度为10
# seq_len = 5
# batch_size = 3
# inputs = torch.randn(seq_len, batch_size, input_size)

# # 初始化隐藏状态和记忆单元，双向LSTM每层有两个方向
# h_0 = [torch.zeros(batch_size, hidden_size) for _ in range(layer_size * (2 if bidirectional else 1))]
# c_0 = [torch.zeros(batch_size, hidden_size) for _ in range(layer_size * (2 if bidirectional else 1))]

# # 前向传播
# output, (h_n, c_n) = lstm(inputs, (h_0, c_0))

# print("输出：", output)


In [4]:
# 使用示例
# input_size = 10  # 输入维度
# hidden_size = 20  # 隐藏层维度
# layer_size = 2    # LSTM层数

# input_size = 5
# hidden_size = 6
# layer_size = 2
# seq_len = 1
# batch_size = 3

# bidirectional = True  # 双向LSTM

# lstm = MyLSTM(input_size, hidden_size, layer_size, bidirectional)

# # 模拟输入：假设序列长度为5，batch size为3，输入维度为10
# # seq_len = 5
# batch_size = 3
# inputs = torch.randn(seq_len, batch_size, input_size)

# # 初始化隐藏状态和记忆单元，双向LSTM每层有两个方向
# h_0 = [torch.zeros(batch_size, hidden_size) for _ in range(layer_size * (2 if bidirectional else 1))]
# c_0 = [torch.zeros(batch_size, hidden_size) for _ in range(layer_size * (2 if bidirectional else 1))]

# # 前向传播
# output, (h_n, c_n) = lstm(inputs, (h_0, c_0))

# print("输出：", output)


In [2]:
# import torch
# import torch.nn as nn

class MyLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, layer_size=1, bidirectional=False):
        super(MyLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.layer_size = layer_size
        self.bidirectional = bidirectional
        self.num_directions = 2 if bidirectional else 1

        # 初始化每一层的LSTM
        self.lstm_layers = nn.ModuleList()

        for layer in range(layer_size):
            if layer == 0:
                in_size = input_size
            else:
                in_size = hidden_size * self.num_directions
            
            # 添加一层LSTM
            self.lstm_layers.append(self._create_lstm_layer(in_size, layer))

    def _create_lstm_layer(self, input_size, layer_idx):
        # 一个LSTM层中包含正向和（可能有）反向
        layer = nn.ModuleDict()
        # 给每一层的forward和backward加上唯一的层索引
        layer[f'forward_{layer_idx}'] = self._create_lstm_unit(input_size)
        if self.bidirectional:
            layer[f'backward_{layer_idx}'] = self._create_lstm_unit(input_size)
        return layer

    def _create_lstm_unit(self, input_size):
        # LSTM 单元的各个门
        unit = nn.ModuleDict({
            'W_f': nn.Linear(input_size + self.hidden_size, self.hidden_size),
            'W_i': nn.Linear(input_size + self.hidden_size, self.hidden_size),
            'W_o': nn.Linear(input_size + self.hidden_size, self.hidden_size),
            'W_c': nn.Linear(input_size + self.hidden_size, self.hidden_size)
        })
        return unit

    def forward(self, x, h0=None, C0=None):
        batch_size = x.size(0)
        seq_len = x.size(1)
        
        # 初始化隐藏状态和细胞状态
        if h0 is None:
            h0 = torch.zeros(self.layer_size * self.num_directions, batch_size, self.hidden_size).to(x.device)
        if C0 is None:
            C0 = torch.zeros(self.layer_size * self.num_directions, batch_size, self.hidden_size).to(x.device)

        h_n, C_n = [], []
        
        for layer_idx, lstm_layer in enumerate(self.lstm_layers):
            h_layer = []
            C_layer = []
            output_fwd, output_bwd = [], []

            # 正向传播
            h_fwd = h0[layer_idx * self.num_directions]
            C_fwd = C0[layer_idx * self.num_directions]
            for t in range(seq_len):
                h_fwd, C_fwd = self._lstm_step(lstm_layer[f'forward_{layer_idx}'], x[:, t, :], h_fwd, C_fwd)
                output_fwd.append(h_fwd.unsqueeze(1))

            output_fwd = torch.cat(output_fwd, dim=1)

            if self.bidirectional:
                # 反向传播
                h_bwd = h0[layer_idx * self.num_directions + 1]
                C_bwd = C0[layer_idx * self.num_directions + 1]
                for t in reversed(range(seq_len)):
                    h_bwd, C_bwd = self._lstm_step(lstm_layer[f'backward_{layer_idx}'], x[:, t, :], h_bwd, C_bwd)
                    output_bwd.append(h_bwd.unsqueeze(1))
                
                output_bwd = torch.cat(output_bwd, dim=1)
                output = torch.cat([output_fwd, output_bwd], dim=2)  # 双向拼接
            else:
                output = output_fwd

            # 更新输入 x 为输出
            x = output
            h_layer.append(h_fwd)
            C_layer.append(C_fwd)

            if self.bidirectional:
                h_layer.append(h_bwd)
                C_layer.append(C_bwd)

            h_n.append(torch.stack(h_layer))
            C_n.append(torch.stack(C_layer))

        h_n = torch.stack(h_n, dim=0)  # 堆叠隐藏状态
        C_n = torch.stack(C_n, dim=0)  # 堆叠细胞状态
        
        return output, (h_n, C_n)

    def _lstm_step(self, unit, x_t, h_prev, C_prev):
        concat = torch.cat((h_prev, x_t), dim=1)

        f_t = torch.sigmoid(unit['W_f'](concat))
        i_t = torch.sigmoid(unit['W_i'](concat))
        C_tilde = torch.tanh(unit['W_c'](concat))
        o_t = torch.sigmoid(unit['W_o'](concat))

        C_t = f_t * C_prev + i_t * C_tilde
        h_t = o_t * torch.tanh(C_t)

        return h_t, C_t


In [3]:
# 测试MyLSTM
input_size = 5
hidden_size = 6
layer_size = 2
seq_len = 4
batch_size = 3
bidirectional = False

# 假设输入为 (batch_size, seq_len, input_size)
x = torch.randn(batch_size, seq_len, input_size)  # batch_size=4, seq_len=6, input_size=5

# 实例化自定义 LSTM
my_lstm = MyLSTM(input_size, hidden_size, layer_size, bidirectional)

# 执行前向传播
output, (h_n, C_n) = my_lstm(x)
print("output shape:", output.shape)  # (batch_size, seq_len, hidden_size * num_directions)
print("h_n shape:", h_n.shape)  # (layer_size * num_directions, batch_size, hidden_size)
print("C_n shape:", C_n.shape)  # (layer_size * num_directions, batch_size, hidden_size)

output shape: torch.Size([3, 4, 6])
h_n shape: torch.Size([2, 1, 3, 6])
C_n shape: torch.Size([2, 1, 3, 6])


In [None]:
input_size = 5
hidden_size = 6
num_layer = 2
seq_len = 1
batch_size = 3

In [2]:
# 目前报错，暂时没有找到原因
class My_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layer, bidirectional=False):
        super(My_LSTM, self).__init__()
        # 定义参数
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layer = num_layer
        self.bidirectional = bidirectional
        self.num_directions = 2 if self.bidirectional else 1

        # 构建子模块
        self.lstm_layer = nn.ModuleList()
        for layer in range(self.num_layer):
            # 修正接口，对于非第一层，输入的接口大小与前一层有关，且大小为input_size*self.num_directions
            if layer == 0:
                in_size = input_size
            else:
                in_size = input_size*self.num_directions
            # 添加lstm_layer层
            self.lstm_layer.append(self._create_lstm_layer(in_size, layer))

    # 创建lstm层--根据传入数据的特征维度和创建对应层的序号
    def _create_lstm_layer(self, input_size, layer_idx):
        # 创建双向lstm层--单双向为了方便调用，采用键值对方式构建
        layer = nn.ModuleDict()
        layer[f"forward_{layer_idx}"] = self._create_lstm_unit(input_size)
        if self.bidirectional:
            # 双向
            layer[f"backward_{layer_idx}"] = self._create_lstm_unit(input_size)
        return layer

    def _create_lstm_unit(self, input_size):
        # 创建lstm层的参数
        unit = nn.ModuleDict({
            "w_f": nn.Linear(input_size + self.hidden_size, self.hidden_size),
            "w_i": nn.Linear(input_size + self.hidden_size, self.hidden_size),
            "w_c": nn.Linear(input_size + self.hidden_size, self.hidden_size),
            "w_o": nn.Linear(input_size + self.hidden_size, self.hidden_size)
        })
        return unit

    # 前向传播
    def forward(self, inputs, h_0=None, c_0=None):
        batch_size = inputs.size(0)
        seq_len = inputs.size(1)

        if h_0 is None:
            h_0 = torch.zeros(self.num_layer*self.num_directions, batch_size, self.hidden_size)
        if c_0 is None:
            c_0 = torch.zeros(self.num_layer*self.num_directions, batch_size, self.hidden_size)

        # 记录隐藏状态和细胞状态
        h_n, c_n = [], []
        # 先对层进行循环，在每一层中对时间片循环
        # 对层循环的过程中，按照键值对方式取层模块
        for layer_idx, lstm_layer in enumerate(self.lstm_layer):
            h_layer = []
            c_layer = []
            output_fwd, output_bwd = [], []

            # 正向传播
            h_fwd = h_0[layer_idx*self.num_directions]
            c_fwd = c_0[layer_idx*self.num_directions]
            for t in range(seq_len):
                h_fwd, c_fwd = self._lstm_step(lstm_layer[f"forward_{layer_idx}"], inputs[:, t, :], h_fwd, c_fwd)
                output_fwd.append(h_fwd.unsqueeze(1))
            output_fwd = torch.cat(output_fwd, dim=1)

            if self.bidirectional:
                # 如果是双向的，那么准备计算反向过程
                h_bwd = h_0[layer_idx * self.num_directions + 1]
                c_bwd = c_0[layer_idx * self.num_directions + 1]
                for t in reversed(range(seq_len)):
                    h_bwd, c_bwd = self._lstm_step(lstm_layer[f"backward_{layer_idx}"], inputs[:, t, :], h_bwd, c_bwd)
                    output_bwd.append(h_bwd.unsqueeze(1))
                output_bwd = torch.cat(output_bwd, dim=1)
                # 对output进行双向拼接
                output = torch.cat([output_fwd, output_bwd], dim=2)
            else:
                output = output_fwd

            # 进入下一层，对inputs进行更新
            inputs = output
            h_layer.append(h_fwd)
            c_layer.append(c_fwd)
            if self.bidirectional:
                h_layer.append(h_bwd)
                c_layer.append(c_bwd)
            h_n.append(torch.stack(h_layer))
            c_n.append(torch.stack(c_layer))
        h_n = torch.stack(h_n, dim=0)
        c_n = torch.stack(c_n, dim=0)
        return output, (h_n, c_n)


    def _lstm_step(self, unit, inputs, h_pre, c_pre):
        concat = torch.cat((h_pre, inputs), dim=1)
        f_t = torch.sigmoid(unit["w_f"](concat))
        i_t = torch.sigmoid(unit["w_i"](concat))
        c_t_title = torch.tanh(unit["w_c"](concat))
        o_t = torch.sigmoid(unit["w_o"](concat))
        
        c_t = f_t * c_pre + i_t * c_t_title
        h_t = o_t * torch.tanh(c_t)

        return h_t, c_t
        
    

In [4]:
# 测试My_LSTM
input_size = 5
hidden_size = 6
layer_size = 2
seq_len = 4
batch_size = 3
bidirectional = False

# 假设输入为 (batch_size, seq_len, input_size)
x = torch.randn(batch_size, seq_len, input_size)  # batch_size=4, seq_len=6, input_size=5

# 实例化自定义 LSTM
my_lstm = My_LSTM(input_size, hidden_size, layer_size, bidirectional)

# 执行前向传播
output, (h_n, C_n) = my_lstm(x)
print("output shape:", output.shape)  # (batch_size, seq_len, hidden_size * num_directions)
print("h_n shape:", h_n.shape)  # (layer_size * num_directions, batch_size, hidden_size)
print("C_n shape:", C_n.shape)  # (layer_size * num_directions, batch_size, hidden_size)