# 自然语言处理部分

利用自然语言处理为sentiment score

## 导入库

In [None]:
import pandas as pd
import numpy as np
import os

import torch
print(torch.__version__)

import torch
import torch.nn as nn
import torch.nn.functional as F

from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from time import time
import torchtext
print(torchtext.__version__)

## Read Data


In [None]:
train_data = pd.read_csv(r"C:\Software\Local Things (Coding)\comp\2025大学生建模比赛\代码\比赛项目代码\处理后数据集\train_cleaned.csv")
test_data = pd.read_csv(r"C:\Software\Local Things (Coding)\comp\2025大学生建模比赛\代码\比赛项目代码\处理后数据集\test_cleaned.csv")
# print (train_data.shape)
# print (test_data.shape)
print(f"train_data.columns: {train_data.columns} \n train_data.shape: {train_data.shape}")
print(f"test_data.columns: {test_data.columns} \n test_data.shape: {test_data.shape}")


In [None]:
print(train_data['text_after'][:5])
print(train_data['text_token'][:5])

## Split and Tozkenize

In [None]:

# ===============================
# 1. 数据预处理
# ===============================
# 假设 train_data 是已加载的 DataFrame，其中包含 'text_token'、'text' 和 'sentiment' 字段
df = train_data.copy()

# 先将 sentiment 转为 category 并获取类别数
df.sentiment = df.sentiment.astype('category')
num_classes = len(df.sentiment.cat.categories)

# 保证文本是字符串，且对情感标签编码
df.text_token = df.text_token.map(lambda x: str(x))
print(f"before encoding: {df['sentiment'][:5]}")
df.sentiment = df.sentiment.cat.codes  # 此时 sentiment 为整数索引
print(f"after encoding: {df['sentiment'][:5]}")
print("划分训练、验证、测试集 ...")
# 划分数据集；这里使用 stratify 保证标签分布均衡
x_train, x_test, y_train, y_test = train_test_split(
    df.text_token.values,
    df.sentiment.values,
    stratify=df.sentiment.values,
    test_size=0.2,
    random_state=42
)

# 从 x_test 中划分验证集（例如前4122个样本）和测试集
x_val = x_test[:4122]
y_val = y_test[:4122]
x_test = x_test[4122:]
y_test = y_test[4122:]


In [None]:

# ===============================
# 2. 文本分词、构建词汇表及序列填充（使用 torchtext）
# ===============================
import torchtext
from torchtext.data import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

# 定义分词器
tokenizer = get_tokenizer("basic_english")
df.text = df.text.astype(str)
# 构建词汇表：这里使用 df.text 字段来生成词汇表，并限制最大词汇量为 5000
vocab = build_vocab_from_iterator(
    (tokenizer(text) for text in df.text.values),
    specials=["<unk>"],
    # max_tokens=5000
)
vocab.set_default_index(vocab["<unk>"])
# 得到 word 到索引的映射字典
word_index = vocab.get_stoi()
vocab_size = len(vocab)
maxlen = 100  # 序列最大长度

# 定义将文本转换为整数索引序列的函数
def text_to_sequence(text, tokenizer, vocab):
    return vocab(tokenizer(text))

# 将训练、验证、测试文本转换为序列
X_train = [text_to_sequence(text, tokenizer, vocab) for text in x_train]
X_val = [text_to_sequence(text, tokenizer, vocab) for text in x_val]
X_test = [text_to_sequence(text, tokenizer, vocab) for text in x_test]

# 定义填充函数，使每个序列固定为 maxlen
def pad_sequence(seq, maxlen=maxlen, padding_value=0):
    if len(seq) > maxlen:
        return seq[:maxlen]
    else:
        return seq + [padding_value] * (maxlen - len(seq))

X_train = [pad_sequence(seq, maxlen) for seq in X_train]
X_val   = [pad_sequence(seq, maxlen) for seq in X_val]
X_test  = [pad_sequence(seq, maxlen) for seq in X_test]

# 将列表转换为 torch.Tensor
X_train = torch.tensor(X_train, dtype=torch.long)
X_val   = torch.tensor(X_val, dtype=torch.long)
X_test  = torch.tensor(X_test, dtype=torch.long)

# 注意：损失函数使用 CrossEntropyLoss 要求标签为整数索引，不做 one_hot 转换
y_train = torch.tensor(np.array(y_train, dtype=np.int64).flatten(), dtype=torch.long)
y_val   = torch.tensor(np.array(y_val, dtype=np.int64).flatten(), dtype=torch.long)
y_test  = torch.tensor(np.array(y_test, dtype=np.int64).flatten(), dtype=torch.long)

print(f"Vocabulary Size: {vocab_size}")
print(f"X_train shape: {X_train.shape}")
print(f"y_train shape: {y_train.shape}")


## Load pretrained model

In [None]:

# ===============================
# 3. 加载预训练GloVe词向量并构造 embedding_matrix
# ===============================
pretrained_model = r"C:\Dataset\预训练模型\glove.twitter.27B.200d.txt"
embeddings_index = {}
with open(pretrained_model, 'r', encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coefs = np.asarray(values[1:], dtype='float32')
        embeddings_index[word] = coefs

# glove.twitter.27B.200d 的每个词向量维度为 200
embedding_dim = 200
# 构造 embedding_matrix，行数为 vocab_size，列数为 embedding_dim
embedding_matrix = np.zeros((vocab_size, embedding_dim))
for word, idx in word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[idx] = embedding_vector

# 转换为 torch.Tensor
embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float)


# Model 1: MLP

## MLP Model Setting

In [None]:

# ===============================
# 4. 定义基于预训练Embedding的 PyTorch 模型
# ===============================
class MLPGlove(nn.Module):
    def __init__(self, vocab_size, embedding_dim, embedding_matrix, maxlen, activation_func="relu"):
        super(MLPGlove, self).__init__()
        self.maxlen = maxlen
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 加载预训练词向量，若希望允许 fine-tuning 则保持 requires_grad=True，否则可冻结：
        self.embedding.weight.data.copy_(embedding_matrix)
        # 若希望冻结预训练层，将下行取消注释：
        # self.embedding.weight.requires_grad = False

        # 激活函数选择
        if activation_func == "relu":
            self.act = F.relu
        elif activation_func == "elu":
            self.act = F.elu
        elif activation_func == "sigmoid":
            self.act = torch.sigmoid
        elif activation_func == "tanh":
            self.act = torch.tanh
        else:
            self.act = F.relu

        # 模型结构：先全局最大池化，再多层全连接
        self.fc1 = nn.Linear(embedding_dim, 30)
        self.bn1 = nn.BatchNorm1d(30)
        self.dropout1 = nn.Dropout(0.2)

        self.fc2 = nn.Linear(30, 10)
        self.bn2 = nn.BatchNorm1d(10)
        self.dropout2 = nn.Dropout(0.2)

        self.fc3 = nn.Linear(10, num_classes)  # 输出类别数

    def forward(self, x):
        # x: (batch_size, maxlen)
        x = self.embedding(x)  # -> (batch_size, maxlen, embedding_dim)
        # Global max pooling：在 maxlen 维度上取最大值，结果形状为 (batch_size, embedding_dim)
        x, _ = torch.max(x, 1)
        x = self.fc1(x)
        x = self.act(x)
        x = self.bn1(x)
        x = self.dropout1(x)

        x = self.fc2(x)
        x = self.act(x)
        x = self.bn2(x)
        x = self.dropout2(x)

        x = self.fc3(x)
        # 对输出使用 softmax 得到概率分布（训练时 CrossEntropyLoss 内部已包含 softmax，因此此处可选择不加 softmax）
        return F.softmax(x, dim=1)  # -> (batch_size, num_classes)
        # 若希望在推断时输出概率，则可以返回 F.softmax(x, dim=1)

# ===============================
# 5. 定义训练函数（含超参数搜索接口）
# ===============================
from torch.utils.data import TensorDataset, DataLoader

from tqdm import tqdm

def mlp_glove_train(activation, optimizer_name, epochs, batch_size):
    # 创建模型实例
    model = MLPGlove(vocab_size, embedding_dim, embedding_matrix, maxlen, activation_func=activation)
    
    # 选择优化器
    if optimizer_name.lower() == "adam":
        optimizer = torch.optim.Adam(model.parameters())
    elif optimizer_name.lower() == "sgd":
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    elif optimizer_name.lower() == "rmsprop":
        optimizer = torch.optim.RMSprop(model.parameters(), lr=0.01)
    else:
        optimizer = torch.optim.Adam(model.parameters())
    
    # 损失函数：使用 CrossEntropyLoss（目标标签为整数索引）
    criterion = nn.CrossEntropyLoss()
    
    # 构建 DataLoader
    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    history = {"train_acc": [], "val_acc": []}
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        correct, total = 0, 0
        for batch_X, batch_y in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{epochs}"):
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_X)  # 输出 shape: (batch_size, num_classes)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            preds = torch.argmax(outputs, dim=1)
            correct += (preds == batch_y).sum().item()
            total += batch_y.size(0)
        train_acc = correct / total
        
        # 验证阶段
        model.eval()
        correct_val, total_val = 0, 0
        with torch.no_grad():
            for batch_X, batch_y in tqdm(val_loader, desc=f"Validation Epoch {epoch+1}/{epochs}"):
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X)
                preds = torch.argmax(outputs, dim=1)
                correct_val += (preds == batch_y).sum().item()
                total_val += batch_y.size(0)
        val_acc = correct_val / total_val
        
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        # 可打印每轮结果以监控训练进程
        print(f"Epoch {epoch+1}/{epochs}: train_acc={train_acc:.4f}, val_acc={val_acc:.4f}")
    
    return history, model



## MLP hepyerparameters tuning

In [None]:

# ===============================
# 6. 超参数调优实验与画图
# ===============================
# 定义待选超参数
activation_list = ["relu", "elu", "sigmoid", "tanh"]
optimizer_list  = ["adam", "SGD", "RMSprop"]
epochs_list     = [5, 10, 15, 20]
batchsize_list  = [8, 16, 32, 64, 128]

# 6.1 选择最佳激活函数（固定 optimizer=adam, epochs=5, batch_size=16）
sel_activation = {}
for act in activation_list:
    history, model = mlp_glove_train(act, "adam", 5, 16)
    sel_activation[act] = history["val_acc"][-1]
    del model
best_activation = max(sel_activation, key=sel_activation.get)
print("best activation function is ", best_activation)

# 6.2 选择最佳优化器（固定 activation=best_activation, epochs=5, batch_size=16）
sel_optimizer = {}
for opt in optimizer_list:
    history, model = mlp_glove_train(best_activation, opt, 5, 16)
    sel_optimizer[opt] = history["val_acc"][-1]
    del model
best_optimizer = max(sel_optimizer, key=sel_optimizer.get)
print("best optimizer is ", best_optimizer)

# 6.3 分析不同 epochs 对准确率的影响（固定 activation=best_activation, optimizer=best_optimizer, batch_size=16）
acc_train_epoch = {}
acc_val_epoch = {}
for ep in epochs_list:
    history, model = mlp_glove_train(best_activation, best_optimizer, ep, 16)
    acc_train_epoch[ep] = history["train_acc"][-1]
    acc_val_epoch[ep]   = history["val_acc"][-1]
    del model
best_epoch = max(acc_val_epoch, key=acc_val_epoch.get)
print("best epoch is ", best_epoch)

df_epoch_train = pd.DataFrame(list(acc_train_epoch.items()), columns=['Epochs', 'Accuracy'])
df_epoch_val   = pd.DataFrame(list(acc_val_epoch.items()), columns=['Epochs', 'Accuracy'])
df_epoch_train['Epochs'] = df_epoch_train['Epochs'].astype(str)
df_epoch_val['Epochs'] = df_epoch_val['Epochs'].astype(str)

plt.figure()
plt.plot(df_epoch_train['Epochs'], df_epoch_train['Accuracy'], "r--o", label="train")
plt.plot(df_epoch_val['Epochs'], df_epoch_val['Accuracy'], "b--o", label="val")
plt.title("Accuracy vs Epochs")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
os.makedirs("images", exist_ok=True)
plt.savefig(os.path.join("images", "acc-epoch-glove-case_identifier.png"), bbox_inches='tight', dpi=200)

# 6.4 分析不同 batch_size 对准确率的影响
acc_train_batch = {}
acc_val_batch = {}
for bs in batchsize_list:
    history, model = mlp_glove_train(best_activation, best_optimizer, best_epoch, bs)
    acc_train_batch[bs] = history["train_acc"][-1]
    acc_val_batch[bs]   = history["val_acc"][-1]
    del model
best_batch = max(acc_val_batch, key=acc_val_batch.get)
print("best batchsize is ", best_batch)

df_batch_train = pd.DataFrame(list(acc_train_batch.items()), columns=['Batchsize', 'Accuracy'])
df_batch_val   = pd.DataFrame(list(acc_val_batch.items()), columns=['Batchsize', 'Accuracy'])
df_batch_train['Batchsize'] = df_batch_train['Batchsize'].astype(str)
df_batch_val['Batchsize'] = df_batch_val['Batchsize'].astype(str)

plt.figure()
plt.plot(df_batch_train['Batchsize'], df_batch_train['Accuracy'], "r--o", label="train")
plt.plot(df_batch_val['Batchsize'], df_batch_val['Accuracy'], "b--o", label="val")
plt.title("Accuracy vs Batchsize")
plt.xlabel("Batchsize")
plt.ylabel("Accuracy")
plt.legend()
plt.savefig(os.path.join("images", "acc-batch-glove-case_identifier.png"), bbox_inches='tight', dpi=200)

# ===============================
# 7. 最终模型在测试集上的评估
# ===============================
t0 = time()
history, best_model = mlp_glove_train(best_activation, best_optimizer, best_epoch, best_batch)

from torch.utils.data import TensorDataset, DataLoader
test_dataset = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_dataset, batch_size=best_batch, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_model.to(device)
best_model.eval()
correct_test, total_test = 0, 0
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = best_model(batch_X)
        preds = torch.argmax(outputs, dim=1)
        correct_test += (preds == batch_y).sum().item()
        total_test += batch_y.size(0)
test_accuracy = correct_test / total_test
print("test accuracy score = ", test_accuracy)
t1 = time()
print("time taken is ", t1 - t0)


## Save & Run All

In [None]:
# 保存最佳模型参数
os.makedirs('models', exist_ok=True)
torch.save(best_model.state_dict(), r'models\MLP_best_model.pth')
# activation:sigmoid
# optimizer: RMSprop
# best_epoch: 10
# batch_size: 8

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_activation = 'sigmoid'
best_optimizer = 'RMSprop'
best_epoch = 10
best_batch = 8
# 加载模型参数到一个新的模型实例
new_model = MLPGlove(activation_func=best_activation,vocab_size=vocab_size,embedding_dim=embedding_dim,embedding_matrix=embedding_matrix,maxlen=maxlen)  # 假设 MLP_Glove 是您的模型类
new_model.load_state_dict(torch.load(r'models\MLP_best_model.pth'))
new_model.to(device)
new_model.eval()


In [None]:
test_data['full_text_token'][:3]

In [None]:
# 准备预测数据集（确保转换为数值张量）
X_tbp = [text_to_sequence(text,tokenizer,vocab) for text in test_data['full_text_token']]

X_tbp = [pad_sequence(seq, maxlen) for seq in X_tbp]
text_sequences = torch.tensor(X_tbp, dtype=torch.long)

pred_dataset = TensorDataset(text_sequences)
pred_loader = DataLoader(pred_dataset, batch_size=best_batch, shuffle=False)

# 存储预测结果和概率
all_preds = []
all_probs = []

with torch.no_grad():
    for batch in pred_loader:
        batch_X = batch[0].to(device)
        outputs = new_model(batch_X)
        probs = torch.softmax(outputs, dim=1)  # 转换为概率
        preds = torch.argmax(probs, dim=1)
        
        all_probs.extend(probs.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

# 将结果添加回原始数据
test_data['prediction'] = all_preds
test_data['probabilities'] = list(np.array(all_probs))  # 存储为概率数组列

# 可选：展开概率到独立列
prob_df = pd.DataFrame(all_probs, columns=[f'prob_class_{i}' for i in range(len(all_probs[0]))])
test_data = pd.concat([test_data, prob_df], axis=1)

# 查看结果
print(test_data[['full_text', 'prediction', 'probabilities']].head())


In [None]:
test_data.to_csv('MLP_prediction.csv', index=False)

# Model 2: CNN

## CNN Model Setting

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class CNNGlove(nn.Module):
    def __init__(self, vocab_size, embedding_dim, embedding_matrix, maxlen, num_classes, activation_func="relu"):
        super(CNNGlove, self).__init__()
        self.maxlen = maxlen
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 加载预训练词向量
        self.embedding.weight.data.copy_(embedding_matrix)
        # 若希望冻结预训练层，将下行取消注释：
        # self.embedding.weight.requires_grad = False

        # 激活函数选择
        if activation_func == "relu":
            self.act = F.relu
        elif activation_func == "elu":
            self.act = F.elu
        elif activation_func == "sigmoid":
            self.act = torch.sigmoid
        elif activation_func == "tanh":
            self.act = torch.tanh
        else:
            self.act = F.relu

        # 模型结构：卷积层 + 最大池化层 + Dropout
        self.conv1 = nn.Conv1d(in_channels=embedding_dim, out_channels=32, kernel_size=2)
        self.pool1 = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=32, kernel_size=2)
        self.pool2 = nn.MaxPool1d(2)
        self.conv3 = nn.Conv1d(in_channels=32, out_channels=16, kernel_size=2)
        self.pool3 = nn.MaxPool1d(2)
        self.conv4 = nn.Conv1d(in_channels=16, out_channels=16, kernel_size=2)
        self.pool4 = nn.MaxPool1d(2)

        # 全连接层
        self.fc1 = nn.Linear(16 * ((maxlen - 8) // 16), 64)  # 计算公式：((maxlen - 8) // 16) 为卷积和池化后的长度
        self.fc2 = nn.Linear(64, num_classes)

        # Dropout层
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)

    def forward(self, x):
        # x: (batch_size, maxlen)
        x = self.embedding(x)  # -> (batch_size, maxlen, embedding_dim)
        x = x.transpose(1, 2)  # -> (batch_size, embedding_dim, maxlen)

        x = self.conv1(x)
        x = self.act(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = self.act(x)
        x = self.pool2(x)
        x = self.dropout1(x)
        x = self.conv3(x)
        x = self.act(x)
        x = self.pool3(x)
        x = self.conv4(x)
        x = self.act(x)
        x = self.pool4(x)
        x = self.dropout1(x)

        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc1(x)
        x = self.act(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        return F.softmax(x, dim=1)  # -> (batch_size, num_classes)
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm

def cnn_glove_train(activation, optimizer_name, epochs, batch_size):
    # 创建模型实例
    model = CNNGlove(vocab_size, embedding_dim, embedding_matrix, maxlen, num_classes, activation_func=activation)

    # 选择优化器
    if optimizer_name.lower() == "adam":
        optimizer = torch.optim.Adam(model.parameters())
    elif optimizer_name.lower() == "sgd":
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    elif optimizer_name.lower() == "rmsprop":
        optimizer = torch.optim.RMSprop(model.parameters(), lr=0.01)
    else:
        optimizer = torch.optim.Adam(model.parameters())
    
    # 损失函数：使用 CrossEntropyLoss（目标标签为整数索引）
    criterion = nn.CrossEntropyLoss()
    
    # 构建 DataLoader
    train_dataset = TensorDataset(X_train, y_train)
    val_dataset = TensorDataset(X_val, y_val)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    history = {"train_acc": [], "val_acc": []}
    
    for epoch in range(epochs):
        # 训练阶段
        model.train()
        correct, total = 0, 0
        for batch_X, batch_y in tqdm(train_loader, desc=f"Training Epoch {epoch+1}/{epochs}"):
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            optimizer.zero_grad()
            outputs = model(batch_X)  # 输出 shape: (batch_size, num_classes)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()
            
            preds = torch.argmax(outputs, dim=1)
            correct += (preds == batch_y).sum().item()
            total += batch_y.size(0)
        train_acc = correct / total
        
        # 验证阶段
        model.eval()
        correct_val, total_val = 0, 0
        with torch.no_grad():
            for batch_X, batch_y in tqdm(val_loader, desc=f"Validation Epoch {epoch+1}/{epochs}"):
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                outputs = model(batch_X)
                preds = torch.argmax(outputs, dim=1)
                correct_val += (preds == batch_y).sum().item()
                total_val += batch_y.size(0)
        val_acc = correct_val / total_val
        
        history["train_acc"].append(train_acc)
        history["val_acc"].append(val_acc)
        # 可打印每轮结果以监控训练进程
        print(f"Epoch {epoch+1}/{epochs}: train_acc={train_acc:.4f}, val_acc={val_acc:.4f}")
    
    return history, model



## CNN hyperparameters tuning

In [None]:
import torch
import torch.optim as optim
import torch.nn.functional as F
import pandas as pd
import matplotlib.pyplot as plt
import os
from time import time
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

# ===============================
# 6. 超参数调优实验与画图
# ===============================
# 定义待选超参数
activation_list = ["relu", "elu", "sigmoid", "tanh"]
optimizer_list  = ["adam", "SGD", "RMSprop"]
epochs_list     = [5, 10, 15, 20]
batchsize_list  = [8, 16, 32, 64, 128]

# 6.1 选择最佳激活函数（固定 optimizer=adam, epochs=5, batch_size=16）
sel_activation = {}
for act in activation_list:
    # history, model = cnn_glove_train(, epochs=5, batch_size=16)
    history, model = cnn_glove_train(act, "adam", 5, 16,)
    sel_activation[act] = history["val_acc"][-1]
    del model
best_activation = max(sel_activation, key=sel_activation.get)
print("best activation function is ", best_activation)

# 6.2 选择最佳优化器（固定 activation=best_activation, epochs=5, batch_size=16）
sel_optimizer = {}
for opt in optimizer_list:
    # history, model = cnn_glove_train(model=CNNGlove,optimizer_name=best_activation, opt, 5, 16)
    history, model = cnn_glove_train(best_activation, opt, 5, 16)
    sel_optimizer[opt] = history["val_acc"][-1]
    del model
best_optimizer = max(sel_optimizer, key=sel_optimizer.get)
print("best optimizer is ", best_optimizer)

# 6.3 分析不同 epochs 对准确率的影响（固定 activation=best_activation, optimizer=best_optimizer, batch_size=16）
acc_train_epoch = {}
acc_val_epoch = {}
for ep in epochs_list:
    history, model = cnn_glove_train(best_activation, best_optimizer, ep, 16)
    acc_train_epoch[ep] = history["train_acc"][-1]
    acc_val_epoch[ep]   = history["val_acc"][-1]
    del model
best_epoch = max(acc_val_epoch, key=acc_val_epoch.get)
print("best epoch is ", best_epoch)

df_epoch_train = pd.DataFrame(list(acc_train_epoch.items()), columns=['Epochs', 'Accuracy'])
df_epoch_val   = pd.DataFrame(list(acc_val_epoch.items()), columns=['Epochs', 'Accuracy'])
df_epoch_train['Epochs'] = df_epoch_train['Epochs'].astype(str)
df_epoch_val['Epochs'] = df_epoch_val['Epochs'].astype(str)

plt.figure()
plt.plot(df_epoch_train['Epochs'], df_epoch_train['Accuracy'], "r--o", label="train")
plt.plot(df_epoch_val['Epochs'], df_epoch_val['Accuracy'], "b--o", label="val")
plt.title("Accuracy vs Epochs")
plt.xlabel("Epochs")
plt.ylabel("Accuracy")
plt.legend()
os.makedirs("images", exist_ok=True)
plt.savefig(os.path.join("images", "acc-epoch-glove-case_identifier.png"), bbox_inches='tight', dpi=200)

# 6.4 分析不同 batch_size 对准确率的影响
acc_train_batch = {}
acc_val_batch = {}
for bs in batchsize_list:
    history, model = cnn_glove_train(best_activation, best_optimizer, best_epoch, bs)
    acc_train_batch[bs] = history["train_acc"][-1]
    acc_val_batch[bs]   = history["val_acc"][-1]
    del model
best_batch = max(acc_val_batch, key=acc_val_batch.get)
print("best batchsize is ", best_batch)

df_batch_train = pd.DataFrame(list(acc_train_batch.items()), columns=['Batchsize', 'Accuracy'])
df_batch_val   = pd.DataFrame(list(acc_val_batch.items()), columns=['Batchsize', 'Accuracy'])
df_batch_train['Batchsize'] = df_batch_train['Batchsize'].astype(str)
df_batch_val['Batchsize'] = df_batch_val['Batchsize'].astype(str)

plt.figure()
plt.plot(df_batch_train['Batchsize'], df_batch_train['Accuracy'], "r--o", label="train")
plt.plot(df_batch_val['Batchsize'], df_batch_val['Accuracy'], "b--o", label="val")
plt.title("Accuracy vs Batchsize")
plt.xlabel("Batchsize")
plt.ylabel("Accuracy")
plt.legend()
plt.savefig(os.path.join("images", "acc-batch-glove-case_identifier.png"), bbox_inches='tight', dpi=200)

# ===============================
# 7. 最终模型在测试集上的评估
# ===============================
t0 = time()
history, best_model = cnn_glove_train(best_activation, best_optimizer, best_epoch, best_batch)

# 创建 DataLoader 用于测试集
test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.long), torch.tensor(y_test, dtype=torch.long))
test_loader = DataLoader(test_dataset, batch_size=best_batch, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_model.to(device)
best_model.eval()

correct_test, total_test = 0, 0
with torch.no_grad():
    for batch_X, batch_y in test_loader:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        outputs = best_model(batch_X)
        preds = torch.argmax(outputs, dim=1)
        correct_test += (preds == batch_y).sum().item()
        total_test += batch_y.size(0)

test_accuracy = correct_test / total_test
print("test accuracy score = ", test_accuracy)

t1 = time()
print("time taken is ", t1 - t0)


## Save & Run all

In [None]:
# 保存最佳模型参数
os.makedirs('models', exist_ok=True)
torch.save(best_model.state_dict(), r'models\CNN_best_model.pth')
# activation:elu
# optimizer: adam
# best_epoch: 5
# batch_size: 128


In [None]:
best_activation = 'elu'
best_optimizer = 'adam'
best_epoch = 5
best_batch = 128
# 加载模型参数到一个新的模型实例
new_model = CNNGlove(activation_func=best_activation,vocab_size=vocab_size,embedding_dim=embedding_dim,embedding_matrix=embedding_matrix,maxlen=maxlen,num_classes=3)  # 假设 CNN_Glove 是您的模型类
new_model.load_state_dict(torch.load(r'models\CNN_best_model.pth'))
new_model.to(device)
new_model.eval()

In [None]:

X_tbp = [text_to_sequence(text,tokenizer,vocab) for text in test_data['full_text_token']]

X_tbp = [pad_sequence(seq, maxlen) for seq in X_tbp]
text_sequences = torch.tensor(X_tbp, dtype=torch.long)

pred_dataset = TensorDataset(text_sequences)
pred_loader = DataLoader(pred_dataset, batch_size=best_batch, shuffle=False)

# 存储预测结果和概率
all_preds = []
all_probs = []

with torch.no_grad():
    for batch in pred_loader:
        batch_X = batch[0].to(device)
        outputs = new_model(batch_X)
        probs = torch.softmax(outputs, dim=1)  # 转换为概率
        preds = torch.argmax(probs, dim=1)
        
        all_probs.extend(probs.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())

# 将结果添加回原始数据
test_data['prediction'] = all_preds
test_data['probabilities'] = list(np.array(all_probs))  # 存储为概率数组列

# 可选：展开概率到独立列
prob_df = pd.DataFrame(all_probs, columns=[f'prob_class_{i}' for i in range(len(all_probs[0]))])
test_data = pd.concat([test_data, prob_df], axis=1)

# 查看结果
print(test_data[['full_text', 'prediction', 'probabilities']].head())


In [None]:
test_data.to_csv('CNN_prediction.csv', index=False)

# Model 3: BERT