# Preparing


## pip installs and import

In [None]:
!pip install torch

In [None]:
!pip install transformers

In [None]:
!pip install datasets

In [None]:
!pip install sentencepiece

In [None]:
!pip install scikit-learn

In [None]:
import json
import os

import torch
import torch.nn as nn
from tqdm import trange
from transformers import AutoTokenizer
from torch.utils.data import DataLoader, TensorDataset
from transformers import get_linear_schedule_with_warmup
from transformers import AdamW
from datasets import load_metric
from sklearn.metrics import f1_score
import pandas as pd
import copy
import numpy as np

from transformers import ElectraModel, ElectraTokenizer
from transformers import AutoModel, ElectraTokenizer
from collections import Counter

import re

## git clone and file download

In [None]:
!git clone https://github.com/HappyBusDay/Korean_ABSA.git

# Config

In [None]:
PADDING_TOKEN = 1
S_OPEN_TOKEN = 0
S_CLOSE_TOKEN = 2

batch_size = 32

# pretrained_models
model_ELECTRA = 'kykim/electra-kor-base'
model_RoBERTa = 'xlm-roberta-base'
model_DeBERTa = "lighthouse/mdeberta-v3-base-kor-further"

entity_property_pair = [    # 카테고리의 수 = 25개
     '패키지/구성품#다양성','본품#인지도','브랜드#디자인',
     '패키지/구성품#편의성','제품 전체#디자인', '제품 전체#품질',
     '패키지/구성품#품질','패키지/구성품#일반','본품#일반',
     '패키지/구성품#디자인','본품#편의성','브랜드#품질',
     '브랜드#인지도','본품#다양성','본품#디자인',
     '제품 전체#다양성','본품#품질','제품 전체#인지도',
     '패키지/구성품#가격','본품#가격','제품 전체#가격',
     '브랜드#가격','브랜드#일반','제품 전체#일반','제품 전체#편의성'
     ]

tf_id_to_name = ['True', 'False']
tf_name_to_id = {tf_id_to_name[i]: i for i in range(len(tf_id_to_name))}

polarity_id_to_name = ['positive', 'negative', 'neutral']
polarity_name_to_id = {polarity_id_to_name[i]: i for i in range(len(polarity_id_to_name))}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

special_tokens_dict = {
    'additional_special_tokens': ['&name&', '&affiliation&', '&social-security-num&', '&tel-num&', '&card-num&', '&bank-account&', '&num&', '&online-account&']
}

# functions and classes

## (func) jsonload, jsondump, jsonlload

In [None]:
def jsonload(fname, encoding="utf-8"):
    with open(fname, encoding=encoding) as f:
        j = json.load(f)

    return j

# json 개체를 파일이름으로 깔끔하게 저장
def jsondump(j, fname):
    with open(fname, "w", encoding="UTF8") as f:
        json.dump(j, f, ensure_ascii=False)

# jsonl 파일 읽어서 list에 저장
def jsonlload(fname, encoding="utf-8"):
    json_list = []
    with open(fname, encoding=encoding) as f:
        for line in f.readlines():
            json_list.append(json.loads(line))
    return json_list

## (func) tokenize_and_align_labels

In [None]:
def tokenize_and_align_labels(tokenizer, form, annotations, max_len):

    entity_property_data_dict = {
        'input_ids': [],
        'attention_mask': [],
        'label': []
    }
    polarity_data_dict = {
        'input_ids': [],
        'attention_mask': [],
        'label': []
    }

    for pair in entity_property_pair:
        isPairInOpinion = False
        if pd.isna(form):
            break
        tokenized_data = tokenizer(form, pair, padding='max_length', max_length=max_len, truncation=True)
        for annotation in annotations:
            entity_property = annotation[0]
            polarity = annotation[2]

            if polarity == '------------':
                continue

            if entity_property == pair:
                entity_property_data_dict['input_ids'].append(tokenized_data['input_ids'])
                entity_property_data_dict['attention_mask'].append(tokenized_data['attention_mask'])
                entity_property_data_dict['label'].append(tf_name_to_id['True'])

                polarity_data_dict['input_ids'].append(tokenized_data['input_ids'])
                polarity_data_dict['attention_mask'].append(tokenized_data['attention_mask'])
                polarity_data_dict['label'].append(polarity_name_to_id[polarity])

                isPairInOpinion = True
                break

        if isPairInOpinion is False:
            entity_property_data_dict['input_ids'].append(tokenized_data['input_ids'])
            entity_property_data_dict['attention_mask'].append(tokenized_data['attention_mask'])
            entity_property_data_dict['label'].append(tf_name_to_id['False'])

    return entity_property_data_dict, polarity_data_dict

## (func) get_dataset

In [None]:
def get_dataset(raw_data, tokenizer, max_len):
    input_ids_list = []
    attention_mask_list = []
    token_labels_list = []

    polarity_input_ids_list = []
    polarity_attention_mask_list = []
    polarity_token_labels_list = []

    for utterance in raw_data:
        entity_property_data_dict, polarity_data_dict = tokenize_and_align_labels(tokenizer, utterance['sentence_form'], utterance['annotation'], max_len)
        
        input_ids_list.extend(entity_property_data_dict['input_ids'])
        attention_mask_list.extend(entity_property_data_dict['attention_mask'])
        token_labels_list.extend(entity_property_data_dict['label'])

        polarity_input_ids_list.extend(polarity_data_dict['input_ids'])
        polarity_attention_mask_list.extend(polarity_data_dict['attention_mask'])
        polarity_token_labels_list.extend(polarity_data_dict['label'])

    return TensorDataset(torch.tensor(input_ids_list), torch.tensor(attention_mask_list),
                         torch.tensor(token_labels_list)), TensorDataset(torch.tensor(polarity_input_ids_list), torch.tensor(polarity_attention_mask_list),
                         torch.tensor(polarity_token_labels_list))

## (class) SimpleClassifier

In [None]:
# baseline
class SimpleClassifier_Base(nn.Module):
    def __init__(self, num_label, classifier_hidden):
        super().__init__()

        if layer_use == True:
            self.dense1 = nn.Linear(classifier_hidden, classifier_hidden//2)
            self.dense2 = nn.Linear(classifier_hidden//2, classifier_hidden//4)
            self.dropout = nn.Dropout(dropout)
            self.output = nn.Linear(classifier_hidden//4, num_label)
        else:
            self.dense = nn.Linear(classifier_hidden, classifier_hidden)
            self.dropout = nn.Dropout(dropout)
            self.output = nn.Linear(classifier_hidden, num_label)

    def forward(self, features):
        if layer_use == True:
            x = features[:, 0, :]
            x = self.dropout(x) # layer 1
            x = self.dense1(x)
            x = self.dropout(x) # layer 2
            x = self.dense2(x)
            x = torch.tanh(x)
            x = self.dropout(x)
            x = self.output(x)
            return x

        else:
            x = features[:, 0, :]
            x = self.dropout(x)
            x = self.dense(x)
            x = torch.tanh(x)
            x = self.dropout(x)
            x = self.output(x)
            return x

## (class) BaseClassifier

* ELECTRA

In [None]:
class BaseClassifier_ELECTRA(nn.Module):
    def __init__(self, num_label, len_tokenizer, hidden_size):
        super(BaseClassifier_ELECTRA, self).__init__()

        self.num_label = num_label
        self.electra = AutoModel.from_pretrained( model_ELECTRA )
        self.electra.resize_token_embeddings(len_tokenizer)
        self.labels_classifier = SimpleClassifier_Base(self.num_label, hidden_size)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.electra(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=None
        )

        sequence_output = outputs[0]
        logits = self.labels_classifier(sequence_output)

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_label),
                                                labels.view(-1))

        return loss, logits

* DeBERTa

In [None]:
class BaseClassifier_DeBERTa(nn.Module):
    def __init__(self, num_label, len_tokenizer, hidden_size):
        super(BaseClassifier_DeBERTa, self).__init__()

        self.num_label = num_label
        self.deberta = AutoModel.from_pretrained( model_DeBERTa )
        self.deberta.resize_token_embeddings(len_tokenizer)
        self.labels_classifier = SimpleClassifier_Base(self.num_label, hidden_size)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.deberta(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=None
        )

        sequence_output = outputs[0]
        logits = self.labels_classifier(sequence_output)

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_label),
                                                labels.view(-1))

        return loss, logits

* RoBERTa

In [None]:
class BaseClassifier_RoBERTa(nn.Module):
    def __init__(self, num_label, len_tokenizer, hidden_size):
        super(BaseClassifier_RoBERTa, self).__init__()

        self.num_label = num_label
        self.electra = AutoModel.from_pretrained( model_RoBERTa )
        self.electra.resize_token_embeddings(len_tokenizer)
        self.labels_classifier = SimpleClassifier_Base(self.num_label, hidden_size)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.electra(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=None
        )

        sequence_output = outputs[0]
        logits = self.labels_classifier(sequence_output)

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_label),
                                                labels.view(-1))

        return loss, logits

* ELECTRA (polarity)

In [None]:
class BaseClassifier_Pola(nn.Module):
    def __init__(self, num_label, len_tokenizer, hidden_size):
        super(BaseClassifier_Pola, self).__init__()

        self.num_label = num_label
        self.electra = AutoModel.from_pretrained( model_ELECTRA )
        self.electra.resize_token_embeddings(len_tokenizer)
        self.labels_classifier = SimpleClassifier_Base(self.num_label, hidden_size)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.electra(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=None
        )

        sequence_output = outputs[0]
        logits = self.labels_classifier(sequence_output)

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_label),
                                                labels.view(-1))

        return loss, logits

## (func) predict_from_korean_form

In [None]:
def predict_from_korean_form(tokenizer_cate, tokenizer_pola, ce_model, pc_model, data):
    category_tokenizer = tokenizer_cate
    polarity_tokenizer = tokenizer_pola

    ce_model.to(device)
    ce_model.eval()

    for idx, sentence in enumerate(data):
        form = sentence['sentence_form']
        sentence['annotation'] = []
        if type(form) != str:
            print("form type is arong: ", form)
            continue
            
        tmp= []
        force_flag = False

        # categoty(속성 범주)와 polarity(감성 범주) 모델을 각각 처리
        for pair in entity_property_pair: 
            tokenized_data_cate = category_tokenizer(form, pair, padding='max_length', max_length = max_length, truncation=True)
            tokenized_data_pola = polarity_tokenizer(form, pair, padding='max_length', max_length = polarity_max_length, truncation=True)
            input_ids_cate = torch.tensor([tokenized_data_cate['input_ids']]).to(device)
            input_ids_pola = torch.tensor([tokenized_data_pola['input_ids']]).to(device)
            attention_mask_cate = torch.tensor([tokenized_data_cate['attention_mask']]).to(device)
            attention_mask_pola = torch.tensor([tokenized_data_pola['attention_mask']]).to(device)

            with torch.no_grad():
                _, ce_logits = ce_model(input_ids_cate, attention_mask_cate)
                tmp.append( ce_logits[0][0] )
            
            # threshold 조정 시 ( default = 0 )
            if ce_logits[0][0] > threshold: 
                ce_predictions = torch.argmax(ce_logits, dim = -1)
                ce_result = tf_id_to_name[ce_predictions[0]]

                if ce_result == 'True':
                    force_flag = True
                    with torch.no_grad():
                        _, pc_logits = pc_model(input_ids_pola, attention_mask_pola)

                    pc_predictions = torch.argmax(pc_logits, dim=-1)
                    pc_result = polarity_id_to_name[pc_predictions[0]]

                    sentence['annotation'].append([pair, pc_result])

        # force evaluation of argument 사용 시 (default = False)
        if force_use == True: 
            if force_flag == False:
                tmp = torch.tensor(tmp)
                pair = entity_property_pair[torch.argmax(tmp)]
                with torch.no_grad():
                    _, pc_logits = pc_model(input_ids_pola, attention_mask_pola)

                pc_predictions = torch.argmax(pc_logits, dim=-1)
                pc_result = polarity_id_to_name[pc_predictions[0]]
                sentence['annotation'].append([pair, pc_result])

    return data

## (func) test_tentiment_analysis

In [None]:
def test_sentiment_analysis():
    print("model pt path  =", model_pt_path)
    print("polarity model pt path =", polarity_model_pt_path)

    if model_kind == 'RoBERTa':
        tokenizer = tokenizer_roberta
        model = BaseClassifier_RoBERTa(len(tf_id_to_name), len(tokenizer), hidden_size)

    elif model_kind == 'DeBERTa':
        tokenizer = tokenizer_deberta
        model = BaseClassifier_DeBERTa(len(tf_id_to_name), len(tokenizer), hidden_size)

    else :
        tokenizer = tokenizer_electra
        model = BaseClassifier_ELECTRA(len(tf_id_to_name), len(tokenizer), hidden_size)
        
    model.load_state_dict(torch.load(model_pt_path , map_location=device))
    model.to(device)
    model.eval()

    polarity_model = BaseClassifier_Pola(len(polarity_id_to_name), len(tokenizer_electra), polarity_hidden_size)
    polarity_model.load_state_dict(torch.load(polarity_model_pt_path , map_location=device))
    polarity_model.to(device)
    polarity_model.eval()

    pred_data = predict_from_korean_form(tokenizer, tokenizer_electra, model, polarity_model, copy.deepcopy(test_data))

    df_pred = pd.DataFrame(pred_data)

    with open(save_path  + '.jsonl', 'w') as file:
        for i in range( len(df_pred) ):
            tmp = str(df_pred['annotation'][i]).replace("\'", "\"").replace('None', 'null')
            file.write(  '{'+'\"id\": \"nikluge-sa-2022-test-{0}\", \"sentence_form\": \"{1}\", \"annotation\": {2}'\
                .format( str(i+1).zfill(5)  ,   df_pred['sentence_form'][i], tmp ) +'}' ) 
            file.write("\n")

    return df_pred

# Starting Inference

parameter 설명
* model_kind (대소문자 구별)
    - RoBERTa : RoBERTa pretrained model load
    - DeBERTa : DeBERTa pretrained model load
    - ELECTRA : ELECTRA pretrained model load
* layer_use
    - True : add 1 layer
    - False : do not add layer (*default)
* hidden_size
    - defalut : 768 
    - up sampling : 1000
    - down sampling : 384
* force_use
    - True : force evalutation of argument
    - False : do not force evaluation (*default)
* threshold
    - default : 0
    - strict : 2 ~ 3
* dropout
    - default : 0.1

In [None]:
############### 입력하세요 ###################

model_kind = 'RoBERTa' 

layer_use = False   # default = False
hidden_size = 768   # default = 768
force_use = False   # default = False
threshold = 0       # default = 0
dropout = 0.1       # default = 0.1

model_pt_path = "./content/sample.pt"
polarity_model_pt_path = "./content/sample.pt"

save_path = './test01'

###########################################

if model_kind == 'RoBERTa':
    max_length = 514
else:
    max_length = 256
polarity_max_length = 256
polarity_hidden_size = 768

* 1회만 실행

In [None]:
tokenizer_roberta = AutoTokenizer.from_pretrained( model_RoBERTa )
tokenizer_electra = AutoTokenizer.from_pretrained( model_ELECTRA )
tokenizer_deberta = AutoTokenizer.from_pretrained( model_DeBERTa )

num_added_toks_roberta = tokenizer_roberta.add_special_tokens(special_tokens_dict)
num_added_toks_electra = tokenizer_electra.add_special_tokens(special_tokens_dict)
num_added_toks_deberta = tokenizer_deberta.add_special_tokens(special_tokens_dict)

test_data_path = './Korean_ABSA/data/test_data.jsonl'
test_data = jsonlload(test_data_path)

entity_property_test_data_roberta, polarity_test_data_roberta = get_dataset(test_data, tokenizer_roberta, max_length)  # max_length = 514
entity_property_test_data_electra, polarity_test_data_electra = get_dataset(test_data, tokenizer_electra, max_length)  # max_length = 256
entity_property_test_data_deberta, polarity_test_data_deberta = get_dataset(test_data, tokenizer_deberta, max_length)  # max_length = 256

entity_property_test_dataloader_roberta = DataLoader(entity_property_test_data_roberta, shuffle=True, batch_size=batch_size)
entity_property_test_dataloader_electra = DataLoader(entity_property_test_data_electra, shuffle=True, batch_size=batch_size)
entity_property_test_dataloader_deberta = DataLoader(entity_property_test_data_deberta, shuffle=True, batch_size=batch_size)

polarity_test_dataloader = DataLoader(polarity_test_data_electra, shuffle=True, batch_size=batch_size)

In [None]:
test_sentiment_analysis()