In [1]:
import torch
import torch.autograd as autograd # torch中自動計算梯度模塊
import torch.nn as nn             # 神經網絡模塊
import torch.nn.functional as F   # 神經網絡模塊中的常用功能 
import torch.optim as optim       # 模型優化器模塊
import numpy as np

import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [2]:
def get_data_pairs(file, ratio=0.1):
    pairs = []
    entries = open(file, 'r', encoding='utf8').read().strip().split('\n\n')
    num = int(len(entries) * ratio)
    
    for entry in entries:
        sentence, target = [], []
        for line in entry.split('\n'):
            if line.strip() == '': continue
                
            token, pos, bio = line.split('\t')
            sentence.append(token)
            target.append(bio)
        pairs.append((sentence, target))

    return pairs[num:], pairs[:num]

In [3]:
dse_train, dse_test = get_data_pairs('./dataset/dse.txt')
ese_train, ese_test = get_data_pairs('./dataset/ese.txt')
print(len(dse_train), len(ese_train))

13043 13043


In [4]:
def get_dict(pair_data):
    # not normalized
    word_to_ix = {"_UNK": 0, "_PAD": 1} 
    ix_to_word = {0: "_UNK", 1: "_PAD"}
    for sent, tags in pair_data:
        for word in sent:
            if word not in word_to_ix:
                ix = len(word_to_ix)
                word_to_ix[word] = ix
                ix_to_word[ix] = word

    tag_to_ix = {"B": 0, "I": 1, "O": 2} # 手工設定詞性標籤數據字典
    ix_to_tag = {0: "B", 1: "I", 2: "O"}

    return word_to_ix, tag_to_ix, ix_to_word, ix_to_tag

word_to_ix, tag_to_ix, ix_to_word, ix_to_tag = get_dict(dse_train)

print(len(word_to_ix))

22271


In [5]:
# def get_word_vectors():
#     glove = {}
#     glove_path = 'dataset/glove/'

#     for l in open(f'{glove_path}/glove.6B.50d.txt', 'r', encoding='utf8'):
#         line = l.split()
#         glove[line[0]] = np.array(line[1:]).astype(np.float)
        
#     return glove

In [11]:
def sequence_to_ixs(seq, to_ix):
    ixs = [to_ix[w] if w in to_ix else 0 for w in seq]
    tensor = torch.cuda.LongTensor(ixs)
    
    return autograd.Variable(tensor)

def ixs_to_sequence(seq, to_word):
    tokens = [to_word[ix] for ix in seq]
    
    return tokens


def sequence_to_ixs2(seq):
    vocabs = word_vectors.vocab.keys()
    ixs = [word_vectors.vocab[w].index if w in vocabs else 0 for w in seq]
    tensor = torch.cuda.LongTensor(ixs)
    
    return autograd.Variable(tensor)


In [19]:
class LSTMTagger(nn.Module):
 
    def __init__(self, embedding_dim, hidden_dim, 
                 vocab_size, tagset_size, 
                 dropout, num_layers, bidirectional):
        super(LSTMTagger, self).__init__()
        
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

#         self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        weights = torch.FloatTensor(word_vectors.syn0)
        self.word_embeddings = nn.Embedding.from_pretrained(weights, freeze=True)
        
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, 
                            dropout=dropout, num_layers=num_layers,
                            bidirectional=bidirectional)
 
        self.hidden2tag = nn.Linear(hidden_dim * (1+int(bidirectional)), tagset_size)
    
        self.hidden = self.init_hidden()
 

    def init_hidden(self):
        return (autograd.Variable(torch.zeros(self.num_layers * (1+int(bidirectional)), 1, self.hidden_dim).cuda()),
                autograd.Variable(torch.zeros(self.num_layers * (1+int(bidirectional)), 1, self.hidden_dim).cuda()))
 

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        
        try:
            lstm_out, self.hidden = self.lstm(
                embeds.view(len(sentence), 1, -1), self.hidden)

            tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))

            tag_scores = F.log_softmax(tag_space)

            return tag_scores
        except Exception as e:
            print(sentence)
            print(embeds)
            print(e)

In [20]:
embedding_dim = 300
hidden_dim = 100
learning_rate = 0.005
momentum = 0.7
dropout = 0
num_layers = 3
bidirectional = True
epochs = 200
model_path = 'models/standard.model'

# import gensim
# word_vectors = gensim.models.KeyedVectors.load_word2vec_format('/scepter/word_vectors/GoogleNews-vectors-negative300.bin', binary=True)  

# word_vectors = get_word_vectors()
# target_vocabs = word_to_ix.keys()
# weights_matrix = np.zeros((len(target_vocabs), embedding_dim))
# words_found = 0

# for i, word in enumerate(target_vocabs):
#     try: 
#         weights_matrix[i] = glove[word]
#         words_found += 1
#     except KeyError:
#         weights_matrix[i] = np.random.normal(scale=0.6, size=(embedding_dim, ))
        
        
model = LSTMTagger(embedding_dim, hidden_dim, 
                   len(word_to_ix), len(tag_to_ix), 
                   dropout=dropout,
                   num_layers=num_layers,
                   bidirectional=bidirectional)

loss_function = nn.NLLLoss()

# optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum)
optimizer = optim.SGD(filter(lambda p: p.requires_grad, model.parameters()), lr=learning_rate, momentum=momentum)

if torch.cuda.is_available():
    model.cuda()

In [21]:
# TRAIN

for epoch in range(epochs): # 我們要訓練300次，可以根據任務量的大小酌情修改次數。
    for sentence, tags in dse_train:
        
        # 清除網絡先前的梯度值，梯度值是Pytorch的變量才有的數據，Pytorch張量沒有
        model.zero_grad()
        
        # 重新初始化隱藏層數據，避免受之前運行代碼的干擾
        model.hidden = model.init_hidden()
        
        # 準備網絡可以接受的的輸入數據和真實標籤數據，這是一個監督式學習
#         sentence_in = sequence_to_ixs(sentence, word_to_ix)
        sentence_in = sequence_to_ixs2(sentence)        
        targets = sequence_to_ixs(tags, tag_to_ix)

        # 運行我們的模型，直接將模型名作為方法名看待即可
        tag_scores = model(sentence_in)
        
        # 計算損失，反向傳遞梯度及更新模型參數
        loss = loss_function(tag_scores, targets)
        
        loss.backward()
        
        optimizer.step()
    
    if (epoch + 1) % 5 == 0:
        print("epoch: {}, loss: {}".format(epoch+1, loss))
        torch.save(model.state_dict(), model_path)



epoch: 5, loss: 0.0032185656018555164
epoch: 10, loss: 0.0005312477005645633
epoch: 15, loss: 0.00022537367476616055


KeyboardInterrupt: 

In [8]:
model.load_state_dict(torch.load(model_path))

In [13]:
def get_segments(tag_seq):
    segs = []
    start = -1
    for i, y in enumerate(tag_seq):
        if y == "O": 
            if start != -1: segs.append((start, i))
            start = -1
        elif y == "B":
            if start != -1: segs.append((start, i))
            start = i
        elif y == "I":
            if start == -1: start = i
        else:
            print("WRONG!")
    
    if start != -1 and start != len(tag_seq):
        segs.append((start, len(tag_seq)))
        
    return segs


def evaluate(predicts, trues):
    assert len(predicts) == len(trues)

    precision_prop, recall_prop = .0, .0
    precision_bin, recall_bin = 0, 0
    predict_total, true_total = 0, 0
    
    for y_predict, y_true in zip(predicts, trues):
        assert len(y_predict) == len(y_true)

        predict_segs = get_segments(y_predict)
        true_segs = get_segments(y_true)

        predict_count = len(predict_segs)
        true_count = len(true_segs)
        
        predict_total += predict_count
        true_total += true_count
        
        predict_flags = [False for i in range(predict_count)]
        true_flags = [False for i in range(true_count)]

        for t_i, (t_start, t_end) in enumerate(true_segs):
            for p_i, (p_start, p_end) in enumerate(predict_segs):
                assert p_start != p_end

                l_max = t_start if t_start > p_start else p_start
                r_min = t_end   if t_end   < p_end else p_end
                overlap = r_min - l_max if r_min > l_max else 0

                precision_prop += overlap / (p_end - p_start)
                recall_prop += overlap / (t_end - t_start)

                if not predict_flags[p_i]:
                    precision_bin += (overlap > 0)
                    predict_flags[p_i] = True
                if not true_flags[t_i]:
                    recall_bin += (overlap > 0)
                    true_flags[t_i] = True

    precision = precision_bin / predict_total if predict_total != 0 else 1
    recall = recall_bin / true_total
    f1 = (2 * precision * recall) / (precision + recall)    
    binary_overlap = { 'precision': precision, 'recall': recall, 'f1': f1 }
    
    precision = precision_prop / predict_total if predict_total != 0 else 1
    recall = recall_prop / true_total
    f1 = (2 * precision * recall) / (precision + recall)
    proportional_overlap = { 'precision': precision, 'recall': recall, 'f1': f1 }
    
    return { 'binary': binary_overlap, 'proportional': proportional_overlap }

In [14]:
# TEST
y_predicts, y_trues = [], []
for each in dse_test:
    seq, true_targets = each
#     inputs = sequence_to_ixs(seq, word_to_ix)
    inputs = sequence_to_ixs2(seq)
    predict_targets = model(inputs)
    predict_targets = torch.max(predict_targets, 1)[1].cpu().numpy()
    predict_targets = ixs_to_sequence(predict_targets, ix_to_tag)

    y_predicts.append(predict_targets)
    y_trues.append(true_targets)

# 感覺可以實驗 tag by tag
evaluate(y_predicts, y_trues)



{'binary': {'f1': 0.3995487246738379,
  'precision': 0.4397363465160075,
  'recall': 0.36609152288072017},
 'proportional': {'f1': 0.52473112024155,
  'precision': 0.6306407497085462,
  'recall': 0.44927928410674106}}