# Building a Transformer with PyTorch 

## Importing the necessary libraries and modules

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy
from torch.utils.data import DataLoader, TensorDataset

print("the env is done")

the env is done


## Defining the Transformer
### Defining the basic block: Muti-Head Attention, Position-wise Feed-Forward Networks, Positional Encoding

![Muti-Head Attention](picture/5c68a161a918e95e471c130be6befe2b.jpg)
> The Multi-Head Attention mechanism computes the attention between each pair of position in a sequence. It consists of mutiple "attention heads" that capture different aspects of the input sequence

### Multi-head Attention 

In [2]:
class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,num_heads):
        super(MultiHeadAttention,self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads #int 除
        
        #initial the linear transformation layers
        self.W_q = nn.Linear(d_model,d_model)  #神经网络线性层，in_feature, out_feature,bias = True
        self.W_k = nn.Linear(d_model,d_model)
        self.W_v = nn.Linear(d_model,d_model)
        self.W_o = nn.Linear(d_model,d_model)
        
    def scaled_dot_product_attention(self,Q,K,V,mask = None):
        attn_scores = torch.matmul(Q, K.transpose(-2,-1)) / math.sqrt(self.d_k)  #矩阵相乘
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores,dim=-1)
        output = torch.matmul(attn_probs,V)
        return output
    
    def splite_heads(self,x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1,2) #view()相当于reshape、resize，重新调整Tensor的形状
    
    def combine_heads(self,x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1,2).contiguous().view(batch_size, seq_length, self.d_model)
    
    # multi-head self-attention process, empower the model to selectively concentrate on specific parts of the input sequence, maximizing the information extraction and comprehension capabilities
    def forward(self,Q,K,V,mask = None):
        Q = self.splite_heads(self.W_q(Q))
        K = self.splite_heads(self.W_k(K))
        V = self.splite_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q,K,V,mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output
    

### Position-wise Feed-Forward Networks
> inject the model with the capability to factor in the positional information of input elements when generating predictions

In [3]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self,d_model,d_ff):
        super(PositionWiseFeedForward,self).__init__()
        # 2 linear transformation layers
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff,d_model)
        # ReLU activation function
        self.relu = nn.ReLU()
        
    def forward(self,x):
        return self.fc2(self.relu(self.fc1(x)))

### Positional Encoding
> inject the position information of each token in the input sequence  
> input parameters d_model and max max_seq_length  
> create a tensor for holding positional encoding value  

In [4]:
class PositionalEncoding(nn.Module):
    def __init__(self,d_model, max_seq_length):
        super(PositionalEncoding,self).__init__()
        pe = torch.zeros(max_seq_length,d_model)
        position = torch.arange(0,max_seq_length,dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0,d_model,2).float() * -(math.log(10000.0) / d_model))
        
        pe[:,0::2] = torch.sin(position * div_term)
        pe[:,1::2] = torch.cos(position * div_term)
        
        self.register_buffer('pe',pe.unsqueeze(0))
        
    def forward(self,x):
        return x+self.pe[:,:x.size(1)]

### Building the Encoder Blocks

![Encoder](picture/3b7561ece643ae6717e6d08bda730230.jpg)
> The encoder part of the transformer network

In [5]:
class EncoderLayer(nn.Module):
    def __init__(self,d_module, num_heads, d_ff, dropout):
        super(EncoderLayer,self).__init__()
        self.self_attn = MultiHeadAttention(d_model,num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model,d_ff)
        #标准化层
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        # 在训练阶段按某种概率随即将输入的张量元素随机归零,dropout为随机归零概率
        self.dropout = nn.Dropout(dropout)
        
    def forward(self,x,mask):
        attn_output = self.self_attn(x,x,x,mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x+self.dropout(ff_output))
        return x

### Building the Decoder Blocks

![Decoder](picture/1af95738151cfa3bc742e2f24c212594.jpg)
> The decoder part of the transformer network

In [6]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, fropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model,num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model,d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self,x,enc_output,src_mask, tgt_mask):
        attn_output = self.self_attn(x,x,x,tgt_mask)
        x = self.norm1(x+self.dropout(attn_output))
        attn_output = self.cross_attn(x,enc_output,enc_output, src_mask)
        x = self.norm2(x+self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x+self.dropout(ff_output))
        return x

### Combining the Encoder and Decoder layer to create the complete Transformer network

![Transformer](picture/b5a0843b027328469e07f730bc205b87.jpg)
> The Transformer Network

In [7]:
class Transformer(nn.Module):
    def __init__(self,src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()
        # self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        # self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model, padding_idx=0)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model, padding_idx=0)
        
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)
        
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        
        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        
    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1-torch.triu(torch.ones(1,seq_length,seq_length), diagonal = 1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask
    
    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src,tgt)
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
        
        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)
            
        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)
            
        output = self.fc(dec_output)
        
        return output

## Training the Transformer Model
### Sample Data Preparation
> a toy dataset will be crafted in this example.

In [8]:
src_vocab_size = 5000 # vocabulary size
tgt_vocab_size = 5000 # target language size
d_model = 512 # dimension of input and output model
num_heads = 8 # number of attention head
num_layers = 6 # number of encoder and decoder layer
d_ff = 2048 # dimention of feed-forward network model
max_seq_length = 100 # the maximum sequence length
dropout = 0.1 # the dropout rate used in training

# transformer = Transformer(src_vocal_size, tgt_vocal_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)
transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

# generate random sample data
src_data = torch.randint(1,src_vocab_size,(64, max_seq_length)) #(batch_size, seq_length)
tgt_data = torch.randint(1,tgt_vocab_size,(64, max_seq_length)) #(batch_size, seq_length)

### Train the Model 

In [9]:
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

KeyboardInterrupt: 

### Train the real world data
> import the package and save the text data

In [33]:
from keras.preprocessing import sequence
from keras.datasets import imdb
from matplotlib import pyplot as plt
import pandas as pd

print("env done")

env done


> 数据归一化处理  
> pad_sequences作用，长度大于maxlen部分会被截取的，小于maxlen会填充到maxlen长度

In [42]:
# 1. 设置参数
max_features = 20000  # 仅保留频率最高的20000个单词
maxlen = 64  # 每条评论的最大长度（超过截断，不足填充）
batch_size = 32  # 每批次的样本数量

# 2. 加载 IMDB 数据集
print("Loading data...")
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)

# 3. 标签独热编码（可选：如果需要转换为独热向量）
y_train = pd.get_dummies(y_train)
y_test = pd.get_dummies(y_test)

# 4. 填充序列，使每条评论的长度一致
print("Pad sequences (samples x time)...")
x_train = sequence.pad_sequences(x_train, maxlen=maxlen)
x_test = sequence.pad_sequences(x_test, maxlen=maxlen)

# 打印数据的形状，确保数据大小正确
print('x_train shape:', x_train.shape)  # (25000, 64)
print('y_train shape:', y_train.shape)  # (25000, 2)

Loading data...
Pad sequences (samples x time)...
x_train shape: (25000, 64)
y_train shape: (25000,)


> 数据集转换为Tensor，并创建Dataloader

In [44]:
# 5. 将数据转换为 PyTorch Tensor，并创建 TensorDataset
x_train_tensor = torch.tensor(x_train, dtype=torch.long)
y_train_tensor = torch.tensor(y_train, dtype=torch.long)  # 直接使用 y_train，已为 NumPy 数组
x_test_tensor = torch.tensor(x_test, dtype=torch.long)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)

# 6. 创建 DataLoader，方便批次训练
train_data = TensorDataset(x_train_tensor, y_train_tensor)
test_data = TensorDataset(x_test_tensor, y_test_tensor)

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size)

# print("DataLoader created successfully!")

print("DataLoader created successfully!")

DataLoader created successfully!


> 训练函数、测试函数、验证函数

In [37]:
def train(model, loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for src, tgt in loader:
        src, tgt = src.to(device), tgt.to(device)

        optimizer.zero_grad()
        output = model(src, src)  # 由于是情感分类，输入和目标可以相同
        
        loss = criterion(output.view(-1, 2), tgt.argmax(dim=1))
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    return total_loss / len(loader)

def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    with torch.no_grad():
        for src, tgt in loader:
            src, tgt = src.to(device), tgt.to(device)
            output = model(src, src)

            loss = criterion(output.view(-1, 2), tgt.argmax(dim=1))
            total_loss += loss.item()

            predictions = output.argmax(dim=2).squeeze()
            correct += (predictions == tgt.argmax(dim=1)).sum().item()

    accuracy = correct / len(loader.dataset)
    return total_loss / len(loader), accuracy

> 定义模型基本参数

In [38]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

src_vocab_size = 20000  # 假设IMDB使用了20,000个词汇
tgt_vocab_size = 2  # 二分类问题：正面(1)或负面(0)
d_model = 128
num_heads = 8
num_layers = 4
d_ff = 512
max_seq_length = 64
dropout = 0.1

# 实例化Transformer模型
# model = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

model = Transformer(
    src_vocab_size=20000,
    tgt_vocab_size=2,  # 二分类任务的输出
    d_model=128,
    num_heads=4,
    num_layers=2,
    d_ff=512,
    max_seq_length=64,
    dropout=0.1
).to(device)

model.to(device)

# 损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [45]:
epochs = 5

for epoch in range(epochs):
    train_loss = train(model, train_loader, criterion, optimizer, device)
    test_loss, test_accuracy = evaluate(model, test_loader, criterion, device)

    print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}')

IndexError: index out of range in self

In [40]:
print(f"最大索引值（训练集）: {x_train.max()}")
print(f"最大索引值（测试集）: {x_test.max()}")
print(f"Embedding 层词汇表大小: {max_features}")

最大索引值（训练集）: 19999
最大索引值（测试集）: 19999
Embedding 层词汇表大小: 20000


In [41]:
print(f"x_train shape: {x_train.shape}")  # 期望为 (25000, maxlen)
print(f"x_test shape: {x_test.shape}")    # 期望为 (25000, maxlen)

x_train shape: (25000, 64)
x_test shape: (25000, 64)


In [20]:
device = torch.device("cpu")

for src, tgt in train_loader:
    print(f"src max: {src.max().item()}, src min: {src.min().item()}")
    print(f"tgt max: {tgt.max().item()}, tgt min: {tgt.min().item()}")
    break  # 只打印一次即可

src max: 19966, src min: 0
tgt max: 1, tgt min: 0
