In [2]:
import pandas as pd
import math
import os
from abc import ABCMeta
from fastparquet import ParquetFile
from transformers import BertTokenizer, TFBertModel
from sklearn.model_selection import train_test_split
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from typing import Dict, List, Tuple
import tensorflow as tf
from tensorflow.python.lib.io.tf_record import TFRecordWriter

In [3]:
from typing import Dict


class LucyNERModelOutputSequenceInfo:

    __output_sequence_info = {
        1: "-", 2: "B_CV", 3: "I_CV", 4: "I_OG", 5: "I_QT", 6: "I_PS", 7: "B_PS", 8: "B_LC", 9: "B_OG", 10: "B_QT",
        11: "I_AF", 12: "I_DT", 13: "B_DT", 14: "I_TM", 15: "I_LC", 16: "B_AF", 17: "B_TM", 18: "B_AM", 19: "I_EV",
        20: "I_TI", 21: "B_TI", 22: "B_EV", 23: "I_AM", 24: "B_PT", 25: "I_PT", 26: "B_FD", 27: "B_MT", 28: "I_TR",
        29: "I_MT", 30: "B_TR", 31: "I_FD", 0: "PAD"
    }

    @classmethod
    def get_output_sequence_info(cls) -> Dict[int, str]:
        return cls.__output_sequence_info.copy()


class LucyNERLabelInfo:

    __label_infos = {"PS": {"name": "PERSON", "child_labels": ["PS_NAME", "PS_CHARACTER", "PS_PET"]}, "FD": {"name": "STUDY_FIELD", "child_labels": ["FD_SCIENCE", "FD_SOCIAL_SCIENCE", "FD_MEDICINE", "FD_ART", "FD_HUMANITIES", "FD_OTHERS"]}, "TR": {"name": "THEORY", "child_labels": ["TR_SCIENCE", "TR_SOCIAL_SCIENCE", "TR_MEDICINE", "TR_ART", "TR_HUMANITIES", "TR_OTHERS"]}, "AF": {"name": "ARTIFACTS", "child_labels": ["AF_BUILDING", "AF_CULTURAL_ASSET", "AF_ROAD", "AF_TRANSPORT", "AF_MUSICAL_INSTRUMENT", "AF_WEAPON", "AFA_DOCUMENT", "AFA_PERFORMANCE", "AFA_VIDEO", "AFA_ART_CRAFT", "AFA_MUSIC", "AFW_SERVICE_PRODUCTS", "AFW_OTHER_PRODUCTS"]}, "OG": {"name": "ORGANIZATION", "child_labels": ["OGG_ECONOMY", "OGG_EDUCATION", "OGG_MILITARY", "OGG_MEDIA", "OGG_SPORTS", "OGG_ART", "OGG_MEDICINE", "OGG_RELIGION", "OGG_SCIENCE", "OGG_LIBRARY", "OGG_LAW", "OGG_POLITICS", "OGG_FOOD", "OGG_HOTEL", "OGG_OTHERS"]}, "LC": {"name": "LOCATION", "child_labels": ["LCP_COUNTRY", "LCP_PROVINCE", "LCP_COUNTY", "LCP_CITY", "LCP_CAPITALCITY", "LCG_RIVER", "LCG_OCEAN", "LCG_BAY", "LCG_MOUNTAIN", "LCG_ISLAND", "LCG_CONTINENT", "LC_SPACE", "LC_OTHERS"]}, "CV": {"name": "CIVILIZATION", "child_labels": ["CV_CULTURE", "CV_TRIBE", "CV_LANGUAGE", "CV_POLICY", "CV_LAW", "CV_CURRENCY", "CV_TAX", "CV_FUNDS", "CV_ART", "CV_SPORTS", "CV_SPORTS_POSITION", "CV_SPORTS_INST", "CV_PRIZE", "CV_RELATION", "CV_OCCUPATION", "CV_POSITION", "CV_FOOD", "CV_DRINK", "CV_FOOD_STYLE", "CV_CLOTHING", "CV_BUILDING_TYPE"]}, "DT": {"name": "DATE", "child_labels": ["DT_DURATION", "DT_DAY", "DT_WEEK", "DT_MONTH", "DT_YEAR", "DT_SEASON", "DT_GEOAGE", "DT_DYNASTY", "DT_OTHERS"]}, "TI": {"name": "TIME", "child_labels": ["TI_DURATION", "TI_HOUR", "TI_MINUTE", "TI_SECOND", "TI_OTHERS"]}, "QT": {"name": "QUANTITY", "child_labels": ["QT_AGE", "QT_SIZE", "QT_LENGTH", "QT_COUNT", "QT_MAN_COUNT", "QT_WEIGHT", "QT_PERCENTAGE", "QT_SPEED", "QT_TEMPERATURE", "QT_VOLUME", "QT_ORDER", "QT_PRICE", "QT_PHONE", "QT_SPORTS", "QT_CHANNEL", "QT_ALBUM", "QT_ADDRESS", "QT_OTHERS"]}, "EV": {"name": "EVENT", "child_labels": ["EV_ACTIVITY", "EV_WAR_REVOLUTION", "EV_SPORTS", "EV_FESTIVAL", "EV_OTHERS"]}, "AM": {"name": "ANIMAL", "child_labels": ["AM_INSECT", "AM_BIRD", "AM_FISH", "AM_MAMMALIA", "AM_AMPHIBIA", "AM_REPTILIA", "AM_TYPE", "AM_PART", "AM_OTHERS"]}, "PT": {"name": "PLANT", "child_labels": ["PT_FRUIT", "PT_FLOWER", "PT_TREE", "PT_GRASS", "PT_TYPE", "PT_PART", "PT_OTHERS"]}, "MT": {"name": "MATERIAL", "child_labels": ["MT_ELEMENT", "MT_METAL", "MT_ROCK", "MT_CHEMICAL"]}, "TM": {"name": "TERM", "child_labels": ["TM_COLOR", "TM_DIRECTION", "TM_CLIMATE", "TM_SHAPE", "TM_CELL_TISSUE_ORGAN", "TMM_DISEASE", "TMM_DRUG", "TMI_HW", "TMI_SW", "TMI_SITE", "TMI_EMAIL", "TMI_MODEL", "TMI_SERVICE", "TMI_PROJECT", "TMIG_GENRE", "TM_SPORTS"]}}

    @classmethod
    def get_label_name_info(cls) -> Dict[str, str]:

        return {label: cls.__label_infos[label].name for label in cls.__label_infos.keys()}

    @classmethod
    def get_child_label_parent_info(cls) -> Dict[str, str]:

        child_label_parent_info = {}

        for label, info in cls.__label_infos.items():
            for child_label in info["child_labels"]:
                child_label_parent_info[child_label] = label

        return child_label_parent_info

In [4]:
model_name = "bert-base-multilingual-cased"

In [5]:
max_length = 128
tokenizer = BertTokenizer.from_pretrained(model_name)

In [6]:
from typing import List, Union, Dict

from transformers import PreTrainedTokenizer


class LucyNERFeatureParser:

    def __init__(self, max_length: int):

        self.__tokenizer: PreTrainedTokenizer = tokenizer

        self.__max_length = max_length

    def __call__(self, texts: Union[List[str], str]):
        return self.featuring(texts)

    def featuring(self, texts: Union[List[str], str]):

        max_length = self.__max_length

        model_input = self.__tokenizer(
            texts,
            return_tensors='np',
            truncation=True,
            max_length=max_length,
            padding="max_length",
            add_special_tokens=True
        )

        return model_input

    def parse_input_id_to_token(self, input_ids):
        return self.__tokenizer.convert_ids_to_tokens(input_ids)

    def get_id_token_vocab(self) -> Dict:
        return self.__tokenizer.get_vocab()

    def get_token_id_vocab(self) -> Dict:
        return self.__tokenizer.ids_to_tokens

In [7]:
feature_parser =LucyNERFeatureParser(max_length=128)

In [8]:
feature_parser.featuring(['책으로 세상을 바라보고 책으로 삶을 살찌우는 시간 티비 책방 북소리 북마스터 김혜집니다.'])

{'input_ids': array([[   101,   9739,  11467,   9435,  33654,   9318,  17342,  30005,
         11664,   9739,  11467,   9409,  10622,   9408, 119245,  27355,
         11018,   9485,  18784,   9895,  29455,   9739,  42337,   9366,
         22333,  12692,   9366,  23811,  58823,   8935, 119437,  38696,
         48345,    119,    102,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,      0,      0,      0,
             0,      0,      0,      0,      0,   

In [9]:
class NERSentenceDataFeatureAnalyzer:

    def __init__(self):
        self.max_length = max_length
        self.__tokenizer = tokenizer
        self.__child_label_parent_info = LucyNERLabelInfo.get_child_label_parent_info()
        self.__model_feature_parser = LucyNERFeatureParser(max_length=max_length)
        sequence_label_info = LucyNERModelOutputSequenceInfo.get_output_sequence_info()
        self.label_sequence_info = {value: key for key, value in sequence_label_info.items()}

    def featuring(self, sentence_info: Dict):
        return self.__parse_sentence_info_to_feature(sentence_info)

    def __parse_sentence_info_to_feature(self, sentence_info: Dict) -> Dict:
        sentence_id = sentence_info['sentence_id']
        sentence_text = sentence_info['sentence_text']
        sentence_ne_infos = sentence_info['sentence_ne_infos']

        tokens, token_labels, token_ner_ids \
            = self.__tokenize_sentence_and_ne_mapping(sentence_text, sentence_ne_infos)

        # input featuring
        input_ids, attention_mask, token_type_ids = self.__featuring_model_input(sentence_text)

        # output featuring
        ## 앞에 special token을 채우기
        output_ids = self.__parse_label_to_sequence(token_labels)
        output_ids_pad = [1] + output_ids + [1]
        output_ids_pad = pad_sequences([output_ids_pad], maxlen=self.max_length, padding='post',
                                          truncating='post')

        output_ids_pad = output_ids_pad[0].tolist()

        sentence_feature = {
            "sentence_id": sentence_id,
            "tokens": tokens,
            "token_labels": token_labels,
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "token_type_ids": token_type_ids,
            "output_ids": output_ids_pad,
        }

        return sentence_feature

    def __tokenize_sentence_and_ne_mapping(self, sentence_text, sentence_ne_infos):
        # char 별 ne 정보를 사전화
        id_ne_vocab = {}
        char_idx_tag_dict = {}

        for sentence_ne_info in sentence_ne_infos:

            id_ne_vocab[sentence_ne_info['id']] = sentence_ne_info

            for char_idx in range(sentence_ne_info['begin'], sentence_ne_info['end']):
                char_idx_tag_dict[char_idx] = sentence_ne_info

        # 토큰별 ne id 정보를 mapping
        tokenizer = self.__tokenizer
        token_list = []
        token_ner_id_list = []

        s_char_idx = 0
        for word in sentence_text.split(' '):
            word_token_list = tokenizer.tokenize(word)
            token_list.extend(word_token_list)

            for token in word_token_list:

                try:
                    char_tag = char_idx_tag_dict[s_char_idx]
                    tag_id = char_tag['id']
                except KeyError:
                    tag_id = 0

                token_ner_id_list.append(tag_id)

                s_char_idx += len(token.replace('##', ''))

            # 공백 idx
            s_char_idx += 1

        # 토큰 label mapping
        token_ner_list = []
        before_id = 0
        for token_ner_id in token_ner_id_list:

            if token_ner_id == 0:
                token_ner_list.append('-')
                continue

            id_tag = id_ne_vocab[token_ner_id]
            label = id_tag['label']
            label = self.__parse_label_child_to_parent(label)

            if before_id == id_tag['id']:
                label = 'I_' + label
            else:
                label = 'B_' + label

            token_ner_list.append(label)

            before_id = id_tag['id']

        return token_list, token_ner_list, token_ner_id_list

    def __featuring_model_input(self, sentence_text: str) -> Tuple[List[int] ,List[int] ,List[int]]:

        model_input = self.__model_feature_parser.featuring(sentence_text)
        input_ids = model_input["input_ids"][0]
        attention_mask = model_input["attention_mask"][0]
        token_type_ids = model_input["token_type_ids"][0]

        return input_ids.tolist(), attention_mask.tolist(), token_type_ids.tolist()

    def __parse_label_to_sequence(self, labels: List[str]) -> List[int]:

        label_sequence_info = self.label_sequence_info

        sequences = [label_sequence_info[label] for label in labels]

        return sequences

    def __parse_label_child_to_parent(self, label: str) -> str:
        return self.__child_label_parent_info[label]

In [10]:
class NERFeaturingDataLoader(metaclass=ABCMeta):

    def __init__(self, output_dir_path: str):

        self.__create_path_dir(output_dir_path)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        pass

    def load(self):
        pass

    def __create_path_dir(self, output_dir_path: str):
        train_data_dir_path = os.path.join(output_dir_path)
        if not os.path.exists(train_data_dir_path):
            os.makedirs(train_data_dir_path)

In [11]:
class NERFeaturingDataToParquetIDXLoader(NERFeaturingDataLoader):

    def __init__(self, output_dir_path: str):
        super().__init__(output_dir_path)
        self.output_dir_path = output_dir_path

        self.file_idx = 0

    def load(self, data: pd.DataFrame):
        self.__load_parquet_file(data)
        self.file_idx += 1

    def __load_parquet_file(self, data: pd.DataFrame):
        data_path = os.path.join(self.output_dir_path, str(self.file_idx) + ".parquet")
        data.to_parquet(data_path, engine="fastparquet")


In [12]:
class NERFeaturingDataToTFRecordLoader(NERFeaturingDataLoader):

    def __init__(self, output_dir_path: str):
        super().__init__(output_dir_path)

        self.tfrecord_writer = TFRecordWriter(os.path.join(output_dir_path, "dataset.tfrecord"))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        return self.tfrecord_writer.close()

    def load(self, data: pd.DataFrame):
        self.__load_tfrecord(data)

    def __load_tfrecord(self, data: pd.DataFrame):

        input_ids = data['input_ids'].values.tolist()
        attention_mask = data['attention_mask'].values.tolist()
        token_type_ids = data['token_type_ids'].values.tolist()
        output_ids = data['output_ids'].values.tolist()

        for i in range(len(input_ids)):
            feature = {
                'input_ids': self.__int64_list_feature(input_ids[i]),
                'attention_mask': self.__int64_list_feature(attention_mask[i]),
                'token_type_ids': self.__int64_list_feature(token_type_ids[i]),
                'output_ids': self.__int64_list_feature(output_ids[i]),
            }

            tf_record_example = tf.train.Example(features=tf.train.Features(feature=feature))
            self.tfrecord_writer.write(tf_record_example.SerializeToString())

    def __int64_list_feature(self, value: List):
        return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

In [13]:
class DataSpliter:

    @classmethod
    def split(cls, *arg, test_size=0.1):
        data1, data2 = train_test_split(
            *arg,
            shuffle=True, random_state=777, test_size=test_size
        )

        return data1, data2

In [14]:
class NERDataFeaturingAndSplitTransfer:

    def __init__(self):

        self.feature_analyzer = NERSentenceDataFeatureAnalyzer()

    def etl(self, input_parquet_data_dir_path: str, output_dir_path: str, save_type="tfrecord"):

        train_data_loader, test_data_loader, valid_data_loader = self.__get_data_loaders(save_type, output_dir_path)

        ner_data_files = ParquetFile(input_parquet_data_dir_path)
        analysis_columns = ['sentence_id', 'sentence_text', 'sentence_ne_infos']

        for ner_datas in ner_data_files.iter_row_groups():
            ner_datas_by_analysis = ner_datas[analysis_columns]

            featured_datas = [self.feature_analyzer.featuring(row) for idx, row in ner_datas_by_analysis.iterrows()]
            featured_datas = pd.DataFrame(featured_datas)

            train_datas, test_datas = DataSpliter.split(featured_datas, test_size=0.2)

            train_datas, valid_datas = DataSpliter.split(train_datas, test_size=0.1)

            train_data_loader.load(train_datas)
            test_data_loader.load(test_datas)
            valid_data_loader.load(valid_datas)

        # train_data_loader.close()
        # test_data_loader.close()
        # valid_data_loader.close()

    def __get_data_loaders(self, save_type, output_dir_path):
        train_data_dir_path = os.path.join(output_dir_path, "train_data")
        test_data_dir_path = os.path.join(output_dir_path, "test_data")
        valid_data_dir_path = os.path.join(output_dir_path, "valid_data")

        if save_type == "tfrecord":
            train_data_loader = NERFeaturingDataToTFRecordLoader(train_data_dir_path)
            test_data_loader = NERFeaturingDataToTFRecordLoader(test_data_dir_path)
            valid_data_loader = NERFeaturingDataToTFRecordLoader(valid_data_dir_path)
        elif save_type == "parquet":
            train_data_loader = NERFeaturingDataToParquetIDXLoader(train_data_dir_path)
            test_data_loader = NERFeaturingDataToParquetIDXLoader(test_data_dir_path)
            valid_data_loader = NERFeaturingDataToParquetIDXLoader(valid_data_dir_path)

        return train_data_loader, test_data_loader, valid_data_loader

In [15]:
transfer = NERDataFeaturingAndSplitTransfer()

In [16]:
extract_parquet_dir_path = '/home/woohyun/NER/data/k_corpus/result/kor_ner_parquet/'
output_dir_path = f'/home/woohyun/NER/data/k_corpus/featured_data/{model_name}'
save_format = 'parquet'

In [17]:
transfer.etl(extract_parquet_dir_path, output_dir_path, save_format)