In [3]:
import sys
sys.path.append('/opt/ml/ocr/ner/ner_api/app')

from __future__ import absolute_import, division, print_function, unicode_literals
import json
import pickle
import torch
import os
from gluonnlp.data import SentencepieceTokenizer
from model.net import KobertCRF
from data_utils.utils import Config
from data_utils.vocab_tokenizer import Tokenizer
from data_utils.vocab_tokenizer import Vocabulary
from data_utils.pad_sequence import keras_pad_fn
from pathlib import Path

import warnings
warnings.filterwarnings(action='ignore')


2022-06-04 05:09:49.095091: W tensorflow/stream_executor/platform/default/dso_loader.cc:59] Could not load dynamic library 'libcudart.so.10.1'; dlerror: libcudart.so.10.1: cannot open shared object file: No such file or directory
2022-06-04 05:09:49.095133: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
  warn('"Twitter" has changed to "Okt" since KoNLPy v0.4.5.')


In [4]:
class DecoderFromNamedEntitySequence():
    def __init__(self, tokenizer, index_to_ner):
        self.tokenizer = tokenizer
        self.index_to_ner = index_to_ner

    def __call__(self, list_of_input_ids, list_of_pred_ids):
        input_token = self.tokenizer.decode_token_ids(list_of_input_ids)[0]
        pred_ner_tag = [self.index_to_ner[pred_id] for pred_id in list_of_pred_ids[0]]

        list_of_ner_word = []
        entity_word, entity_tag, prev_entity_tag = "", "", ""
        for i, pred_ner_tag_str in enumerate(pred_ner_tag):
            if "B-" in pred_ner_tag_str:
                entity_tag = pred_ner_tag_str[-3:]

                if prev_entity_tag != entity_tag and prev_entity_tag != "":
                    list_of_ner_word.append({"word": entity_word.replace("▁", " "), "tag": prev_entity_tag})

                entity_word = input_token[i]
                prev_entity_tag = entity_tag
            elif "I-"+entity_tag in pred_ner_tag_str:
                entity_word += input_token[i]
            else:
                if entity_word != "" and entity_tag != "":
                    list_of_ner_word.append({"word":entity_word.replace("▁", " "), "tag":entity_tag})
                entity_word, entity_tag, prev_entity_tag = "", "", ""

        return list_of_ner_word

## Namecard Test

In [5]:
ROOT_PATH = Path(os.getcwd())

In [12]:
from typing import List, Tuple, Dict

model_dir = Path(ROOT_PATH / 'config')
model_config = Config(json_path=model_dir / 'config.json')

tok_path = str(ROOT_PATH / 'app/ptr_lm_model/tokenizer_78b3253a26.model')

checkpoint_path = str(ROOT_PATH / 'checkpoints/best-epoch-12-step-1000-acc-0.961.bin')
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


In [13]:
def get_vocab_tokenizer(tok_path: str) -> Tuple[SentencepieceTokenizer, Vocabulary]:
    ptr_tokenizer = SentencepieceTokenizer(tok_path)

    with open(model_dir / "vocab.pkl", 'rb') as f:
        vocab = pickle.load(f)
    
    return ptr_tokenizer, vocab

In [14]:
def get_ner_to_index(model_dir: str) -> Tuple[Dict, Dict]:
    with open(model_dir / "ner_to_index.json", 'rb') as f:
        ner_to_index = json.load(f)
        index_to_ner = {v: k for k, v in ner_to_index.items()}
    return ner_to_index, index_to_ner

In [15]:
def get_model(model_config: Config, ner_to_index: Dict, vocab: Vocabulary, checkpoint_path: checkpoint_path = None):
    model = KobertCRF(config=model_config, num_classes=len(ner_to_index), vocab=vocab)
    model_dict = model.state_dict()

    if checkpoint_path:
        checkpoint = torch.load(checkpoint_path, map_location=torch.device('cpu'))
        convert_keys = {}
        for k, v in checkpoint['model_state_dict'].items():
            new_key_name = k.replace("module.", '')
            if new_key_name not in model_dict:
                print("{} is not int model_dict".format(new_key_name))
                continue
            convert_keys[new_key_name] = v

        model.load_state_dict(convert_keys)
    
    model.to(device)
    return model

In [16]:
def get_ner_result(all_text_list: List) -> List:
    ner_result = []

    for input_text in all_text_list:
        
        list_of_input_ids = tokenizer.list_of_string_to_list_of_cls_sep_token_ids([input_text])
        x_input = torch.tensor(list_of_input_ids).long().to(device)
        list_of_pred_ids = model(x_input)

        list_of_ner_word = decoder_from_res(list_of_input_ids=list_of_input_ids, list_of_pred_ids=list_of_pred_ids)
        ner_result.append([word for word in list_of_ner_word if word['tag'] == 'PER' or word['tag'] == 'ORG'])

    return ner_result


In [17]:
ptr_tokenizer, vocab = get_vocab_tokenizer(tok_path)
tokenizer = Tokenizer(vocab=vocab, split_fn=ptr_tokenizer, pad_fn=keras_pad_fn, maxlen=model_config.maxlen)
ner_to_index, index_to_ner = get_ner_to_index(model_dir)
model = get_model(model_config, ner_to_index, vocab, checkpoint_path)

model.eval()
decoder_from_res = DecoderFromNamedEntitySequence(tokenizer=tokenizer, index_to_ner=index_to_ner)

In [18]:
all_text_list = ['김민수 삼성전자 tom 서울특별시 박민수 영등포구 123-45 010-1234-5687', '경기도 수원시 장안구 010-5456-5654 이지연 LG전자']

In [27]:
ner_result = get_ner_result(all_text_list)

In [28]:
ner_result

[[{'word': ' 김민수', 'tag': 'PER'},
  {'word': ' 삼성전자', 'tag': 'ORG'},
  {'word': ' 박민수', 'tag': 'PER'}],
 [{'word': ' 이지연', 'tag': 'PER'}, {'word': ' LG전자', 'tag': 'ORG'}]]

In [29]:
def get_result_list(ner_result: List) -> List:
    result_list = []

    for namecard in ner_result:
        dict_per_namecard = {'PER': [], 'ORG': []}
        for word_and_tag in namecard:
            word, tag = word_and_tag.values()
            if tag == 'PER':
                dict_per_namecard['PER'].append(word)
            elif tag == 'ORG':
                dict_per_namecard['ORG'].append(word)
        
        result_list.append(dict_per_namecard)
    
    return result_list

In [30]:
result_dict = get_result_list(ner_result)

In [31]:
result_dict

[{'PER': [' 김민수', ' 박민수'], 'ORG': [' 삼성전자']},
 {'PER': [' 이지연'], 'ORG': [' LG전자']}]