# Transformer 代码实践


In [50]:
import torch
from torch import nn
import torch.nn.functional as F
import math

In [51]:
# 设计词嵌入类
class TokenEmbedding(nn.Embedding):
    def __init__(self, vocab_size, dim):
        super(TokenEmbedding, self).__init__(
            vocab_size,
            dim,
            padding_idx=1,
        )

In [52]:
# 设计位置嵌入
class PositionalEmbedding(nn.Module):
    def __init__(self, dim, max_len, device):
        super(PositionalEmbedding, self).__init__()
        self.encoding = torch.zeros(max_len, dim, device=device)
        # 禁用向后传播
        self.encoding.requires_grad = False
        pos = torch.arange(0, max.len, device=device)
        pos = pos.float().unsqueeze(dim=1)
        _2i = torch.arange(0, dim, step=2, device=device).float()
        self.encoding[:, 0::2] = torch.sin(pos / (10000 ** (_2i / dim)))
        self.encoding[:, 1::2] = torch.cos(pos / (10000 ** (_2i / dim)))

    def forward(self, x):
        batch_size, seq_len = x.size()
        return self.encoding[:seq_len, :]

In [53]:
# 设计transformer编码
class TransformerEmbedding(nn.Module):
    def __init__(self, vocab_size, dim, max_len, drop_prod, device):
        super(TransformerEmbedding, self).__init__()
        self.tok_emb = TokenEmbedding(vocab_size, dim)
        self.pos_emb = PositionalEmbedding(dim, max_len, device)
        self.drop_out = nn.Dropout(p=drop_prod)

    def forward(self, x: torch.Tensor):
        tok_emb = self.tok_emb(x)
        pos_emb = self.pos_emb(x)
        return self.drop_out(tok_emb + pos_emb)

In [54]:
# 多头注意力机制
class MultiHeadAttention(nn.Module):
    def __init__(self, dim, n_head):
        super(MultiHeadAttention, self).__init__()
        self.dim = dim
        self.n_head = n_head
        self.w_q = nn.Linear(dim, dim)
        self.w_k = nn.Linear(dim, dim)
        self.w_v = nn.Linear(dim, dim)
        self.w_combine = nn.Linear(dim, dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, q, k, v, mask=None):
        batch, time, dimension = q.shape
        n_d = dimension // self.n_head
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)
        q = q.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        k = k.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        v = v.view(batch, time, self.n_head, n_d).permute(0, 2, 1, 3)
        score = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(n_d)

        if mask is not None:
            score = score.masked_fill(mask == 0, -10000)

        score = self.softmax(score)
        score = torch.matmul(score, v)
        score = score.permute(0, 2, 1, 3).contiguous().view(batch, time, dimension)

        return self.w_combine(score)

In [55]:
# layernorm
class LayerNorm(nn.Module):
    def __init__(self, dim, eps=1e-12):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(dim))
        self.beta = nn.Parameter(torch.zeros(dim))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(-1, keepdim=True)
        var = x.var(-1, unbiased=False, keepdim=True)
        out = (x - mean) / torch.sqrt(var + self.eps)
        out = self.gamma * out + self.beta
        return out

In [56]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, dim, hidden, dropout=0.1):
        super(PositionwiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(dim, hidden)
        self.fc2 = nn.Linear(hidden, dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [57]:
class EncoderLayer(nn.Module):
    def __init__(self, dim, ffn_hidden, n_head, dropout=0.1):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(dim, n_head)
        self.norm1 = LayerNorm(dim)
        self.dropout1 = nn.Dropout(dropout)
        self.ffn = PositionwiseFeedForward(dim, ffn_hidden, dropout)
        self.norm2 = LayerNorm(dim)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        res_x = x
        x = self.attention(x, x, x, mask)
        x = self.dropout1(x)
        x = self.norm1(res_x + x)
        res_x = x
        x = self.ffn(x)
        x = self.dropout2(x)
        x = self.norm2(res_x + x)
        return x

In [58]:
class Encoder(nn.Module):
    def __init__(
        self,
        enc_voc_size: int,
        max_len: int,
        d_model: int,
        ffn_hidden: int,
        n_head: int,
        n_layer: int,
        dropout: float = 0.1,
        device: torch.device = torch.device("cpu"),
    ):
        super(Encoder, self).__init__()
        self.embedding = TransformerEmbedding(
            enc_voc_size, d_model, max_len, dropout, device
        )
        self.layers = nn.ModuleList(
            [EncoderLayer(d_model, ffn_hidden, n_head, dropout) for _ in range(n_layer)]
        )

    def forward(self, x: torch.Tensor, s_mask: torch.Tensor) -> torch.Tensor:
        x = self.embedding(x)
        for layer in self.layers:
            x = layer(x, s_mask)
        return x

In [59]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model: int, ffn_hidden: int, n_head: int, drop_prob: float):
        super(DecoderLayer, self).__init__()
        self.attention1 = MultiHeadAttention(d_model, n_head)
        self.norm1 = LayerNorm(d_model)
        self.dropout1 = nn.Dropout(drop_prob)
        self.cross_attention = MultiHeadAttention(d_model, n_head)
        self.norm2 = LayerNorm(d_model)
        self.dropout2 = nn.Dropout(drop_prob)
        self.ffn = PositionwiseFeedForward(d_model, ffn_hidden, drop_prob)
        self.norm3 = LayerNorm(d_model)
        self.dropout3 = nn.Dropout(drop_prob)

    def forward(
        self,
        dec: torch.Tensor,
        enc: torch.Tensor,
        t_mask: torch.Tensor,
        s_mask: torch.Tensor,
    ) -> torch.Tensor:
        _x = dec
        x = self.attention1(dec, dec, dec, t_mask)
        x = self.dropout1(x)
        x = self.norm1(x + _x)
        _x = x
        x = self.cross_attention(x, enc, enc, s_mask)
        x = self.dropout2(x)
        x = self.norm2(x + _x)
        _x = x
        x = self.ffn(x)
        x = self.dropout3(x)
        x = self.norm3(x + _x)
        return x

In [60]:
class Decoder(nn.Module):
    def __init__(
        self,
        dec_voc_size: int,
        max_len: int,
        d_model: int,
        ffn_hidden: int,
        n_head: int,
        n_layer: int,
        drop_prob: float,
        device: torch.device = torch.device("cpu"),
    ):
        super(Decoder, self).__init__()
        self.embedding = TransformerEmbedding(
            dec_voc_size, d_model, max_len, drop_prob, device
        )
        self.layers = nn.ModuleList(
            [
                DecoderLayer(d_model, ffn_hidden, n_head, drop_prob)
                for _ in range(n_layer)
            ]
        )
        self.fc = nn.Linear(d_model, dec_voc_size)

    def forward(
        self,
        dec: torch.Tensor,
        enc: torch.Tensor,
        t_mask: torch.Tensor,
        s_mask: torch.Tensor,
    ) -> torch.Tensor:
        dec = self.embedding(dec)
        for layer in self.layers:
            dec = layer(dec, enc, t_mask, s_mask)
        dec = self.fc(dec)
        return dec

In [61]:
import torch
import torch.nn as nn
import torch.optim as optim

# 假设你已经定义了 Encoder, Decoder, TransformerEmbedding, MultiHeadAttention, LayerNorm, PositionwiseFeedForward 类


class Transformer(nn.Module):
    def __init__(
        self,
        src_pad_idx: int,
        trg_pad_idx: int,
        enc_voc_size: int,
        dec_voc_size: int,
        d_model: int,
        max_len: int,
        n_heads: int,
        ffn_hidden: int,
        n_layers: int,
        drop_prob: float,
        device: torch.device = torch.device("cpu"),
    ):
        super(Transformer, self).__init__()
        self.encoder = Encoder(
            enc_voc_size,
            max_len,
            d_model,
            ffn_hidden,
            n_heads,
            n_layers,
            drop_prob,
            device,
        )
        self.decoder = Decoder(
            dec_voc_size,
            max_len,
            d_model,
            ffn_hidden,
            n_heads,
            n_layers,
            drop_prob,
            device,
        )
        self.src_pad_idx = src_pad_idx
        self.trg_pad_idx = trg_pad_idx
        self.device = device

    def make_pad_mask(
        self, q: torch.Tensor, k: torch.Tensor, pad_idx_q: int, pad_idx_k: int
    ) -> torch.Tensor:
        len_q, len_k = q.size(1), k.size(1)
        q = q.ne(pad_idx_q).unsqueeze(1).unsqueeze(3)
        q = q.repeat(1, 1, 1, len_k)
        k = k.ne(pad_idx_k).unsqueeze(1).unsqueeze(2)
        k = k.repeat(1, 1, len_q, 1)
        mask = q & k
        return mask

    def make_casual_mask(self, q: torch.Tensor, k: torch.Tensor) -> torch.Tensor:
        len_q, len_k = q.size(1), k.size(1)
        mask = (
            torch.triu(torch.ones(len_q, len_k), diagonal=1)
            .type(torch.BoolTensor)
            .to(self.device)
        )
        return mask

    def forward(self, src: torch.Tensor, trg: torch.Tensor) -> torch.Tensor:
        src_mask = self.make_pad_mask(src, src, self.src_pad_idx, self.src_pad_idx)
        trg_mask = self.make_pad_mask(
            trg, trg, self.trg_pad_idx, self.trg_pad_idx
        ) * self.make_casual_mask(trg, trg)
        enc = self.encoder(src, src_mask)
        out = self.decoder(trg, enc, trg_mask, src_mask)
        return out

In [63]:
import torch
import torch.nn as nn
import torch.optim as optim

# 假设你已经定义了 Encoder, Decoder, TransformerEmbedding, MultiHeadAttention, LayerNorm, PositionwiseFeedForward 类

# 超参数
src_pad_idx = 0
trg_pad_idx = 0
enc_voc_size = 10000
dec_voc_size = 10000
d_model = 512
max_len = 100
n_heads = 8
ffn_hidden = 2048
n_layers = 6
drop_prob = 0.1
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 初始化模型
model = Transformer(
    src_pad_idx,
    trg_pad_idx,
    enc_voc_size,
    dec_voc_size,
    d_model,
    max_len,
    n_heads,
    ffn_hidden,
    n_layers,
    drop_prob,
    device,
).to(device)

# 准备输入数据（示例数据）
src = torch.randint(0, enc_voc_size, (32, max_len)).to(
    device
)  # (batch_size, src_seq_len)
trg = torch.randint(0, dec_voc_size, (32, max_len)).to(
    device
)  # (batch_size, trg_seq_len)

# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss(ignore_index=trg_pad_idx)
optimizer = optim.Adam(model.parameters(), lr=0.0001)

# 训练模型（示例训练循环）
model.train()
for epoch in range(10):  # 训练10个epoch
    optimizer.zero_grad()
    output = model(src, trg[:, :-1])  # 输入目标序列的前n-1个token
    loss = criterion(
        output.view(-1, dec_voc_size), trg[:, 1:].contiguous().view(-1)
    )  # 计算损失
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}, Loss: {loss.item()}")

# 评估模型（示例评估）
model.eval()
with torch.no_grad():
    output = model(src, trg[:, :-1])
    loss = criterion(output.view(-1, dec_voc_size), trg[:, 1:].contiguous().view(-1))
    print(f"Validation Loss: {loss.item()}")

AttributeError: 'builtin_function_or_method' object has no attribute 'len'