In [18]:
import pickle

import numpy as np
from seqeval.metrics import f1_score, classification_report
from tensorflow import keras
import tensorflow as tf

from config import resources as rsc
from config import hyperparams as hparams

In [19]:
def read_file(file):
    """데이터 파일을 파싱

    파일은
        1. 세미콜론으로 시작하는 원본 문장,
        2. 그 다음엔 달러 기호로 시작하는 BIO 태깅된 문장,
        3. 그 이후부터 다음 원본 문장 전까지는 각 토큰의 피처들의 나열(ID, token, POS, BIO)
    이렇게 세 종류의 줄로 이루어져 있다.

    :param file: BIO 토큰 정보를 가진 파일.
    :return: 각 줄의 모든 피처를 열거한 리스트: [feats_sentence1, feats_sentence2, ...]
    """

    # 어떤 라인인지 파악하기 위한 조건
    def is_original_line(cur_i, lines):
        if cur_i >= len(lines) - 1:
            return False
        else:
            cur_line, next_line = lines[cur_i], lines[cur_i + 1]
            cur_first_char = cur_line[0]
            next_first_char = next_line[0]
            return cur_first_char == ";" and next_first_char == '$'

    def is_ner_processed_line(cur_i, lines):
        cur_line = lines[cur_i]
        first_char = cur_line[0]
        return first_char == "$"

    def is_end_of_datapoint(line):
        first_char = line[0]
        return first_char == "\n"

    features_all_lines = []  # 파일 내 모든 문장의 토큰 피처들
    with open(file, 'r', encoding="utf-8") as fin:
        lines = fin.readlines()

        for i, line in enumerate(lines):
            if is_original_line(i, lines):
                features_a_line = []
            elif is_ner_processed_line(i, lines):
                continue
            elif is_end_of_datapoint(line):
                features_all_lines.append(features_a_line)
            else:
                features_a_line.append(line.split())
    return features_all_lines


def preprocess(corpus):
    """데이터 전처리 후 데이터셋 생성.

    :param corpus: `read_file`의 리턴.
    :return: dataset, maxlen, word_size, tag_size. 뒤 세 요소는 `build_model`에 쓰인다.
        dataset: SliceDataset.
            [0]: 인코딩된 고정 길이 토큰 시퀀스,
            [1]: 원-핫 태그. 토큰 시퀀스가 어느 태그에 속하는지 의미.
    """

    def divide_words_tags():
        """tokens, tags 따로 수집.

        샘플별로 token과 BIO 태그를 분리한다.

        :return: (tokens, bio_tags)
        """
        words_all_sentences, tags_all_sentences = [], []
        for info_about_a_sentence in corpus:
            words_a_sentence, tags_a_sentence = [], []

            for features_a_token in info_about_a_sentence:
                word, tag = features_a_token[1], features_a_token[3]
                words_a_sentence.append(word)
                tags_a_sentence.append(tag)

            words_all_sentences.append(words_a_sentence)
            tags_all_sentences.append(tags_a_sentence)
        return words_all_sentences, tags_all_sentences

    def encode_sequences(sequences, **tokenizer_opts):
        """문자열 시퀀스를 정수형으로 인코딩.

        :param sequences: words_all_sentences와 tags_all_sentences 두 가지 중 하나.
        :param tokenizer_opts: 토크나이저 초기화 시 줄 옵션.
            - words_all_sentences면 `oov_token="OOV"`
            - tags_all_sentences면 `lower=False` 부여.
        :return: 토크나이저에 의해 인코딩된 시퀀스. 시퀀스 토큰 공간 크기, 최장 시퀀스 길이 세 가지.

        """
        tokenizer = keras.preprocessing.text.Tokenizer(**tokenizer_opts)
        tokenizer.fit_on_texts(sequences)
        index_word = tokenizer.index_word
        index_word[0] = 'PAD'
        encoded_sequences = tokenizer.texts_to_sequences(sequences)
        vocab_size = len(index_word) + 1
        maxlen = max(map(lambda sequence: len(sequence), encoded_sequences))

        return encoded_sequences, vocab_size, maxlen

    words_all_sentences, tags_all_sentences = divide_words_tags()
    enc_word_sequences, word_size, maxlen = encode_sequences(
        words_all_sentences, oov_token="OOV")
    padded_word_sequences = keras.preprocessing.sequence.pad_sequences(
        enc_word_sequences, maxlen=maxlen, padding="post")

    enc_tag_sequences, tag_size, _ = encode_sequences(
        tags_all_sentences, lower=False)
    padded_tag_sequences = keras.preprocessing.sequence.pad_sequences(
        enc_tag_sequences, maxlen=maxlen, padding="post")

    one_hot_tags = tf.keras.utils.to_categorical(
        padded_tag_sequences, num_classes=tag_size)
    dataset = tf.data.Dataset.from_tensor_slices(
        (padded_word_sequences, one_hot_tags))

    return dataset, maxlen, word_size, tag_size


def build_model(input_len,
                vocab_size: int,
                tag_size: int):
    """NER 모델을 빌드한다.

    입력 shape = (len_seq,). 정수형 인코딩된 토큰 시퀀스를 하나 받는다.
    출력 shape = (tag_space_size,). 토큰 시퀀스가 각 BIO 태그일 확률을 나타낸다.

    :param input_len: 입력 토큰 시퀀스의 길이. 입력된느 모든 시퀀스의 길이는 일정해야 한다.
    :param vocab_size: 토큰 공간의 크기.
    :param tag_size: BIO 태그 공간의 크기.
    :return: 컴파일된 모델.
    """
    input = keras.layers.Input(shape=[input_len, ])
    embedding = keras.layers.Embedding(
        input_dim=vocab_size,
        output_dim=30,
        mask_zero=True,  # 시퀀스를 0으로 패딩했다면 True로 값을 설정해 그 0이 패딩임을 고지.
        input_length=input_len)(input)
    bi_lstm = keras.layers.Bidirectional(
        tf.keras.layers.LSTM(
            200,
            return_sequences=True,
            dropout=.5,
            # T면 [timesteps, batch, feature] F면 [batch, timesteps, feature]의 입력
            # 형상으로 True일 떄가 연산 효율은 더 좋다(기본값 False).
            # time_major= True
            recurrent_dropout=.25))(embedding)
    dense = keras.layers.TimeDistributed(
        keras.layers.Dense(tag_size, activation="softmax"))(bi_lstm)

    bi_lstm_model = keras.Model(inputs=input, outputs=dense)
    bi_lstm_model.compile(loss="categorical_crossentropy",
                          optimizer=tf.keras.optimizers.legacy.Adam(),
                          metrics=["accuracy"])

    return bi_lstm_model


## 데이터 처리

In [43]:
corpus = read_file(rsc.train["ner"])
trainset, maxlen, word_size, tag_size = preprocess(corpus)
trainset = trainset.batch(hparams.ner["batch_size"])

## 모델 빌드

In [45]:
bi_lstm_model = build_model(input_len=maxlen,
                            vocab_size=word_size,
                            tag_size=tag_size)



In [None]:
early_stopping = keras.callbacks.EarlyStopping(monitor="loss",
                                               patience=5,
                                               mode="auto")

bi_lstm_history = bi_lstm_model.fit(trainset,
                                    epochs=hparams.ner["epochs"],
                                    callbacks=[early_stopping])
bi_lstm_model.save(rsc.model["ner"])

Epoch 1/1000


2023-01-20 00:43:54.708377: W tensorflow/tsl/platform/profile_utils/cpu_utils.cc:128] Failed to get CPU frequency: 0 Hz
2023-01-20 00:43:54.714318: I tensorflow/core/grappler/optimizers/custom_graph_optimizer_registry.cc:114] Plugin optimizer for device_type GPU is enabled.


 13/485 [..............................] - ETA: 58:15:31 - loss: 2.2081 - accuracy: 0.7644

## 테스트 데이터로 성능 측정

In [None]:
def postprocess():
    pass


In [None]:
loaded_model = tf.keras.models.load_model(rsc.model["ner"])

In [None]:
def sequences_to_tag(sequences, mapping_dict):
    """"one-hot 결과를 읽을 수 있는 문자로 변환한다.

    각 시퀀스는 토큰별 one-hot 추론 결과를 원소로 담고 있다. [num_seq, len_seq, len_one_hot]

    :param sequences: 추론된 원-핫 형식 태그 콜렉션. 추론 함수의 결과 텐서를 기대한다.
    :param mapping_dict: 인코딩된 태그값에 해당하는 원래 문자 태그의 매핑.
    :return: `mapping_dict`에 의해 문자로 변환된 시퀀스들.
    """
    tags_all_sequences = []
    for sequence in sequences:
        tags_a_sequence = []
        for one_hot_score in sequence:
            most_likely_index = np.argmax(one_hot_score)
            pred_tag = mapping_dict[most_likely_index].replace("PAD", "O")
            tags_a_sequence.append(pred_tag)
        tags_all_sequences.append(tags_a_sequence)

    return tags_all_sequences

In [None]:
# 시간이 걸리므로 pickle 직렬화 보존하자.
decoded_pred_tags = sequences_to_tag(pred_y, tag_index_word)
decoded_test_tags = sequences_to_tag(test_y, tag_index_word)

In [None]:
with open("data/corpus/korean_ner/decoded_pred_tags.pickle", "wb") as fout:
    pickle.dump(decoded_pred_tags, fout)

with open("data/corpus/korean_ner/decoded_test_tags", "wb") as fout:
    pickle.dump(decoded_test_tags, fout)

In [None]:
report = classification_report(decoded_test_tags, decoded_pred_tags)
f1 = f1_score(decoded_test_tags, decoded_pred_tags)

In [None]:
print(report)
print(f1)