# IMDB情感分析项目

## 导入必要的库

In [None]:
import os
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
import re

## 数据预处理类

In [None]:
class IMDBDataPreprocessor:
    def __init__(self, max_len=256):
        self.max_len = max_len
        
    def clean_text(self, text):
        text = re.sub(r'<[^>]+>', '', text)
        text = re.sub(r'[^a-zA-Z\s]', '', text)
        return text.lower().strip()
    
    def prepare_glove_data(self, train_path, test_path, val_size=0.2):
        train_df = pd.read_csv(train_path, sep='\t', quoting=3)
        test_df = pd.read_csv(test_path, sep='\t', quoting=3)
        
        print("数据预处理中...")
        
        # 构建词汇表
        all_texts = train_df['review'].tolist() + test_df['review'].tolist()
        word_to_idx = {'<pad>': 0, '<unk>': 1}
        idx = 2
        
        for text in all_texts:
            cleaned = self.clean_text(text)
            for word in cleaned.split():
                if word not in word_to_idx:
                    word_to_idx[word] = idx
                    idx += 1
        
        # 编码函数
        def encode_texts(texts):
            encoded = []
            for text in texts:
                cleaned = self.clean_text(text)
                words = cleaned.split()[:self.max_len]
                indices = [word_to_idx.get(word, 1) for word in words]
                # 填充
                if len(indices) < self.max_len:
                    indices += [0] * (self.max_len - len(indices))
                encoded.append(indices)
            return torch.tensor(encoded, dtype=torch.long)
        
        # 编码数据
        train_features = encode_texts(train_df['review'])
        test_features = encode_texts(test_df['review'])
        train_labels = torch.tensor(train_df['sentiment'].values, dtype=torch.long)
        
        # 分割验证集
        train_features, val_features, train_labels, val_labels = train_test_split(
            train_features, train_labels, test_size=val_size, random_state=42
        )
        
        # 创建随机词向量（简化版）
        vocab_size = len(word_to_idx)
        weight_matrix = torch.randn(vocab_size, 300) * 0.1
        
        return {
            'train': (train_features, train_labels),
            'val': (val_features, val_labels),
            'test': (test_features, test_df['id'].tolist()),
            'word_to_idx': word_to_idx,
            'weight_matrix': weight_matrix
        }

## 模型定义

In [None]:
class LSTMAttentionModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_layers, num_classes, 
                 weight_matrix=None, bidirectional=True, dropout=0.3):
        super(LSTMAttentionModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        if weight_matrix is not None:
            self.embedding.weight.data.copy_(weight_matrix)
        
        self.lstm = nn.LSTM(embed_dim, hidden_dim, num_layers, 
                           batch_first=True, bidirectional=bidirectional, dropout=dropout)
        
        self.attention = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, 1)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, num_classes)
        
    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        
        # 注意力机制
        attention_weights = torch.softmax(self.attention(lstm_out).squeeze(-1), dim=1)
        context_vector = torch.sum(attention_weights.unsqueeze(-1) * lstm_out, dim=1)
        
        output = self.dropout(context_vector)
        output = self.fc(output)
        
        return output

class CNNModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_classes, weight_matrix=None, 
                 num_filters=100, filter_sizes=[3,4,5], dropout=0.3):
        super(CNNModel, self).__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        if weight_matrix is not None:
            self.embedding.weight.data.copy_(weight_matrix)
        
        self.convs = nn.ModuleList([
            nn.Conv2d(1, num_filters, (fs, embed_dim)) for fs in filter_sizes
        ])
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(num_filters * len(filter_sizes), num_classes)
        
    def forward(self, x):
        embedded = self.embedding(x).unsqueeze(1)
        
        conved = [nn.functional.relu(conv(embedded)).squeeze(3) for conv in self.convs]
        pooled = [nn.functional.max_pool1d(conv, conv.size(2)).squeeze(2) for conv in conved]
        
        cat = self.dropout(torch.cat(pooled, dim=1))
        output = self.fc(cat)
        
        return output

## 训练器类

In [None]:
class ModelTrainer:
    def __init__(self, model, device, model_type='lstm'):
        self.model = model.to(device)
        self.device = device
        self.model_type = model_type

    def train(self, train_loader, val_loader, num_epochs=10, lr=0.001, patience=3):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.CrossEntropyLoss()

        best_val_acc = 0
        patience_counter = 0
        best_model_state = None

        for epoch in range(num_epochs):
            self.model.train()
            train_loss = 0
            train_preds = []
            train_labels = []

            pbar = tqdm(train_loader, desc=f'Epoch {epoch + 1}/{num_epochs}')
            for batch in pbar:
                inputs, labels = batch
                inputs, labels = inputs.to(self.device), labels.to(self.device)

                optimizer.zero_grad()
                logits = self.model(inputs)
                loss = criterion(logits, labels)

                loss.backward()
                optimizer.step()

                train_loss += loss.item()
                preds = torch.argmax(logits, dim=1)
                train_preds.extend(preds.cpu().numpy())
                train_labels.extend(labels.cpu().numpy())

                pbar.set_postfix({'loss': f'{loss.item():.4f}'})

            train_acc = accuracy_score(train_labels, train_preds)
            val_acc = self.evaluate(val_loader)
            avg_train_loss = train_loss / len(train_loader)

            print(f'Epoch {epoch + 1}: Train Loss: {avg_train_loss:.4f}, Train Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')

            if val_acc > best_val_acc:
                best_val_acc = val_acc
                patience_counter = 0
                best_model_state = self.model.state_dict().copy()
                torch.save(best_model_state, 'best_model.pth')
                print(f"保存最佳模型，验证准确率: {val_acc:.4f}")
            else:
                patience_counter += 1
                if patience_counter >= patience:
                    print(f'早停于第 {epoch + 1} 轮')
                    break

        if best_model_state is not None:
            self.model.load_state_dict(best_model_state)
        else:
            self.model.load_state_dict(torch.load('best_model.pth'))

        return best_val_acc

    def evaluate(self, data_loader):
        self.model.eval()
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for batch in data_loader:
                inputs, labels = batch
                inputs = inputs.to(self.device)
                logits = self.model(inputs)
                labels = labels.cpu().numpy()

                preds = torch.argmax(logits, dim=1).cpu().numpy()
                all_preds.extend(preds)
                all_labels.extend(labels)

        return accuracy_score(all_labels, all_preds)

    def predict(self, test_loader):
        self.model.eval()
        predictions = []

        print("开始预测...")

        with torch.no_grad():
            for i, batch in enumerate(test_loader):
                if isinstance(batch, (list, tuple)):
                    inputs = batch[0]
                else:
                    inputs = batch

                inputs = inputs.to(self.device)
                logits = self.model(inputs)
                preds = torch.argmax(logits, dim=1).cpu().numpy()
                predictions.extend(preds)

                if (i + 1) % 50 == 0:
                    print(f"已处理 {i + 1} 个batch，共 {len(predictions)} 条预测")

        print(f"预测完成，共生成 {len(predictions)} 条预测结果")
        return predictions

## 主执行流程

In [None]:
def main():
    print("IMDB情感分析开始...")
    
    # 检查设备
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"使用设备: {device}")
    
    # 数据路径
    train_path = "./corpus/imdb/labeledTrainData.tsv"
    test_path = "./corpus/imdb/testData.tsv"
    
    # 检查数据文件
    if not os.path.exists(train_path):
        print("错误：请确保数据文件在正确路径")
        print("需要文件: corpus/imdb/labeledTrainData.tsv 和 testData.tsv")
        return
    
    # 数据预处理
    preprocessor = IMDBDataPreprocessor(max_len=256)
    data = preprocessor.prepare_glove_data(train_path, test_path)
    
    # 创建数据加载器
    train_dataset = TensorDataset(data['train'][0], data['train'][1])
    val_dataset = TensorDataset(data['val'][0], data['val'][1])
    test_dataset = TensorDataset(data['test'][0])
    
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    print(f"数据统计: 训练样本 {len(train_dataset)}, 验证样本 {len(val_dataset)}, 测试样本 {len(test_dataset)}")
    
    # 创建模型
    vocab_size = len(data['word_to_idx'])
    model = LSTMAttentionModel(
        vocab_size=vocab_size,
        embed_dim=300,
        hidden_dim=128,
        num_layers=2,
        num_classes=2,
        weight_matrix=data['weight_matrix'],
        bidirectional=True
    )
    
    # 训练模型
    trainer = ModelTrainer(model, device)
    best_acc = trainer.train(train_loader, val_loader, num_epochs=10)
    
    # 预测
    predictions = trainer.predict(test_loader)
    
    # 保存结果
    test_ids = data['test'][1]
    result_df = pd.DataFrame({'id': test_ids, 'sentiment': predictions})
    result_df.to_csv('imdb_predictions.csv', index=False)
    
    print("结果已保存到: imdb_predictions.csv")
    print(f"最佳验证准确率: {best_acc:.4f}")
    print("项目执行完成")

# 运行主函数
if __name__ == "__main__":
    main()

## 运行代码

执行下面的单元格来运行整个项目：

In [None]:
# 运行主函数
main()

## 结果展示

运行完成后，查看预测结果：

In [None]:
# 显示预测结果统计
result_df = pd.read_csv('imdb_predictions.csv')
print("预测结果统计:")
print(f"总样本数: {len(result_df)}")
print(f"正面评论比例: {result_df['sentiment'].mean():.2%}")
print("情感分布:")
print(result_df['sentiment'].value_counts().sort_index())