<a href="https://colab.research.google.com/github/Lee-gp/NLP/blob/master/cnews_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
import torch
from torch import nn,optim
from tensorflow import keras as kr
import torch.utils.data as Data
from torch.utils.data import DataLoader
import torch.nn.functional as F

In [2]:
# 数据文件设置
from google.colab import drive
import os
drive.mount("/content/drive")
os.chdir("/content/drive/My Drive/Colab Notebooks/BI_core/BI_core_L9")

train_file = './cnews.train.txt'
test_file = './cnews.test.txt'
val_file = './cnews.val.txt'
vocab_file = './cnews.vocab.txt'
sample_file = "./cnews.train.sample.txt"

sample_size = 10
batch_size = 200
epoch = 100

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# 得到采样训练数据
def get_train_sample():
  sample = {}
  #采样数据
  with open(train_file,"r",encoding="UTF-8") as file:
    for line in file.readlines():
      #print(line)
      label,text = line.split("\t")
      if label in sample:
        if len(sample[label]) < sample_size:
          sample[label].append(text)
      else:
        sample[label] = [text]
  #print(sample)
  #保存采样数据
  with open(sample_file,"w",encoding="UTF-8") as file:
    for label,texts in sample.items():
      for text in texts:
        file.write(("{}\t{}".format(label,text)))

In [4]:
# 读取词汇表文件，转化为列表，字典。
def read_words():
  words = []
  with open(vocab_file,'r',encoding='UTF_8',errors='ignore') as file:
    for word in file.readlines():
      word = word.strip()
      if word not in words:
        words.append(word)
  return words,dict(zip(words,range(len(words))))    

In [5]:
# 将类别转化为列表和字典，共10个类别
def read_cates():
  cates = []
  with open(sample_file,"r",encoding="UTF-8") as file:
    for line in file.readlines():
      cate,_ = line.split("\t")
      if cate not in cates:
        cates.append(cate)
  return cates,dict(zip(cates,range(len(cates)))),dict(zip(range(len(cates)),cates))  

In [6]:
# 将文本处理为待训练数据
def process_file(filename,word_to_id,cat_to_id,max_length=200):
  contents,labels = [],[]
  with open(filename,'r',encoding='UTF-8',errors = 'ignore') as file:
    for line in file:
      try:
        label,content = line.strip().split('\t')
        if content:
          contents.append(content)
          labels.append(label)
      except:
        pass       
  data_id,label_id = [],[]
  for i in range(len(contents)):
    #将每句话id化
    data_id.append([word_to_id[x] for x in contents[i] if x in word_to_id])
    label_id.append(cat_to_id[labels[i]])
  #print(data_id)
  #print(label_id)        
  #使用keras提供的pad_sequences将文本pad为固定长度
  x_pad = kr.preprocessing.sequence.pad_sequences(data_id,max_length)
  y_pad = kr.utils.to_categorical(label_id,num_classes = len(cat_to_id))#将标签转换为one_hot表示
  return x_pad,y_pad

In [7]:
# 构建RNN模型
class TextRNN(nn.Module):
  def __init__(self):
    super(TextRNN,self).__init__()
    self.embedding = nn.Embedding(len(words),64) #输入维度4998，word的数量，输出64维
    #双向GRU网络
    self.rnn = nn.GRU(input_size=64,hidden_size=64,num_layers=1,bidirectional=True)
    #因为GRU模型使用了双向向量，输出维度加倍
    self.f1 = nn.Sequential(nn.Linear(128,64),nn.Dropout(0.8),nn.ReLU())
    self.f2 = nn.Sequential(nn.Linear(64,10),nn.Softmax())
        
  def forward(self,x):
    x = self.embedding(x)    
    x,_ = self.rnn(x)
    x = F.dropout(x,p = 0.8)
    #取最后一个时间步的数据
    x = self.f1(x[:,-1,:])
    return self.f2(x)        

In [8]:
# 训练函数
def train(x_train,y_train,x_val,y_val):
  torch_dataset = Data.TensorDataset(x_train,y_train)
  train_loader = DataLoader(dataset=torch_dataset,batch_size=batch_size,shuffle=True,num_workers=3)
  # print(train_loader)
  cuda = torch.device("cuda")
  rnn = TextRNN()
  rnn = rnn.cuda()
  optimizer = torch.optim.Adam(rnn.parameters(),lr=0.001)
  loss_func = nn.MultiLabelSoftMarginLoss()

  for i in range(epoch):
    for step,(b_x,b_y) in enumerate(train_loader):
      #print(step)
      b_x = b_x.cuda()
      b_y = b_y.cuda()
      #print(b_x.detach().cpu().numpy().shape)
      output = rnn(b_x)
      loss = loss_func(output,b_y)

      optimizer.zero_grad()
      loss.backward()
      optimizer.step()

      if step % 500 == 0:
        x_val = x_val.cuda()
        y_val = y_val.cuda()
        out_val = rnn(x_val)
        #print(out_val)
        accuracy = np.mean((torch.argmax(out_val,1) == torch.argmax(y_val,1)).cpu().numpy())
        print("Epoch:{},Step:{},loss:{},accuracy:{}".format(i,step,loss.item(),accuracy))
  return rnn

In [9]:
#模型测试
def test(model,x_test):
  x_test = x_test.cuda()
  out_test = model(x_test)
  #print(out_test)
  class_index = torch.max(out_test,1)[1].data.cpu().numpy()
  category = [id_to_cate[i] for i in class_index]
  print(category[:10])
  return category

In [13]:
print(torch.cuda.is_available())

get_train_sample()
cates,cate_to_id,id_to_cate = read_cates()
words,word_to_id = read_words()
print(len(words))
#准备训练数据
x_train,y_train = process_file(train_file,word_to_id,cate_to_id)
x_val,y_val = process_file(val_file,word_to_id,cate_to_id)
x_test,y_test = process_file(test_file,word_to_id,cate_to_id)
#x_sample,y_sample = process_file(sample_file,word_to_id,cate_to_id)

x_train,y_train = torch.LongTensor(x_train),torch.LongTensor(y_train)
x_val,y_val = torch.LongTensor(x_val),torch.LongTensor(y_val)
x_test,y_test = torch.LongTensor(x_test),torch.LongTensor(y_test)
#x_sample,y_sample = torch.LongTensor(x_sample),torch.LongTensor(y_sample)
#使用小样本预训练
#model = train(x_sample,y_sample,x_val,y_val)
# print(y_val)

True
4998
tensor([[1, 0, 0,  ..., 0, 0, 0],
        [1, 0, 0,  ..., 0, 0, 0],
        [1, 0, 0,  ..., 0, 0, 0],
        ...,
        [0, 0, 0,  ..., 0, 0, 1],
        [0, 0, 0,  ..., 0, 0, 1],
        [0, 0, 0,  ..., 0, 0, 1]])


In [11]:
#使用全量数据训练
model = train(x_train,y_train,x_val,y_val)

  input = module(input)


Epoch:0,Step:0,loss:0.7341630458831787,accuracy:0.1056
Epoch:1,Step:0,loss:0.7196397185325623,accuracy:0.2904
Epoch:2,Step:0,loss:0.7184866666793823,accuracy:0.3162
Epoch:3,Step:0,loss:0.7211172580718994,accuracy:0.3208
Epoch:4,Step:0,loss:0.7169749140739441,accuracy:0.3284
Epoch:5,Step:0,loss:0.7159298658370972,accuracy:0.3348
Epoch:6,Step:0,loss:0.7177308201789856,accuracy:0.3318
Epoch:7,Step:0,loss:0.7192797660827637,accuracy:0.3346
Epoch:8,Step:0,loss:0.7149922847747803,accuracy:0.3352
Epoch:9,Step:0,loss:0.7177204489707947,accuracy:0.3378
Epoch:10,Step:0,loss:0.7143713235855103,accuracy:0.337
Epoch:11,Step:0,loss:0.7150159478187561,accuracy:0.3376
Epoch:12,Step:0,loss:0.7142236232757568,accuracy:0.3452
Epoch:13,Step:0,loss:0.7191746234893799,accuracy:0.3448
Epoch:14,Step:0,loss:0.7102710604667664,accuracy:0.3422
Epoch:15,Step:0,loss:0.7114759087562561,accuracy:0.3392
Epoch:16,Step:0,loss:0.7175334095954895,accuracy:0.3396
Epoch:17,Step:0,loss:0.7117500305175781,accuracy:0.346
Epoc

In [12]:
#获得预测结果
category = test(model,x_test)

['体育', '体育', '体育', '家居', '时政', '体育', '体育', '体育', '体育', '体育']


  input = module(input)
