<a href="https://colab.research.google.com/github/chongzicbo/nlp-ml-dl-notes/blob/master/code/textclassification/text_cnn.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# 基于TextCnn的中文文本分类


In [33]:
import torch
from torch import nn
from torch.nn import functional as F
import math
from torch.utils.data import Dataset,DataLoader
import numpy as np
import random
from sklearn.model_selection import train_test_split
import pandas as pd
import re
from tensorflow.keras.preprocessing import sequence

# 1.数据预处理及准备训练和测试数据

In [34]:
maxlen=300
batch_size=128

## 1.1 文本预处理

In [35]:
def textToChars(filePath):
    """
    读取文本文件并进行处理
    :param filePath:文件路径
    :return:
    """
    lines = []
    df = pd.read_excel(filePath)
    for index, row in df.iterrows():
        row = re.sub("[^\u4e00-\u9fa5]", "", str(row))  # 只保留中文
        lines.append(list(row))
    return lines


def getWordIndex(vocabPath):
    """
    获取word2Index,index2Word
    :param vocabPath:词汇文件
    :return:
    """
    word2Index = {}
    with open(vocabPath, encoding="utf-8") as f:
        for line in f.readlines():
            word2Index[line.strip()] = len(word2Index)
    index2Word = dict(zip(word2Index.values(), word2Index.keys()))
    return word2Index, index2Word


def lodaData(posFile, negFile, word2Index):
    """
    获取训练数据
    :param posFile:正样本文件
    :param negFile:负样本文件
    :param word2Index:
    :return:
    """
    posLines = textToChars(posFile)
    negLines = textToChars(negFile)
    posIndexLines = [[word2Index[word] if word2Index.get(word) else 0 for word in line] for line in posLines]
    negIndexLines = [[word2Index[word] if word2Index.get(word) else 0 for word in line] for line in negLines]
    lines = posIndexLines + negIndexLines
    print("训练样本和测试样本共：%d 个"%(len(lines)))
    # lens = [len(line) for line in lines]
    labels = [1] * len(posIndexLines) + [0] * len(negIndexLines)
    padSequences = sequence.pad_sequences(lines, maxlen=maxlen, padding="post", truncating="post")
    X_train,X_test,y_train,y_test=train_test_split(padSequences,np.array(labels),test_size=0.2,random_state=42)
    return X_train,X_test,y_train,y_test

In [36]:
vocabPath="/content/drive/My Drive/data/vocab.txt"
negFilePath="/content/drive/My Drive/data/text_classify/sentiment/neg.xls"
posFilePath="/content/drive/My Drive/data/text_classify/sentiment/pos.xls"
word2Index, index2Word=getWordIndex(vocabPath)
X_train,X_test,y_train,y_test=lodaData(posFile=posFilePath,negFile=negFilePath,word2Index=word2Index)
print(X_train.shape,X_test.shape,y_train.shape,y_test.shape)

训练样本和测试样本共：21103 个
(16882, 300) (4221, 300) (16882,) (4221,)


## 1.2  自定义Dataset

In [37]:
class MyDataset(Dataset):

  def __init__(self,features,labels):
    """
    features:文本向量化后的特征
    labels:标签向量 
    """
    self.features=features
    self.labels=labels

  def __len__(self):
    return self.features.shape[0]

  def __getitem__(self,index):
    return self.features[index],self.labels[index]

    
train_dataset=MyDataset(X_train,y_train)
test_dataset=MyDataset(X_test,y_test)
train_dataloader=DataLoader(train_dataset,batch_size=batch_size,shuffle=True)
test_dataloader=DataLoader(test_dataset,batch_size=batch_size,shuffle=False)

In [38]:
for features,labels in train_dataloader:
  print(features.shape,labels.shape)
  break

torch.Size([128, 300]) torch.Size([128])


# 2.网络搭建

In [39]:
class TextCnn(nn.Module):
    def __init__(self, param: dict):
        super(TextCnn, self).__init__()
        input_channel = 1  # input channel size
        output_channel = param["output_channel"]  # output channel size
        kernel_size = param["kernel_size"]
        vocab_size = param["vocab_size"]
        embedding_dim = param["embedding_dim"]
        dropout = param["dropout"]
        class_num = param["class_num"]
        self.param = param
        self.embedding = nn.Embedding(vocab_size, embedding_dim,padding_idx=0)
        self.conv1 = nn.Conv2d(input_channel, output_channel, (kernel_size[0], embedding_dim))
        self.conv2 = nn.Conv2d(input_channel, output_channel, (kernel_size[1], embedding_dim))
        self.conv3 = nn.Conv2d(input_channel, output_channel, (kernel_size[2], embedding_dim))
        self.dropout = nn.Dropout(dropout)
        self.fc1 = nn.Linear(len(kernel_size) * output_channel, class_num)

    def init_embedding(self, embedding_matrix):
        self.embedding.weight = nn.Parameter(torch.Tensor(embedding_matrix))

    @staticmethod
    def conv_pool(x, conv):
        """
        卷积+池化
        :param x:[batch_size,1,sequence_length,embedding_dim]
        :param conv:
        :return:
        """
        x = conv(x)  # 卷积， [batch_size,output_channel,h_out,1]
        x = F.relu((x.squeeze(3)))  # 去掉最后一维,[batch_size,output_channel,h_out]
        x = F.max_pool1d(x, x.size(2)).squeeze(2)  # [batch_size,output_channel]
        return x

    def forward(self, x):
        """
        前向传播
        :param x:[batch_size,sequence_length]
        :return:
        """
        x = self.embedding(x)  # [batch_size,sequence_length,embedding_dim]
        x = x.unsqueeze(1)  # 增加一个channel维度 [batch_size,1,sequence_length,embedding_dim]
        x1 = self.conv_pool(x, self.conv1)  # [batch_size,output_channel]
        x2 = self.conv_pool(x, self.conv2)  # [batch_size,output_channel]
        x3 = self.conv_pool(x, self.conv3)  # [batch_size,output_channel]
        x = torch.cat((x1, x2, x3), 1)  # [batch_size,output_channel*3]
        x = self.dropout(x)
        logit = F.log_softmax(self.fc1(x), dim=1)
        return logit

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                if m.bias is not None:
                    m.bias.data.zero_()

            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()


# 3.模型训练

In [40]:
textCNNParams={
    "vocab_size":len(word2Index),
    "embedding_dim":100,
    "class_num":2,
    "output_channel":4,
    "kernel_size":[3,4,5],
    "dropout":0.2
}

In [41]:
net=TextCnn(textCNNParams)
# net.init_weights()

In [42]:
net.cuda()

TextCnn(
  (embedding): Embedding(21128, 100, padding_idx=0)
  (conv1): Conv2d(1, 4, kernel_size=(3, 100), stride=(1, 1))
  (conv2): Conv2d(1, 4, kernel_size=(4, 100), stride=(1, 1))
  (conv3): Conv2d(1, 4, kernel_size=(5, 100), stride=(1, 1))
  (dropout): Dropout(p=0.2, inplace=False)
  (fc1): Linear(in_features=12, out_features=2, bias=True)
)

In [43]:
optimizer=torch.optim.SGD(net.parameters(),lr=0.01)
criterion=nn.NLLLoss()

In [44]:
for epoch in range(10):
  total_train_loss=[]
  net.train()
  for i,(feature,label) in enumerate(train_dataloader):
    feature=feature.cuda()
    label=label.cuda()
    y_pred=net(feature.long()) #前向计算
    loss=criterion(y_pred,label) #计算损失
    optimizer.zero_grad() #清除梯度
    loss.backward() #计算梯度，误差回传
    optimizer.step() #更新参数
    total_train_loss.append(loss.data.item())
  total_valid_loss=[]
  pred_true_labels=0
  net.eval()
  for i,(feature_test,label_test) in enumerate(test_dataloader):
    feature_test=feature_test.cuda()
    label_test=label_test.cuda()
    with torch.no_grad():
      pred_test=net(feature_test.long())
      test_loss=criterion(pred_test,label_test)
      total_valid_loss.append(test_loss.data.item())
      # accu=torch.sum((torch.argmax(pred_test,dim=1)==label_test)).data.item()/feature_test.shape[0]
      pred_true_labels+=torch.sum(torch.argmax(pred_test,dim=1)==label_test).data.item()
      
  print("epoch:{},train_loss:{},test_loss:{},accuracy:{}".format(epoch,np.mean(total_train_loss),np.mean(total_valid_loss),pred_true_labels/len(test_dataset)))

epoch:0,train_loss:0.11022299110437885,test_loss:0.005435514842357599,accuracy:1.0
epoch:1,train_loss:0.025626315403673234,test_loss:0.0016127865140636761,accuracy:1.0
epoch:2,train_loss:0.01653201541877493,test_loss:0.0007606070474580382,accuracy:1.0
epoch:3,train_loss:0.014247172505058574,test_loss:0.00043912876263317287,accuracy:1.0
epoch:4,train_loss:0.01097160065370245,test_loss:0.0002832797701668107,accuracy:1.0
epoch:5,train_loss:0.010533176803481623,test_loss:0.00022185035579075867,accuracy:1.0
epoch:6,train_loss:0.007795979900313823,test_loss:0.0001584696825981761,accuracy:1.0
epoch:7,train_loss:0.006695934636588914,test_loss:0.00011782544364299004,accuracy:1.0
epoch:8,train_loss:0.0052681467575232755,test_loss:9.00564286205212e-05,accuracy:1.0
epoch:9,train_loss:0.00692182230660833,test_loss:8.227612234997761e-05,accuracy:1.0


# 4.模型测试

In [57]:
def predict_one(sentence,net,word2Index):
  sentence=re.sub("[^\u4e00-\u9fa5]", "", str(sentence))  # 只保留中文
  print(sentence)
  sentence=[word2Index[word] if word2Index.get(word) else 0 for word in sentence]
  sentence=sentence+[0]*(maxlen-len(sentence)) if len(sentence)<maxlen else sentence[0:300]
  print(sentence)
  sentence=torch.tensor(np.array(sentence)).view(-1,len(sentence)).cuda()
  label=torch.argmax(net(sentence),dim=1).data.item()
  print(label)


In [61]:
sentence="一次很不爽的购物，页面上说是第二天能到货，结果货是从陕西发出的，卖家完全知道第二天根本到不了货。多处提到送货入户还有100%送货入户也没有兑现，与客服联系多日，还是把皮球踢到快递公司。算是一个教训吧。"
predict_one(sentence,net,word2Index)

一次很不爽的购物页面上说是第二天能到货结果货是从陕西发出的卖家完全知道第二天根本到不了货多处提到送货入户还有送货入户也没有兑现与客服联系多日还是把皮球踢到快递公司算是一个教训吧
[671, 3613, 2523, 679, 4272, 4638, 6579, 4289, 7552, 7481, 677, 6432, 3221, 5018, 753, 1921, 5543, 1168, 6573, 5310, 3362, 6573, 3221, 794, 7362, 6205, 1355, 1139, 4638, 1297, 2157, 2130, 1059, 4761, 6887, 5018, 753, 1921, 3418, 3315, 1168, 679, 749, 6573, 1914, 1905, 2990, 1168, 6843, 6573, 1057, 2787, 6820, 3300, 6843, 6573, 1057, 2787, 738, 3766, 3300, 1050, 4385, 680, 2145, 3302, 5468, 5143, 1914, 3189, 6820, 3221, 2828, 4649, 4413, 6677, 1168, 2571, 6853, 1062, 1385, 5050, 3221, 671, 702, 3136, 6378, 1416, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0