In [145]:
import torch
import numpy as np
from gensim.models import KeyedVectors

In [146]:
# 加载词向量模型
word_vector_dict = KeyedVectors.load_word2vec_format(
    "./Dataset/wiki_word2vec_50.bin", binary=True
)

# 确定词向量的维度
vector_size = word_vector_dict.vector_size

In [147]:
# 从文件中读取数据
def load_data(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        lines = file.readlines()

    sentences = []
    labels = []

    for line in lines:
        parts = line.strip().split()
        label = torch.tensor(float(parts[0]))
        words = parts[1:]

        word_vectors = []

        for word in words:
            if word in word_vector_dict:
                word_vector = word_vector_dict[word]
                word_vectors.append(word_vector)

        word_vectors = np.array(word_vectors)

        if len(word_vectors) > 0:
            sentence_vector = torch.tensor(word_vectors).view(-1, vector_size)
            sentences.append(sentence_vector)
            labels.append(label)
        else:
            print(f"Empty sentence: {line}")

    return {
        "sentences": sentences,
        "labels": labels,
    }

In [148]:
# 加载数据
train_data = load_data("./Dataset/train.txt")
valid_data = load_data("./Dataset/validation.txt")
test_data = load_data("./Dataset/test.txt")

Empty sentence: 0	鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆 鸟爆



In [149]:
# 句子长度的 summary
max_train_len = max([len(sentence) for sentence in train_data["sentences"]])
max_valid_len = max([len(sentence) for sentence in valid_data["sentences"]])
max_test_len = max([len(sentence) for sentence in test_data["sentences"]])
max_len = max(max_train_len, max_valid_len, max_test_len)
print("最大训练集句子长度：", max_train_len)
print("最大验证集句子长度：", max_valid_len)
print("最大测试集句子长度：", max_test_len)
print("最大句子长度：", max_len)

min_train_len = min([len(sentence) for sentence in train_data["sentences"]])
min_valid_len = min([len(sentence) for sentence in valid_data["sentences"]])
min_test_len = min([len(sentence) for sentence in test_data["sentences"]])
min_len = min(min_train_len, min_valid_len, min_test_len)
print("最小训练集句子长度：", min_train_len)
print("最小验证集句子长度：", min_valid_len)
print("最小测试集句子长度：", min_test_len)
print("最小句子长度：", min_len)

average_train_len = np.mean([len(sentence) for sentence in train_data["sentences"]])
average_valid_len = np.mean([len(sentence) for sentence in valid_data["sentences"]])
average_test_len = np.mean([len(sentence) for sentence in test_data["sentences"]])
average_len = np.mean([average_train_len, average_valid_len, average_test_len])
print("平均训练集句子长度：", average_train_len)
print("平均验证集句子长度：", average_valid_len)
print("平均测试集句子长度：", average_test_len)
print("平均句子长度：", average_len)

最大训练集句子长度： 648
最大验证集句子长度： 113
最大测试集句子长度： 81
最大句子长度： 648
最小训练集句子长度： 10
最小验证集句子长度： 11
最小测试集句子长度： 19
最小句子长度： 10
平均训练集句子长度： 43.42051307696154
平均验证集句子长度： 43.61840468999822
平均测试集句子长度： 43.8780487804878
平均句子长度： 43.63898884914919


In [150]:
# 将句子填充到相同长度
def unify_columns(matrix, unified_length):
    col_length, row_length = matrix.size()
    if col_length > unified_length:
        new_matrix = matrix[:unified_length, :]
    elif col_length < unified_length:
        padding = torch.zeros(unified_length - col_length, row_length)
        new_matrix = torch.cat((matrix, padding), dim=0)
    else:
        new_matrix = matrix
    return new_matrix

In [151]:
from torch.utils.data import Dataset

# 定义数据集类
class SentimentDataset(Dataset):
    def __init__(self, sentences, labels, sentences_len):
        self.sentences = sentences
        self.labels = labels
        self.sentence_len = sentences_len

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        sentence = self.sentences[index].float()
        label = self.labels[index]
        # 填充句子到最大长度
        # 转换形状以适配卷积层: (batch_size, embedding_dim, sentence_length)
        output_matrix = unify_columns(sentence, self.sentence_len).permute(1, 0).requires_grad_()
        return output_matrix, label

In [152]:
# 创建Dataset对象
SENTENCE_LEN = 80
train_dataset = SentimentDataset(
    train_data["sentences"], train_data["labels"], sentences_len=SENTENCE_LEN
)
valid_dataset = SentimentDataset(
    valid_data["sentences"], valid_data["labels"], sentences_len=SENTENCE_LEN
)
test_dataset = SentimentDataset(
    test_data["sentences"], test_data["labels"], sentences_len=SENTENCE_LEN
)

print("训练集大小:", len(train_dataset))
print("验证集大小:", len(valid_dataset))
print("测试集大小:", len(test_dataset))

训练集大小: 19997
验证集大小: 5629
测试集大小: 369


In [153]:
from torch.utils.data import DataLoader

# 创建DataLoader
batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [155]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim
import torch.nn.functional as F

In [156]:
# CNN模型
class CNNSentimentClassifier(nn.Module):
    def __init__(self, embedding_dim, num_filters, filter_sizes, output_dim, dropout):
        super(CNNSentimentClassifier, self).__init__()

        # 卷积层列表
        self.conv_layers = nn.ModuleList(
            [
                nn.Conv1d(
                    in_channels=embedding_dim,
                    out_channels=num_filters,
                    kernel_size=size,
                )
                for size in filter_sizes
            ]
        )

        # Dropout层
        self.dropout = nn.Dropout(dropout)

        # 全连接层
        self.fc = nn.Linear(num_filters * len(filter_sizes), output_dim)

        # 初始化卷积层
        for conv_layer in self.conv_layers:
            init.kaiming_normal_(conv_layer.weight, nonlinearity="relu")
            init.zeros_(conv_layer.bias)

        # 初始化全连接层
        init.xavier_normal_(self.fc.weight)
        init.zeros_(self.fc.bias)

    def forward(self, text):

        # 保存所有卷积层的输出
        conv_outputs = []

        # 遍历所有卷积层
        for conv_layer in self.conv_layers:
            # 应用卷积层
            conv_output = conv_layer(text)
            # 应用ReLU激活函数
            conv_output = F.relu(conv_output)
            # 应用最大池化层
            conv_output, _ = torch.max(conv_output, dim=2)
            # 保存卷积层的输出
            conv_outputs.append(conv_output)

        # 将所有卷积层的输出拼接起来
        concat_output = torch.cat(conv_outputs, dim=1)

        # 应用Dropout
        concat_output = self.dropout(concat_output)

        # 应用全连接层
        logits = self.fc(concat_output)

        # 使用sigmoid激活函数
        probs = F.sigmoid(logits)

        return probs

In [157]:
# 超参数
EMBEDDING_DIM = vector_size
NUM_FILTERS = 100
FILTER_SIZES = [3, 5, 7, 9]
OUTPUT_DIM = 1
DROPOUT = 0.5

# 初始化模型
model = CNNSentimentClassifier(
    EMBEDDING_DIM, NUM_FILTERS, FILTER_SIZES, OUTPUT_DIM, DROPOUT
)

# 查看模型结构
print(model)

# 定义损失函数和优化器
loss_function = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters())

CNNSentimentClassifier(
  (conv_layers): ModuleList(
    (0): Conv1d(50, 100, kernel_size=(3,), stride=(1,))
    (1): Conv1d(50, 100, kernel_size=(5,), stride=(1,))
    (2): Conv1d(50, 100, kernel_size=(7,), stride=(1,))
    (3): Conv1d(50, 100, kernel_size=(9,), stride=(1,))
  )
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=400, out_features=1, bias=True)
)


In [158]:
# 定义训练循环
def train_epoch(model, data_loader, loss_function, optimizer):
    model.train()
    total_loss = 0
    for sentences, labels in data_loader:
        # 清除梯度
        optimizer.zero_grad()

        # 前向传播
        probs = model(sentences)

        # 计算损失
        loss = loss_function(probs, labels.unsqueeze(1))

        # 反向传播
        loss.backward()

        # 更新权重
        optimizer.step()

        # 累加损失
        total_loss += loss.item()

    return total_loss / len(data_loader)


# 定义验证循环
def valid_epoch(model, data_loader, loss_function):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for sentences, labels in data_loader:
            # 前向传播
            probs = model(sentences)

            # 计算损失
            loss = loss_function(probs, labels.unsqueeze(1))

            # 累加损失
            total_loss += loss.item()

    return total_loss / len(data_loader)

In [170]:
# 定义准确率计算函数
def accuracy(probs, labels):
    labels = labels.view(-1, 1)
    y_pred = probs >= 0.5
    y = labels == 1
    correct_predictions = (y_pred == y).float()
    correct_predictions = correct_predictions.view(-1)
    accuracy = correct_predictions.sum() / len(labels)
    return accuracy

In [160]:
# 修改训练循环以包括准确率
def train_epoch(model, data_loader, loss_function, optimizer):
    model.train()
    total_loss = 0
    total_accuracy = 0
    for sentences, labels in data_loader:
        optimizer.zero_grad()
        probs = model(sentences)
        loss = loss_function(probs, labels.unsqueeze(1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        batch_accuracy = accuracy(probs, labels)
        total_accuracy += batch_accuracy.item()

    average_loss = total_loss / len(data_loader)
    average_accuracy = total_accuracy / len(data_loader)
    return average_loss, average_accuracy


# 修改验证循环以包括准确率
def valid_epoch(model, data_loader, loss_function):
    model.eval()
    total_loss = 0
    total_accuracy = 0
    with torch.no_grad():
        for sentences, labels in data_loader:
            probs = model(sentences)
            loss = loss_function(probs, labels.unsqueeze(1))
            total_loss += loss.item()
            batch_accuracy = accuracy(probs, labels)
            total_accuracy += batch_accuracy.item()

    average_loss = total_loss / len(data_loader)
    average_accuracy = total_accuracy / len(data_loader)
    return average_loss, average_accuracy


# 开始训练，并打印准确率
num_epochs = 10
for epoch in range(num_epochs):
    train_loss, train_accuracy = train_epoch(
        model, train_loader, loss_function, optimizer
    )
    valid_loss, valid_accuracy = valid_epoch(model, valid_loader, loss_function)
    print(
        f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}, "
        f"Train Accuracy: {train_accuracy:.4f}, Valid Accuracy: {valid_accuracy:.4f}"
    )

Epoch 1/10, Train Loss: 0.6936, Valid Loss: 0.6931, Train Accuracy: 0.5006, Valid Accuracy: 0.5004
Epoch 2/10, Train Loss: 0.6398, Valid Loss: 0.6106, Train Accuracy: 0.6790, Valid Accuracy: 0.7586
Epoch 3/10, Train Loss: 0.6075, Valid Loss: 0.6073, Train Accuracy: 0.7674, Valid Accuracy: 0.7798
Epoch 4/10, Train Loss: 0.5995, Valid Loss: 0.6053, Train Accuracy: 0.7859, Valid Accuracy: 0.7886
Epoch 5/10, Train Loss: 0.5916, Valid Loss: 0.6007, Train Accuracy: 0.8057, Valid Accuracy: 0.7896
Epoch 6/10, Train Loss: 0.5847, Valid Loss: 0.5987, Train Accuracy: 0.8216, Valid Accuracy: 0.7903
Epoch 7/10, Train Loss: 0.5771, Valid Loss: 0.5970, Train Accuracy: 0.8407, Valid Accuracy: 0.7874
Epoch 8/10, Train Loss: 0.5719, Valid Loss: 0.5970, Train Accuracy: 0.8525, Valid Accuracy: 0.8071
Epoch 9/10, Train Loss: 0.5663, Valid Loss: 0.5959, Train Accuracy: 0.8646, Valid Accuracy: 0.8096
Epoch 10/10, Train Loss: 0.5615, Valid Loss: 0.5975, Train Accuracy: 0.8769, Valid Accuracy: 0.8158


In [162]:
# 保存模型
torch.save(model.state_dict(), "cnn_sentiment_model.pth")

In [214]:
def precision_score(probs, labels):
    probs = probs.view(-1)
    y_true = list(labels == 1)
    for i in range(len(y_true)):
        y_true[i] = y_true[i].item()
    print(y_true)
    y_pred = list(probs >= 0.5)
    for i in range(len(y_pred)):
        y_pred[i] = y_pred[i].item()
    print(y_pred)
    true_positives = sum([1 for y, p in zip(y_true, y_pred) if y == True and p == True])
    false_positives = sum(
        [1 for y, p in zip(y_true, y_pred) if y == False and p == True]
    )
    precision = true_positives / (true_positives + false_positives)
    return precision


def recall_score(probs, labels):
    probs = probs.view(-1)
    y_true = list(labels == 1)
    for i in range(len(y_true)):
        y_true[i] = y_true[i].item()

    y_pred = list(probs >= 0.5)
    for i in range(len(y_pred)):
        y_pred[i] = y_pred[i].item()

    true_positives = sum([1 for y, p in zip(y_true, y_pred) if y == True and p == True])
    false_negatives = sum(
        [1 for y, p in zip(y_true, y_pred) if y == True and p == False]
    )
    recall = true_positives / (true_positives + false_negatives)
    return recall

In [172]:
# 加载模型
model.load_state_dict(torch.load("cnn_sentiment_model.pth"))

# 设置模型为评估模式
model.eval()

CNNSentimentClassifier(
  (conv_layers): ModuleList(
    (0): Conv1d(50, 100, kernel_size=(3,), stride=(1,))
    (1): Conv1d(50, 100, kernel_size=(5,), stride=(1,))
    (2): Conv1d(50, 100, kernel_size=(7,), stride=(1,))
    (3): Conv1d(50, 100, kernel_size=(9,), stride=(1,))
  )
  (dropout): Dropout(p=0.5, inplace=False)
  (fc): Linear(in_features=400, out_features=1, bias=True)
)

In [173]:
# 测试循环以评估模型性能
def test_epoch(model, data_loader, loss_function):
    model.eval()
    total_loss = 0
    total_accuracy = 0
    with torch.no_grad():
        for sentences, labels in data_loader:
            probs = model(sentences)
            loss = loss_function(probs, labels.unsqueeze(1))
            total_loss += loss.item()
            batch_accuracy = accuracy(probs, labels)
            total_accuracy += batch_accuracy.item()

    average_loss = total_loss / len(data_loader)
    average_accuracy = total_accuracy / len(data_loader)
    return average_loss, average_accuracy

# 在训练完成后，评估模型在测试集上的性能
test_loss, test_accuracy = test_epoch(model, test_loader, loss_function)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_accuracy:.4f}")


Test Loss: 0.5927, Test Accuracy: 0.8154


In [215]:
# test
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)

total_loss = 0
total_accuracy = 0
total_correct = 0
total_samples = 0
total_precision = 0
total_recall = 0
with torch.no_grad():  # 关闭梯度计算
    for sentences, labels in test_loader:
        probs = model(sentences)

        batch_loss = loss_function(probs, labels.unsqueeze(1))
        total_loss += batch_loss.item()

        batch_accuracy = accuracy(probs, labels)
        total_accuracy += batch_accuracy.item()

        # 计算精确率、召回率和F-score
        batch_precision = precision_score(probs, labels)
        batch_recall = recall_score(probs, labels)

        total_precision += batch_precision
        total_recall += batch_recall

average_loss = total_loss / len(test_loader)
average_accuracy = total_accuracy / len(test_loader)
average_precision = total_precision / len(test_loader)
average_recall = total_recall / len(test_loader)
average_f1 = 2 * ((average_precision * average_recall) / (average_precision + average_recall))

print(f"Average Loss: {average_loss:.4f}")
print(f"Average Accuracy: {average_accuracy:.4f}")
print(f"Average Precision: {average_precision:.4f}")
print(f"Average Recall: {average_recall:.4f}")
print(f"Average F1 Score: {average_f1:.4f}")

[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, Tru