# Day 5: LSTM 在 IMDB 情感分类上的实现

本讲义演示如何使用 LSTM（长短期记忆）网络在 IMDB 影评数据集上进行**二分类**任务（正面/负面情感）。

**学习目标：**
- 理解 LSTM 的工作原理与应用场景
- 掌握 Hugging Face `datasets` 库用于 NLP 数据加载与预处理
- 实现 Embedding + LSTM + Dense 的端到端文本分类流程
- 观察并分析训练过程中的过拟合现象

**关键概念：**
- **Embedding 层**：将词 ID 映射为稠密向量表示
- **LSTM 层**：捕捉序列中的长期依赖关系
- **动态填充**：根据批次中最长序列进行填充，提高效率
- **BCEWithLogitsLoss**：结合 Sigmoid 激活与二分类交叉熵，数值稳定性更好

## Hugging Face `datasets` 库简介

`datasets` 是 Hugging Face 生态中的**高效数据加载与处理工具**，专为大规模数据集设计。相比传统方法，它具有以下优势：

**核心特性：**
1. **高效存储**：基于 Apache Arrow，支持内存映射，即使数据集超过内存也能使用
2. **丰富 API**：支持 map、filter、select 等高阶操作，用法简洁
3. **自动下载**：自动从 Hugging Face Hub 下载并缓存数据集
4. **流式处理**：支持边下载边处理（对超大数据集有用）

**使用流程：**
1. `load_dataset()`：加载数据集
2. 定义预处理函数并用 `map()` 应用
3. `set_format()`：转换为 PyTorch 张量
4. 与 DataLoader 集成进行批处理

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from datasets import load_dataset
from transformers import AutoTokenizer

In [3]:
# 设置参数
max_features = 10000  # 词汇表大小
maxlen = 200          # 序列最大长度
batch_size = 64       # 批次大小

## `datasets` 库的设计哲学

关键点：
- 数据存储在**磁盘**上（Arrow 格式），需要时才加载到内存
- 支持**高效的切片、索引、过滤**等操作
- `map()` 支持 `batched=True` 以加速数据处理
- 自动处理数据缓存，避免重复处理

这种设计使得即使在资源有限的环境中，也能高效处理数十 GB 级别的数据集。

In [None]:
# 加载 IMDB 数据集
dataset = load_dataset('imdb')
print(f"数据集分割: {dataset.column_names}")
print(f"训练集大小: {len(dataset['train'])}")
print(f"测试集大小: {len(dataset['test'])}")

# 加载 BERT Tokenizer（用于 IMDB 情感分类）
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')

# 定义预处理函数
def preprocess_function(examples):
    """
    对文本进行分词、填充和截断
    
    Args:
        examples: 字典，包含 'text' 和 'label' 键
        
    Returns:
        字典，包含 'input_ids', 'attention_mask' 等
    """
    return tokenizer(
        examples['text'], 
        padding='max_length', 
        truncation=True, 
        max_length=maxlen
    )

# 应用预处理（使用 batched=True 加速）
tokenized_datasets = dataset.map(
    preprocess_function, 
    batched=True,
    desc="Tokenizing dataset"
)

# 后处理：重命名列、移除原始文本
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
tokenized_datasets = tokenized_datasets.remove_columns(["text"])

# 转换为 PyTorch 张量格式
tokenized_datasets.set_format("torch")

print("\n数据加载和预处理完成。")

README.md:   0%|          | 0.00/7.81k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/21.0M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/20.5M [00:00<?, ?B/s]

unsupervised-00000-of-00001.parquet:   0%|          | 0.00/42.0M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/25000 [00:00<?, ? examples/s]

Generating unsupervised split:   0%|          | 0/50000 [00:00<?, ? examples/s]

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/25000 [00:00<?, ? examples/s]

Map:   0%|          | 0/50000 [00:00<?, ? examples/s]

数据加载和预处理完成。


datasets 提供了强大的数据处理 API，你可以用非常简洁的方式对整个数据集或数据集的某个子集进行转换操作。
- map() 方法： 这是最核心的数据处理方法！你可以定义一个函数，然后用 dataset.map(your_function) 将这个函数应用到数据集的每个样本或每个批次上。这使得数据预处理和特征工程变得非常方便。
- filter() 方法： 根据条件过滤数据集的样本。
- remove_columns() / rename_columns()： 移除或重命名数据集的列。
- select()： 选取数据集的某个子集。
- sort()： 对数据集进行排序。

In [5]:
# 创建 DataLoader
train_dataloader = DataLoader(tokenized_datasets["train"], shuffle=True, batch_size=batch_size)
test_dataloader = DataLoader(tokenized_datasets["test"], batch_size=batch_size)

print("DataLoader 创建完成。")

DataLoader 创建完成。


In [6]:
# 检查一个批次的数据形状
for batch in train_dataloader:
    print(f"文本批次形状 (input_ids): {batch['input_ids'].shape}")
    print(f"标签批次形状 (labels): {batch['labels'].shape}")
    # Hugging Face tokenizer 还会返回 attention_mask 和 token_type_ids (对于 BERT)
    print(f"Attention mask 批次形状: {batch['attention_mask'].shape}")
    if 'token_type_ids' in batch:
        print(f"Token type ids 批次形状: {batch['token_type_ids'].shape}")
    break

文本批次形状 (input_ids): torch.Size([64, 200])
标签批次形状 (labels): torch.Size([64])
Attention mask 批次形状: torch.Size([64, 200])
Token type ids 批次形状: torch.Size([64, 200])


## 构建简单的 LSTM 模型 

搭建一个包含 Embedding 层、LSTM 层和 Dense (Linear) 层的RNN模型。

In [None]:
class SimpleLSTMModel(nn.Module):
    """
    简单 LSTM 二分类模型
    
    架构：
    - Embedding：词 ID -> 稠密向量 [seq_len, embedding_dim]
    - LSTM：单层 LSTM 捕捉序列依赖 [seq_len, hidden_dim]
    - 输出层：LSTM 最后隐藏状态 -> 线性层 -> 二分类输出
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super().__init__()
        
        # Embedding 层：将词 ID (0-vocab_size-1) 映射为 embedding_dim 维向量
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # LSTM 层：单向，batch_first=True 使输入为 [batch, seq_len, features]
        self.lstm = nn.LSTM(
            embedding_dim, 
            hidden_dim, 
            num_layers=1, 
            batch_first=True
        )
        
        # 全连接输出层
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, input_ids):
        """
        前向传播
        
        Args:
            input_ids: [batch_size, seq_len]
            
        Returns:
            predictions: [batch_size, output_dim]
        """
        # Embedding: [batch, seq_len] -> [batch, seq_len, embedding_dim]
        embedded = self.embedding(input_ids)
        
        # LSTM: [batch, seq_len, embedding_dim] -> output, (hidden, cell)
        # output: [batch, seq_len, hidden_dim]，每个时间步的输出
        # hidden: [1, batch, hidden_dim]，最后一个时间步的隐藏状态
        output, (hidden, cell) = self.lstm(embedded)
        
        # 取 LSTM 最后一个时间步的隐藏状态
        hidden = hidden.squeeze(0)  # [batch, hidden_dim]
        
        # 全连接层进行分类
        predictions = self.fc(hidden)  # [batch, output_dim]
        return predictions

In [8]:
# 设置模型参数
# 使用 Hugging Face tokenizer 的词汇表大小
vocab_size = tokenizer.vocab_size
embedding_dim = 128 # 嵌入维度适当提升，兼顾性能和效果
hidden_dim = 128    # LSTM 隐藏层维度适当提升，兼顾性能和效果
output_dim = 1      # 输出维度 (二分类)

# 实例化LSTM模型
model = SimpleLSTMModel(vocab_size, embedding_dim, hidden_dim, output_dim)

print("模型构建完成。")
print(model)

模型构建完成。
SimpleLSTMModel(
  (embedding): Embedding(30522, 128)
  (lstm): LSTM(128, 128, batch_first=True)
  (fc): Linear(in_features=128, out_features=1, bias=True)
)


## 训练设置

定义损失函数和优化器。

In [None]:
# 定义损失函数和优化器
criterion = nn.BCEWithLogitsLoss()  # 结合 Sigmoid 和交叉熵，数值稳定
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 移动模型和损失函数到设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
criterion = criterion.to(device)

print(f"使用设备: {device}")
print("损失函数和优化器设置完成。")

使用设备: cpu
损失函数和优化器设置完成。


## 辅助函数定义

**`binary_accuracy()`**：计算二分类准确率
- 将 logits 通过 Sigmoid 转换为概率 [0, 1]
- 四舍五入到 0 或 1，与真实标签比较

**`train()`**：单个 epoch 的训练循环
- 遍历训练集所有 batch
- 计算损失、反向传播、更新参数
- 返回平均损失和准确率

**`evaluate()`**：在验证/测试集上评估
- 不计算梯度（`torch.no_grad()`）
- 返回损失和准确率，用于监控模型性能

In [None]:
def binary_accuracy(preds, y):
    """
    计算二分类准确率
    
    Args:
        preds: [batch_size] logits（原始预测值，未经激活）
        y: [batch_size] 二进制标签 (0 或 1)
        
    Returns:
        准确率 (0.0 - 1.0)
    """
    rounded_preds = torch.round(torch.sigmoid(preds))  # 通过 Sigmoid 转换为概率，再四舍五入
    correct = (rounded_preds == y).float()
    acc = correct.sum() / len(correct)
    return acc

def train(model, iterator, optimizer, criterion, device):
    """
    单个 epoch 的训练
    
    Returns:
        (epoch_loss, epoch_acc)
    """
    epoch_loss = 0
    epoch_acc = 0
    
    model.train()
    
    for batch in iterator:
        input_ids = batch['input_ids'].to(device)
        labels = batch['labels'].to(device)
        
        optimizer.zero_grad()
        
        # 前向传播
        predictions = model(input_ids).squeeze(1)
        loss = criterion(predictions, labels.float())
        
        # 计算准确率
        acc = binary_accuracy(predictions, labels)
        
        # 反向传播
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

def evaluate(model, iterator, criterion, device):
    """
    在验证/测试集上评估
    
    Returns:
        (epoch_loss, epoch_acc)
    """
    epoch_loss = 0
    epoch_acc = 0

    model.eval()
    
    with torch.no_grad():
        for batch in iterator:
            input_ids = batch['input_ids'].to(device)
            labels = batch['labels'].to(device)

            predictions = model(input_ids).squeeze(1)
            loss = criterion(predictions, labels.float())
            acc = binary_accuracy(predictions, labels)

            epoch_loss += loss.item()
            epoch_acc += acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [None]:
train_losses, valid_losses, train_accs, valid_accs = [], [], [], []

epochs = 10 # 不要继续训练了，过拟合严重
best_valid_loss = float('inf')
best_epoch = 0
print("start training...")
for epoch in range(epochs):
    train_loss, train_acc = train(model, train_dataloader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, test_dataloader, criterion)
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    train_accs.append(train_acc)
    valid_accs.append(valid_acc)
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        best_epoch = epoch
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': valid_loss,
        }
        torch.save(checkpoint, f'../model/SimpleLSTM/checkpoint_{epoch+1}.pth')
    print(f'Epoch: {epoch+1:02}')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%')
    print(f'\t Valid. Loss: {valid_loss:.3f} |  Valid. Acc: {valid_acc*100:.2f}%')

start training...
Epoch: 01
	Train Loss: 0.644 | Train Acc: 62.61%
	 Valid. Loss: 0.560 |  Valid. Acc: 73.64%
Epoch: 02
	Train Loss: 0.433 | Train Acc: 80.95%
	 Valid. Loss: 0.417 |  Valid. Acc: 80.87%
Epoch: 03
	Train Loss: 0.291 | Train Acc: 88.50%
	 Valid. Loss: 0.373 |  Valid. Acc: 83.77%
Epoch: 04
	Train Loss: 0.213 | Train Acc: 92.15%
	 Valid. Loss: 0.407 |  Valid. Acc: 84.17%
Epoch: 05
	Train Loss: 0.148 | Train Acc: 95.11%
	 Valid. Loss: 0.432 |  Valid. Acc: 84.06%
Epoch: 06
	Train Loss: 0.103 | Train Acc: 96.88%
	 Valid. Loss: 0.540 |  Valid. Acc: 82.78%
Epoch: 07
	Train Loss: 0.068 | Train Acc: 98.15%
	 Valid. Loss: 0.540 |  Valid. Acc: 83.11%
Epoch: 08
	Train Loss: 0.050 | Train Acc: 98.66%
	 Valid. Loss: 0.696 |  Valid. Acc: 83.07%
Epoch: 09
	Train Loss: 0.040 | Train Acc: 98.95%
	 Valid. Loss: 0.696 |  Valid. Acc: 83.42%
Epoch: 10
	Train Loss: 0.030 | Train Acc: 99.28%
	 Valid. Loss: 0.763 |  Valid. Acc: 83.09%


## 训练与过拟合观察

**现象**：模型在 epoch 3 后开始过拟合
- 训练损失持续下降
- 验证损失开始上升

**原因**：
- 单层 LSTM 容量不足，难以充分学习
- 数据集足够大，但模型过度拟合训练细节
- 缺乏足够的正则化（Dropout 在单层 LSTM 中效果有限）

**可能的改进方向**（暂不实现）：
- 添加更多 Dropout 层
- 增加 LSTM 层数
- 数据增强或加权损失
- Early Stopping

本实验重点是理解流程，暂不做过多优化。

In [19]:
# 加载最优模型并评估
checkpoint = torch.load(f'../model/SimpleLSTM/checkpoint_{best_epoch+1}.pth')

model.load_state_dict(checkpoint['model_state_dict'])

model.eval()

test_loss, test_acc = evaluate(model, test_dataloader, criterion)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

Test Loss: 0.373 | Test Acc: 83.77%


In [20]:
# 单条文本预测函数
def predict_sentiment(model, tokenizer, sentence, device, maxlen=200):
    model.eval()
    tokens = tokenizer(sentence, padding='max_length', truncation=True, max_length=maxlen, return_tensors='pt')
    input_ids = tokens['input_ids'].to(device)
    with torch.no_grad():
        output = model(input_ids)
        prob = torch.sigmoid(output.squeeze(1)).item()
    return prob

# 示例：预测一条影评
sample_text = "This movie was absolutely fantastic! I loved it."
prob = predict_sentiment(model, tokenizer, sample_text, device)
print(f'正面情感概率: {prob:.4f}')
print('预测标签:', 'positive' if prob > 0.5 else 'negative')

正面情感概率: 0.9527
预测标签: positive
