In [5]:
import tensorflow as tf
import numpy as np

# 加载IMDB数据集，num_words=10000表示使用前10000个最常用的词
(train_data, train_labels), (test_data, test_labels) = tf.keras.datasets.imdb.load_data(num_words=10000)

# 创建反向映射
word_index = tf.keras.datasets.imdb.get_word_index()
index_word = {index: word for word, index in word_index.items()}

# 将每个评论的整数列表转换为单词列表
train_texts = [' '.join([index_word.get(i - 3, '?') for i in review]) for review in train_data]
test_texts = [' '.join([index_word.get(i - 3, '?') for i in review]) for review in test_data]

# 合并训练集和测试集文本
texts = train_texts + test_texts
labels = np.array(train_labels.tolist() + test_labels.tolist())


In [6]:
import re
from nltk.corpus import stopwords
import nltk

# 下载 NLTK 停用词库（如果尚未下载）
nltk.download('stopwords')

# 获取英文停用词
stop_words = set(stopwords.words('english'))

# 定义文本清理函数
def clean_text(text):
    # 转小写
    text = text.lower()
    # 去除标点符号和非字母字符
    text = re.sub(r'[^a-z\s]', '', text)
    # 按空格分词
    words = text.split()
    # 去除停用词
    cleaned_words = [word for word in words if word not in stop_words]
    # 重新组合为字符串
    return ' '.join(cleaned_words)

# 清理文本
cleaned_texts = [clean_text(text) for text in texts]


[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Administrator\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [7]:
from gensim.models import Word2Vec
from gensim.models.word2vec import LineSentence


# 保存清理后的文本到文件
with open("../txt/cleaned_texts.txt", 'w') as f:
    for text in cleaned_texts:
        f.write(text + '\n')

# 训练Word2Vec模型
word2vec_model = Word2Vec(LineSentence('../txt/cleaned_texts.txt'), vector_size=50, window=10, min_count=1, workers=4)
word2vec_model.save("word2vec_50dim.model")

# 查看词向量示例
print(word2vec_model.wv['good'])


[ 2.5568116  -0.5148969  -0.30211067 -0.6525784  -0.7846254  -0.07693809
  2.7421577  -3.8669763   3.233074   -0.05970954 -0.15017028  0.16839494
  1.3168415  -1.35753     1.1577648   1.1919928  -1.2399462  -0.38931683
 -0.58445    -0.08187722 -2.61016     1.9360516  -1.0797453   0.2688658
 -1.2063103  -0.5822842   1.4312816   4.364593    0.5273557   0.0417451
 -1.8826184  -1.4872185  -0.37934548 -0.08250141  1.116712   -1.1693203
 -1.6756998  -2.7791555   0.55821395  0.85290354 -1.491963    5.5283957
  1.3680674  -1.787526   -2.5441642  -1.9117925  -1.3347747   2.600239
  1.1327598  -5.540544  ]


In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split  # 添加这个导入

# 检查是否可以使用 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 假设 word2vec_model 已经加载好
embedding_size = 50  # 假设使用50维的词向量
sequence_length = 100  # 统一句子长度为100

# 将文本转化为整数索引
def text_to_indices(text, model, sequence_length=100):
    sentence_indices = []
    for word in text.split():
        if word in model.wv:
            # 获取词的索引
            sentence_indices.append(model.wv.key_to_index[word])
        else:
            sentence_indices.append(0)  # 如果词不在词汇表中，使用0（通常代表未知词）
    
    # 填充或截断到固定长度
    if len(sentence_indices) < sequence_length:
        sentence_indices.extend([0] * (sequence_length - len(sentence_indices)))
    else:
        sentence_indices = sentence_indices[:sequence_length]
    
    return np.array(sentence_indices)

# 将所有文本转换为索引
inputs = np.array([text_to_indices(text, word2vec_model, sequence_length) for text in cleaned_texts])

# 假设labels已经定义
inputs_tensor = torch.LongTensor(inputs).to(device)
labels_tensor = torch.LongTensor(labels).to(device)

# 检查文本和标签的数量是否一致
print(len(inputs_tensor) == len(labels_tensor))  # 确保数据和标签长度一致

# 划分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(inputs_tensor, labels_tensor, test_size=0.2, random_state=42)

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

True


In [9]:
class TransformerModel(nn.Module):
    def __init__(self, embedding_size, num_classes, num_heads, num_layers, hidden_dim, dropout=0.1):
        super(TransformerModel, self).__init__()

        # 将 NumPy 数组转换为 PyTorch tensor
        word2vec_tensor = torch.tensor(word2vec_model.wv.vectors, dtype=torch.float32).to(device)

        # 嵌入层
        self.embedding = nn.Embedding.from_pretrained(word2vec_tensor, freeze=False)  # 使用预训练词向量

        # Transformer模型，设置batch_first=True
        self.transformer = nn.Transformer(d_model=embedding_size, nhead=num_heads,
                                          num_encoder_layers=num_layers, dim_feedforward=hidden_dim,
                                          dropout=dropout, batch_first=True)
        # 分类头
        self.fc = nn.Linear(embedding_size, num_classes)

    def forward(self, x):
        # x.shape: (batch_size, seq_len)
        x = self.embedding(x)  # 将输入转为嵌入，输出形状为 (batch_size, seq_len, embedding_size)

        # 检查 Transformer 输入的维度是否正确
        assert x.size(-1) == self.transformer.d_model, f"Input dimension {x.size(-1)} does not match d_model {self.transformer.d_model}"

        # Transformer模型
        # 注意：这里的输入 x 和目标 x 是相同的，符合自编码器结构
        transformer_output = self.transformer(x, x)  # 使用自编码器结构，源和目标相同

        # 取Transformer输出的最后一个时间步的输出
        x = transformer_output[:, -1, :]  # 形状变为 (batch_size, embedding_size)

        # 分类层
        x = self.fc(x)  # 直接通过分类层

        return x

In [15]:
# 训练并验证函数
def train_and_validate(model, train_loader, val_loader, epochs=10, lr=0.001):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)  # 学习率衰减

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for batch in train_loader:
            inputs, labels = batch
            inputs, labels = inputs.to(device), labels.to(device)  # 将输入和标签移到设备上
            optimizer.zero_grad()
            outputs = model(inputs)  # 前向传播
            loss = loss_fn(outputs, labels)  # 计算损失
            loss.backward()  # 反向传播
            optimizer.step()  # 更新参数
            total_loss += loss.item()

        # 验证阶段
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for batch in val_loader:
                inputs, labels = batch
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

        accuracy = 100 * correct / total
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {total_loss/len(train_loader):.4f}, Val Accuracy: {accuracy:.2f}%")

        scheduler.step()  # 更新学习率

In [16]:
# 选择模型并开始训练
transformer_model = TransformerModel(
    embedding_size=embedding_size,
    num_classes=2,
    num_heads=10,
    num_layers=2,
    hidden_dim=256
).to(device)

train_and_validate(transformer_model, train_loader, val_loader, epochs=10, lr=0.001)

# 保存模型
torch.save(transformer_model.state_dict(), 'model/transformer_model.pth')

Epoch [1/10], Loss: 0.3572, Val Accuracy: 86.42%
Epoch [2/10], Loss: 0.2975, Val Accuracy: 86.95%
Epoch [3/10], Loss: 0.2585, Val Accuracy: 87.38%
Epoch [4/10], Loss: 0.2026, Val Accuracy: 87.40%
Epoch [5/10], Loss: 0.1883, Val Accuracy: 87.31%
Epoch [6/10], Loss: 0.1800, Val Accuracy: 87.06%
Epoch [7/10], Loss: 0.1713, Val Accuracy: 87.12%
Epoch [8/10], Loss: 0.1690, Val Accuracy: 87.14%
Epoch [9/10], Loss: 0.1687, Val Accuracy: 87.02%
Epoch [10/10], Loss: 0.1678, Val Accuracy: 87.04%


RuntimeError: Parent directory model does not exist.

In [23]:
# 加载训练好的Transformer模型
transformer_model.load_state_dict(torch.load('../model/transformer_model.pth', weights_only=True))
transformer_model.eval()  # 切换为评估模式

correct = 0
total = 0
with torch.no_grad():  # 不需要计算梯度
    for batch in val_loader:
        inputs, labels = batch
        inputs, labels = inputs.to(device), labels.to(device)  # 确保输入和标签在正确的设备上
        outputs = transformer_model(inputs)  # 获取模型输出

        # 获取预测类别
        _, predicted = torch.max(outputs, 1)
        print(predicted.shape)
        # 统计正确预测的数量
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

# 计算准确率
accuracy = 100 * correct / total
print(f"Accuracy: {accuracy:.2f}%")


torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64])
torch.Size([64