In [None]:
import numpy as np # linear algebra
import pandas as pd 

train_df = pd.read_csv("/kaggle/input/alldatas/Traindata.txt", sep='\t', names=['ClassNo','Sentence' , 'ClassName'])
test_df = pd.read_csv("/kaggle/input/alldatas/Testdata.txt", sep='\t',names=['ClassNo','Sentence' , 'ClassName'])


In [None]:
labels = train_df["ClassName"].unique()
labels

In [None]:
import os
import torch
import re
import copy
import torch.nn as nn
from torch.nn import Embedding,Conv1d
from pathlib import Path

import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader, random_split
from torch import nn,optim
from tqdm.notebook import tqdm

from torch.optim import Adam

In [None]:
embedding_dim = 300  # 设定需要的embedding长度
# word_dict是一个单词于id的映射字典，
# <pading>的意思是补0，<unk>代表不认识的单词，其实就是glove词向量中没有的单词都会被认为是<unk>
word_dict = {'<pading>':0,"<unk>": 1}


# 加载对应长度的glove预训练词向量，维度越大的词向量加载越慢，300维的词向量文件有1G

glove_df = pd.read_csv("/kaggle/input/alldatas/glove.6B.300d.txt", sep=" ", quoting=3, header=None, index_col=0)
# 生成对应的字典形式，key为单词，value为词向量
glove_dict = {key: val.values for key, val in glove_df.T.items()}

In [None]:
def word_tokenize(text: str):
    """ 
    这是一个切分单词的函数，这个函数除了简单了分词之外，
    还会将word_dict补充完整，生成完整的词表映射
    """
    word_index = []
    pat = re.compile(r"[\w]+|[.,!?;|]") 
    tokens = pat.findall(text.lower())  
    for token in tokens:
        if token not in word_dict:
            word_dict[token] = len(word_dict) if token in glove_dict else word_dict["<unk>"]
        word_index.append(word_dict[token])
    return word_index
# 训练集和测试集分词
train_text = train_df["Sentence"].apply(lambda s: word_tokenize(str(s)))
test_text = test_df["Sentence"].apply(lambda s: word_tokenize(str(s)))
MAX_LENGTH = 1000  # 最大句子长度
train_text = train_text.apply(lambda x:(x+[0]*MAX_LENGTH)[:MAX_LENGTH])
test_text = test_text.apply(lambda x:(x+[0]*MAX_LENGTH)[:MAX_LENGTH])
s2i = {'News':0,'Opinion':1,'Companies and Markets':2}
# s2i = {'Accounting':1,'Chanticleer':2,'Companies and Markets':3,'Computers':4,'Domain Prestige':5,'Education':6,'Features':7,'Financial Services':8,'Life & Leisure':9,'Market Wrap':10,
#       'Marketing & Media':11,'News':12,'Opinion':13,'Perspective':14,'Property':15,'Review':16,'Saleroom':17,'Smart Investor':18,
#       'Stock Tables':19,'Supplement':20,'Weekend Fin':21,'World':0}
train_df['ClassName']=train_df['ClassName'].replace(s2i).astype(int)
test_df['ClassName']=test_df['ClassName'].replace(s2i).astype(int)

In [None]:
# 预训练的词向量矩阵，这个后面会直接输入embedding层
glove_embeddings = np.zeros((len(word_dict), embedding_dim))
for k, v in word_dict.items():
    if v==0:
        glove_embeddings[v] = np.zeros(embedding_dim)
    glove_embeddings[v] = glove_dict[k] if k in glove_dict else glove_dict["<unk>"]

In [None]:
class CommentDataset(Dataset):
    def __init__(self,texts,labels):
        self.texts=texts
        self.labels=labels
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self,item):
        """
        item 为数据索引，迭代取第item条数据
        """
        text=torch.tensor(self.texts[item],dtype=torch.long)
        label=torch.tensor(self.labels[item],dtype=torch.long)
        return {'text_id':text,'label':label}
def create_data_loader(X,y,batch_size):
    ds=CommentDataset(
        texts = X.values,
        labels=y.values
    )
    return DataLoader(
        ds,
        batch_size=batch_size
    )
BATCH_SIZE = 128
train_data_loader = create_data_loader(train_text,train_df['ClassName'], BATCH_SIZE)
val_data_loader = create_data_loader(test_text,test_df['ClassName'], BATCH_SIZE)

In [None]:
import torch
import torch.nn as nn

class RNN(nn.Module):
    def __init__(self, weight, embedding_dim=300, hidden_size=128):
        super(RNN, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(weight, freeze=False)
        self.rnn = nn.RNN(300, hidden_size, batch_first=True, num_layers=2, dropout=0.4)
        self.linear = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.5)
        self.out = nn.Linear(hidden_size, 3)

    def forward(self, X):
        X = self.embedding(X)
        _, hidden = self.rnn(X)
        hidden = hidden[-1]  # 取最后一层的隐藏状态
        hidden = self.dropout(self.linear(hidden))
        out = self.out(hidden)
        return out


In [None]:
from collections import defaultdict

history = defaultdict(list)


In [None]:
import time
from sklearn.metrics import f1_score
test_f1_scores = []
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # 设置CUDA
N_EPOCHS = 100  # 设置模型训练次数
learning_rate = 3e-4  # 初始学习率
model_name = 'TextRNN'
model = TextRNN(torch.from_numpy(glove_embeddings).float())
model = model.to(device)
# 优化器
optimizer = Adam(model.parameters(), learning_rate)

# 损失
criterion = nn.CrossEntropyLoss().to(device)
best_acc = 0

# 在训练循环之前添加计时代码
train_start_time = time.time()
for epoch in tqdm(range(N_EPOCHS)):
    # 启用模型的训练模式
    model.train()
    # 定义损失
    epoch_loss = 0
    epoch_acc = 0
    val_number = 0
    for batch in train_data_loader:
        text_id = batch['text_id'].to(device)
        label = batch['label'].to(device)
        predictions = model(text_id)
        loss = criterion(predictions, label)
        epoch_loss += loss.item()
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        preds = predictions.max(1)[1]
        epoch_acc += ((preds == label).sum().item())
        val_number += label.size(0)
    history[f'{model_name}_train_loss'].append(epoch_loss / len(train_data_loader))
    history[f'{model_name}_train_accuracy'].append(epoch_acc / val_number)
    print(f'第{epoch + 1}轮，训练Loss：', epoch_loss / len(train_data_loader), '，训练准确率：', epoch_acc / val_number)
train_end_time = time.time()
train_total_time = train_end_time - train_start_time
print("训练总运行时间: ", train_total_time, "秒")

In [None]:
model.eval()
# 初始化损失
epoch_loss = 0
epoch_acc = 0
val_number = 0
predictions_test = []
labels_test = []
# 不计算梯度
with torch.no_grad():
    for batch in val_data_loader:
        text_id = batch['text_id'].to(device)
        label = batch['label'].to(device)
        predictions = model(text_id)
        loss = criterion(predictions, label)
        epoch_loss += loss.item()
        preds = predictions.max(1)[1]
        epoch_acc += ((preds == label).sum().item())
        val_number += label.size(0)
        predictions_test.extend(preds.tolist())
        labels_test.extend(label.tolist())
history[f'{model_name}_test_loss'].append(epoch_loss / len(val_data_loader))
history[f'{model_name}_test_accuracy'].append(epoch_acc / val_number)
if best_acc < (epoch_acc / val_number):
    best_acc = epoch_acc / val_number
print('测试Loss：', epoch_loss / len(val_data_loader), '，测试准确率：', epoch_acc / val_number)
print('-' * 20)

In [None]:
history['TextRNN_train_accuracy']

In [None]:
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker

plt.style.use('ggplot')
plt.figure(figsize=(16,5),dpi=150)
plt.subplot(1,2,1)
plt.plot(history['TextRNN_train_loss'],label='TextCNN')
# plt.plot(history['CNNText_inception_test_loss'],label='CNNText_inception')
plt.title('Train Loss')

# 设置坐标轴刻度
plt.xticks(range(1,len(history['TextRNN_train_loss'])+1))
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(4)) # 横坐标间隔为4

# 添加图例并设置位置
plt.legend(frameon=True, loc='upper right', facecolor='white', bbox_to_anchor=(1.0, 1.0))



plt.subplot(1,2,2)
plt.plot(history['TextRNN_train_accuracy'],label='TextCNN')
# plt.plot(history['CNNText_inception_test_accuracy'],label='CNNText_inception')
plt.title('Train Accuracy')
plt.legend(frameon=True,loc='lower right',facecolor='white')
plt.xticks(range(1,len(history['TextRNN_train_loss'])+1))
plt.gca().xaxis.set_major_locator(ticker.MultipleLocator(4)) # 横坐标间隔为5

In [None]:
from sklearn.metrics import precision_recall_fscore_support

# 假设y_true和y_pred分别是真实标签和预测标签
y_true = labels_test
y_pred = predictions_test

# 计算精确度、召回率和F1分数
precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='macro')

print("Macro Precision:", precision)
print("Macro Recall:", recall)
print("Macro F1 Score:", f1)

precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='weighted')

print("Weighted Precision:", precision)
print("Weighted Recall:", recall)
print("Weighted F1 Score:", f1)

precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='micro')

print("Micro Precision:", precision)
print("Micro Recall:", recall)
print("Micro F1 Score:", f1)

In [None]:
save_path = 'kaggleRNN100learnrate3e-4'
torch.save(model.state_dict(), save_path)