分类架构：文本 -> Token -> Embedding -> LSTM -> 序列表示 -> 分类器 -> Loss

LSTM层的输出形状：

>output, (h_n, c_n) = lstm(embeddings)

output：[batch_size, seq_len, hidden_size]

h_n：[num_layers * num_directions, batch_size, hidden_size]

分类模型中，我们用哪个向量做分类

| 方法             | 描述                    |
| -------------- | --------------------- |
| 最后一个 time step | `output[:, -1, :]`    |
| 最后一层 h_n       | `h_n[-1]`             |
| Mean Pooling   | `mean(output, dim=1)` |


## 模型结构图

In [2]:
Input Text
   ↓
Token IDs
   ↓
Embedding Layer
   ↓
LSTM Encoder
   ↓
Last Hidden State (h_n)
   ↓
Fully Connected
   ↓
Softmax
   ↓
Prediction


[batch, seq]
   ↓
[batch, seq, emb]
   ↓
[batch, seq, hidden]
   ↓
[batch, hidden]
   ↓
[batch, class]


SyntaxError: invalid syntax (909900121.py, line 1)

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import random

#构造一个模拟文本分类数据集
class TextClassificationDataset(Dataset):
    def __init__(self, num_samples=10000, vocab_size=5000, max_len=20):

        #用一个list保存所有样本
        #每一个元素是一个(input_ids, label)二元组
        self.samples = []

        for _ in range(num_samples):
            #随机生成一个句子长度，下限为5
            seq_len = random.randint(5, max_len)
            #生成一个长度为seq_len的token序列，在[a, b)间采样
            input_ids = torch.randint(1, vocab_size, (seq_len,))
            #生成一个标签(0或1)，(1,)表示shape是[1], 再用.item()取出python标量
            label = torch.randint(0, 1, (1,)).item()
            #将(input_ids, label)作为一个样本加入samples
            self.samples.append((input_ids, label))

    #返回数据集大小
    #DataLoader会在内部调用这个函数    
    def __len__(self):
        return len(self.samples)
    
    #根据索引idx取出一个样本
    def __getitem__(self, idx):
        return self.samples[idx]
    
#padding + collate
def collate_fn(batch):
    """
    batch: List[(input_ids, label)]
    """
    input_ids_list, labels = zip(*batch)

    lengths = [len(x) for x in input_ids_list]
    max_len = max(lengths)

    padded_inputs = []
    #对每个句子进行padding，确保每个句子长度一致
    for x in input_ids_list:
        #计算需要padding的长度
        pad_len = max_len - len(x)
        #使用torch.cat拼接原始句子和pad_len的0向量
        padded = torch.cat([x, torch.zeros(pad_len, dtype=torch.long)])

        padded_inputs.append(padded)

    #将padded_inputs转换成一个tensor
    #padded_inputs是一个列表，其中的每一个元素是一个tensor
    #使用tensor.stack将它们堆叠成一个大的tensor，形状是[B,L]
    padded_inputs = torch.stack(padded_inputs)  #[B,L]

    #将label转换成tensor格式
    labels = torch.tensor(labels)   #[B]

    return padded_inputs, labels

class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_size, num_classes):
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, embed_dim)

        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_size,
            batch_first=True
        )

        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, input_ids):
        """
        input_ids: [batch_size, seq_len]
        """
        embedings = self.embedding(input_ids)       #[B, L, E]
        _, (h_n, _) = self.lstm(embedings)          #h_n: [1, B, H]
        last_hidden = h_n[-1]
        logits = self.fc(last_hidden)

        return logits
    
def train():
    vocab_size = 5000
    embed_dim = 128
    hidden_size = 256
    num_classes = 2
    batch_size = 32
    lr = 1e-3
    epochs = 5

    #数据
    train_dataset = TextClassificationDataset()
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=collate_fn
    )

    #模型，优化器
    model = LSTMClassifier(
        vocab_size=vocab_size,
        embed_dim=embed_dim,
        hidden_size=hidden_size,
        num_classes=num_classes
    )

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    #训练
    for epoch in range(epochs):
        model.train()
        total_loss = 0.0

        for input_ids, labels in train_loader:
            logits = model(input_ids)
            loss = criterion(logits, labels)

            optimizer.zero_grad
            loss.backward()
            optimizer.step

            total_loss += loss.item()

        print(f"Epoch {epoch + 1}/{epochs} | Loss: {total_loss:.4f}")

if __name__ == "__main__":
    train()

Epoch 1/5 | Loss: 207.0214
Epoch 2/5 | Loss: 207.0007
Epoch 3/5 | Loss: 207.0101
Epoch 4/5 | Loss: 207.0195
Epoch 5/5 | Loss: 207.0379


## LSTM + Attention 文本分类结构
结构流程：
### 1.Embedding 层
输入：[batch_size, seq_len] 的 token id

输出：[batch_size, seq_len, embed_dim]

### 2.BiLSTM 层
输入：embedding

输出：H，形状 [batch_size, seq_len, hidden_dim * 2]

用双向 LSTM，这样 H 每个位置有前后文信息。

### 3.Attention 层
我们用一种最常见的 Additive Attention（Bahdanau-style 简化版）：

- 先把 H 通过一个线性 + tanh 得到“注意力打分”：
$$u_{t}=\tanh \left(W h_{t}+b\right), \quad u_{t} \in \mathbb{R}^{d_{a t t}}$$

- 再和一个可学习向量 v 做内积得到标量分数：
$$e_{t}=v^{\top} u_{t}$$

- 对 e_t 做 softmax 得到权重：
$$\alpha_{t}=\frac{\exp \left(e_{t}\right)}{\sum_{k=1}^{T} \exp \left(e_{k}\right)}$$

- 最后句向量
$$c=\sum_{t=1}^{T} \alpha_{t} h_{t}$$

实现时：

H : [B, T, H_dim]

u : [B, T, att_dim]

e : [B, T]

α : [B, T]

c : [B, H_dim]

### 4.分类层
输入：c [B, H_dim]

过全连接 + softmax / sigmoid：

$$\hat{y}=\operatorname{softmax}\left(W_{c} c+b_{c}\right)$$

In [1]:
from datasets import load_dataset
#from sklearn.model_selection import train_test_split

raw_datasets = load_dataset("imdb")     #二分类：0=neg， 1=pos

print(raw_datasets)

train_valid = raw_datasets["train"]
test_dataset = raw_datasets["test"]

#拆成20000train和5000valid
train_valid = train_valid.train_test_split(test_size=5000, seed=42)
train_dataset = train_valid["train"]
valid_dataset = train_valid["test"]

print(len(train_dataset), len(valid_dataset), len(test_dataset))

  from .autonotebook import tqdm as notebook_tqdm
'(MaxRetryError("HTTPSConnectionPool(host='huggingface.co', port=443): Max retries exceeded with url: /datasets/imdb/resolve/main/README.md (Caused by ConnectTimeoutError(<HTTPSConnection(host='huggingface.co', port=443) at 0x1f31bd6bf40>, 'Connection to huggingface.co timed out. (connect timeout=10)'))"), '(Request ID: d0278d29-3170-42e9-a21a-8d8f7d3756bb)')' thrown while requesting HEAD https://huggingface.co/datasets/imdb/resolve/main/README.md
Retrying in 1s [Retry 1/5].


DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    unsupervised: Dataset({
        features: ['text', 'label'],
        num_rows: 50000
    })
})
20000 5000 25000


写一个几件英文分词，实际项目中可以换成更好的分词器

In [13]:
from collections import Counter
import re

def simple_tokenizer(text):
    text = text.lower()
    #r表示原始字符串raw string, 避免/这种转义带来的麻烦
    #+表示前面的模式至少出现一次，可以连续多次
    #可在一起的意思是，匹配text里连续的小写英文字母字符串，也就是按英文单词切分
    tokens = re.findall(r"[a-z]+", text)
    return tokens

min_freq = 5 #词频小于5的当<unk>
counter = Counter()

for example in train_dataset:
    tokens = simple_tokenizer(example["text"])
    counter.update(tokens)

#special tokens
PAD = "<pad>"
UNK = "<unk>"

#<pad>用于补齐长度，编号0；<unk>表示未知词，编号1
vocab = {PAD: 0, UNK: 1}

#counter.items()返回(单词，词频)
for word, freq in counter.items():
    if freq >= min_freq:
        vocab[word] = len(vocab)

vocab_size = len(vocab)
print("vocab_size= ", vocab_size)

#encode: 把一条原始文本转换成固定长度的token id序列
def encode(text, vocab, max_len=256):
    tokens = simple_tokenizer(text)
    #把token转换成对应的词表id，如果token不在词表中，则用<UNK>的id
    ids = [vocab.get(tok, vocab[UNK]) for tok in tokens]

    if len(ids) < max_len:
        pad_len = max_len - len(ids)
        ids += [vocab[PAD]] * pad_len
    
    else:
        ids = ids[:max_len]

    return ids


vocab_size=  25954


In [3]:
import torch
from torch.utils.data import Dataset, DataLoader

class TextClassificationDataset(Dataset):
    def __init__(self, hf_dataset, vocab, max_len=256):
        self.dataset = hf_dataset
        self.vocab = vocab
        self.max_len = max_len

    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        text = self.dataset[idx]["text"]
        label = self.dataset[idx]["label"]

        input_ids = encode(text, vocab=self.vocab, max_len=self.max_len)
        input_ids = torch.tensor(input_ids, dtype=torch.long)

        return input_ids, label
    
max_len = 256
train_data = TextClassificationDataset(train_dataset, vocab, max_len=max_len)
valid_data = TextClassificationDataset(valid_dataset, vocab, max_len=max_len)
test_data = TextClassificationDataset(test_dataset, vocab, max_len=max_len)

batch_size = 64
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=batch_size)
test_loader = DataLoader(test_data, batch_size=batch_size)

### 基线模型：BiLSTM 文本分类

这个版本没有 Attention，就用 “最后一步 hidden state” 当句向量。

In [4]:
import torch.nn as nn
import torch.nn.functional as F

class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, 
                 num_classes=2, num_layers=1, 
                 bidirectional=True, dropout=0.5, pad_idx=0):
        super().__init__()

        #输入：[B, T]
        #输出：[B, T, embed_dim]
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)

        #输出 H: [B, T, hidden_dim * 2]
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            bidirectional=True,
            batch_first=True
        )

        self.dropout = nn.Dropout(dropout)

        lstm_output_dim = hidden_dim * (2 if bidirectional else 1)

        self.fc = nn.Linear(lstm_output_dim, num_classes)

    def forward(self, input_ids):
        """
        input_ids: [B, T]
        """
        emb = self.embedding(input_ids)     #[B, T, E]
        H, (h_n, c_n) = self.lstm(emb)      #H: [B, T, 2H]

        #取最后一个时间步的hidden state
        #H[:, -1, :] : [B, 2H]
        last_hidden = H[:, -1, :]

        out = self.dropout(last_hidden)
        logits = self.fc(out)

        return logits

### BiLSTM + Attention 文本分类

In [5]:
class LSTMAttentionClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim,
                 num_classes=2, num_layers=1, bidirectional=True,
                 dropout=0.5, pad_idx=0, attn_dim=128):
        super().__init__()
        #输入：[B, T]
        #输出：[B, T, embed_dim]
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=pad_idx)

       #输出 H: [B, T, hidden_dim * 2]
        self.lstm = nn.LSTM(
            input_size=embed_dim,
            hidden_size=hidden_dim,
            num_layers=num_layers,
            bidirectional=bidirectional,
            batch_first=True
        )

        self.dropout = nn.Dropout(dropout)
        lstm_out_dim = hidden_dim * (2 if bidirectional else 1)

        #attention
        self.atten_w = nn.Linear(lstm_out_dim, attn_dim, bias=True)
        self.atten_v = nn.Linear(attn_dim, 1, bias=False)

        #分类器
        self.fc = nn.Linear(lstm_out_dim, num_classes)
        self.pad_idx = pad_idx

    def attention(self, H, mask=None):
        """
        H: [B, T, H_dim]
        mask: [B, T]，padding 位置为 0, 真实token为1
        """
        u = torch.tanh(self.atten_w(H))     #[B, T, atten_dim]
        #线性映射成标量打分
        e = self.atten_v(u).squeeze(-1)     #[B, T]

        #mask掉padding的位置
        if mask is not None:
            e = e.masked_fill(mask==0, float("-inf"))

        #softmax归一化得到attention的权重分布
        alpha = F.softmax(e, dim=-1)        #[B, T]

        alpha_expanded = alpha.unsqueeze(-1)

        #加权隐藏状态
        weighted_H = H * alpha_expanded     #[B, T, H_dim]

        #沿时间求和得到上下文向量
        context = weighted_H.sum(dim=1)     #[B, H_dim]

        return context, alpha
    
    def forward(self, input_ids):
        """
        input_ids: [B, T]
        """
        emb = self.embedding(input_ids)     #[B, T, E]
        H, _ = self.lstm(emb)               #[B, T, H_dim]

        #标记真实token的位置
        mask = (input_ids != self.pad_idx).long() #[B, T]

        context, atten_weights = self.attention(H, mask)
        out = self.dropout(context)
        logits = self.fc(out)               #[B, num_classes]

        return logits, atten_weights

### 训练 + 评估 + 指标对比表

In [6]:
import torch
import torch.optim as optim
from sklearn.metrics import f1_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_one_epoch(model, dataloader, optimizer, criterion):
    model.train()
    total_loss, total_correct, total_examples = 0, 0, 0

    for input_ids, labels in dataloader:
        input_ids = input_ids.to(device)
        labels = labels.to(device)

        #区分两种模型，有无attention
        outputs = model(input_ids)
        if isinstance(outputs, tuple):
            logits, _ = outputs
        else:
            logits = outputs

        loss = criterion(logits, labels)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * input_ids.size(0)
        preds = logits.argmax(dim=-1)
        total_correct += (preds == labels).sum().item()
        total_examples += input_ids.size(0)

    avg_loss = total_loss / total_examples
    avg_acc = total_correct / total_examples

    return avg_loss, avg_acc

@torch.no_grad()
def evaluate(model, dataloader, criterion):
    model.eval()
    total_loss, total_correct, total_examples = 0, 0, 0
    all_preds, all_labels = [], []

    for input_ids, labels in dataloader:
        input_ids = input_ids.to(device)
        labels = labels.to(device)

        outputs = model(input_ids)
        if isinstance(outputs, tuple):
            logits, _ = outputs
        else:
            logits = outputs

        loss = criterion(logits, labels)
        total_loss += loss.item() * input_ids.size(0)
        preds = logits.argmax(dim=-1)
        total_correct += (preds == labels).sum().item()
        total_examples += input_ids.size(0)

        all_preds.extend(preds.cpu().tolist())
        all_labels.extend(labels.cpu().tolist())
    
    avg_loss = total_loss / total_examples
    avg_acc = total_correct / total_examples
    f1 = f1_score(all_labels, all_preds, average="macro")

    return avg_loss, avg_acc, f1

In [7]:
embed_dim = 128
hidden_dim = 128
num_classes = 2
pad_idx = vocab[PAD]
num_epochs = 5

baseline_model = BiLSTMClassifier(
    vocab_size=vocab_size,
    embed_dim=embed_dim,
    hidden_dim=hidden_dim,
    num_classes=num_classes,
    num_layers=1,
    bidirectional=True,
    dropout=0.5,
    pad_idx=pad_idx
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(baseline_model.parameters(), lr=1e-3)

for epoch in range(num_epochs):
    train_loss, train_acc = train_one_epoch(baseline_model, train_loader, optimizer, criterion)
    val_loss, val_acc, val_f1 = evaluate(baseline_model, valid_loader, criterion)
    print(f"[BiLSTM] Epoch {epoch+1}: "
          f"train_loss: {train_loss:.4f}, train_acc: {train_acc:.4f}, "
          f"valid_loss: {val_loss:.4f}, valid_acc: {val_acc:.4f}, valid_f1: {val_f1:.4f}")
    
#最后在test数据集上评估
test_loss, test_acc, test_f1 = evaluate(baseline_model, test_loader, criterion)
print(f"[BiLSTM] Test: loss= {test_loss:.4f}, acc= {test_acc:.4f}, f1= {test_f1:.4f}")

[BiLSTM] Epoch 1: train_loss: 0.6941, train_acc: 0.5039, valid_loss: 0.6955, valid_acc: 0.4994, valid_f1: 0.4294
[BiLSTM] Epoch 2: train_loss: 0.6911, train_acc: 0.5311, valid_loss: 0.6960, valid_acc: 0.5044, valid_f1: 0.4401
[BiLSTM] Epoch 3: train_loss: 0.6858, train_acc: 0.5363, valid_loss: 0.7049, valid_acc: 0.5058, valid_f1: 0.4077
[BiLSTM] Epoch 4: train_loss: 0.6783, train_acc: 0.5498, valid_loss: 0.6982, valid_acc: 0.5198, valid_f1: 0.4760
[BiLSTM] Epoch 5: train_loss: 0.6710, train_acc: 0.5508, valid_loss: 0.7107, valid_acc: 0.4970, valid_f1: 0.4195
[BiLSTM] Test: loss= 0.7026, acc= 0.5084, f1= 0.4267


In [8]:
attn_model = LSTMAttentionClassifier(
    vocab_size=vocab_size,
    embed_dim=embed_dim,
    hidden_dim=hidden_dim,
    num_classes=num_classes,
    num_layers=1,
    bidirectional=True,
    dropout=0.5,
    pad_idx=pad_idx,
    attn_dim=128
).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(attn_model.parameters(), lr=1e-3)

for epoch in range(1, num_epochs + 1):
    train_loss, train_acc = train_one_epoch(attn_model, train_loader, optimizer, criterion)
    val_loss, val_acc, val_f1 = evaluate(attn_model, valid_loader, criterion)
    print(f"[BiLSTM+Attn] Epoch {epoch}: "
          f"train_loss={train_loss:.4f}, train_acc={train_acc:.4f}, "
          f"val_loss={val_loss:.4f}, val_acc={val_acc:.4f}, val_f1={val_f1:.4f}")

test_loss, test_acc, test_f1 = evaluate(attn_model, test_loader, criterion)
print(f"[BiLSTM+Attn] Test: loss={test_loss:.4f}, acc={test_acc:.4f}, f1={test_f1:.4f}")

[BiLSTM+Attn] Epoch 1: train_loss=0.4868, train_acc=0.7499, val_loss=0.3692, val_acc=0.8360, val_f1=0.8356
[BiLSTM+Attn] Epoch 2: train_loss=0.2997, train_acc=0.8742, val_loss=0.3170, val_acc=0.8646, val_f1=0.8643
[BiLSTM+Attn] Epoch 3: train_loss=0.2082, train_acc=0.9182, val_loss=0.3002, val_acc=0.8768, val_f1=0.8767
[BiLSTM+Attn] Epoch 4: train_loss=0.1256, train_acc=0.9564, val_loss=0.3484, val_acc=0.8732, val_f1=0.8731
[BiLSTM+Attn] Epoch 5: train_loss=0.0653, train_acc=0.9783, val_loss=0.4549, val_acc=0.8732, val_f1=0.8732
[BiLSTM+Attn] Test: loss=0.5245, acc=0.8522, f1=0.8522


### Attention的权重

In [23]:
def show_attention_one_sample(model, dataset, idx, top_k=30):
    """
    model: 已训练好的 LSTM + Attention 模型
    dataset: TextClassificationDataset（自定义的数据集）
    idx: 要查看的样本在 dataset 中的索引
    top_k: 最多显示前多少个 token（防止句子太长）
    """
    model.eval()
    #从dataset中提取第idx的样本
    #input_ids: [T], label: 标量
    input_ids, label = dataset[idx]

    #增加batch长度，把[T] -> [1, T]
    input_ids = input_ids.unsqueeze(0).to(device)   #[1, T]

    with torch.no_grad():
        #logits: [1, num_classes]
        #attn_weights: [1, T](每个时间步的attention权重)
        logits, attn_weights = model(input_ids)

    #去掉batch长度：[1, T] -> [T]
    attn_weights = attn_weights.squeeze(0).cpu().numpy()    #[T]

    #同时去掉batch维度：[1, T] -> [T]
    ids = input_ids.squeeze(0).cpu().tolist()

    #把padding的token去掉
    valid_pairs = [(i, w) for i, w in zip(ids, attn_weights) if i != vocab[PAD]]

    #避免句子太长，只显示前top_k个token
    valid_pairs = valid_pairs[:top_k]

    tokens, weights = [], []

    for i, w in valid_pairs:
        tokens.append(id2word.get(i, UNK))
        weights.append(w)

    #将token和attention权重一一对应打印出来
    for tok, w in zip(tokens, weights):
        #用“#”的长度来表示attention权重的大小
        bar = "#" * int(w * 1000)

        #左边是token(固定长度为15)
        print(f"{tok:15s} | {bar} ({w:.3f})")

id2word = {idx: word for word, idx in vocab.items()}
show_attention_one_sample(attn_model, valid_data, idx=0)

there           |  (0.000)
is              |  (0.000)
no              | ##### (0.005)
relation        |  (0.000)
at              |  (0.000)
all             |  (0.000)
between         |  (0.001)
<unk>           |  (0.000)
and             |  (0.000)
<unk>           |  (0.000)
but             |  (0.000)
the             |  (0.000)
fact            |  (0.000)
that            |  (0.000)
both            | ### (0.004)
are             | ####### (0.007)
police          | ############################### (0.031)
series          | ####################### (0.024)
about           | #################### (0.020)
violent         | ################# (0.017)
crimes          |  (0.000)
<unk>           |  (0.000)
looks           | # (0.001)
<unk>           |  (0.000)
<unk>           |  (0.000)
looks           | # (0.002)
classic         |  (0.000)
<unk>           |  (0.000)
plots           |  (0.001)
are             | ### (0.003)
