In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
from collections import Counter
import matplotlib.pyplot as plt

In [2]:
device = torch.device('cuda')

In [3]:
# 1. 数据预处理
class Toutiao_News(Dataset):
    def __init__(self, file_path, max_len=30):
        self.titles = []
        self.labels = []
        self.catgory = []
        self.max_len = max_len
        
        with open(file_path, 'r', encoding='utf-8') as f:
            data = f.readlines()
        
        for line in data:
            parts = line.strip().split('_!_')
            if len(parts) >= 5:
                self.labels.append(int(parts[1]))  # 标签字段
                self.titles.append(parts[3])       # 标题字段
                self.catgory.append(parts[2])
        
        self.cat_dict = dict(zip(self.catgory,self.labels)) #建立类型到对应编号的字典
        
        self.build_vocab() #构建词表
        
        self.label_to_idx = {label: idx for idx, label in enumerate(set(self.labels))} #以字典的方式为标签分配索引
        self.idx_to_label = {idx: label for label, idx in self.label_to_idx.items()}
        
    
    def build_vocab(self):
        all_chars = []
        for title in self.titles:
            all_chars.extend(list(title))
        
        char_counts = Counter(all_chars) #进行字符频次统计
        
        self.vocab = {'<PAD>': 0, '<UNK>': 1} 
        for char, count in char_counts.most_common(5000):  #选取最高频的5000个字并编号
            self.vocab[char] = len(self.vocab)
        
        self.vocab_size = len(self.vocab)
    
    def text_to_tensor(self, text):
        
        indices = [self.vocab.get(char, 1) for char in list(text)] #将字符串拆成单个字符
        
        if len(indices) > self.max_len: #若长度大于上界，则截断
            indices = indices[:self.max_len]
        else:
            indices = indices + [0] * (self.max_len - len(indices)) #若长度小于上界，则填充
        
        return torch.tensor(indices, dtype=torch.long) #转成张量
    
    def __len__(self):
        return len(self.titles)
    
    def __getitem__(self, idx):
        title = self.titles[idx]
        label = self.labels[idx]
        
        title_tensor = self.text_to_tensor(title)
        label_idx = self.label_to_idx[label]
        
        return title_tensor, torch.tensor(label_idx, dtype=torch.long) #X，y的访问方式

In [4]:
# 2. 定义RNN模型
class TextRNN(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim, n_layers = 1, dropout = 0.5): #n_layers为LSTM的层数
        super().__init__()
        
        self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx = 0) #词向量嵌入，填充索引=0
        self.rnn = nn.LSTM(embed_dim, hidden_dim, n_layers, batch_first=True, dropout = dropout if n_layers > 1 else 0) #batch_first=True批量优先
        #nn.LSTM的输出样式为(output, (hidden, cell))，其中output为所有时间步的隐藏状态，hidden为最后一个时间步的隐藏状态，cell为最后一个时间步的细胞状态
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout)
    
    def forward(self, text):
        
        embedded = self.embedding(text)  # 函数会获取到[batch_size, seq_len, embed_dim]
        
        output, (hidden, cell) = self.rnn(embedded) #output: [batch_size, seq_len, hidden_dim];hidden: [n_layers, batch_size, hidden_dim]; cell: [n_layers, batch_size, hidden_dim]
        
        hidden = self.dropout(hidden[-1])  # 取最后一个隐藏层[batch_size, hidden_dim]
        
        return self.fc(hidden)  # 全连接层[batch_size, output_dim]

In [5]:
# 3. 训练函数
def train(net, iterator, optimizer, loss, device):
    net.train()
    epoch_loss = 0
    epoch_acc = 0
    
    for train_batch in iterator:
        optimizer.zero_grad()
        
        # 将数据移动到GPU
        text, labels = train_batch
        text = text.to(device)
        labels = labels.to(device)
        
        predictions = net(text)
        
        l = loss(predictions, labels)
        acc = categorical_accuracy(predictions, labels)
        
        l.backward()
        optimizer.step()
        
        epoch_loss = epoch_loss + l.item()
        epoch_acc = epoch_acc + acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)

In [6]:
# 4. 评估函数
def evaluate(net, iterator, loss, device):
    net.eval()
    epoch_loss = 0
    epoch_acc = 0
    
    with torch.no_grad():
        for train_batch in iterator:
            text, labels = train_batch
            text = text.to(device)
            labels = labels.to(device)
            
            predictions = net(text)
            
            l = loss(predictions, labels)
            acc = categorical_accuracy(predictions, labels)
            
            epoch_loss = epoch_loss + l.item()
            epoch_acc = epoch_acc + acc.item()
    
    return epoch_loss / len(iterator), epoch_acc / len(iterator)


def categorical_accuracy(preds, y):
    max_preds = preds.argmax(dim=1, keepdim=True) #可用于处理多批量的情况
    correct = max_preds.squeeze(1).eq(y) #用squeeze(1)来移除dim1是必要的，否则eq(y)会触发广播机制
    return correct.sum() / y.shape[0] 

In [7]:
def save_model():     # 保存模型
    torch.save({
        'model_state_dict': net.state_dict(),
        'vocab': dataset.vocab,
        'label_to_idx': dataset.label_to_idx,
        'idx_to_label': dataset.idx_to_label,
        'cat_dict': dataset.cat_dict,
        'max_len': max_len
    }, 'text_rnn.pth')
    print("模型已保存到 text_rnn.pth")



In [9]:
# 5. 主函数

# 超参数设置
batch_size = 64
embed_dim = 300
hidden_dim = 128
n_layers = 2
dropout = 0.5
n_epoch = 10
lr = 0.001
max_len = 50

# 数据集划分
dataset = Toutiao_News('data/toutiao_cat_data.txt/toutiao_cat_data.txt', max_len)

train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_data, test_data = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_data, batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size)

# 定义网络
net = TextRNN(
    vocab_size = dataset.vocab_size,
    embed_dim = embed_dim,
    hidden_dim = hidden_dim,
    output_dim = len(dataset.label_to_idx),
    n_layers = n_layers,
    dropout = dropout
).to(device)  # 将模型移动到GPU

optimizer = optim.Adam(net.parameters(), lr)
loss = nn.CrossEntropyLoss()

# 训练
train_losses = []
train_accs = []
test_losses = []
test_accs = []

In [None]:
for epoch in range(n_epoch):
    train_loss, train_acc = train(net, train_loader, optimizer, loss, device)
    test_loss, test_acc = evaluate(net, test_loader, loss, device)

    train_losses.append(train_loss)
    train_accs.append(train_acc)
    test_losses.append(test_loss)
    test_accs.append(test_acc)

    print(f'Epoch: {epoch}')
    print(f'Train Loss: {train_loss:.4f}  Train Acc: {train_acc*100:.4f}%')
    print(f'Test Loss: {test_loss:.4f}  Test Acc: {test_acc*100:.4f}%')

# 绘制训练曲线
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.legend()
plt.title('Loss')

plt.subplot(1, 2, 2)
plt.plot(train_accs, label='Train Acc')
plt.plot(test_accs, label='Test Acc')
plt.legend()
plt.title('Accuracy')
plt.close() 

In [None]:
save_model()