<a href="https://colab.research.google.com/github/byrcewang/DL_SS2H/blob/main/Transformer_PyTorch_big.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [27]:
import torch
import torch.nn as nn

# Step 1: Define the Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len=512):
        super(PositionalEncoding, self).__init__()
        position = torch.arange(0, max_seq_len).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model))
        self.positional_encoding = torch.zeros(1, max_seq_len, d_model)
        self.positional_encoding[0, :, 0::2] = torch.sin(position * div_term)
        self.positional_encoding[0, :, 1::2] = torch.cos(position * div_term)

    def forward(self, x):
        return x + self.positional_encoding

# Step 2: Define the Multi-Head Attention Layer
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads
        assert d_model % num_heads == 0
        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)
        self.fc = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]
        Q = self.wq(query).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        K = self.wk(key).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        V = self.wv(value).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)

        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / torch.sqrt(self.head_dim)

        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        attention = torch.nn.functional.softmax(energy, dim=-1)
        x = torch.matmul(attention, V)
        x = x.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.num_heads * self.head_dim)
        x = self.fc(x)
        return x

# Step 3: Define the Position-wise Feedforward Layer
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Step 4: Define the Encoder Layer
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.multihead_attention = MultiHeadAttention(d_model, num_heads)
        self.positionwise_ff = PositionWiseFeedForward(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        attention_output = self.multihead_attention(x, x, x, mask)
        x = x + self.dropout(attention_output)
        x = self.layer_norm1(x)
        feed_forward_output = self.positionwise_ff(x)
        x = x + self.dropout(feed_forward_output)
        x = self.layer_norm2(x)
        return x

# Step 5: Define the Transformer Encoder
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, max_seq_len, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        self.layers = nn.ModuleList([TransformerEncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.positional_encoding = PositionalEncoding(d_model, max_seq_len)

    def forward(self, x, mask):
        x = x + self.positional_encoding
        for layer in self.layers:
            x = layer(x, mask)
        return x

# Step 6: Create the Big Transformer Model
class BigTransformerModel(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, max_seq_len, num_classes, dropout=0.1):
        super(BigTransformerModel, self).__init__()
        self.encoder = TransformerEncoder(num_layers, d_model, num_heads, d_ff, max_seq_len, dropout)
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x, mask):
        x = self.encoder(x, mask)
        x = x.mean(dim=1)  # Global average pooling
        x = self.fc(x)
        return x

# Step 7: Instantiate the Model
num_layers = 6
d_model = 1024
num_heads = 16
d_ff = 4096
max_seq_len = 512
num_classes = 10  # Change this to your desired number of output classes
dropout = 0.1

model = BigTransformerModel(num_layers, d_model, num_heads, d_ff, max_seq_len, num_classes, dropout)

# Now you can use this model for your specific task, such as text classification or machine translation, by feeding data through it.

In [25]:
import torch
import torch.nn as nn

# 步骤 1: 定义位置编码（Positional Encoding）
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_len=512):
        super(PositionalEncoding, self).__init__()

        # 生成位置编码的位置索引（0到max_seq_len-1）
        position = torch.arange(0, max_seq_len).unsqueeze(1)

        # 计算位置编码的分母项
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-torch.log(torch.tensor(10000.0)) / d_model))

        # 创建位置编码矩阵，初始化为全零
        self.positional_encoding = torch.zeros(1, max_seq_len, d_model)

        # 使用正弦和余弦函数来填充位置编码矩阵
        self.positional_encoding[0, :, 0::2] = torch.sin(position * div_term)
        self.positional_encoding[0, :, 1::2] = torch.cos(position * div_term)

    def forward(self, x):
        # 在输入张量x上加上位置编码
        return x + self.positional_encoding


# 步骤 2: 定义多头自注意力层（Multi-Head Attention Layer）
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()

        # 设置多头注意力的头数和每个头的维度
        self.num_heads = num_heads
        self.head_dim = d_model // num_heads

        # 确保模型维度能够被头数整除
        assert d_model % num_heads == 0

        # 创建权重矩阵，用于查询（Q）、键（K）、值（V）的线性变换
        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)

        # 用于最终输出的线性变换
        self.fc = nn.Linear(d_model, d_model)

    def forward(self, query, key, value, mask=None):
        batch_size = query.shape[0]

        # 对查询、键、值进行线性变换并重塑张量以获得多头维度
        Q = self.wq(query).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        K = self.wk(key).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)
        V = self.wv(value).view(batch_size, -1, self.num_heads, self.head_dim).permute(0, 2, 1, 3)

        # 计算注意力得分
        energy = torch.matmul(Q, K.permute(0, 1, 3, 2)) / torch.sqrt(self.head_dim)

        # 如果存在遮罩（mask），则将注意力得分中的特定位置替换为负无穷
        if mask is not None:
            energy = energy.masked_fill(mask == 0, float("-1e20"))

        # 使用 softmax 函数获得归一化的注意力权重
        attention = torch.nn.functional.softmax(energy, dim=-1)

        # 根据注意力权重计算加权和
        x = torch.matmul(attention, V)

        # 重塑张量以恢复原始形状
        x = x.permute(0, 2, 1, 3).contiguous().view(batch_size, -1, self.num_heads * self.head_dim)

        # 使用全连接层进行最终线性变换
        x = self.fc(x)

        return x

# 步骤 3: 定义位置前馈层（Position-wise Feedforward Layer）
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        # 使用全连接层 fc1 将输入维度 d_model 转换为 d_ff 维度
        self.fc1 = nn.Linear(d_model, d_ff)
        # 使用全连接层 fc2 将 d_ff 维度转换回 d_model 维度
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, x):
        # 应用 ReLU 激活函数
        x = torch.relu(self.fc1(x))
        # 通过 fc2 进行另一次线性变换
        x = self.fc2(x)
        return x

# 步骤 4: 定义编码器层（Encoder Layer）
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        # 多头自注意力层，传入输入维度 d_model 和注意头数 num_heads
        self.multihead_attention = MultiHeadAttention(d_model, num_heads)
        # 位置前馈层，传入输入维度 d_model 和 d_ff 维度
        self.positionwise_ff = PositionWiseFeedForward(d_model, d_ff)
        # 第一个层归一化（LN）层
        self.layer_norm1 = nn.LayerNorm(d_model)
        # 第二个层归一化（LN）层
        self.layer_norm2 = nn.LayerNorm(d_model)
        # 用于增强模型的 dropout 操作
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask):
        # 使用多头自注意力层处理输入 x，mask 用于遮蔽无关信息
        attention_output = self.multihead_attention(x, x, x, mask)
        # 将自注意力层的输出与输入相加，用于残差连接
        x = x + self.dropout(attention_output)
        # 应用第一个 LN 层，用于规范化
        x = self.layer_norm1(x)
        # 使用位置前馈层处理输出 x
        feed_forward_output = self.positionwise_ff(x)
        # 将前馈层的输出与输入相加，用于残差连接
        x = x + self.dropout(feed_forward_output)
        # 应用第二个 LN 层，用于规范化
        x = self.layer_norm2(x)
        return x


# 步骤 5: 定义Transformer编码器（Transformer Encoder）
class TransformerEncoder(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, max_seq_len, dropout=0.1):
        super(TransformerEncoder, self).__init__()
        # 创建一个由多个TransformerEncoderLayer组成的列表，并将它们堆叠起来
        self.layers = nn.ModuleList([TransformerEncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        # 添加位置编码器，用于处理序列的位置信息
        self.positional_encoding = PositionalEncoding(d_model, max_seq_len)

    def forward(self, x, mask):
        # 将位置编码添加到输入张量中
        x = x + self.positional_encoding
        # 通过多个TransformerEncoderLayer来编码输入
        for layer in self.layers:
            x = layer(x, mask)
        return x

# 步骤 6: 创建大型Transformer模型
class BigTransformerModel(nn.Module):
    def __init__(self, num_layers, d_model, num_heads, d_ff, max_seq_len, num_classes, dropout=0.1):
        super(BigTransformerModel, self).__init__()
        # 创建一个Transformer编码器，用于处理输入序列
        self.encoder = TransformerEncoder(num_layers, d_model, num_heads, d_ff, max_seq_len, dropout)
        # 添加一个全连接层，用于将Transformer编码器的输出映射到类别标签
        self.fc = nn.Linear(d_model, num_classes)

    def forward(self, x, mask=None):
        # 将位置编码添加到输入张量中
        x = x + self.encoder.positional_encoding
        # 通过Transformer编码器处理输入
        x = self.encoder(x, mask)
        # 进行全局平均池化，以获得最终的输出表示
        x = x.mean(dim=1)
        # 使用全连接层进行最终的分类
        x = self.fc(x)
        return x

# 步骤 7: 实例化模型
num_layers = 6  # Transformer 编码器中的层数
d_model = 1024  # 模型的维度
num_heads = 16  # 多头自注意力机制中的头数
d_ff = 4096  # 位置前馈层的隐藏层维度
max_seq_len = 512  # 最大序列长度
num_classes = 10  # 将此值更改为您所需的输出类别数
dropout = 0.1  # 随机失活概率

model = BigTransformerModel(num_layers, d_model, num_heads, d_ff, max_seq_len, num_classes, dropout)
# 现在，您可以通过模型将数据馈送到特定任务，如文本分类或机器翻译中。