# 使用 Transformer 实现中文文本摘要
本笔记演示如何使用 Hugging Face Transformers 的中文摘要模型（Pegasus）进行：
- 单条文本摘要
- 批量文本摘要
并提供常用参数（max_length/min_length/do_sample）与 GPU 自动选择。

In [None]:
# 可选：安装/升级依赖（首次使用或版本较旧时执行）
%pip install -q --upgrade transformers sentencepiece accelerate

In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, pipeline

# 选择较小、下载更快的中文摘要模型；如需更强效果可换 523M 版本
model_name = "IDEA-CCNL/Randeng-Pegasus-238M-Summary-Chinese"  # 或 "IDEA-CCNL/Randeng-Pegasus-523M-Summary-Chinese"

# 强制使用 CUDA
assert torch.cuda.is_available(), "未检测到 CUDA，请在具有 GPU 的环境中运行或安装支持 CUDA 的 PyTorch。"
torch.cuda.set_device(0)

tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True, trust_remote_code=True)
model = AutoModelForSeq2SeqLM.from_pretrained(model_name, trust_remote_code=True).to("cuda")

device = 0  # 强制使用第 0 块 GPU
summarizer = pipeline("summarization", model=model, tokenizer=tokenizer, device=device)
device

In [None]:
# 单条中文文本摘要示例
text = (
    "人工智能（Artificial Intelligence，缩写为 AI）是研究、开发用于模拟、延伸和扩展人的智能的理论、方法、技术及应用系统的一门新技术科学。"
    "自 2012 年深度学习兴起以来，语音识别、计算机视觉、自然语言处理等领域取得了突破性进展。"
    "近年来，大语言模型（LLM）在通用文本理解与生成能力上表现突出，推动了搜索、问答、写作辅助等应用的发展。"
)

summary = summarizer(
    text, max_length=128, min_length=20, do_sample=False
)[0]["summary_text"]
print("摘要:\n", summary)

# 批量摘要（可选）
texts = [
    "特斯拉宣布在上海建设新的储能超级工厂，生产其超大型商业储能产品 Megapack，预计年产量达一万台。此次投资将进一步完善特斯拉在中国的产业布局。",
    "我国将加快推进算力基础设施建设，完善东数西算等工程布局，促进算力资源高效互联，满足 AI 训练与推理的巨大需求。",
]
summaries = summarizer(texts, max_length=128, min_length=15, do_sample=False)
print("\n批量摘要：")
for i, r in enumerate(summaries):
    print(f"[{i}]", r["summary_text"])

# 从零实现 Transformer 翻译（含多头自注意力、FFN、残差+LayerNorm、位置编码）
本节将在不依赖高层封装的情况下，用 PyTorch 实现一个最小可运行的 Transformer，并在一个玩具中英平行语料上训练与测试。
注意：这是教学/演示规模（小模型+小数据），目的是跑通流程与理解结构。

In [37]:
import math
import random
from typing import List, Tuple
import torch
import torch.nn as nn
import torch.nn.functional as F

torch.manual_seed(42)

# 强制使用 CUDA
assert torch.cuda.is_available(), "未检测到 CUDA，请在具有 GPU 的环境中运行或安装支持 CUDA 的 PyTorch。"
torch.cuda.set_device(0)
device_torch = torch.device("cuda")
torch.backends.cudnn.benchmark = True

d_model = 128
n_head = 4
d_ff = 256
num_layers = 2
dropout = 0.1
max_len = 64
batch_size = 8
num_epochs = 30
lr = 3e-4

PAD, BOS, EOS, UNK = 0, 1, 2, 3

In [38]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model: int, max_len: int, dropout: float = 0.0):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, x: torch.Tensor):  # x: (B, T, C)
        T = x.size(1)
        x = x + self.pe[:T, :].unsqueeze(0)
        return self.dropout(x)

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, n_head: int, dropout: float = 0.1):
        super().__init__()
        assert d_model % n_head == 0
        self.d_model = d_model
        self.n_head = n_head
        self.head_dim = d_model // n_head
        self.w_q = nn.Linear(d_model, d_model)
        self.w_k = nn.Linear(d_model, d_model)
        self.w_v = nn.Linear(d_model, d_model)
        self.w_o = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(dropout)
        self.softmax = nn.Softmax(dim=-1)
    
    def forward(self, q, k, v, attn_mask: torch.Tensor = None):
        B, Tq, C = q.shape
        Tk = k.size(1)
        H, D = self.n_head, self.head_dim
        q = self.w_q(q).view(B, Tq, H, D).permute(0, 2, 1, 3)  # (B,H,Tq,D)
        k = self.w_k(k).view(B, Tk, H, D).permute(0, 2, 1, 3)  # (B,H,Tk,D)
        v = self.w_v(v).view(B, Tk, H, D).permute(0, 2, 1, 3)  # (B,H,Tk,D)
        scores = (q @ k.transpose(-2, -1)) / math.sqrt(D)  # (B,H,Tq,Tk)
        if attn_mask is not None:
            # attn_mask: broadcastable to (B, H, Tq, Tk); 1 for keep, 0 for mask
            scores = scores.masked_fill(attn_mask == 0, float('-inf'))
        attn = self.softmax(scores)
        attn = self.dropout(attn)
        context = attn @ v  # (B,H,Tq,D)
        context = context.permute(0, 2, 1, 3).contiguous().view(B, Tq, C)  # (B,Tq,C)
        out = self.w_o(context)
        return out

class PositionwiseFFN(nn.Module):
    def __init__(self, d_model: int, d_ff: int, dropout: float = 0.1):
        super().__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.dropout = nn.Dropout(dropout)
        self.act = nn.GELU()
    
    def forward(self, x):
        return self.fc2(self.dropout(self.act(self.fc1(x))))

class ResidualAddNorm(nn.Module):
    def __init__(self, d_model: int, dropout: float = 0.1):
        super().__init__()
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LayerNorm(d_model)
    
    def forward(self, x, sublayer_out):
        return self.norm(x + self.dropout(sublayer_out))

In [39]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head, dropout)
        self.ffn = PositionwiseFFN(d_model, d_ff, dropout)
        self.norm1 = ResidualAddNorm(d_model, dropout)
        self.norm2 = ResidualAddNorm(d_model, dropout)
    
    def forward(self, x, src_mask=None):  # x: (B, S, C)
        x2 = self.self_attn(x, x, x, attn_mask=src_mask)
        x = self.norm1(x, x2)
        x2 = self.ffn(x)
        x = self.norm2(x, x2)
        return x

class DecoderLayer(nn.Module):
    def __init__(self, d_model, n_head, d_ff, dropout):
        super().__init__()
        self.self_attn = MultiHeadAttention(d_model, n_head, dropout)
        self.cross_attn = MultiHeadAttention(d_model, n_head, dropout)
        self.ffn = PositionwiseFFN(d_model, d_ff, dropout)
        self.norm1 = ResidualAddNorm(d_model, dropout)
        self.norm2 = ResidualAddNorm(d_model, dropout)
        self.norm3 = ResidualAddNorm(d_model, dropout)
    
    def forward(self, y, memory, tgt_mask=None, memory_mask=None):  # y:(B,T,C)
        y2 = self.self_attn(y, y, y, attn_mask=tgt_mask)
        y = self.norm1(y, y2)
        y2 = self.cross_attn(y, memory, memory, attn_mask=memory_mask)
        y = self.norm2(y, y2)
        y2 = self.ffn(y)
        y = self.norm3(y, y2)
        return y

class Encoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, num_layers, dropout):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, d_model, padding_idx=PAD)
        self.pos = PositionalEncoding(d_model, max_len, dropout)
        self.layers = nn.ModuleList([EncoderLayer(d_model, n_head, d_ff, dropout) for _ in range(num_layers)])
    
    def forward(self, src, src_mask=None):  # src:(B,S)
        x = self.emb(src) * math.sqrt(self.emb.embedding_dim)
        x = self.pos(x)
        for layer in self.layers:
            x = layer(x, src_mask)
        return x

class Decoder(nn.Module):
    def __init__(self, vocab_size, d_model, n_head, d_ff, num_layers, dropout):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, d_model, padding_idx=PAD)
        self.pos = PositionalEncoding(d_model, max_len, dropout)
        self.layers = nn.ModuleList([DecoderLayer(d_model, n_head, d_ff, dropout) for _ in range(num_layers)])
    
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None):  # tgt:(B,T)
        y = self.emb(tgt) * math.sqrt(self.emb.embedding_dim)
        y = self.pos(y)
        for layer in self.layers:
            y = layer(y, memory, tgt_mask, memory_mask)
        return y

class Transformer(nn.Module):
    def __init__(self, src_vocab, tgt_vocab, d_model, n_head, d_ff, num_layers, dropout):
        super().__init__()
        self.encoder = Encoder(src_vocab, d_model, n_head, d_ff, num_layers, dropout)
        self.decoder = Decoder(tgt_vocab, d_model, n_head, d_ff, num_layers, dropout)
        self.generator = nn.Linear(d_model, tgt_vocab)
    
    def forward(self, src, tgt_inp, src_mask=None, tgt_mask=None, memory_mask=None):
        memory = self.encoder(src, src_mask)  # (B,S,C)
        out = self.decoder(tgt_inp, memory, tgt_mask, memory_mask)  # (B,T,C)
        logits = self.generator(out)  # (B,T,V)
        return logits

In [40]:
# 简单分词与词表（英：按空格；中：按字符）
def tokenize_en(s: str) -> List[str]:
    return s.lower().strip().split()
def tokenize_zh(s: str) -> List[str]:
    return list(s.strip())  # 字符级，适合演示



def build_vocab(token_lists: List[List[str]], min_freq: int = 1):
    from collections import Counter
    counter = Counter()
    for tokens in token_lists:
        counter.update(tokens)
    itos = ["<pad>", "<bos>", "<eos>", "<unk>"]
    for tok, freq in counter.items():
        if freq >= min_freq and tok not in itos:
            itos.append(tok)
    stoi = {tok: i for i, tok in enumerate(itos)}
    return stoi, itos

def encode(tokens: List[str], stoi: dict, add_bos: bool, add_eos: bool, max_len: int):
    ids = []
    if add_bos:
        ids.append(BOS)
    for t in tokens:
        ids.append(stoi.get(t, UNK))
        if len(ids) >= max_len - (1 if add_eos else 0):
            break
    if add_eos:
        ids.append(EOS)
    return ids

def pad_to_len(ids: List[int], max_len: int, pad_id: int = PAD):
    return ids + [pad_id] * (max_len - len(ids))

def decode(ids: List[int], itos: List[str]):
    toks = []
    for i in ids:
        if i in (PAD, BOS, EOS):
            continue
        toks.append(itos[i] if i < len(itos) else "<unk>")
    return toks

In [41]:
# 从 data/test.csv 读取并行语料（英->中），自动识别列名
from pathlib import Path
import pandas as pd

data_dir = Path.cwd() / "data"
csv_path = data_dir / "test.csv"

def load_pairs_from_csv(p: Path):
    if not p.exists():
        raise FileNotFoundError(f"未找到数据文件: {p}")
    df = pd.read_csv(p, encoding="utf-8", dtype=str)  # 读为字符串，避免 NaN
    # 统一列名为小写，便于匹配
    df.columns = [str(c).strip().lower() for c in df.columns]
    candidates = [
        ("en", "zh"),
        ("english", "chinese"),
        ("source", "target"),
        ("src", "tgt"),
        ("text_en", "text_zh"),
        ("eng", "chn"),
    ]
    col_en = col_zh = None
    for a, b in candidates:
        if a in df.columns and b in df.columns:
            col_en, col_zh = a, b
            break
    if col_en is None or col_zh is None:
        if len(df.columns) >= 2:
            col_en, col_zh = df.columns[:2]
            print(f"未匹配到常见列名，默认使用前两列：{col_en}, {col_zh}")
        else:
            raise ValueError("CSV 至少需要两列（英文、中文）")
    en_series = df[col_en].fillna("").astype(str).str.strip()
    zh_series = df[col_zh].fillna("").astype(str).str.strip()
    mask = (en_series != "") & (zh_series != "")
    en_list = en_series[mask].tolist()
    zh_list = zh_series[mask].tolist()
    return list(zip(en_list, zh_list)), (col_en, col_zh), int(mask.sum())

pairs, used_cols, n_rows = load_pairs_from_csv(csv_path)
print(f"已从 {csv_path} 读取 {n_rows} 对句子，列: {used_cols}")

# 构建词表
src_tokens_list = [tokenize_en(s) for s, _ in pairs]
tgt_tokens_list = [tokenize_zh(t) for _, t in pairs]
src_stoi, src_itos = build_vocab(src_tokens_list)
tgt_stoi, tgt_itos = build_vocab(tgt_tokens_list)

src_vocab_size = len(src_itos)
tgt_vocab_size = len(tgt_itos)
src_vocab_size, tgt_vocab_size, len(pairs)

已从 d:\xry\yanjiusheng\1\DAMOXING\lab1\data\test.csv 读取 8549 对句子，列: ('en', 'zh')


(17579, 3003, 8549)

In [42]:
# 批处理与掩码
def make_batch(pairs: List[Tuple[str, str]], batch_size: int, shuffle: bool = True):
    idxs = list(range(len(pairs)))
    if shuffle:
        random.shuffle(idxs)
    for i in range(0, len(idxs), batch_size):
        b = [pairs[j] for j in idxs[i:i+batch_size]]
        src_batch, tgt_inp_batch, tgt_out_batch = [], [], []
        for s, t in b:
            src_ids = encode(tokenize_en(s), src_stoi, add_bos=False, add_eos=True, max_len=max_len)
            tgt_ids = encode(tokenize_zh(t), tgt_stoi, add_bos=True, add_eos=True, max_len=max_len)
            # teacher forcing: 输入为 <bos> y1 ... y_{n-1}，输出为 y1 ... y_n <eos>
            tgt_inp = tgt_ids[:-1]
            tgt_out = tgt_ids[1:]
            src_batch.append(pad_to_len(src_ids, max_len))
            tgt_inp_batch.append(pad_to_len(tgt_inp, max_len))
            tgt_out_batch.append(pad_to_len(tgt_out, max_len))
        yield (
            torch.tensor(src_batch, dtype=torch.long, device=device_torch),
            torch.tensor(tgt_inp_batch, dtype=torch.long, device=device_torch),
            torch.tensor(tgt_out_batch, dtype=torch.long, device=device_torch),
        )

def subsequent_mask(sz: int):
    # (1, 1, T, T) 上三角为 -inf 或 0 的布尔版，这里用 1/0 表达 keep/mask
    mask = torch.tril(torch.ones((sz, sz), dtype=torch.uint8, device=device_torch))
    return mask.view(1, 1, sz, sz)

def padding_mask(batch_ids: torch.Tensor, pad_id: int = PAD):
    # (B, T) -> (B, 1, 1, T) 1/0
    mask = (batch_ids != pad_id).unsqueeze(1).unsqueeze(1)
    return mask

def combine_masks(*masks):
    # 所有 mask 按与逻辑合并（1 保留，0 屏蔽）
    m = None
    for x in masks:
        if x is None:
            continue
        m = x if m is None else (m & x)
    return m

In [43]:
# 组网、损失与优化器
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, num_layers, dropout).to(device_torch)
criterion = nn.CrossEntropyLoss(ignore_index=PAD)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

def train_one_epoch():
    model.train()
    total_loss = 0.0
    steps = 0
    for src, tgt_inp, tgt_out in make_batch(pairs, batch_size, shuffle=True):
        B, S = src.size()
        _, T = tgt_inp.size()
        src_mask = padding_mask(src)  # (B,1,1,S)
        tgt_pad_mask = padding_mask(tgt_inp)  # (B,1,1,T)
        tgt_sub_mask = subsequent_mask(T)  # (1,1,T,T)
        tgt_mask = combine_masks(tgt_pad_mask, tgt_sub_mask)  # (B,1,T,T)
        mem_mask = src_mask  # (B,1,1,S) 广播到 (B,1,T,S)
        logits = model(src, tgt_inp, src_mask=src_mask, tgt_mask=tgt_mask, memory_mask=mem_mask)  # (B,T,V)
        loss = criterion(logits.view(B*T, -1), tgt_out.view(B*T))
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        total_loss += loss.item()
        steps += 1
    return total_loss / max(steps, 1)

In [44]:
# 简单贪心解码
@torch.no_grad()
def greedy_decode(model, src_sentence: str, max_new_tokens: int = 40):
    model.eval()
    src = torch.tensor([pad_to_len(encode(tokenize_en(src_sentence), src_stoi, add_bos=False, add_eos=True, max_len=max_len), max_len)], device=device_torch)
    src_mask = padding_mask(src)  # (1,1,1,S)
    memory = model.encoder(src, src_mask)  # (1,S,C)
    ys = torch.tensor([[BOS]], device=device_torch)  # (1,1)
    for _ in range(max_new_tokens):
        tgt_mask = combine_masks(padding_mask(ys), subsequent_mask(ys.size(1)))  # (1,1,T,T)
        out = model.decoder(ys, memory, tgt_mask, src_mask)  # (1,T,C)
        logits = model.generator(out[:, -1:, :])  # (1,1,V)
        next_token = logits.argmax(dim=-1)[:, -1]  # (1,)
        ys = torch.cat([ys, next_token.unsqueeze(1)], dim=1)  # (1,T+1)
        if next_token.item() == EOS:
            break
    pred_ids = ys[0].tolist()[1:]  # 去掉 BOS
    pred_tokens = decode(pred_ids, tgt_itos)
    return "".join(pred_tokens)  # 中文字符级，直接拼接

# 训练并测试
for epoch in range(1, num_epochs + 1):
    loss = train_one_epoch()
    if epoch % 5 == 0 or epoch == 1:
        print(f"epoch {epoch:02d} | loss {loss:.4f}")


epoch 01 | loss 5.5394
epoch 05 | loss 4.1338
epoch 05 | loss 4.1338
epoch 10 | loss 3.7267
epoch 10 | loss 3.7267
epoch 15 | loss 3.4788
epoch 15 | loss 3.4788
epoch 20 | loss 3.2947
epoch 20 | loss 3.2947
epoch 25 | loss 3.1506
epoch 25 | loss 3.1506
epoch 30 | loss 3.0298
epoch 30 | loss 3.0298


In [45]:
tests = [
    "are",
    "hello",
    "how are you",
    "see you",
    "what is name",
 ]
for i, s in enumerate(tests, 1):
    zh = greedy_decode(model, s)
    print(f"这是第{i}条输出：{s} -> {zh}")

这是第1条输出：are -> 这是其实。
这是第2条输出：hello -> T道了。
这是第3条输出：how are you -> 你怎么做？
这是第4条输出：see you -> 你看到了。
这是第5条输出：what is name -> 这是什么？
