<a href="https://colab.research.google.com/github/codefunny/LearnAI/blob/main/Transformer%E5%AD%A6%E4%B9%A0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import matplotlib.pyplot as plt

In [None]:

def scaled_dot_product_attention(Q, K, V, mask=None):
    """
    缩放点积注意力计算。

    参数:
        Q: 查询矩阵 (batch_size, seq_len_q, embed_size)
        K: 键矩阵 (batch_size, seq_len_k, embed_size)
        V: 值矩阵 (batch_size, seq_len_v, embed_size)
        mask: 掩码矩阵，用于屏蔽不应该关注的位置 (可选)

    返回:
        output: 注意力加权后的输出矩阵
        attention_weights: 注意力权重矩阵
    """
    embed_size = Q.size(-1)  # embed_size

    # 计算点积并进行缩放
    scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(embed_size)

    # 如果提供了掩码矩阵，则将掩码对应位置的分数设为 -inf
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))

    # 对缩放后的分数应用 Softmax 函数，得到注意力权重
    attention_weights = F.softmax(scores, dim=-1)

    # 加权求和，计算输出
    output = torch.matmul(attention_weights, V)

    return output, attention_weights

### 示例

In [None]:
# 示例参数
batch_size = 2
num_heads = 2
seq_len_q = 3  # 查询序列长度
seq_len_k = 3  # 键序列长度
head_dim = 4

# 模拟查询矩阵 Q 和键值矩阵 K, V
Q = torch.randn(batch_size, num_heads, seq_len_q, head_dim)
K = torch.randn(batch_size, num_heads, seq_len_k, head_dim)
V = torch.randn(batch_size, num_heads, seq_len_k, head_dim)

# 生成下三角掩码矩阵 (1, 1, seq_len_q, seq_len_k)，通过广播应用到所有头
mask = torch.tril(torch.ones(seq_len_q, seq_len_k)).unsqueeze(0).unsqueeze(0)  # mask.shape (seq_len_q, seq_len_k) -> (1, 1, seq_len_q, seq_len_k)

# 执行缩放点积注意力，并应用下三角掩码
output, attn_weights = scaled_dot_product_attention(Q, K, V, mask)

# 打印结果
print("掩码矩阵 (下三角):")
print(mask[0, 0])

print(output)

print("\n注意力权重矩阵:")
print(attn_weights)

掩码矩阵 (下三角):
tensor([[1., 0., 0.],
        [1., 1., 0.],
        [1., 1., 1.]])
tensor([[[[-0.4591, -1.0087,  0.6157, -1.8339],
          [-0.3931, -1.0311,  0.6526, -1.4015],
          [ 0.1126, -0.8771,  1.0635,  0.3316]],

         [[ 1.8850,  0.3640, -0.1149, -1.5179],
          [-0.3112, -1.1621, -0.1893, -1.3541],
          [ 0.0956, -0.4182, -0.3832, -1.4330]]],


        [[[-1.2407, -0.9829,  1.3195, -0.0941],
          [-0.8491,  0.1760,  1.1789, -0.3993],
          [-0.5403,  0.5056,  1.0416, -0.4845]],

         [[-1.0030, -0.6392, -0.2104,  0.8032],
          [-0.6119, -0.5840, -0.0379,  0.9288],
          [ 0.3210, -0.4257,  0.5015,  0.5630]]]])

注意力权重矩阵:
tensor([[[[1.0000, 0.0000, 0.0000],
          [0.8920, 0.1080, 0.0000],
          [0.2739, 0.2091, 0.5170]],

         [[1.0000, 0.0000, 0.0000],
          [0.2740, 0.7260, 0.0000],
          [0.3469, 0.2742, 0.3789]]],


        [[[1.0000, 0.0000, 0.0000],
          [0.4466, 0.5534, 0.0000],
          [0.2543, 0.6648, 0.0

### MHA

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, h):
        """
        多头注意力机制：每个头单独定义线性层。

        参数:
            d_model: 输入序列的嵌入维度。
            h: 注意力头的数量。
        """
        super(MultiHeadAttention, self).__init__()
        assert d_model % h == 0, "d_model 必须能被 h 整除。"

        self.d_model = d_model
        self.h = h

        # “共享”的 Q, K, V 线性层
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)

        # 输出线性层，将多头拼接后的输出映射回 d_model
        self.fc_out = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        """
        前向传播函数。

        参数:
            q: 查询矩阵 (batch_size, seq_len_q, d_model)
            k: 键矩阵 (batch_size, seq_len_k, d_model)
            v: 值矩阵 (batch_size, seq_len_v, d_model)
            mask: 掩码矩阵 (batch_size, 1, seq_len_q, seq_len_k)

        返回:
            out: 注意力加权后的输出
            attention_weights: 注意力权重矩阵
        """
        batch_size = q.size(0)

        # 获取查询和键值的序列长度
        seq_len_q = q.size(1)
        seq_len_k = k.size(1)

        # 将线性变换后的“共享”矩阵拆分为多头，调整维度为 (batch_size, h, seq_len, d_k)
        # d_k 就是每个注意力头的维度
        Q = self.w_q(q).view(batch_size, seq_len_q, self.h, -1).transpose(1, 2)
        K = self.w_k(k).view(batch_size, seq_len_k, self.h, -1).transpose(1, 2)
        V = self.w_v(v).view(batch_size, seq_len_k, self.h, -1).transpose(1, 2)

        # 执行缩放点积注意力
        scaled_attention, _ = scaled_dot_product_attention(Q, K, V, mask)

        # 合并多头并还原为 (batch_size, seq_len_q, d_model)
        concat_out = scaled_attention.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)

        # 通过输出线性层
        out = self.fc_out(concat_out)  # (batch_size, seq_len_q, d_model)

        return out

### FNN

In [None]:
class PositionwiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff, dropout=0.1):
        """
        位置前馈网络。

        参数:
            d_model: 输入和输出向量的维度
            d_ff: FFN 隐藏层的维度，或者说中间层
            dropout: 随机失活率（Dropout），即随机屏蔽部分神经元的输出，用于防止过拟合

        （实际上论文并没有确切地提到在这个模块使用 dropout，所以注释）
        """
        super(PositionwiseFeedForward, self).__init__()
        self.w_1 = nn.Linear(d_model, d_ff)  # 第一个线性层
        self.w_2 = nn.Linear(d_ff, d_model)  # 第二个线性层
        #self.dropout = nn.Dropout(dropout)   # Dropout 层

    def forward(self, x):
        # 先经过第一个线性层和 ReLU，然后经过第二个线性层
        return self.w_2(self.w_1(x).relu())  #self.w_2(self.dropout(self.w_1(x).relu()))


### ResNet

In [None]:
class ResidualConnection(nn.Module):
    def __init__(self, dropout=0.1):
        """
        残差连接，用于在每个子层后添加残差连接和 Dropout。

        参数:
            dropout: Dropout 概率，用于在残差连接前应用于子层输出，防止过拟合。
        """
        super(ResidualConnection, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, sublayer):
        """
        前向传播函数。

        参数:
            x: 残差连接的输入张量，形状为 (batch_size, seq_len, d_model)。
            sublayer: 子层模块的函数，多头注意力或前馈网络。

        返回:
            经过残差连接和 Dropout 处理后的张量，形状为 (batch_size, seq_len, d_model)。
        """
        # 将子层输出应用 dropout，然后与输入相加（参见论文 5.4 的表述或者本文「呈现」部分）
        return x + self.dropout(sublayer(x))

### LayerNorm

In [None]:
class LayerNorm(nn.Module):
    def __init__(self, feature_size, epsilon=1e-9):
        """
        层归一化，用于对最后一个维度进行归一化。

        参数:
            feature_size: 输入特征的维度大小，即归一化的特征维度。
            epsilon: 防止除零的小常数。
        """
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(feature_size))  # 可学习缩放参数，初始值为 1
        self.beta = nn.Parameter(torch.zeros(feature_size))  # 可学习偏移参数，初始值为 0
        self.epsilon = epsilon

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.epsilon) + self.beta

In [None]:
class SublayerConnection(nn.Module):
    def __init__(self, feature_size, dropout=0.1, epsilon=1e-9):
        """
        子层连接，包括残差连接和层归一化，应用于 Transformer 的每个子层。

        参数:
            feature_size: 输入特征的维度大小，即归一化的特征维度。
            dropout: 残差连接中的 Dropout 概率。
            epsilon: 防止除零的小常数。
        """
        super(SublayerConnection, self).__init__()
        self.residual = ResidualConnection(dropout)  # 使用 ResidualConnection 进行残差连接
        self.norm = LayerNorm(feature_size, epsilon)  # 层归一化

    def forward(self, x, sublayer):
        # 将子层输出应用 dropout 后经过残差连接后再进行归一化，可见本文「呈现」部分
        return self.norm(self.residual(x, sublayer))

# 或者直接在 AddNorm 里面实现残差连接
class SublayerConnection1(nn.Module):
    """
        子层连接的另一种实现方式，残差连接直接在该模块中实现。

        参数:
            feature_size: 输入特征的维度大小，即归一化的特征维度。
            dropout: 残差连接中的 Dropout 概率。
            epsilon: 防止除零的小常数。
        """
    def __init__(self, feature_size, dropout=0.1, epsilon=1e-9):
        super(SublayerConnection, self).__init__()
        self.norm = LayerNorm(feature_size, epsilon)
        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x, sublayer):
        # 将子层输出应用 dropout 后经过残差连接后再进行归一化，可见本文「呈现」部分
        return self.norm(x + self.dropout(sublayer(x)))

### Embedding

In [None]:
class Embeddings(nn.Module):
    """
    嵌入，将 token ID 转换为固定维度的嵌入向量，并进行缩放。

    参数:
        vocab_size: 词汇表大小。
        d_model: 嵌入向量的维度。
    """
    def __init__(self, vocab_size, d_model):
        super(Embeddings, self).__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        self.scale_factor = math.sqrt(d_model)

    def forward(self, x):
        """
        前向传播函数。

        参数:
            x: 输入张量，形状为 (batch_size, seq_len)，其中每个元素是 token ID。

        返回:
            缩放后的嵌入向量，形状为 (batch_size, seq_len, d_model)。
        """
        return self.embed(x) * self.scale_factor

### torch的Transformer

In [None]:
import torch.optim as optim

class TransformerModel(nn.Module):
    def __init__(self, input_dim, model_dim, num_heads, num_layers, output_dim):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(input_dim, model_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, 1000, model_dim))  # 假设序列长度最大为1000
        self.transformer = nn.Transformer(d_model=model_dim, nhead=num_heads, num_encoder_layers=num_layers)
        self.fc = nn.Linear(model_dim, output_dim)

    def forward(self, src, tgt):
        src_seq_length, tgt_seq_length = src.size(1), tgt.size(1)
        src = self.embedding(src) + self.positional_encoding[:, :src_seq_length, :]
        tgt = self.embedding(tgt) + self.positional_encoding[:, :tgt_seq_length, :]
        transformer_output = self.transformer(src, tgt)
        output = self.fc(transformer_output)
        return output

# 超参数
input_dim = 10000  # 词汇表大小
model_dim = 512    # 模型维度
num_heads = 8      # 多头注意力头数
num_layers = 6     # 编码器和解码器层数
output_dim = 10000 # 输出维度（通常与词汇表大小相同）

# 初始化模型、损失函数和优化器
model = TransformerModel(input_dim, model_dim, num_heads, num_layers, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 假设输入数据
src = torch.randint(0, input_dim, (10, 32))  # (序列长度, 批量大小)
tgt = torch.randint(0, input_dim, (20, 32))  # (序列长度, 批量大小)

# 前向传播
output = model(src, tgt)

# 计算损失
loss = criterion(output.view(-1, output_dim), tgt.view(-1))

# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()

print("Loss:", loss.item())



Loss: 9.415475845336914


In [None]:
import os
import requests
import math
import tiktoken
import torch
import torch.nn as nn
from torch.nn import functional as F

# Hyperparameters
batch_size = 4  # How many batches per training step
context_length = 16  # Length of the token chunk each batch
d_model = 64  # The size of our model token embeddings
num_blocks = 8  # Number of transformer blocks
num_heads = 4  # Number of heads in Multi-head attention
learning_rate = 1e-3  # 0.001
dropout = 0.1  # Dropout rate
max_iters = 5000  # Total of training iterations <- Change this to smaller number for testing
eval_interval = 50  # How often to evaluate
eval_iters = 20  # Number of iterations to average for evaluation
device = 'cuda' if torch.cuda.is_available() else 'cpu'  # Use GPU if it's available.
TORCH_SEED = 1337
torch.manual_seed(TORCH_SEED)

import os
import requests
from io import StringIO

# 定义文件存储路径
file_path = 'data/sales_textbook.txt'

# 检查文件是否存在，如果不存在则从URL下载
if not os.path.exists('data'):
    os.makedirs('data')  # 如果目录不存在，则创建它
url = 'https://huggingface.co/datasets/goendalf666/sales-textbook_for_convincing_and_selling/raw/main/sales_textbook.txt'
response = requests.get(url)

# 确保请求成功
if response.status_code == 200:
    # 将内容写入文件
    with open(file_path, 'w', encoding='utf-8') as f:
        f.write(response.text)
else:
    print(f"Failed to download the file. Status code: {response.status_code}")

# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as f:
    text = f.read()

# 现在你可以使用变量`text`进行进一步的处理

# Using TikToken (Same as GPT3) to tokenize the source text
encoding = tiktoken.get_encoding("cl100k_base")
tokenized_text = encoding.encode(text)
max_token_value = max(tokenized_text) + 1  # the maximum value of the tokenized numbers
tokenized_text = torch.tensor(tokenized_text, dtype=torch.long, device=device)  # put tokenized text into tensor

# Split train and validation
split_idx = int(len(tokenized_text) * 0.9)
train_data = tokenized_text[:split_idx]
val_data = tokenized_text[split_idx:]


# Define Feed Forward Network
class FeedForward(nn.Module):
    def __init__(self):
        super().__init__()
        self.d_model = d_model
        self.dropout = dropout
        self.ffn = nn.Sequential(
            nn.Linear(in_features=self.d_model, out_features=self.d_model * 4),
            nn.ReLU(),
            nn.Linear(in_features=self.d_model * 4, out_features=self.d_model),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.ffn(x)


# Define Scaled Dot Product Attention
class Attention(nn.Module):
    def __init__(self, head_size: int):
        super().__init__()
        self.d_model = d_model
        self.head_size = head_size
        self.context_length = context_length
        self.dropout = dropout

        self.key_layer = nn.Linear(in_features=self.d_model, out_features=self.head_size, bias=False)
        self.query_layer = nn.Linear(in_features=self.d_model, out_features=self.head_size, bias=False)
        self.value_layer = nn.Linear(in_features=self.d_model, out_features=self.head_size, bias=False)
        self.register_buffer('tril', torch.tril(
            torch.ones((self.context_length, self.context_length))))  # Lower triangular mask
        self.dropout_layer = nn.Dropout(self.dropout)

    def forward(self, x):
        B, T, C = x.shape  # Batch size, Time steps(current context_length), Channels(dimensions)
        assert T <= self.context_length
        assert C == self.d_model
        q = self.query_layer(x)
        k = self.key_layer(x)
        v = self.value_layer(x)

        # Scaled dot product attention: Q @ K^T / sqrt(d_k)
        weights = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))
        # Apply masked attention
        weights = weights.masked_fill(self.tril[:T, :T] == 0, float('-inf'))
        weights = F.softmax(input=weights, dim=-1)
        weights = self.dropout_layer(weights)

        # Apply dot product attention: weights @ V
        out = weights @ v
        return out


class MultiHeadAttention(nn.Module):
    def __init__(self, head_size: int):
        super().__init__()
        self.num_heads = num_heads
        self.head_size = head_size
        self.d_model = d_model
        self.context_length = context_length
        self.dropout = dropout

        self.heads = nn.ModuleList([Attention(head_size=self.head_size) for _ in range(self.num_heads)])
        self.projection_layer = nn.Linear(in_features=self.d_model, out_features=self.d_model)
        self.dropout_layer = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.projection_layer(out)
        out = self.dropout_layer(out)
        return out


class TransformerBlock(nn.Module):

    def __init__(self, num_heads: int):
        super().__init__()
        self.d_model = d_model
        self.context_length = context_length
        self.head_size = d_model // num_heads  # head size should be divisible by d_model
        self.num_heads = num_heads
        self.dropout = dropout

        self.multi_head_attention_layer = MultiHeadAttention(head_size=self.head_size)
        self.feed_forward_layer = FeedForward()
        self.layer_norm_1 = nn.LayerNorm(normalized_shape=self.d_model)
        self.layer_norm_2 = nn.LayerNorm(normalized_shape=self.d_model)

    def forward(self, x):
        # Note: The order of the operations is different from the original Transformer paper
        # The order here is: LayerNorm -> Multi-head attention -> LayerNorm -> Feed forward
        x = x + self.multi_head_attention_layer(self.layer_norm_1(x))  # Residual connection
        x = x + self.feed_forward_layer(self.layer_norm_2(x))  # Residual connection
        return x


class TransformerLanguageModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.d_model = d_model
        self.context_length = context_length
        self.num_heads = num_heads
        self.num_blocks = num_blocks
        self.dropout = dropout
        self.max_token_value = max_token_value
        # Set up token embedding look-up table
        self.token_embedding_lookup_table = nn.Embedding(num_embeddings=self.max_token_value + 1, embedding_dim=self.d_model)

        # Run all the transformer blocks
        # Different from original paper, here we add a final layer norm after all the blocks
        self.transformer_blocks = nn.Sequential(*(
                [TransformerBlock(num_heads=self.num_heads) for _ in range(self.num_blocks)] +
                [nn.LayerNorm(self.d_model)]
        ))
        self.language_model_out_linear_layer = nn.Linear(in_features=self.d_model, out_features=self.max_token_value)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        """
        # Set up position embedding look-up table
        # following the same approach as the original Transformer paper (Sine and Cosine functions)
        """
        position_encoding_lookup_table = torch.zeros(self.context_length, self.d_model)
        position = torch.arange(0, self.context_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, self.d_model, 2).float() * (-math.log(10000.0) / self.d_model))
        position_encoding_lookup_table[:, 0::2] = torch.sin(position * div_term)
        position_encoding_lookup_table[:, 1::2] = torch.cos(position * div_term)
        # change position_encoding_lookup_table from (context_length, d_model) to (T, d_model)
        position_embedding = position_encoding_lookup_table[:T, :].to(device)
        x = self.token_embedding_lookup_table(idx) + position_embedding
        x = self.transformer_blocks(x)
        # The "logits" are the output values of our model before applying softmax
        logits = self.language_model_out_linear_layer(x)

        if targets is not None:
            B, T, C = logits.shape
            logits_reshaped = logits.view(B * T, C)
            targets_reshaped = targets.view(B * T)
            loss = F.cross_entropy(input=logits_reshaped, target=targets_reshaped)
        else:
            loss = None
        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx is (B,T) array of indices in the current context
        for _ in range(max_new_tokens):
            # Crop idx to the max size of our positional embeddings table
            idx_crop = idx[:, -self.context_length:]
            # Get predictions
            logits, loss = self(idx_crop)
            # Get the last time step from logits where the dimensions of the logits are (B,T,C)
            logits_last_timestep = logits[:, -1, :]
            # Apply softmax to get probabilities
            probs = F.softmax(input=logits_last_timestep, dim=-1)
            # Sample from the probabilities' distribution.
            idx_next = torch.multinomial(input=probs, num_samples=1)
            # Append the sampled indexes idx_next to idx
            idx = torch.cat((idx, idx_next), dim=1)
        return idx


# Initialize the model
model = TransformerLanguageModel()
model = model.to(device)


# Get input embedding batch
def get_batch(split: str):
    data = train_data if split == 'train' else val_data
    idxs = torch.randint(low=0, high=len(data) - context_length, size=(batch_size,))
    x = torch.stack([data[idx:idx + context_length] for idx in idxs]).to(device)
    y = torch.stack([data[idx + 1:idx + context_length + 1] for idx in idxs]).to(device)
    return x, y


# Calculate loss
@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'valid']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            x_batch, y_batch = get_batch(split)
            logits, loss = model(x_batch, y_batch)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out


# Use AdamW optimizer
optimizer = torch.optim.AdamW(params=model.parameters(), lr=learning_rate)
tracked_losses = list()
for step in range(max_iters):
    if step % eval_iters == 0 or step == max_iters - 1:
        losses = estimate_loss()
        tracked_losses.append(losses)
        print('Step:', step, 'Training Loss:', round(losses['train'].item(), 3), 'Validation Loss:',
              round(losses['valid'].item(), 3))

    xb, yb = get_batch('train')
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

# Save the model state dictionary
torch.save(model.state_dict(), 'model-ckpt.pt')

# Generate
model.eval()
start = 'The salesperson'
start_ids = encoding.encode(start)
x = (torch.tensor(start_ids, dtype=torch.long, device=device)[None, ...])
y = model.generate(x, max_new_tokens=100)
print('---------------')
print(encoding.decode(y[0].tolist()))
print('---------------')


Step: 0 Training Loss: 11.663 Validation Loss: 11.716
Step: 20 Training Loss: 10.356 Validation Loss: 10.419
Step: 40 Training Loss: 8.815 Validation Loss: 8.968
Step: 60 Training Loss: 7.44 Validation Loss: 7.818
Step: 80 Training Loss: 6.88 Validation Loss: 7.135
Step: 100 Training Loss: 6.586 Validation Loss: 7.249
Step: 120 Training Loss: 6.472 Validation Loss: 7.065
Step: 140 Training Loss: 6.44 Validation Loss: 6.948
Step: 160 Training Loss: 6.265 Validation Loss: 6.967
Step: 180 Training Loss: 6.174 Validation Loss: 6.738
Step: 200 Training Loss: 5.971 Validation Loss: 6.584
Step: 220 Training Loss: 5.981 Validation Loss: 6.623
Step: 240 Training Loss: 5.969 Validation Loss: 6.41
Step: 260 Training Loss: 5.717 Validation Loss: 6.495
Step: 280 Training Loss: 5.787 Validation Loss: 6.472
Step: 300 Training Loss: 5.727 Validation Loss: 6.365
Step: 320 Training Loss: 5.727 Validation Loss: 6.362
Step: 340 Training Loss: 5.426 Validation Loss: 6.461
Step: 360 Training Loss: 5.519 Val

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):

    def __init__(self):
        super(Net, self).__init__()
        # 1 input image channel, 6 output channels, 5x5 square convolution
        # kernel
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.conv2 = nn.Conv2d(6, 16, 5)
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        # Max pooling over a (2, 2) window
        x = F.max_pool2d(F.relu(self.conv1(x)), (2, 2))
        # If the size is a square you can only specify a single number
        x = F.max_pool2d(F.relu(self.conv2(x)), 2)
        x = x.view(-1, self.num_flat_features(x))
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

    def num_flat_features(self, x):
        size = x.size()[1:]  # all dimensions except the batch dimension
        num_features = 1
        for s in size:
            num_features *= s
        return num_features


net = Net()
print(net)

Net(
  (conv1): Conv2d(1, 6, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=400, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=10, bias=True)
)


In [1]:
import torch

class TinyModel(torch.nn.Module):

    def __init__(self):
        super(TinyModel, self).__init__()

        self.linear1 = torch.nn.Linear(100, 200)
        self.activation = torch.nn.ReLU()
        self.linear2 = torch.nn.Linear(200, 10)
        self.softmax = torch.nn.Softmax()

    def forward(self, x):
        x = self.linear1(x)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.softmax(x)
        return x

tinymodel = TinyModel()

print('The model:')
print(tinymodel)

print('\n\nJust one layer:')
print(tinymodel.linear2)

print('\n\nModel params:')
for param in tinymodel.parameters():
    print(param)

print('\n\nLayer params:')
for param in tinymodel.linear2.parameters():
    print(param)

The model:
TinyModel(
  (linear1): Linear(in_features=100, out_features=200, bias=True)
  (activation): ReLU()
  (linear2): Linear(in_features=200, out_features=10, bias=True)
  (softmax): Softmax(dim=None)
)


Just one layer:
Linear(in_features=200, out_features=10, bias=True)


Model params:
Parameter containing:
tensor([[-0.0403, -0.0693,  0.0942,  ...,  0.0951, -0.0187, -0.0477],
        [ 0.0795,  0.0372, -0.0085,  ...,  0.0404, -0.0603,  0.0163],
        [-0.0342, -0.0100, -0.0471,  ...,  0.0469,  0.0002,  0.0008],
        ...,
        [-0.0955,  0.0739, -0.0546,  ...,  0.0451,  0.0991, -0.0012],
        [ 0.0887,  0.0788, -0.0866,  ..., -0.0784,  0.0213, -0.0839],
        [ 0.0620,  0.0985,  0.0109,  ...,  0.0228,  0.0461, -0.0881]],
       requires_grad=True)
Parameter containing:
tensor([ 0.0268,  0.0848, -0.0829,  0.0575, -0.0998,  0.0298, -0.0403,  0.0734,
         0.0233, -0.0540,  0.0040,  0.0764, -0.0218,  0.0583,  0.0378,  0.0225,
        -0.0548, -0.0593, -0.0393, -0.07

In [1]:
from transformers import MarianMTModel, MarianTokenizer
import torch

# 1. 加载中→英翻译的模型和tokenizer
model_name = "Helsinki-NLP/opus-mt-zh-en"
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)

# 2. 输入中文句子
chinese_sentence = "今天天气很好。"

# 3. 编码输入
inputs = tokenizer(chinese_sentence, return_tensors="pt", padding=True)

# 4. 推理（翻译）
translated = model.generate(**inputs)

# 5. 解码输出
english_translation = tokenizer.decode(translated[0], skip_special_tokens=True)

print("中文：", chinese_sentence)
print("翻译：", english_translation)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/44.0 [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/805k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/807k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.62M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.39k [00:00<?, ?B/s]



pytorch_model.bin:   0%|          | 0.00/312M [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/312M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

中文： 今天天气很好。
翻译： It's a nice day.


In [3]:
print(inputs)
print(translated)


{'input_ids': tensor([[ 4394, 17559,  4665,     9,     0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1]])}
tensor([[65000,    82,    21,    22,    12,  4492,  1062,     5,     0]])


In [6]:
# Python Code
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import random
from collections import defaultdict

# -----------------------------
# Positional Encoding
# -----------------------------
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2, dtype=torch.float) * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))  # Register buffer for model state dict

    def forward(self, x):
        return x + self.pe[:, :x.size(1)].to(x.device)

# -----------------------------
# Multi-head Attention
# -----------------------------
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super().__init__()
        assert d_model % num_heads == 0
        self.d_k = d_model // num_heads
        self.num_heads = num_heads
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask=None):
        B, L, D = q.size()
        H = self.num_heads

        def split_heads(x):
            return x.view(B, L, H, self.d_k).transpose(1, 2)

        q, k, v = split_heads(self.q_linear(q)), split_heads(self.k_linear(k)), split_heads(self.v_linear(v))

        scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, float('-inf'))  # Avoid overflow
        attn = torch.softmax(scores, dim=-1)
        context = torch.matmul(attn, v)

        context = context.transpose(1, 2).contiguous().view(B, L, D)
        return self.out(context)

# -----------------------------
# Transformer Blocks
# -----------------------------
class TransformerBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x2 = self.attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(x2))
        x2 = self.ff(x)
        x = self.norm2(x + self.dropout(x2))
        return x

# -----------------------------
# DecoderBlock with cross-attention
# -----------------------------
class DecoderBlock(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.norm1 = nn.LayerNorm(d_model)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.norm2 = nn.LayerNorm(d_model)
        self.ff = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, enc_out, tgt_mask=None, memory_mask=None):
        x2 = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(x2))
        x2 = self.cross_attn(x, enc_out, enc_out, memory_mask)
        x = self.norm2(x + self.dropout(x2))
        x2 = self.ff(x)
        x = self.norm3(x + self.dropout(x2))
        return x

# -----------------------------
# Transformer Model
# -----------------------------
class Transformer(nn.Module):
    def __init__(self, src_vocab, tgt_vocab, d_model=128, num_heads=4, d_ff=512, num_layers=2, dropout=0.1):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Embedding(src_vocab, d_model),
            PositionalEncoding(d_model),
            *[TransformerBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.decoder = nn.Sequential(
            nn.Embedding(tgt_vocab, d_model),
            PositionalEncoding(d_model),
            *[DecoderBlock(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)]
        )
        self.out = nn.Linear(d_model, tgt_vocab)

    def forward(self, src, tgt, src_mask=None, tgt_mask=None):
        enc_out = self.encoder(src, src_mask)
        dec_out = self.decoder(tgt, enc_out, tgt_mask, src_mask)
        return self.out(dec_out)


# -----------------------------
# Data Preparation
# -----------------------------
train_pairs = [
    ("我 爱 你", "i love you"),
    ("你 好", "hello"),
    ("天 气 很 好", "the weather is good"),
    ("我 是 学生", "i am a student")
]

def build_vocab(sentences):
    token_freq = defaultdict(int)
    for s in sentences:
        for token in s.strip().split():
            token_freq[token] += 1
    vocab = {"<pad>": 0, "<bos>": 1, "<eos>": 2, "<unk>": 3}
    for token in token_freq:
        vocab[token] = len(vocab)
    return vocab

src_vocab = build_vocab([src for src, _ in train_pairs])
tgt_vocab = build_vocab([tgt for _, tgt in train_pairs])
inv_tgt_vocab = {i: t for t, i in tgt_vocab.items()}

def encode_sentence(sentence, vocab, max_len=10, add_bos=False, add_eos=True):
    tokens = sentence.strip().split()
    ids = []
    if add_bos: ids.append(vocab["<bos>"])
    ids += [vocab.get(t, vocab["<unk>"]) for t in tokens]
    if add_eos: ids.append(vocab["<eos>"])
    ids += [vocab["<pad>"]] * (max_len - len(ids))
    return torch.tensor(ids[:max_len])

# -----------------------------
# Training Loop
# -----------------------------
model = Transformer(len(src_vocab), len(tgt_vocab))
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

EPOCHS = 1000
for epoch in range(1, EPOCHS + 1):
    model.train()
    src_sent, tgt_sent = random.choice(train_pairs)
    src_ids = encode_sentence(src_sent, src_vocab, add_eos=True).unsqueeze(0)
    tgt_ids = encode_sentence(tgt_sent, tgt_vocab, add_bos=True, add_eos=True).unsqueeze(0)

    src_input = src_ids
    tgt_input = tgt_ids[:, :-1]
    tgt_output = tgt_ids[:, 1:]

    logits = model(src_input, tgt_input)
    loss = F.cross_entropy(logits.view(-1, logits.size(-1)), tgt_output.view(-1), ignore_index=tgt_vocab["<pad>"])

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if epoch % 200 == 0:
        print(f"Epoch {epoch} | Loss: {loss.item():.4f}")

# -----------------------------
# Inference Function
# -----------------------------
def translate(sentence, model, src_vocab, tgt_vocab, inv_tgt_vocab, max_len=10):
    model.eval()
    src_ids = encode_sentence(sentence, src_vocab).unsqueeze(0)
    tgt_ids = torch.tensor([[tgt_vocab["<bos>"]]])
    for _ in range(max_len):
        out = model(src_ids, tgt_ids)
        next_token = out[0, -1].argmax().item()
        tgt_ids = torch.cat([tgt_ids, torch.tensor([[next_token]])], dim=1)
        if next_token == tgt_vocab["<eos>"]:
            break
    tokens = [inv_tgt_vocab.get(i, "<unk>") for i in tgt_ids.squeeze().tolist()]
    return " ".join(tokens[1:-1])

print(translate("我 爱 你", model, src_vocab, tgt_vocab, inv_tgt_vocab))
print(translate("天 气 很 好", model, src_vocab, tgt_vocab, inv_tgt_vocab))

TypeError: Sequential.forward() takes 2 positional arguments but 3 were given