In [1]:
import numpy as np
import torch
from torch import nn, optim
import torch.nn.functional as F
import sys
import torchtext.vocab as Vocab
import codecs
import collections
import torch.utils.data as Data
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

### 1、导入原始数据

In [3]:
def load_sentence(data_path):
    sentences = []
    sentence = []
    for line in codecs.open(data_path, 'r', 'utf8'):
        line = line.rstrip()
        if not line:
            if len(sentence) > 0:
                if 'DOCSTART' not in sentence[0][0]:
                    sentences.append(sentence)
                sentence = []
        else:
            word = line.split()
            sentence.append(word)
    if len(sentence) > 0:
        if 'DOCSTART' not in sentence[0][0]:
            sentences.append(sentence)
    return sentences

### 2、制作数据词典

In [4]:
def get_vocab(sentences):    
    words = []
    for sentence in sentences:
        for sen in sentence:
            word = sen[0]
            words.append(word)
    counter = collections.Counter(words)
    vocab = Vocab.Vocab(counter)
    return vocab, len(vocab)

### 3、制作数据索引

In [27]:
def get_index(sentences, vocab, tag_to_idx):
    
    max_l = max([len(sentence) for sentence in sentences])
    def pad(x):  # 补全语料句子长度
        return x + [0] * (max_l - len(x))
    
    tokenized_data = []
    tags_sentence = []
    for sentence in sentences:
        tokenized_sentence = []
        sentence_tag = []
        for w in sentence:
            tokenized_sentence.append(vocab.stoi[w[0]])  # TEXT.vocab.stoi 对应词寻找下标； itos 对应下标寻找词 
            sentence_tag.append(tag_to_idx[w[1]])
        tokenized_data.append(pad(tokenized_sentence))
        tags_sentence.append(pad(sentence_tag))
    features = torch.tensor(tokenized_data, dtype=torch.long)
    labels = torch.tensor(tags_sentence, dtype=torch.long)
    return features, labels
# features, labels = get_index(NER_sentences, NER_vocab, tag_to_idx)

### 4、导入词向量

In [6]:
def load_pretrained_embedding(words, pretrained_vocab):
    '''
    @params:
        words: 需要加载词向量的词语列表，以 itos (index to string) 的词典形式给出
        pretrained_vocab: 预训练词向量
    @return:
        embed: 加载到的词向量
    '''
    embed= torch.zeros(len(words), 100)  # 初始化语料对应词向量
    oov_count = 0  # 计总袋外词数量
    for i, word in enumerate(words):
        try:
            idx = pretrained_vocab.stoi[word]
            embed[i, :] = pretrained_vocab.vectors[idx]
        except KeyError:
            oov_count += 1
    if oov_count > 0:
        print("There are %d oov words." % oov_count)
    return embed

### 5、搭建模型

In [7]:
class RNNModel(nn.Module):
    def __init__(self, input_dim, embed_size, hidden_dim, out_dim, BiFlag=False):
        super(RNNModel, self).__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim * (2 if BiFlag else 1)
        # 定义词嵌入层
        self.embedding = nn.Embedding(input_dim, embed_size)
        # 定义LSTM网络的输入、输出、是否双向
        self.rnn_layer = nn.LSTM(input_size=embed_size, hidden_size=hidden_dim,
                                bidirectional=BiFlag)
        # 定义线性分类层
        self.dense = nn.Linear(self.hidden_dim, out_dim)
    
    def forward(self, inputs, state):  # 前向传播计算
        # 嵌入层要求输入格式为longTensor
        x = self.embedding(inputs)  # shape：(batch_size, vocab_size, embed_size)
        hiddens, state = self.rnn_layer(x, state)  # hiddens为输出  state为最新的隐层状态
        # 由于以上的输入x为3D的tensor，因此需要转化为2D的tensor
        hiddens = hiddens.view(-1, hiddens.shape[-1])  # hiddens.shape: (num_steps * batch_size, hidden_size)
        outputs = self.dense(hiddens)
        return outputs, state

### 6、启动训练

In [None]:
def train_NER(model, num_epochs, train_data):
    loss = nn.CrossEntropyLoss()  # 在使用CrossEntropyLoss()这个函数进行验证时，标签必须从0开始设置，否则便会报错。
    optimizer = optim.SGD(model.parameters(), lr=0.01)
    model.to(device)
    state = None
    for epoch in range(num_epochs):
        l_sum = []
        d_iter = make_data_iter(NER_source, NER_target)
        for x,y in train_data:
            if state is not None:
                if isinstance (state, tuple): # LSTM, state:(h, c)  
                    state = (state[0].detach(), state[1].detach())
                else:
                    state = state.detach()
            (output, state) = model(x, state)
            l = loss(output, y.view(-1))
            optimizer.zero_grad()
            l.backward()
            optimizer.step()
            l_sum.append(float(l))
        print("epoch %d , loss %f"%(epoch, np.mean(l_sum)))

In [9]:
tag_to_idx = {"<PAD>": 0, "O": 1, "B-LOC": 2, "I-LOC": 3, "B-PER":4, "I-PER":5, "B-ORG": 6, "I-ORG": 7}
idx_to_tag = {"0":"<PAD>", "1": "O", "2": "B-LOC", "3": "I-LOC", "4": "B-PER", "5": "I-PER", "6":"B-ORG", "7":"I-ORG"}
NER_sentences = load_sentence("./dataset/example.train")
NER_vocab, vocab_size= get_vocab(NER_sentences)  # 语料词典和词典大小
tags_size = len(tag_to_idx)  # target标签大小
NER_source, NER_target = get_index(NER_sentences, NER_vocab, tag_to_idx)

In [14]:
cache_dir = "./dataset"
glove_vocab = Vocab.Vectors(name='wiki_100.utf8', cache=cache_dir)
vocab_vec = load_pretrained_embedding(NER_vocab.itos, glove_vocab)
train_set  = Data.TensorDataset(NER_source[:1000], NER_target[:1000])
train_iter = Data.DataLoader(train_set, 50, shuffle=True)
model = RNNModel(vocab_size, 100, 128, tags_size)
model.embedding.weight.data.copy_(vocab_vec)
model.embedding.weight.requires_grad = False # 直接加载预训练好的, 所以不需要更新它

There are 12 oov words.


In [16]:
train_NER(model, 5, train_iter)

In [72]:
train_set  = Data.TensorDataset(NER_source[:1000], NER_target[:1000])
train_iter = Data.DataLoader(train_set, 50, shuffle=True)
# for x,y in train_iter:
#     print(x.shape)
#     print(x)
#     break

torch.Size([50, 574])
tensor([[ 454,  585,  199,  ...,    0,    0,    0],
        [  19,  347,   96,  ...,    0,    0,    0],
        [ 784, 1192,  511,  ...,    0,    0,    0],
        ...,
        [  24,   38,  230,  ...,    0,    0,    0],
        [1874,   95,  278,  ...,    0,    0,    0],
        [  95,  185,   38,  ...,    0,    0,    0]])


2.0