In [2]:
import torch
import torch.nn as nn
import math

In [4]:
#定义Multi_head_self_attention、前向神经网络、Transformer Encoder层，最后封装成完整的Transformer

class MHselfattention(nn.Module):
    def __init__(self, dk, dv, num_heads, dmodel):
        super(MHselfattention, self).__init__()
        self.wq = nn.Parameter(torch.randn((num_heads, dmodel, dk)), requires_grad=True)
        self.wk = nn.Parameter(torch.randn((num_heads, dmodel, dk)), requires_grad=True)
        self.wv = nn.Parameter(torch.randn((num_heads, dmodel, dv)), requires_grad=True)
        self.output_linear = nn.Linear(num_heads * dv, dmodel)
        self.scale = math.sqrt(dk)
        self.num_heads = num_heads
        
    def forward(self, x):
        batch_size, seq_len, dmodel = x.size()
        q_splits = torch.stack([torch.matmul(x, self.wq[i]) for i in range(self.num_heads)], dim=0)
        k_splits = torch.stack([torch.matmul(x, self.wk[i]) for i in range(self.num_heads)], dim=0)
        v_splits = torch.stack([torch.matmul(x, self.wv[i]) for i in range(self.num_heads)], dim=0)
        outputs = []
        for i in range(self.num_heads):
            q = q_splits[i]
            k = k_splits[i]
            v = v_splits[i]
            k_trans = k.transpose(-2, -1)
            scores = torch.matmul(q, k_trans) / self.scale
            weights = torch.softmax(scores, dim=1)
            output = torch.matmul(weights, v)
            outputs.append(output)
        concat_outputs = torch.cat(outputs, dim=-1)
        final_output = self.output_linear(concat_outputs)
        return final_output

# multi_head_self_attn = MHselfattention(dk, dv, mum_heads, dmodel)
# x.shape = (batch_size, N, dmodel=512)
# output_multi_head_self_attn = multi_head_self_attn(x)
# output_multi_head_self_attn.shape = (batch_size, N, dmodel=512)

class FeedForward(nn.Module):
    def __init__(self, dmodel, dff, dropout_rate=0.1):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(dmodel, dff)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.linear2 = nn.Linear(dff, dmodel)
    
    def forward(self, x): #输入x的shape=(batch_size, N, dmodel=512)
        x = self.linear1(x) # shape=(batch_size, N, dff)
        x = self.relu(x) # 激活函数不变
        x = self.dropout(x) # dropout不变
        x = self.linear2(x) # shape=(batch_size, N, dmodel)
        return x # 也就是最后输出的x的shape=(batch_size, N, dmodel)

# MHselfattention的output作为FeedForward的input
# feed_forwawrd = FeedForward(dmodel=512, dff, dropout_rate=0.1)
# x.shape = (batch_size, N, dmodel=512)
# output_feedforward = feed_forward(x)
# 也就是最后输出的x的shape=(batch_size, N, dmodel=512)

class TransformerEncoderLayer(nn.Module):
    def __init__(self, dk, dv, num_heads, dmodel, dff, dropout_rate=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.mha = MHselfattention(dk, dv, num_heads, dmodel)
        self.ffn = FeedForward(dmodel, dff, dropout_rate)
        self.layernorm1 = nn.LayerNorm(dmodel)
        self.layernorm2 = nn.LayerNorm(dmodel)
        self.dropout = nn.Dropout(dropout_rate)
        
    def forward(self, x):
        attn_output = self.mha(x) # 先做一次 Multi-Head Attention
        x = self.layernorm1(x + attn_output) # 然后Add & Norm标准化
        ffn_output = self.ffn(x)# 前向传播一次，即经过一次定义的FeedForward
        x = self.layernorm2(x + ffn_output) # 第二次Add & Norm标准化
        return x # 最后的输出shape还是(batch_size, N, dmodel=512)

class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, dk, dv, num_heads, dmodel, dff, dropout_rate=0.1):
        super(TransformerEncoder, self).__init__()
        self.encoder_layers = nn.ModuleList([TransformerEncoderLayer(dk, dv, num_heads, dmodel, dff, dropout_rate) for _ in range(num_layers)])
    
    def forward(self, x):
        for layer in self.encoder_layers:
            x = layer(x)
        return x


In [6]:
# 调用示例

# 定义超参
batch_size = 5
seq_len = 10
dmodel = 16
dk = 8
dv = 8
num_heads = 4
dff = 64
num_layers = 2
dropout_rate = 0.1

# 创建输入数据
x = torch.randn(batch_size, seq_len, dmodel)

# 初始化 Transformer Encoder
encoder = TransformerEncoder(num_layers=num_layers, dk=dk, dv=dv, num_heads=num_heads, dmodel=dmodel, dff=dff, dropout_rate=dropout_rate)

# 前向传播
output = encoder(x)
print(f'Output shape: {output.shape}')

Output shape: torch.Size([5, 10, 16])


In [14]:
def train(x, y_true, num_epochs, model, optimizer, loss_function, device):
    """
    训练 Transformer Encoder 模型
    
    参数：
    - x (Tensor): 输入张量，形状为 (batch_size, N, dmodel)
    - y_true (Tensor): 目标张量，形状为 (batch_size, N, dmodel)
    - num_epochs (int): 训练的轮数
    - model (nn.Module): Transformer Encoder 模型
    - optimizer (torch.optim.Optimizer): 优化器
    - loss_function (nn.Module): 损失函数
    
    返回：
    - final_loss (Tensor): 最终损失值
    - time (int): 总的训练步骤数
    """
    model.to(device) # 将模型和训练数据部署到其他设备上
    x = x.to(device) # 同上
    y_true = y_true.to(device) # 同上
    
    time = 0
    for epoch in range(num_epochs):
        model.train()  # 设定模型为训练模式
        optimizer.zero_grad()  # 清除之前的梯度
        output = model(x) # 前向传播
        # 确保输出的维度与 y_true 匹配
        if output.shape != y_true.shape:
            raise ValueError(f"Output shape {output.shape} does not match target shape {y_true.shape}")

        loss = loss_function(output, y_true) # 计算损失
        loss.backward() # 反向传播
        optimizer.step() # 更新参数
        time += 1
        
        # 打印损失
        if (epoch + 1) % 10 == 0:  # 每 10 个 epoch 打印一次损失
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
    
    return loss, time

In [16]:
# 定义超参、损失函数、优化器，调用训练函数进行训练

# 定义模型参数
batch_size = 20
N = 10
dmodel = 512
dk = 512
dv = 512
num_heads = 8
dff = 64
num_layers = 6
dropout_rate = 0.1
num_epochs = 100

#定义训练设备，我用的是macbook，没有cuda就只能用mps加速
device = torch.device('cuda' if torch.cuda.is_available() else 'mps')

# 创建输入数据和目标数据
x = torch.randn(batch_size, N, dmodel)  # 假设的输入数据
y_true = torch.randn(batch_size, N, dmodel)  # 假设的目标数据

# 初始化 Transformer Encoder 模型
model = TransformerEncoder(num_layers=num_layers, dk=dk, dv=dv, num_heads=num_heads, dmodel=dmodel, dff=dff, dropout_rate=dropout_rate)

# 定义优化器和损失函数
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_function = nn.MSELoss()

# 运行训练
final_loss, final_time = train(x, y_true, num_epochs, model=model, optimizer=optimizer, loss_function=loss_function, device=device)

# 打印最终结果
print(f'Final Loss: {final_loss.item():.4f}')
print(f'Total Time (steps): {final_time}')

Epoch [10/100], Loss: 1.6358
Epoch [20/100], Loss: 1.3809
Epoch [30/100], Loss: 1.2020
Epoch [40/100], Loss: 1.0895
Epoch [50/100], Loss: 1.0303
Epoch [60/100], Loss: 1.0071
Epoch [70/100], Loss: 1.0012
Epoch [80/100], Loss: 1.0016
Epoch [90/100], Loss: 1.0010
Epoch [100/100], Loss: 1.0010
Final Loss: 1.0010
Total Time (steps): 100
