In [1]:
"""
任务二：用 PyTorch 实现 LSTM / Stacked LSTM / BiLSTM（使用 Keras 的 IMDB 数据集）
保存模型并打印评估报告（accuracy, precision, recall, f1）
"""

import os
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from keras.datasets import imdb
from keras.preprocessing.sequence import pad_sequences

from sklearn.metrics import accuracy_score, precision_recall_fscore_support, classification_report, confusion_matrix


In [2]:
# -----------------------
# 超参数
# -----------------------
SEED = 42
vocab_size = 20000
maxlen = 200
embed_dim = 128
hidden_dim = 128
num_layers_stacked = 2  # 堆叠 LSTM 的层数
batch_size = 64
epochs = 5
lr = 1e-3
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
save_dir = "saved_models_task2"
os.makedirs(save_dir, exist_ok=True)

# -----------------------
# 固定随机种子
# -----------------------
def set_seed(seed=SEED):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

set_seed()


In [3]:
# -----------------------
# 加载并预处理数据（Keras IMDB）
# -----------------------
print("Loading IMDB dataset (keras)...")
(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=vocab_size)
x_train = pad_sequences(x_train, maxlen=maxlen)
x_test = pad_sequences(x_test, maxlen=maxlen)

# 转为 PyTorch Tensor
x_train_t = torch.LongTensor(x_train)
y_train_t = torch.LongTensor(y_train)
x_test_t = torch.LongTensor(x_test)
y_test_t = torch.LongTensor(y_test)

class IMDBDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y
    def __len__(self):
        return len(self.x)
    def __getitem__(self, idx):
        return self.x[idx], self.y[idx]

train_loader = DataLoader(IMDBDataset(x_train_t, y_train_t), batch_size=batch_size, shuffle=True)
test_loader = DataLoader(IMDBDataset(x_test_t, y_test_t), batch_size=batch_size, shuffle=False)


Loading IMDB dataset (keras)...


In [4]:


# -----------------------
# 模型定义（forward 返回 logits，不做 sigmoid）
# loss 中使用 BCEWithLogitsLoss
# -----------------------
class LSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        emb = self.embedding(x)                       # (B, T, E)
        out, _ = self.lstm(emb)                       # out: (B, T, H)
        last = out[:, -1, :]                          # 取最后时间步 (B, H)
        logits = self.fc(last).squeeze(1)             # (B,)
        return logits


class StackedLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers=2):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)

    def forward(self, x):
        emb = self.embedding(x)
        out, _ = self.lstm(emb)
        last = out[:, -1, :]
        logits = self.fc(last).squeeze(1)
        return logits


class BiLSTMClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, 1)

    def forward(self, x):
        emb = self.embedding(x)
        out, _ = self.lstm(emb)                       # out: (B, T, 2H)
        last = out[:, -1, :]                          # (B, 2H)
        logits = self.fc(last).squeeze(1)             # (B,)
        return logits


In [5]:
# -----------------------
# 训练与评估函数
# -----------------------
def train_one_epoch(model, loader, optimizer, criterion):
    model.train()
    total_loss = 0.0
    for x_batch, y_batch in loader:
        x_batch = x_batch.to(device)
        y_batch = y_batch.float().to(device)

        optimizer.zero_grad()
        logits = model(x_batch)
        loss = criterion(logits, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item() * x_batch.size(0)

    return total_loss / len(loader.dataset)


def evaluate_model(model, loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for x_batch, y_batch in loader:
            x_batch = x_batch.to(device)
            logits = model(x_batch)
            probs = torch.sigmoid(logits).cpu().numpy()
            preds = (probs >= 0.5).astype(int)
            all_preds.append(preds)
            all_labels.append(y_batch.numpy())

    y_pred = np.concatenate(all_preds)
    y_true = np.concatenate(all_labels)

    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary', zero_division=0)
    report = classification_report(y_true, y_pred, digits=4)
    cm = confusion_matrix(y_true, y_pred)

    return {"accuracy": acc, "precision": prec, "recall": rec, "f1": f1, "report": report, "cm": cm}


In [6]:
# -----------------------
# 训练并保存模型的通用流程
# -----------------------
def run_experiment(model_class, model_name, model_kwargs, epochs=epochs, save=True):
    print("\n" + "="*60)
    print(f"Experiment: {model_name} | params: {model_kwargs}")
    model = model_class(**model_kwargs).to(device)
    criterion = nn.BCEWithLogitsLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    best_f1 = 0.0
    best_state = None

    for epoch in range(1, epochs+1):
        train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
        metrics = evaluate_model(model, test_loader)
        print(f"[{model_name}] Epoch {epoch}/{epochs}  TrainLoss: {train_loss:.4f}  TestAcc: {metrics['accuracy']:.4f}  F1: {metrics['f1']:.4f}")

        # 保存最佳模型（按 F1）
        if metrics["f1"] > best_f1:
            best_f1 = metrics["f1"]
            best_state = {
                "model_state": model.state_dict(),
                "optimizer_state": optimizer.state_dict(),
                "epoch": epoch,
                "metrics": metrics
            }

    print(f"\nBest test F1 for {model_name}: {best_f1:.4f}")
    print("Classification report (best epoch):\n")
    print(best_state["metrics"]["report"])
    print("Confusion matrix:\n", best_state["metrics"]["cm"])

    if save and best_state is not None:
        save_path = os.path.join(save_dir, f"{model_name}_best.pth")
        torch.save(best_state, save_path)
        print(f"Saved best model to: {save_path}")

    return best_state


In [7]:



# -----------------------
# main: 依次运行三种模型
# -----------------------
if __name__ == "__main__":
    # 单层 LSTM
    lstm_kwargs = {"vocab_size": vocab_size, "embed_dim": embed_dim, "hidden_dim": hidden_dim}
    best_lstm = run_experiment(LSTMClassifier, "LSTM_single", lstm_kwargs)

    # 堆叠 LSTM（num_layers_stacked）
    stacked_kwargs = {"vocab_size": vocab_size, "embed_dim": embed_dim, "hidden_dim": hidden_dim, "num_layers": num_layers_stacked}
    best_stacked = run_experiment(StackedLSTMClassifier, "LSTM_stacked", stacked_kwargs)

    # BiLSTM
    bilstm_kwargs = {"vocab_size": vocab_size, "embed_dim": embed_dim, "hidden_dim": hidden_dim}
    best_bilstm = run_experiment(BiLSTMClassifier, "BiLSTM", bilstm_kwargs)

    print("\nAll experiments finished.")



Experiment: LSTM_single | params: {'vocab_size': 20000, 'embed_dim': 128, 'hidden_dim': 128}
[LSTM_single] Epoch 1/5  TrainLoss: 0.5814  TestAcc: 0.7706  F1: 0.7528
[LSTM_single] Epoch 2/5  TrainLoss: 0.5071  TestAcc: 0.7108  F1: 0.6815
[LSTM_single] Epoch 3/5  TrainLoss: 0.3993  TestAcc: 0.8242  F1: 0.8189
[LSTM_single] Epoch 4/5  TrainLoss: 0.4311  TestAcc: 0.7552  F1: 0.7388
[LSTM_single] Epoch 5/5  TrainLoss: 0.3760  TestAcc: 0.8250  F1: 0.8119

Best test F1 for LSTM_single: 0.8189
Classification report (best epoch):

              precision    recall  f1-score   support

           0     0.8064    0.8533    0.8292     12500
           1     0.8442    0.7951    0.8189     12500

    accuracy                         0.8242     25000
   macro avg     0.8253    0.8242    0.8241     25000
weighted avg     0.8253    0.8242    0.8241     25000

Confusion matrix:
 [[10666  1834]
 [ 2561  9939]]
Saved best model to: saved_models_task2\LSTM_single_best.pth

Experiment: LSTM_stacked | param