In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
import pandas as pd
import jieba
from tqdm import tqdm
from torch.nn.utils.rnn import pack_padded_sequence
from torch.nn.utils.rnn import pad_sequence
from collections import defaultdict
from torch import optim
from torch.nn import functional as F
import pickle


In [2]:
# 该类用于实现token到索引的映射
class Vocab:

    def __init__(self, tokens=None) -> None:
        # 构造函数
        # tokens：全部的token列表

        self.idx_to_token = list()
        # 将token存成列表，索引直接查找对应的token即可
        self.token_to_idx = dict()
        # 将索引到token的映射关系存成字典，键为索引，值为对应的token

        if tokens is not None:
            # 构造时输入了token的列表
            if "<unk>" not in tokens:
                # 不存在标记
                tokens = tokens + "<unk>"
            for token in tokens:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1
                # 当前该token对应的索引是当下列表的最后一个
            self.unk = self.token_to_idx["<unk>"]

    @classmethod
    def build(cls, data, min_freq=1, reserved_tokens=None, stop_words='cn_sentiment_test/dataset/cn_stopwords.txt'):
        # 构建词表
        # cls：该类本身
        # data: 输入的文本数据
        # min_freq：列入token的最小频率
        # reserved_tokens：额外的标记token
        # stop_words：停用词文件路径
        token_freqs = defaultdict(int)
        stopwords = open(stop_words, encoding="utf-8").read().split('\n')
        for i in tqdm(range(data.shape[0]), desc=f"Building vocab"):
            review_tokens = jieba.lcut(str(data.iloc[i]["review"]))
            for token in review_tokens:
                if token in stopwords:
                    continue
                token_freqs[token] += 1
        # 统计各个token的频率
        uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])
        # 加入额外的token
        uniq_tokens += [token for token, freq in token_freqs.items() \
                        if freq >= min_freq and token != "<unk>"]
        # 全部的token列表
        return cls(uniq_tokens)

    def __len__(self):
        # 返回词表的大小
        return len(self.idx_to_token)

    def __getitem__(self, token):
        # 查找输入token对应的索引，不存在则返回<unk>返回的索引
        return self.token_to_idx.get(token, self.unk)

    def convert_tokens_to_ids(self, tokens):
        # 查找一系列输入标签对应的索引值
        return [self[token] for token in tokens]

    def convert_ids_to_tokens(self, ids):
        # 查找一系列索引值对应的标记
        return [self.idx_to_token[index] for index in ids]



In [3]:
'''数据集构建函数'''


def build_data(data_path: str):
    '''
    Args:
       data_path:待读取本地数据的路径 
    Returns:
       训练集、测试集、词表
    '''
    whole_data = pd.read_csv(data_path,encoding="utf-8")
    # 读取数据为 DataFrame 类型
    vocab = Vocab.build(whole_data)
    # 构建词表

    train_data = [(vocab.convert_tokens_to_ids(sentence), 1) for sentence in
                  whole_data[whole_data["label"] == 1][:30000]["review"]] \
                 + [(vocab.convert_tokens_to_ids(sentence), 0) for sentence in
                    whole_data[whole_data["label"] == 0][:30000]["review"]]
    # 分别取褒贬各30000句作为训练数据，将token映射为对应的索引值

    test_data = [(vocab.convert_tokens_to_ids(sentence), 1) for sentence in
                 whole_data[whole_data["label"] == 1][30000:]["review"]] \
                + [(vocab.convert_tokens_to_ids(sentence), 0) for sentence in
                   whole_data[whole_data["label"] == 0][30000:]["review"]]
    # 其余数据作为测试数据

    return train_data, test_data, vocab



In [4]:

'''声明一个 DataSet 类'''


class MyDataset(Dataset):

    def __init__(self, data) -> None:
        # data：使用词表映射之后的数据
        self.data = data

    def __len__(self):
        # 返回样例的数目
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index]





In [5]:
'''声明一个collate_fn函数，用于对一个批次的样本进行整理'''


def collate_fn(examples):
    # 从独立样本集合中构建各批次的输入输出
    lengths = torch.tensor([len(ex[0]) for ex in examples])
    # 获取每个序列的长度
    inputs = [torch.tensor(ex[0]) for ex in examples]
    # 将输入inputs定义为一个张量的列表，每一个张量为句子对应的索引值序列
    targets = torch.tensor([ex[1] for ex in examples], dtype=torch.long)
    # 目标targets为该批次所有样例输出结果构成的张量
    inputs = pad_sequence(inputs, batch_first=True)
    # 将用pad_sequence对批次类的样本进行补齐
    return inputs, lengths, targets



In [6]:

'''创建一个LSTM类作为模型'''


class LSTM(nn.Module):
    # 基类为nn.Module
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_class):
        # 构造函数
        # vocab_size:词表大小
        # embedding_dim：词向量维度
        # hidden_dim：隐藏层维度
        # num_class:多分类个数
        super(LSTM, self).__init__()

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        # 词向量层
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        # lstm层
        self.output = nn.Linear(hidden_dim, num_class)
        # 输出层，线性变换

    def forward(self, inputs, lengths):
        # 前向计算函数
        # inputs:输入
        # lengths:打包的序列长度
        # print(f"输入为：{inputs.size()}")
        embeds = self.embedding(inputs)
        # 注意这儿是词向量层，不是词袋词向量层
        # print(f"词向量层输出为：{embeds.size()}")
        x_pack = pack_padded_sequence(embeds, lengths.to('cpu'), batch_first=True, enforce_sorted=False)
        # LSTM需要定长序列，使用该函数将变长序列打包
        # print(f"经过打包为：{x_pack.size()}")
        hidden, (hn, cn) = self.lstm(x_pack)
        # print(f"经过lstm计算后为：{hn.size()}")
        outputs = self.output(hn[-1])
        # print(f"输出层输出为：{outputs.size()}")
        log_probs = F.log_softmax(outputs, dim=-1)
        # print(f"输出概率值为：{probs}")
        # 归一化为概率值
        return log_probs


训练参数的设置
词表的构建
文件的读取

In [7]:

'''训练'''

# 超参数设置

embedding_dim = 128
hidden_dim = 24
batch_size = 1024
num_epoch = 10
num_class = 2

datapath = "cn_sentiment_test/dataset/online_shopping_10_cats.csv"
train_data, test_data, vocab = build_data(datapath)


print("vocab:",vocab)

# 保存vocab到文件
def save_vocab_to_txt(vocab, file_path):
    with open(file_path, 'w', encoding='utf-8') as f:
        for token in vocab.idx_to_token:
            f.write(token + '\n')


save_vocab_to_txt(vocab, 'vocab.txt')

# 加载数据
train_dataset = MyDataset(train_data)
test_dataset = MyDataset(test_data)
train_data_loader = DataLoader(train_dataset, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
test_data_loader = DataLoader(test_dataset, batch_size=1, collate_fn=collate_fn, shuffle=False)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)
print("vocab的大小为:", len(vocab))
model = LSTM(len(vocab), embedding_dim, hidden_dim, num_class)

model.to(device)
# 加载模型

nll_loss = nn.NLLLoss()
# 负对数似然损失

optimizer = optim.Adam(model.parameters(), lr=0.001)
# Adam优化器




Building vocab:   0%|          | 0/62773 [00:00<?, ?it/s]Building prefix dict from the default dictionary ...
Dumping model to file cache C:\Users\10578\AppData\Local\Temp\jieba.cache
Loading model cost 0.585 seconds.
Prefix dict has been built successfully.
Building vocab: 100%|██████████| 62773/62773 [00:35<00:00, 1789.72it/s]


vocab: <__main__.Vocab object at 0x0000023B001F6208>
cuda
vocab的大小为: 67869


模型训练代码，数据已在上一个模块加载完毕，模型训练结束过后直接保存至当前目录下，并命名

In [8]:
 #模型训练
model.train()
for epoch in range(num_epoch):
    total_loss = 0
    for batch in tqdm(train_data_loader, desc=f"Training Epoch {epoch}"):
        inputs, lengths, targets = [x.to(device) for x in batch]
        log_probs = model(inputs, lengths)
        loss = nll_loss(log_probs, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Loss:{total_loss:.2f}")

# 保存模型参数
torch.save(model.state_dict(), 'model_params_onlineShopping.pth')

Training Epoch 0: 100%|██████████| 59/59 [00:07<00:00,  8.05it/s]


Loss:36.58


Training Epoch 1: 100%|██████████| 59/59 [00:06<00:00,  9.06it/s]


Loss:26.30


Training Epoch 2: 100%|██████████| 59/59 [00:06<00:00,  9.01it/s]


Loss:21.91


Training Epoch 3: 100%|██████████| 59/59 [00:06<00:00,  9.09it/s]


Loss:20.64


Training Epoch 4: 100%|██████████| 59/59 [00:06<00:00,  8.99it/s]


Loss:18.66


Training Epoch 5: 100%|██████████| 59/59 [00:06<00:00,  8.74it/s]


Loss:17.68


Training Epoch 6: 100%|██████████| 59/59 [00:06<00:00,  8.98it/s]


Loss:16.83


Training Epoch 7: 100%|██████████| 59/59 [00:06<00:00,  8.85it/s]


Loss:16.25


Training Epoch 8: 100%|██████████| 59/59 [00:06<00:00,  9.21it/s]


Loss:15.66


Training Epoch 9: 100%|██████████| 59/59 [00:06<00:00,  9.00it/s]

Loss:15.25





In [9]:
# 测试
acc = 0
for batch in tqdm(test_data_loader, desc=f"Testing"):
    inputs, lengths, targets = [x.to(device) for x in batch]
    with torch.no_grad():
        output = model(inputs, lengths)
        acc += (output.argmax(dim=1) == targets).sum().item()
print(f"ACC:{acc / len(test_data_loader):.2f}")

Testing: 100%|██████████| 2773/2773 [00:07<00:00, 385.38it/s]

ACC:0.85





若已有模型数据可以跳过上面训练阶段，下方直接载入模型数据

In [10]:
model.load_state_dict(torch.load('model_params_onlineShopping.pth'))

<All keys matched successfully>

输入文本预处理函数

In [11]:
stop_words_path = 'cn_sentiment_test/dataset/cn_stopwords.txt'
with open(stop_words_path, encoding="utf-8") as f:
    stopwords = set(f.read().splitlines())

def preprocess_text(text, stopwords=stopwords):
    # 使用结巴分词对文本进行分词
    tokens = jieba.cut(text)
    # 去除空格并转换为小写
    tokens = [token for token in tokens if token not in stopwords]
    # 去除停用词等其他预处理步骤...
    tokens_data = vocab.convert_tokens_to_ids(tokens)
    # 返回分词后的结果
    return tokens_data


In [13]:

# 预测
def predict_sentiment(text):
    # 加载模型参数
    model.load_state_dict(torch.load('model_params_onlineShopping.pth'))
    model.eval()  # 将模型设置为评估模式
    text_data = preprocess_text(text)
    print("text data:",text_data)
    text_dataset = MyDataset(text_data)
    inputs = torch.LongTensor(text_dataset).unsqueeze(0).to(device)
    lengths = torch.LongTensor([len(text_dataset)]).to(device)
    # lengths = torch.tensor(len(text_dataset)).unsqueeze(0)
    # inputs = [torch.tensor(text_dataset)]
    inputs=pad_sequence(inputs,batch_first=True)
    with torch.no_grad():
        # inputs, lengths, targets= [x.to(device) for x in text_tuple]
        output = model(inputs, lengths)
        predicted_class = output.argmax(dim=1).item() # 获取预测的类别
    print("inputs:", inputs)
    print("lengths", lengths)
    print("predicted:",predicted_class)
    # 返回预测结果
    if predicted_class == 0:
        return "Negative"
    elif predicted_class == 1:
        return "Positive"


输入文本进行测试

In [14]:

while (1):
    text = ""
    while text == "":
        text = input("输入文本:")
    if text == 'quit': break
    prediction = predict_sentiment(text)
    print(f"Text: {text}")
    print(f"Sentiment: {prediction}")

text data: [10565]
inputs: tensor([[10565]], device='cuda:0')
lengths tensor([1], device='cuda:0')
predicted: 1
Text: 帅
Sentiment: Positive
text data: [14212]
inputs: tensor([[14212]], device='cuda:0')
lengths tensor([1], device='cuda:0')
predicted: 0
Text: 丑
Sentiment: Negative
