# LSTM 实现中文文本情感分析
给出数据集路径，划分训练集测试集，完成训练，输出准确率，并保存模型

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.utils.data.dataloader as dataloader
import torch.optim as optim
import tqdm
import os
import time
import re
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import jieba
import gensim
import time
from itertools import chain
from sklearn.metrics import accuracy_score

In [2]:
neg_path = 'data/neg.txt'
pos_path = 'data/pos.txt'

In [3]:
# 分词器
def tokenizer(text):
    result = jieba.lcut(text)
    return result

# 预处理数据
# 读取数据
neg_data = open(neg_path, 'r', encoding='utf-8').read().split('\n')
pos_data = open(pos_path, 'r', encoding='utf-8').read().split('\n')

# 分词
neg_data = [tokenizer(x) for x in neg_data]
pos_data = [tokenizer(x) for x in pos_data]

# 划分训练集、测试集
data = np.array(neg_data+pos_data)
label = np.concatenate((np.zeros(len(neg_data)), np.ones(len(pos_data))))
train_X, test_X, train_y, test_y = train_test_split(data, label, test_size=0.2, random_state=5)

# 构建词表
vocab = set(chain(*data))
vocab_size = len(vocab)
word_to_idx = {word: i + 1 for i, word in enumerate(vocab)}
word_to_idx['<unk>'] = 0
idx_to_word = {i + 1: word for i, word in enumerate(vocab)}
idx_to_word[0] = '<unk>'

Building prefix dict from the default dictionary ...
Loading model from cache /var/folders/n6/fr8x2twx7v9b5c7qqz1d_d640000gn/T/jieba.cache
Loading model cost 0.711 seconds.
Prefix dict has been built successfully.
  data = np.array(neg_data+pos_data)


In [4]:
def encode_samples(tokenized_samples, vocab):
    features = []
    for sample in tokenized_samples:
        feature = []
        for token in sample:
            if token in word_to_idx:
                feature.append(word_to_idx[token])
            else:
                feature.append(0)
        features.append(feature)
    return features

def pad_samples(features, maxlen=500, PAD=0):
    padded_features = []
    for feature in features:
        if len(feature) >= maxlen:
            padded_feature = feature[:maxlen]
        else:
            padded_feature = feature
            while(len(padded_feature) < maxlen):
                padded_feature.append(PAD)
        padded_features.append(padded_feature)
    return padded_features

In [5]:
train_features = torch.tensor(pad_samples(encode_samples(train_X, vocab)))
train_labels = torch.tensor([score for score in train_y])
test_features = torch.tensor(pad_samples(encode_samples(test_X, vocab)))
test_labels = torch.tensor([score for score in test_y])

In [6]:
print(train_features)

tensor([[52240, 37623,     0,  ...,     0,     0,     0],
        [14502, 54732,     0,  ...,     0,     0,     0],
        [49312, 32686, 70467,  ...,     0,     0,     0],
        ...,
        [41968, 26860,  2939,  ...,     0,     0,     0],
        [14427, 26860, 55932,  ...,     0,     0,     0],
        [61185, 66795, 50773,  ...,     0,     0,     0]])

In [None]:
# tencent 预训练的词向量文件路径
vec_path = "/share_v3/fangcheng/data/Tencent_AILab_ChineseEmbedding.txt"
# 加载词向量文件
wvmodelwvmodel = gensim.models.KeyedVectors.load_word2vec_format(vec_path, binary=False)

In [None]:
class SentimentNet(nn.Module):
    def __init__(self, vocab_size, embed_size, num_hiddens, num_layers,
                 bidirectional, labels, **kwargs):
        super(SentimentNet, self).__init__(**kwargs)
        self.num_hiddens = num_hiddens
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=self.num_hiddens,
                               num_layers=num_layers, bidirectional=self.bidirectional,
                               dropout=0)
        if self.bidirectional:
            self.decoder = nn.Linear(num_hiddens * 4, labels)
        else:
            self.decoder = nn.Linear(num_hiddens * 2, labels)

    def forward(self, inputs):
        embeddings = self.embedding(inputs)
        states, hidden = self.encoder(embeddings.permute([1, 0, 2]))
        encoding = torch.cat([states[0], states[-1]], dim=1)
        outputs = self.decoder(encoding)
        return outputs

In [None]:
num_epochs = 5
embed_size = 100
num_hiddens = 100
num_layers = 2
bidirectional = True
batch_size = 64
labels = 2
lr = 0.8
device = device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

net = SentimentNet(vocab_size=(vocab_size+1), embed_size=embed_size,
                   num_hiddens=num_hiddens, num_layers=num_layers,
                   bidirectional=bidirectional,
                   labels=labels)
net.to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=lr)

In [None]:
train_set = torch.utils.data.TensorDataset(train_features, train_labels)
test_set = torch.utils.data.TensorDataset(test_features, test_labels)

train_iter = torch.utils.data.DataLoader(train_set, batch_size=batch_size,
                                         shuffle=True)
test_iter = torch.utils.data.DataLoader(test_set, batch_size=batch_size,
                                        shuffle=False)

In [None]:
for epoch in range(num_epochs):
    start = time.time()
    train_loss, test_losses = 0, 0
    train_acc, test_acc = 0, 0
    n, m = 0, 0
    for feature, label in train_iter:
        n += 1
        net.zero_grad()
        # feature = Variable(feature.cuda())
        # label = Variable(label.cuda())
        score = net(feature)
        # print(score.type)
        # score = score.to(device=device, dtype=torch.long)
        loss = loss_function(score, label)
        loss.backward()
        optimizer.step()
        train_acc += accuracy_score(torch.argmax(score.cpu().data,
                                                 dim=1), label.cpu())
        train_loss += loss
    with torch.no_grad():
        for test_feature, test_label in test_iter:
            m += 1
            # test_feature = test_feature.cuda()
            # test_label = test_label.cuda()
            test_score = net(test_feature)
            test_loss = loss_function(test_score, test_label)
            test_acc += accuracy_score(torch.argmax(test_score.cpu().data,
                                                    dim=1), test_label.cpu())
            test_losses += test_loss
    end = time.time()
    runtime = end - start
    print('epoch: %d, train loss: %.4f, train acc: %.2f, test loss: %.4f, test acc: %.2f, time: %.2f' %
          (epoch, train_loss.data / n, train_acc / n, test_losses.data / m, test_acc / m, runtime))