In [None]:
%matplotlib inline
import re
import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

from tqdm import tqdm
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical

data_dir = os.path.join(os.getcwd(), 'data')

print (tf.__version__)
print (data_dir)

In [None]:
def read_ner_data(data_dir:str, file_name:str):
    with open(os.path.join(data_dir, file_name), mode = 'r', encoding= 'utf-8') as f:
        data = [line.strip().splitlines() for line in f.read().split('\n=====\n') if line.strip()]
        data = [[tuple(tk.split('\t')) for tk in line] for line in data]
    return data

%time train_sents = list(read_ner_data(data_dir, 'train.txt'))
print ('n_Train Data set: %d\n'%len(train_sents))

%time test_sents = list(read_ner_data(data_dir, 'test.txt'))
print ('n_Test Data set: %d\n'%len(test_sents))

In [None]:
from pprint import pprint 

line = train_sents[0]
pprint (line)
print ('\n==================\n')

sentence = np.array(line)[:,0].tolist()
bio_tags = np.array(line)[:,2].tolist()
print (sentence, '\n', bio_tags)

In [None]:
sentences = [np.array(line)[:,0].tolist() for line in tqdm(train_sents)]
bio_tags = [np.array(line)[:,2].tolist() for line in tqdm(train_sents)]

print ('\n')
print (sentences[0])
print (bio_tags[0])

In [None]:
print('샘플의 최대 길이 : %d' % max(len(l) for l in sentences))
print('샘플의 평균 길이 : %f' % (sum(map(len, sentences))/len(sentences)))
plt.hist([len(s) for s in sentences], bins=50)
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.show()

In [None]:
input_tokenizer = Tokenizer(oov_token='OOV') # 모든 단어를 사용하지만 인덱스 1에는 단어 'OOV'를 할당한다.
input_tokenizer.fit_on_texts(sentences)
output_tokenizer = Tokenizer(lower=False) # 태깅 정보들은 내부적으로 대문자를 유지한채로 저장
output_tokenizer.fit_on_texts(bio_tags)

In [None]:
# Tokenizer를 통해, 데이터 인코딩
X_train = input_tokenizer.texts_to_sequences(sentences)
y_train = output_tokenizer.texts_to_sequences(bio_tags)

print ('원본   데이터  : ', sentences[0])
print ('인코딩   데이터: ', X_train[0], '\n')
print ('원본   BIO Tags: ', bio_tags[0])
print ('인코딩 BIO Tags: ', y_train[0])

In [None]:
word2index = input_tokenizer.word_index
index2word = input_tokenizer.index_word
tags2index = output_tokenizer.word_index
index2tags = output_tokenizer.index_word
index2tags[0] = 'PAD'

index2tags

In [None]:
vocab_size = len(input_tokenizer.word_index) + 1
tag_size = len(output_tokenizer.word_index) + 1
print('단어 집합의 크기 : {}'.format(vocab_size))
print('개체명 태깅 정보 집합의 크기 : {}'.format(tag_size))
print('단어 OOV의 인덱스 : {}'.format(input_tokenizer.word_index['OOV']))


In [None]:
max_seq_len = 75
X_train = pad_sequences(X_train, padding='post', maxlen=max_seq_len)
y_train = pad_sequences(y_train, padding='post', maxlen=max_seq_len)
y_train[0]

In [None]:
# Test Data 준비
X_test = [np.array(line)[:,0].tolist() for line in tqdm(test_sents)]
X_test = input_tokenizer.texts_to_sequences(X_test)
X_test = pad_sequences(X_test, padding='post', maxlen=max_seq_len)

y_test = [np.array(line)[:,2].tolist() for line in tqdm(test_sents)]
y_test = output_tokenizer.texts_to_sequences(y_test)
y_test = pad_sequences(y_test, padding='post', maxlen=max_seq_len)

In [None]:
y_train = to_categorical(y_train, num_classes=tag_size)
y_test = to_categorical(y_test, num_classes=tag_size)

print('훈련 샘플 문장의 크기 : {}'.format(X_train.shape))
print('훈련 샘플 레이블의 크기 : {}'.format(y_train.shape))
print('테스트 샘플 문장의 크기 : {}'.format(X_test.shape))
print('테스트 샘플 레이블의 크기 : {}'.format(y_test.shape))

In [None]:
%%time
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Bidirectional, TimeDistributed, Embedding
from tensorflow.keras.optimizers import Adam

model = Sequential()
model.add(Embedding(vocab_size, 128, input_length=max_seq_len, mask_zero=True))
model.add(Bidirectional(LSTM(256, return_sequences=True)))
model.add(TimeDistributed(Dense(tag_size, activation='softmax')))
model.compile(loss='categorical_crossentropy', optimizer=Adam(0.001), metrics=['accuracy'])

model.fit(X_train, y_train, batch_size=128, epochs=3,  validation_data=(X_test, y_test))

In [None]:
model.evaluate(X_test, y_test)

In [None]:
from sklearn_crfsuite import scorers, metrics

labels = list(index2tags.values())
labels.remove('O')
labels.remove('PAD')

true_y = np.argmax(y_test, axis=-1)
true_y = [[index2tags[idx] for idx in true_tags if idx != 0] for true_tags in tqdm(true_y)]

prd_y = model.predict(np.array(X_test))
prd_y = np.argmax(prd_y, axis=-1)
prd_y = [np.array(list(zip(true_y[i], prd_y[i])))[:,1].tolist() 
    for i in range(len(prd_y))]
prd_y = [[index2tags[int(idx)] for idx in prd_tags] for prd_tags in tqdm(prd_y)]


f1_score = metrics.flat_f1_score(true_y, prd_y, average='weighted', labels=labels)
print ('\n\n========= Model Validataion =============\n')
print ('F1 score: %0.3f'%f1_score)

sorted_labels = sorted(
    labels, 
    key=lambda name: (name[1:], name[0])
)

print (metrics.flat_classification_report(
    true_y, 
    prd_y, 
    labels=sorted_labels, 
    digits=3
    ))