# Modules and Global Variables

In [None]:
from transformers import (
    AutoTokenizer, AutoModelForSequenceClassification, 
)

import torch, copy, json, re, os

In [None]:
### new
entity_property_pair = [
    '본품#가격', '본품#다양성', '본품#디자인', '본품#인지도', '본품#일반', '본품#편의성', '본품#품질',
    '브랜드#가격', '브랜드#디자인', '브랜드#인지도', '브랜드#일반', '브랜드#품질',
    '제품 전체#가격', '제품 전체#다양성', '제품 전체#디자인', '제품 전체#인지도', '제품 전체#일반', '제품 전체#편의성', '제품 전체#품질',
    '패키지/구성품#가격', '패키지/구성품#다양성', '패키지/구성품#디자인', '패키지/구성품#일반', '패키지/구성품#편의성', '패키지/구성품#품질'
]

more_tokens = ['&name&', '&affiliation&', '&social-security-num&', '&tel-num&', '&card-num&', '&bank-account&', '&num&', '&online-account&']

tf_id_to_name = ['True', 'False']
tf_name_to_id = {tf_id_to_name[i]: i for i in range(len(tf_id_to_name))}

polarity_id_to_name = ['positive', 'negative', 'neutral']
polarity_name_to_id = {polarity_id_to_name[i]: i for i in range(len(polarity_id_to_name))}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

NGPU = torch.cuda.device_count()
if NGPU > 1:
    model = torch.nn.DataParallel(model, device_ids=list(range(NGPU)))

len(entity_property_pair)

25

In [None]:
ACD_CHECKPOINT = '/content/drive/MyDrive/aspect_based_sentiment_analysis/aspect_category_detection/klue_roberta_small/klue_roberta_small_testrun/klue_roberta_small_testrun/checkpoint-75025'
ASC_CHECKPOINT = '/content/drive/MyDrive/aspect_based_sentiment_analysis/aspect_sentiment_classification/klue_roberta_small/klue_roberta_small_testrun/klue_roberta_small_testrun/checkpoint-22358'

ROOT_PATH = '/content/drive/MyDrive/aspect_based_sentiment_analysis'
TEST_DATA_PATH = '/content/drive/MyDrive/aspect_based_sentiment_analysis/data/NIKL_ABSA_2022_COMPETITION/nikluge-sa-2022-test.jsonl'

# Load Model, Tokenizer, and Collator

In [None]:
acd_model = AutoModelForSequenceClassification.from_pretrained(ACD_CHECKPOINT)
acd_tokenizer = AutoTokenizer.from_pretrained(ACD_CHECKPOINT)

asc_model = AutoModelForSequenceClassification.from_pretrained(ASC_CHECKPOINT)
asc_tokenizer = AutoTokenizer.from_pretrained(ASC_CHECKPOINT)

In [None]:
def jsonload(fname, encoding="utf-8"):
    with open(fname, encoding=encoding) as f:
        j = json.load(f)
    return j

def jsondump(j, fname):
    with open(fname, "w", encoding="UTF8") as f:
        json.dump(j, f, ensure_ascii=False)

def jsonlload(fname, encoding="utf-8"):
    json_list = []
    with open(fname, encoding=encoding) as f:
        for line in f.readlines():
            json_list.append(json.loads(line))
    return json_list

In [None]:
def predict_from_korean_form(acd_tokenizer, asc_tokenizer, acd_model, asc_model, data):

    acd_model.to(device)
    acd_model.eval()
    asc_model.to(device)
    asc_model.eval()

    for sentence in data:
        form = sentence['sentence_form']
        form = demoji.replace_with_desc(string=form, sep= " ")
        form = re.sub(r'\s+', ' ', form)
        form = re.sub(r'#', '', form)
        sentence['annotation'] = []
        if type(form) != str:
            print("form type is arong: ", form)
            continue
        for pair in entity_property_pair:
            acd_encoded = acd_tokenizer(form, pair, truncation=True, return_tensors="pt")
            acd_encoded = {k:v.to(device) for k,v in acd_encoded.items()}

            with torch.no_grad():
                acd_outputs = acd_model(**acd_encoded)
            
            ce_predictions = acd_outputs['logits'].argmax(-1)
            ce_result = tf_id_to_name[ce_predictions[0]]

            if ce_result == 'True':
                asc_encoded = asc_tokenizer(form, pair, truncation=True, return_tensors="pt")
                asc_encoded = {k:v.to(device) for k,v in asc_encoded.items()}

                with torch.no_grad():
                    asc_outputs = asc_model(**asc_encoded)
                
                pc_predictions = asc_outputs['logits'].argmax(-1)
                pc_result = polarity_id_to_name[pc_predictions[0]]

                sentence['annotation'].append([pair, pc_result])

    return data


In [None]:
test_data = jsonlload(TEST_DATA_PATH)
pred_data = predict_from_korean_form(acd_tokenizer, asc_tokenizer, acd_model, asc_model, copy.deepcopy(test_data))

jsondump(pred_data, './pred_data.json')
pred_data = jsonload('./pred_data.json')

# Inference Test

In [None]:
form = '패키지에 보니 허브한방추출물과 옷나무 껍질 추출물이 들어갔다고 해서 한방향이 날줄알았는데 제품제형은 투명하고 향은 상큼한 향이랄까요?'
form = '최근 북한 미사일 발사 등 도발 및 한미·한미일 대응'
pair = '패키지/구성품#일반'

In [None]:
# tokenized_data = acd_tokenizer(form, pair, padding='max_length', max_length=256, truncation=True)
acd_encoded = acd_tokenizer(form, pair, truncation=True, return_tensors="pt")
acd_encoded = {k:v.to(device) for k,v in acd_encoded.items()}

with torch.no_grad():
    acd_outputs = acd_model(**acd_encoded)

ce_predictions = acd_outputs['logits'].argmax(-1)
ce_result = tf_id_to_name[ce_predictions[0]]
print(acd_outputs['logits'])
print(ce_predictions)
print(ce_result)

tensor([[-4.7648,  4.7138]])
tensor([1])
False


In [None]:
asc_encoded = asc_tokenizer(form, pair, truncation=True, return_tensors="pt")
asc_encoded = {k:v.to(device) for k,v in asc_encoded.items()}

with torch.no_grad():
    asc_outputs = asc_model(**asc_encoded)

pc_predictions = asc_outputs['logits'].argmax(-1)
pc_result = polarity_id_to_name[pc_predictions[0]]
print(asc_outputs['logits'])
print(pc_predictions)
print(pc_result)

tensor([[-2.4268,  5.6437, -2.8214]])
tensor([1])
negative
