In [None]:
import random
import json
from tqdm import tqdm
import unicodedata
from collections import OrderedDict
import os

import torch
from torch.utils.data import DataLoader

import onnx
import onnxruntime as ort
from onnxruntime.quantization import quantize_dynamic, QuantType

In [None]:
# local modules
from ner_tokenizer_bio import NER_tokenizer_BIO
from bert_for_token_classification_pl import BertForTokenClassification_pl

In [None]:
BEST_MODEL_PATH     = './model/epoch=4-step=660.ckpt'
TOKENIZER_PATH      = './model/iot-nlu-tokenizer'
ONNX_FILE_PATH      = './model/iot-nlu.onnx'
ONNX_INT8_FILE_PATH = './model/iot-nlu-ui8.onnx'

# インテントの種類数 (None=0, LED_ON=1, LED_OFF=2, READ_THERMO=3, OPEN=4, CLOSE=5, SET_TEMP=6)
NUM_INTENT_LABELS = 7

# スロットの種類数 (COL=1, COLLTDEV=2, LOC=3, ONOFFDEV=4, OPENABLE=5, TEMPDEV=6, TEMPERTURE_NUM=7, THMDEV=8)
NUM_ENTITY_TYPE   = 8

In [None]:
# トークナイザのロード
# 固有表現のカテゴリーの数`num_entity_type`を入力に入れる必要がある。
tokenizer = NER_tokenizer_BIO.from_pretrained(
    TOKENIZER_PATH,
    num_entity_type=NUM_ENTITY_TYPE
)

In [None]:
# export plain onnx to ui8 onnx
quantize_dynamic(
    ONNX_FILE_PATH,
    ONNX_INT8_FILE_PATH,
    weight_type=QuantType.QUInt8,
)

def print_sizel(file_path):
    print('Size (MB):', os.path.getsize(file_path)/1e6)

print_sizel(ONNX_FILE_PATH)
print_sizel(ONNX_INT8_FILE_PATH)

In [None]:
# データのロード
dataset = json.load(open('data/nlp_data.json','r'))

# カテゴリーをラベルに変更、文字列の正規化する。
for sample in dataset:
    sample['text'] = unicodedata.normalize('NFKC', sample['text'])

# データセットの分割
random.shuffle(dataset)
dataset = dataset[:10000]
n       = len(dataset)
n_train = int(n*0.6)
n_val   = int(n*0.2)
dataset_train = dataset[:n_train]
dataset_val   = dataset[n_train:n_train+n_val]
dataset_test  = dataset[n_train+n_val:]

In [None]:
def create_dataset(tokenizer, dataset, max_length):
    """
    データセットをデータローダに入力できる形に整形。
    """
    dataset_for_loader = []
    for sample in dataset:
        text = sample['text']
        entities = sample['entities']
        encoding = tokenizer.encode_plus_tagged(
            text, entities, max_length=max_length
        )
        encoding['intent_label'] = sample['intent']
        encoding = { k: torch.tensor(v) for k, v in encoding.items() }
        dataset_for_loader.append(encoding)
    return dataset_for_loader

# データセットの作成
max_length = 128
dataset_train_for_loader = create_dataset(
    tokenizer, dataset_train, max_length
)
dataset_val_for_loader = create_dataset(
    tokenizer, dataset_val, max_length
)

# データローダの作成
dataloader_train = DataLoader(
    dataset_train_for_loader, batch_size=32, shuffle=True
)
dataloader_val  = DataLoader(dataset_val_for_loader, batch_size=256)

In [None]:
def evaluate_model(entities_list, entities_predicted_list, type_id=None):
    """
    正解と予測を比較し、モデルの固有表現抽出の性能を評価する。
    type_idがNoneのときは、全ての固有表現のタイプに対して評価する。
    type_idが整数を指定すると、その固有表現のタイプのIDに対して評価を行う。
    """
    num_entities    = 0 # 固有表現(正解)の個数
    num_predictions = 0 # BERTにより予測された固有表現の個数
    num_correct     = 0 # BERTにより予測のうち正解であった固有表現の数
    indices_incorrect = []
    
    # それぞれの文章で予測と正解を比較。
    # 予測は文章中の位置とタイプIDが一致すれば正解とみなす。
    counter = 0
    for entities, entities_predicted \
        in zip(entities_list, entities_predicted_list):

        if type_id:
            entities = [ e for e in entities if e['type_id'] == type_id ]
            entities_predicted = [ 
                e for e in entities_predicted if e['type_id'] == type_id
            ]
            
        get_span_type = lambda e: (e['span'][0], e['type_id'])
        set_entities = set( get_span_type(e) for e in entities )
        set_entities_predicted = \
            set( get_span_type(e) for e in entities_predicted )

        num_entities += len(entities)
        num_predictions += len(entities_predicted)
        num_correct += len( set_entities & set_entities_predicted )
        
        # debug
        if(len(set_entities) != len( set_entities & set_entities_predicted )):
            indices_incorrect.append(counter)
        #    print(set_entities)
        #    print(set_entities_predicted)

        counter += 1
    
    # 指標を計算
    precision = num_correct/num_predictions # 適合率
    recall = num_correct/num_entities # 再現率
    f_value = 2*precision*recall/(precision+recall) # F値

    result = {
        'num_entities': num_entities,
        'num_predictions': num_predictions,
        'num_correct': num_correct,
        'precision': precision,
        'recall': recall,
        'f_value': f_value
    }

    #print(indices_incorrect)
    return result

def run_model(session, dataset):
    intents_list            = [] # 正解インテントを追加している
    intents_predicted_list  = [] # 分類されたインテントを追加していく
    entities_list           = [] # 正解の固有表現を追加していく
    entities_predicted_list = [] # 抽出された固有表現を追加していく

    for sample in tqdm(dataset):
        text = sample['text']
        encoding, spans = tokenizer.encode_plus_untagged(
            text, return_tensors='pt'
        )
        encoding = { k: v.cpu() for k, v in encoding.items() } 
        inputs = {
            "input_ids"      : encoding["input_ids"     ].cpu().numpy(),
            "attention_mask" : encoding["attention_mask"].cpu().numpy(),
            "token_type_ids" : encoding["token_type_ids"].cpu().numpy()
        }

        with torch.no_grad():
            total_loss, logits_intent, logits_slot = session.run( None, inputs)
            scores_intent = logits_intent
            scores_slots  = logits_slot[0]

        # 分類スコアを固有表現に変換する
        entities_predicted = tokenizer.convert_bert_output_to_entities(
            text, scores_slots, spans
        )

        intents_list.append(sample['intent'])
        intents_predicted_list.append(scores_intent.argmax(-1)[0])
        entities_list.append(sample['entities'])
        entities_predicted_list.append( entities_predicted )
    
    return intents_list, intents_predicted_list, entities_list, entities_predicted_list

def run_onnx_evaluation(session, dataset):
    outputs = run_model(session, dataset)
    intents_list            = outputs[0]
    intents_predicted_list  = outputs[1]
    entities_list           = outputs[2]
    entities_predicted_list = outputs[3]
    
    # インテント分類スコア
    counter = 0.0
    for pred, truth in zip(intents_predicted_list, intents_list):
        counter += float(pred == truth)
    print('intent classification accuracy = ', counter/len(intents_list))
    # 固有表現抽出スコア
    print(evaluate_model(entities_list, entities_predicted_list))

In [None]:
sess_fp32 = ort.InferenceSession(
    ONNX_FILE_PATH,
    providers=['CUDAExecutionProvider']
)
run_onnx_evaluation(sess_fp32, dataset_test)
del sess_fp32

In [None]:
sess_i8 = ort.InferenceSession(
    ONNX_INT8_FILE_PATH,
    providers=['TensorrtExecutionProvider']
)
run_onnx_evaluation(sess_i8, dataset_test)
del sess_i8

In [None]:
text = unicodedata.normalize('NFKC', '会議室にある黄色い電灯の火を点灯してくださいな')
encoding, spans = tokenizer.encode_plus_untagged(
    text, return_tensors='pt'
)
encoding = { k: v.cpu() for k, v in encoding.items() } 

inputs = {
    "input_ids": encoding["input_ids"].cpu().numpy(),
    "attention_mask": encoding["attention_mask"].cpu().numpy(),
    "token_type_ids": encoding["token_type_ids"].cpu().numpy()
}

ort_session = ort.InferenceSession(
    ONNX_INT8_FILE_PATH,
    providers=['CUDAExecutionProvider']
)
total_loss, logits_intent, logits_slot = ort_session.run( None, inputs)
scores_intent = logits_intent
scores_slots  = logits_slot[0]

# Intent 分類スコアを Intent に変換する
intent = scores_intent.argmax(-1)[0]
# Slot 分類スコアを固有表現に変換する
entities_predicted = tokenizer.convert_bert_output_to_entities(
    text, scores_slots, spans
)

print("入力",text)
print("予測 intent  :", intent)
print("予測 entities:", json.dumps(entities_predicted, indent=2, ensure_ascii=False))