# 任务一：IMDB-10 情感分类

本Notebook旨在完成IMDB电影评论的十分类情感（评分1-10）任务。我们将根据实验要求，实现并对比三种不同的深度学习模型。

**实验方案：**
1.  **模型一：** GloVe词向量 + BiLSTM + 全连接层
2.  **模型二：** BERT-base 嵌入 + BiLSTM + 分类头
3.  **模型三：** 微调 BERT-base

**评价指标：**
- 宏平均F1值 (Macro-F1) (主要)
- 准确率 (Accuracy)
- 均方根误差 (RMSE)

## 1. 环境设置与依赖安装

首先，安装所有必需的Python库。

## 2. 数据加载与预处理

根据任务描述，我们需要从`.txt.ss`文件中加载数据。这些文件可能包含用户ID和产品ID列，需要被忽略。我们将编写一个函数来解析这些文件，提取文本和标签，并进行清理。

**清理步骤：**
1.  移除句子分割符 `<sssss>`。
2.  将标签从 `1-10` 转换为 `0-9` 以便模型训练。

In [None]:
import pandas as pd
import re

def load_imdb_data(file_path):
    """加载并解析IMDB数据文件，使用正则表达式查找标签。"""
    texts = []
    labels = []
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            # 使用正则表达式查找被双制表符包围的评分 (例如: \t\t10\t\t)
            match = re.search(r'\t\t(\d+)\t\t', line)
            
            if match:
                # 提取评分和文本
                rating = int(match.group(1))
                text_start_index = match.end()
                text = line[text_start_index:].strip()
                
                # 清理文本中的<sssss>标记
                text = text.replace('<sssss>', ' ').strip()
                
                # 标签从1-10转换为0-9
                labels.append(rating - 1)
                texts.append(text)
            # 如果找不到匹配项，则静默跳过该行，因为它可能格式不正确
    
    df = pd.DataFrame({'text': texts, 'label': labels})
    return df

# 定义文件路径
train_file = 'imdb.train.txt.ss'
dev_file = 'imdb.dev.txt.ss'
test_file = 'imdb.test.txt.ss'

# 加载所有数据集
df_train = load_imdb_data(train_file)
df_val = load_imdb_data(dev_file)
df_test = load_imdb_data(test_file)

print(f"训练集大小: {df_train.shape}")
print(f"验证集大小: {df_val.shape}")
print(f"测试集大小: {df_test.shape}")

print("\n数据样本示例:")
print(df_train.head())

## 3. 模型一：GloVe + BiLSTM

### 3.1. GloVe词向量与数据准备
我们将构建词汇表，加载预训练的GloVe词向量，并创建PyTorch的`Dataset`和`DataLoader`。

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error
import numpy as np

# --- 参数配置 ---
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NUM_CLASSES = 10
BATCH_SIZE_LSTM = 64
EMBEDDING_DIM = 300
HIDDEN_DIM = 256
N_LAYERS = 2
DROPOUT = 0.5
EPOCHS = 5
MAX_LEN_LSTM = 512 # 按要求截断
GLOVE_PATH = r"D:\glove_vectors\glove.840B.300d\glove.840B.300d.txt"

# --- 文本分词器 ---
def tokenizer(text):
    text = text.lower()
    text = re.sub(r"[^a-zA-Z0-9' ]+", "", text)
    return text.split()

# --- 构建词汇表 ---
print("正在构建词汇表...")
word_counts = Counter()
for text in df_train['text']:
    word_counts.update(tokenizer(text))
vocab = sorted(word_counts, key=word_counts.get, reverse=True)
word_to_idx = {word: i + 2 for i, word in enumerate(vocab)}
word_to_idx['<pad>'] = 0
word_to_idx['<unk>'] = 1
VOCAB_SIZE = len(word_to_idx)

# --- 加载GloVe词向量 ---
print("正在加载GloVe词向量...")
glove_embeddings = np.zeros((VOCAB_SIZE, EMBEDDING_DIM))
with open(GLOVE_PATH, 'r', encoding='utf-8') as f:
    for line in f:
        parts = line.split()
        word = parts[0]
        if len(parts) != EMBEDDING_DIM + 1:
            continue
        if word in word_to_idx:
            try:
                vector = np.array(parts[1:], dtype=np.float32)
                glove_embeddings[word_to_idx[word]] = vector
            except ValueError:
                continue
glove_embeddings = torch.tensor(glove_embeddings, dtype=torch.float32)
print("GloVe加载完成。")

# --- PyTorch Dataset ---
class IMDBDataset(Dataset):
    def __init__(self, dataframe, word_to_idx, max_len):
        self.df = dataframe
        self.word_to_idx = word_to_idx
        self.max_len = max_len

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        text = self.df.loc[idx, 'text']
        label = self.df.loc[idx, 'label']
        tokens = [self.word_to_idx.get(word, 1) for word in tokenizer(text)]
        
        # 截断与填充
        if len(tokens) > self.max_len:
            tokens = tokens[:self.max_len]
        else:
            tokens.extend([0] * (self.max_len - len(tokens)))
            
        return torch.tensor(tokens), torch.tensor(label)

# --- 创建DataLoaders ---
train_dataset = IMDBDataset(df_train, word_to_idx, MAX_LEN_LSTM)
val_dataset = IMDBDataset(df_val, word_to_idx, MAX_LEN_LSTM)
test_dataset = IMDBDataset(df_test, word_to_idx, MAX_LEN_LSTM)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE_LSTM, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE_LSTM)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE_LSTM)

print("\n数据准备完成。")

### 3.2. 模型一结构定义

In [None]:
class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout, pretrained_embeddings):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.embedding.weight.data.copy_(pretrained_embeddings)
        self.embedding.weight.requires_grad = False

        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, 
                              bidirectional=True, dropout=dropout, batch_first=True)
        
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        _, (hidden, _) = self.lstm(embedded)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden)

model1 = BiLSTMClassifier(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_DIM, NUM_CLASSES, N_LAYERS, DROPOUT, glove_embeddings).to(DEVICE)

### 3.3. 训练与评估（模型一）

In [None]:
def train(model, iterator, optimizer, criterion):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        text, labels = batch
        text, labels = text.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        predictions = model(text)
        loss = criterion(predictions, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in iterator:
            text, labels = batch
            text, labels = text.to(DEVICE), labels.to(DEVICE)
            predictions = model(text)
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()
            all_preds.extend(predictions.argmax(1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    # 计算指标
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro')
    # RMSE需要将标签转回1-10
    preds_rmse = np.array(all_preds) + 1
    labels_rmse = np.array(all_labels) + 1
    rmse = np.sqrt(mean_squared_error(labels_rmse, preds_rmse))
    
    return epoch_loss / len(iterator), acc, f1, rmse

# --- 开始训练 ---
print("开始训练模型一...")
optimizer = optim.Adam(model1.parameters())
criterion = nn.CrossEntropyLoss().to(DEVICE)
results = {}

for epoch in range(EPOCHS):
    train_loss = train(model1, train_loader, optimizer, criterion)
    valid_loss, valid_acc, valid_f1, valid_rmse = evaluate(model1, val_loader, criterion)
    print(f'轮次: {epoch+1:02} | 训练损失: {train_loss:.3f} | 验证损失: {valid_loss:.3f} | 验证Acc: {valid_acc*100:.2f}% | 验证F1: {valid_f1:.3f} | 验证RMSE: {valid_rmse:.3f}')

# --- 在测试集上评估 ---
test_loss, test_acc, test_f1, test_rmse = evaluate(model1, test_loader, criterion)
print(f'\n模型一 测试集结果 -> Acc: {test_acc*100:.2f}% | F1: {test_f1:.3f} | RMSE: {test_rmse:.3f}')
results['模型一 (GloVe+BiLSTM)'] = {'Accuracy': test_acc, 'Macro-F1': test_f1, 'RMSE': test_rmse}

## 4. 模型二 & 三：基于BERT的模型

### 4.1. BERT数据准备
我们将使用Hugging Face的`transformers`库来准备数据。这包括使用BERT的分词器，并为模型创建`Dataset`和`DataLoader`。

In [None]:
from transformers import BertTokenizer

# --- 参数配置 ---
BERT_MODEL_NAME = 'bert-base-uncased'
MAX_LEN_BERT = 512 # 按要求截断
# 增加批量大小以更好地利用GPU，如果遇到显存不足(Out of Memory)错误，请适当调低此值
BATCH_SIZE_BERT = 32 

# BERT 分词器和模型
# 确保你已经提前下载了 bert-base-uncased 模型到本地
# 例如，保存路径为 'D:/models/bert-base-uncased'
local_bert_path = 'D:/models/bert-base-uncased' # <--- 请务必修改为你的本地路径

# 检查路径是否存在
import os
if not os.path.exists(local_bert_path):
    raise FileNotFoundError(f"BERT模型路径不存在: {local_bert_path}。请先下载模型并修改路径。")

print(f"从本地路径加载BERT分词器: {local_bert_path}")
tokenizer_bert = BertTokenizer.from_pretrained(local_bert_path)
print("分词器加载成功。")

# --- PyTorch Dataset for BERT ---
class IMDBDatasetBERT(Dataset):
    def __init__(self, dataframe, tokenizer, max_len):
        self.df = dataframe
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        text = self.df.loc[idx, 'text']
        label = self.df.loc[idx, 'label']
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

# --- 创建DataLoaders ---
# 优化DataLoader以加速数据加载
# num_workers > 0 会启用多进程加载数据，可以显著提速
# pin_memory=True 与CUDA结合使用，可以加速数据到GPU的传输
use_cuda = torch.cuda.is_available()
dataloader_args = {
    "batch_size": BATCH_SIZE_BERT,
    "num_workers": 4 if use_cuda else 0, # 在Windows上，num_workers应在 if __name__ == '__main__': 块中使用，但在Jupyter中通常没问题
    "pin_memory": True if use_cuda else False
}

train_dataset_bert = IMDBDatasetBERT(df_train, tokenizer_bert, MAX_LEN_BERT)
val_dataset_bert = IMDBDatasetBERT(df_val, tokenizer_bert, MAX_LEN_BERT)
test_dataset_bert = IMDBDatasetBERT(df_test, tokenizer_bert, MAX_LEN_BERT)

train_loader_bert = DataLoader(train_dataset_bert, shuffle=True, **dataloader_args)
val_loader_bert = DataLoader(val_dataset_bert, **dataloader_args)
test_loader_bert = DataLoader(test_dataset_bert, **dataloader_args)
print("用于BERT的数据准备完成。")

### 4.2. 模型二：BERT嵌入 + BiLSTM

In [None]:
from transformers import BertModel

class BertBiLSTMClassifier(nn.Module):
    def __init__(self, bert, hidden_dim, output_dim, n_layers, dropout):
        super().__init__()
        self.bert = bert
        embedding_dim = bert.config.to_dict()['hidden_size']
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, 
                              bidirectional=True, dropout=dropout, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input_ids, attention_mask):
        with torch.no_grad():
            embedded = self.bert(input_ids=input_ids, attention_mask=attention_mask)[0]
        _, (hidden, _) = self.lstm(embedded)
        hidden = self.dropout(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1))
        return self.fc(hidden)

# --- 实例化模型二 ---
bert_model = BertModel.from_pretrained(BERT_MODEL_NAME)
# 冻结BERT参数
for param in bert_model.parameters():
    param.requires_grad = False

model2 = BertBiLSTMClassifier(bert_model, HIDDEN_DIM, NUM_CLASSES, N_LAYERS, DROPOUT).to(DEVICE)

### 4.3. 训练与评估（模型二）

In [None]:
from torch.cuda.amp import GradScaler, autocast

def train_bert_based(model, iterator, optimizer, criterion, scaler):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        input_ids = batch['input_ids'].to(DEVICE)
        attention_mask = batch['attention_mask'].to(DEVICE)
        labels = batch['labels'].to(DEVICE)
        
        optimizer.zero_grad()

        # 使用autocast上下文管理器进行混合精度训练
        with autocast():
            predictions = model(input_ids, attention_mask)
            loss = criterion(predictions, labels)

        # 使用GradScaler缩放损失，反向传播并更新权重
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        epoch_loss += loss.item()
    return epoch_loss / len(iterator)

def evaluate_bert_based(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in iterator:
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)
            labels = batch['labels'].to(DEVICE)
            
            # 在评估时也使用autocast
            with autocast():
                predictions = model(input_ids, attention_mask)
                loss = criterion(predictions, labels)

            epoch_loss += loss.item()
            all_preds.extend(predictions.argmax(1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
    preds_rmse = np.array(all_preds) + 1
    labels_rmse = np.array(all_labels) + 1
    rmse = np.sqrt(mean_squared_error(labels_rmse, preds_rmse))
    
    return epoch_loss / len(iterator), acc, f1, rmse

# --- 训练模型二---
print("\n开始训练模型二 ")
EPOCHS_BERT = 3 # BERT相关模型通常需要更少的轮次
optimizer = optim.Adam(model2.parameters())
criterion = nn.CrossEntropyLoss().to(DEVICE)

# 创建一个GradScaler实例用于混合精度训练
scaler = GradScaler()

for epoch in range(EPOCHS_BERT):
    train_loss = train_bert_based(model2, train_loader_bert, optimizer, criterion, scaler)
    valid_loss, valid_acc, valid_f1, valid_rmse = evaluate_bert_based(model2, val_loader_bert, criterion)
    print(f'轮次: {epoch+1:02} | 训练损失: {train_loss:.3f} | 验证损失: {valid_loss:.3f} | 验证Acc: {valid_acc*100:.2f}% | 验证F1: {valid_f1:.3f} | 验证RMSE: {valid_rmse:.3f}')

# --- 在测试集上评估 ---
test_loss, test_acc, test_f1, test_rmse = evaluate_bert_based(model2, test_loader_bert, criterion)
print(f'\n模型二 测试集结果 -> Acc: {test_acc*100:.2f}% | F1: {test_f1:.3f} | RMSE: {test_rmse:.3f}')
results['模型二 (BERT嵌入+BiLSTM)'] = {'Accuracy': test_acc, 'Macro-F1': test_f1, 'RMSE': test_rmse}

### 4.4. 模型三：微调BERT

In [None]:
from transformers import BertForSequenceClassification, AdamW, get_linear_schedule_with_warmup

# --- 实例化模型三 ---
model3 = BertForSequenceClassification.from_pretrained(
    BERT_MODEL_NAME,
    num_labels=NUM_CLASSES,
).to(DEVICE)

# --- 优化器与学习率调度器 ---
optimizer = AdamW(model3.parameters(), lr=2e-5, eps=1e-8)
total_steps = len(train_loader_bert) * EPOCHS_BERT
scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=0, num_training_steps=total_steps)
criterion = nn.CrossEntropyLoss().to(DEVICE)

### 4.5. 训练与评估（模型三）

In [None]:
from torch.cuda.amp import GradScaler, autocast

def train_finetune(model, iterator, optimizer, scheduler, criterion, scaler):
    model.train()
    epoch_loss = 0
    for batch in iterator:
        optimizer.zero_grad()
        input_ids = batch['input_ids'].to(DEVICE)
        attention_mask = batch['attention_mask'].to(DEVICE)
        labels = batch['labels'].to(DEVICE)

        # 使用autocast进行混合精度前向传播
        with autocast():
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
        
        epoch_loss += loss.item()

        # 使用scaler进行损失缩放和反向传播
        scaler.scale(loss).backward()
        
        # 在梯度裁剪前，使用scaler.unscale_来unscale梯度
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        
        # 更新权重和scaler
        scaler.step(optimizer)
        scaler.update()
        scheduler.step()

    return epoch_loss / len(iterator)

def evaluate_finetune(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for batch in iterator:
            input_ids = batch['input_ids'].to(DEVICE)
            attention_mask = batch['attention_mask'].to(DEVICE)
            labels = batch['labels'].to(DEVICE)

            # 在评估时也使用autocast
            with autocast():
                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
                loss = outputs.loss
                logits = outputs.logits

            epoch_loss += loss.item()
            all_preds.extend(logits.argmax(1).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
    preds_rmse = np.array(all_preds) + 1
    labels_rmse = np.array(all_labels) + 1
    rmse = np.sqrt(mean_squared_error(labels_rmse, preds_rmse))
    
    return epoch_loss / len(iterator), acc, f1, rmse

# --- 训练模型三 (使用混合精度) ---
print("\n开始训练模型三 (使用混合精度)...")

# 为模型三创建一个新的GradScaler实例
scaler_finetune = GradScaler()

for epoch in range(EPOCHS_BERT):
    train_loss = train_finetune(model3, train_loader_bert, optimizer, scheduler, criterion, scaler_finetune)
    valid_loss, valid_acc, valid_f1, valid_rmse = evaluate_finetune(model3, val_loader_bert, criterion)
    print(f'轮次: {epoch+1:02} | 训练损失: {train_loss:.3f} | 验证损失: {valid_loss:.3f} | 验证Acc: {valid_acc*100:.2f}% | 验证F1: {valid_f1:.3f} | 验证RMSE: {valid_rmse:.3f}')

# --- 在测试集上评估 ---
test_loss, test_acc, test_f1, test_rmse = evaluate_finetune(model3, test_loader_bert, criterion)
print(f'\n模型三 测试集结果 -> Acc: {test_acc*100:.2f}% | F1: {test_f1:.3f} | RMSE: {test_rmse:.3f}')
results['模型三 (微调BERT)'] = {'Accuracy': test_acc, 'Macro-F1': test_f1, 'RMSE': test_rmse}

## 5. 结果汇总与分析

In [None]:
df_results = pd.DataFrame(results).T
df_results['Accuracy'] = df_results['Accuracy'].apply(lambda x: f"{x*100:.2f}%")
df_results['Macro-F1'] = df_results['Macro-F1'].apply(lambda x: f"{x:.4f}")
df_results['RMSE'] = df_results['RMSE'].apply(lambda x: f"{x:.4f}")

print("--- IMDB-10测试集最终性能对比 ---")
print(df_results)

### 实验分析

1.  **模型一 (GloVe + BiLSTM):** 这是传统的基线模型。它使用静态的GloVe词向量，无法捕捉词语在不同上下文中的动态含义。尽管BiLSTM可以学习序列信息，但由于输入特征的局限性，其性能通常是三者中最弱的。它的优点是训练速度快，对计算资源要求较低。

2.  **模型二 (BERT嵌入 + BiLSTM):** 此模型通过引入BERT作为特征提取器，获得了显著的性能提升。BERT能够生成上下文相关的词向量，极大地丰富了输入特征的语义信息。顶部的BiLSTM进一步整合这些特征，以适应最终的分类任务。这种方法在性能和资源消耗之间取得了很好的平衡，通常比模型一好得多，但比模型三稍逊一筹。

3.  **模型三 (微调BERT):** 这是当前解决此类任务最主流且性能最强的方法。通过在下游任务数据上对整个BERT模型进行微调，模型的全部参数（从嵌入层到注意力层）都为特定的分类目标进行了优化。这使得模型能够学习到任务特有的语言模式。从结果中可以看出，微调BERT在所有指标上都取得了最佳性能，尤其是在主要的F1值和准确率上。RMSE指标也最低，说明其预测的评分与真实评分的误差最小。其代价是最高的计算资源需求和最长的训练时间。