##### 数据构成
总共 100,000 条评论，其中包括：

- 25,000 条标注的训练集（train/）

- 25,000 条标注的测试集（test/）

- 50,000 条无标注的评论（train/unsup/）

##### 标签定义

评分 ≤ 4 的评论标为 负面（neg），评分 ≥ 7 的评论标为 正面（pos）。中性评分（5-6 分）被排除在标注数据之外

##### 文件结构

train/pos/, train/neg/, test/pos/, test/neg/, train/unsup/ 五个文件夹，每条评论是一个 .txt 文件，命名格式为 [id]_[rating].txt

urls_*.txt：每条评论对应的 IMDb 页面链接（按行号对应）

*.feat：已经生成的词袋特征（BoW），采用 LIBSVM 格式，配合 imdb.vocab 可映射词索引

imdb.vocab：词汇表

imdbEr.txt：每个词的期望评分值，反映其平均情感极性

##### 数据预处理原则
每部电影最多只收录 30 条评论，以避免训练集中出现过多与某部电影相关的偏差

训练集与测试集中的电影完全不重叠

In [6]:
import os
import torch
import random
from torch.utils.data import Dataset, DataLoader
from collections import Counter

# 设置随机种子
torch.manual_seed(42)

# 路径设置
DATA_DIR = './data_循环'
TRAIN_DIR = os.path.join(DATA_DIR, 'train')
TEST_DIR = os.path.join(DATA_DIR, 'test')

def tokenizer(text):
    return text.split()

# 加载 IMDb 数据
def load_imdb_data(directory):
    data = []
    for label in ['pos', 'neg']:
        path = os.path.join(directory, label)
        for fname in os.listdir(path):
            if fname.endswith('.txt'):
                with open(os.path.join(path, fname), 'r', encoding='utf-8') as f:
                    text = f.read()
                    data.append((text, 1 if label == 'pos' else 0))
    return data

# 构建词汇表
def build_vocab(data, tokenizer, min_freq=2):
    counter = Counter()
    for text, _ in data:
        tokens = tokenizer(text)
        counter.update(tokens) # 统计词频

    # 保留频率大于等于 min_freq 的 token
    filtered_tokens = {tok: freq for tok, freq in counter.items() if freq >= min_freq}
    sorted_by_freq = sorted(filtered_tokens.items(), key=lambda x: x[1], reverse=True)
    token_dict = {token: freq for token, freq in sorted_by_freq}

    # 构建 vocab（加入特殊符号）
    vocab = {'<pad>': 0, '<unk>': 1}
    idx = 2
    for token in token_dict:
        vocab[token] = idx
        idx += 1
    return vocab

# Dataset 类
class IMDBDataset(Dataset):
    def __init__(self, data, tokenizer, vocab):
        self.data = data
        self.tokenizer = tokenizer
        self.vocab = vocab

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        text, label = self.data[idx]
        tokens = self.tokenizer(text)
        indices = [self.vocab.get(token, self.vocab['<unk>']) for token in tokens]
        return torch.tensor(indices), torch.tensor(label)

# collate 函数
def collate_batch(batch):
    texts, labels = zip(*batch)
    lengths = torch.tensor([len(seq) for seq in texts])
    padded_texts = torch.nn.utils.rnn.pad_sequence(texts, batch_first=True, padding_value=0)
    labels = torch.tensor(labels)
    return padded_texts, lengths, labels

# 模型定义
class BiLSTMClassifier(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes=1):
        super().__init__()
        self.embedding = torch.nn.Embedding(vocab_size, embed_dim, padding_idx=0)
        self.lstm = torch.nn.LSTM(embed_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.dropout = torch.nn.Dropout(0.5)
        self.fc = torch.nn.Linear(hidden_dim * 2, num_classes)
        self.sigmoid = torch.nn.Sigmoid()

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed = torch.nn.utils.rnn.pack_padded_sequence(embedded, lengths.cpu(), batch_first=True, enforce_sorted=False)
        packed_out, (hidden, _) = self.lstm(packed)
        hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        out = self.dropout(hidden)
        return self.sigmoid(self.fc(out)).squeeze()

# 准确率
def binary_accuracy(preds, y):
    rounded = (preds >= 0.5).float()
    return (rounded == y).float().mean()

# 训练函数
def train_model(model, dataloader, optimizer, criterion):
    model.train()
    total_loss, total_acc = 0, 0
    for x, lengths, y in dataloader:
        x, lengths, y = x.to(device), lengths.to(device), y.float().to(device)
        optimizer.zero_grad()
        output = model(x, lengths)
        loss = criterion(output, y)
        acc = binary_accuracy(output, y)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        total_acc += acc.item()
    return total_loss / len(dataloader), total_acc / len(dataloader)

# 评估函数
def evaluate_model(model, dataloader, criterion):
    model.eval()
    total_loss, total_acc = 0, 0
    with torch.no_grad():
        for x, lengths, y in dataloader:
            x, lengths, y = x.to(device), lengths.to(device), y.float().to(device)
            output = model(x, lengths)
            loss = criterion(output, y)
            acc = binary_accuracy(output, y)
            total_loss += loss.item()
            total_acc += acc.item()
    return total_loss / len(dataloader), total_acc / len(dataloader)

# === 主程序开始 ===

# 加载数据
train_data = load_imdb_data(TRAIN_DIR)
test_data = load_imdb_data(TEST_DIR)
random.shuffle(train_data)
random.shuffle(test_data)

# 构建词表
vocab = build_vocab(train_data, tokenizer)

# 准备 DataLoader
BATCH_SIZE = 32
train_dataset = IMDBDataset(train_data, tokenizer, vocab)
test_dataset = IMDBDataset(test_data, tokenizer, vocab)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, collate_fn=collate_batch)

# 模型和训练配置
EMBED_DIM = 100
HIDDEN_DIM = 128
EPOCHS = 5
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BiLSTMClassifier(len(vocab), EMBED_DIM, HIDDEN_DIM).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = torch.nn.BCELoss()

# 训练过程
for epoch in range(EPOCHS):
    train_loss, train_acc = train_model(model, train_loader, optimizer, criterion)
    test_loss, test_acc = evaluate_model(model, test_loader, criterion)
    print(f"Epoch {epoch+1} | Train Loss: {train_loss:.4f}, Acc: {train_acc:.4f} | Test Loss: {test_loss:.4f}, Acc: {test_acc:.4f}")


KeyboardInterrupt: 