In [2]:
# 一、导入数据
# -- coding: utf-8 --
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from torch import nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
import jieba
import torch
import warnings
import os

warnings.filterwarnings("ignore")  # 忽略警告信息

# 加载语料库文件，并导入数据
neg = pd.read_excel('data/neg.xls', header=None)
pos = pd.read_excel('data/pos.xls', header=None)

pos.head()

# 分词处理
word_cut = lambda x: jieba.lcut(str(x))
pos['words'] = pos[0].apply(word_cut)
neg['words'] = neg[0].apply(word_cut)

# 使用 1 表示积极情绪，0 表示消极情绪，并完成数组拼接
texts = np.concatenate((pos['words'], neg['words']))
labels = np.concatenate((np.ones(len(pos)), np.zeros(len(neg))))

# 准备训练数据
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.2, train_size=0.1)

# 二、构建 Bi-LSTM 模型
class BiLSTM(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes):
        super(BiLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, num_classes)  # 双向 LSTM 的输出维度是 hidden_dim * 2

    def forward(self, x):
        # Embedding layer
        embedded = self.embedding(x)  # (batch_size, seq_len, embed_dim)

        # LSTM layer
        lstm_out, _ = self.lstm(embedded)  # (batch_size, seq_len, hidden_dim * 2)

        # Fully connected layer
        # 取最后一个时间步的输出
        output = self.fc(lstm_out[:, -1, :])  # (batch_size, num_classes)
        return output

# 三、数据预处理
# 构建词汇表
vocab = set()
for text in texts:
    vocab.update(text)
vocab = sorted(vocab)
vocab_size = len(vocab) + 1  # 加1是为了留出一个索引给填充符

# 将文本转换为索引
def text_to_indices(text):
    return [vocab.index(word) + 1 for word in text]  # 加1是为了避免索引为0

train_indices = [text_to_indices(text) for text in train_texts]
val_indices = [text_to_indices(text) for text in val_texts]

# 填充序列
def collate_batch(batch):
    texts, labels = zip(*batch)
    texts = pad_sequence([torch.tensor(text, dtype=torch.long) for text in texts], batch_first=True)
    labels = torch.tensor(labels, dtype=torch.long)
    return texts, labels

# 将数据转换为 PyTorch 数据集
class SentimentDataset(Dataset):
    def __init__(self, indices, labels):
        self.indices = indices
        self.labels = labels

    def __getitem__(self, idx):
        return self.indices[idx], self.labels[idx]

    def __len__(self):
        return len(self.labels)

train_dataset = SentimentDataset(train_indices, train_labels)
val_dataset = SentimentDataset(val_indices, val_labels)

# 创建数据加载器
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_batch)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=collate_batch)

# 四、训练 Bi-LSTM 模型
# 设置超参数
embed_dim = 64  # 减小嵌入维度
hidden_dim = 128  # 减小隐藏层维度
num_classes = 2
learning_rate = 2e-3

# 初始化模型、优化器和学习率调度器
device = torch.device("cpu")  # 切换到 CPU
model = BiLSTM(vocab_size, embed_dim, hidden_dim, num_classes).to(device)
optimizer = AdamW(model.parameters(), lr=learning_rate)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=2, verbose=True, min_lr=1e-5)
criterion = nn.CrossEntropyLoss()

# 训练函数
def train(model, train_loader, val_loader, epochs=10):
    best_loss = float('inf')
    history = []  # 用于存储每个 epoch 的指标

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for texts, labels in train_loader:
            texts, labels = texts.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(texts)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}, Training Loss: {total_loss / len(train_loader):.4f}")

        # 验证阶段
        model.eval()
        val_loss = 0
        all_labels = []
        all_predictions = []
        with torch.no_grad():
            for texts, labels in val_loader:
                texts, labels = texts.to(device), labels.to(device)
                outputs = model(texts)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predictions = torch.max(outputs, 1)
                all_labels.extend(labels.cpu().numpy())
                all_predictions.extend(predictions.cpu().numpy())

        # 计算验证集上的指标
        accuracy = accuracy_score(all_labels, all_predictions)
        precision = precision_score(all_labels, all_predictions, average='binary')
        recall = recall_score(all_labels, all_predictions, average='binary')
        f1 = f1_score(all_labels, all_predictions, average='binary')

        # 保存指标到历史记录
        history.append({
            "Epoch": epoch + 1,
            "Training Loss": total_loss / len(train_loader),
            "Validation Loss": val_loss / len(val_loader),
            "Accuracy": accuracy,
            "Precision": precision,
            "Recall": recall,
            "F1 Score": f1
        })

        print(f"Epoch {epoch+1}, Validation Loss: {val_loss / len(val_loader):.4f}, "
              f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, "
              f"Recall: {recall:.4f}, F1 Score: {f1:.4f}")

        # 保存最佳模型
        if val_loss < best_loss:
            best_loss = val_loss
            torch.save(model.state_dict(), 'bi_lstm_model.pth')
            print("Best model saved!")

        # 调整学习率
        scheduler.step(val_loss)

    # 创建表格并打印
    df = pd.DataFrame(history)
    print("\nTraining History:")
    print(df.to_string(index=False))

# 开始训练
train(model, train_loader, val_loader, epochs=3)

# 五、情感预测
# 加载训练好的模型
model.load_state_dict(torch.load('bi_lstm_model.pth'))
model.eval()

# 对电影评论进行情感判断
def bi_lstm_predict(string):
    # 对输入文本进行分词
    words = jieba.lcut(str(string))
    indices = text_to_indices(words)
    indices = torch.tensor(indices).unsqueeze(0).to(device)  # 添加 batch 维度

    with torch.no_grad():
        outputs = model(indices)
        _, predicted_class = torch.max(outputs, 1)

    # 输出结果
    sentiment = '积极' if predicted_class.item() == 1 else '消极'
    print(f"{string} [{sentiment}]")
    return sentiment

# 测试预测
string = '还不错，符合需求'
pred_result = bi_lstm_predict(string)
print(f"预测结果: {pred_result}")

Epoch 1, Training Loss: 0.6879
Epoch 1, Validation Loss: 0.6876, Accuracy: 0.5384, Precision: 0.6858, Recall: 0.1624, F1 Score: 0.2626
Best model saved!
Epoch 2, Training Loss: 0.6920
Epoch 2, Validation Loss: 0.6811, Accuracy: 0.5741, Precision: 0.7085, Recall: 0.2695, F1 Score: 0.3905
Best model saved!
Epoch 3, Training Loss: 0.6793
Epoch 3, Validation Loss: 0.6839, Accuracy: 0.5090, Precision: 0.5079, Recall: 0.9588, F1 Score: 0.6641

Training History:
 Epoch  Training Loss  Validation Loss  Accuracy  Precision   Recall  F1 Score
     1       0.687855         0.687609  0.538370   0.685771 0.162377  0.262580
     2       0.692025         0.681118  0.574135   0.708487 0.269537  0.390508
     3       0.679287         0.683928  0.509000   0.507933 0.958821  0.664074
还不错，符合需求 [积极]
预测结果: 积极
