In [None]:
# colab 환경 시 필수 설치
# 필요 시 설치(가상환경의 경우 터미널에 설치)
!pip install transformers==4.24.0

In [None]:
# colab 환경 시 필수 설치
# 필요 시 설치(가상환경의 경우 터미널에 설치)
!pip install datasets==2.6.1

# 모듈 import 및 전역 변수 설정

In [None]:
import json
import os

import torch
import torch.nn as nn
from tqdm import trange
from transformers import AutoModel, AutoTokenizer, XLMRobertaModel
from torch.utils.data import DataLoader, TensorDataset
from transformers import get_linear_schedule_with_warmup
from transformers import AdamW
from datasets import load_metric
from sklearn.metrics import f1_score
import pandas as pd
import copy

# 학습시 사용할 베이스 모델 설정
base_model = 'kykim/electra-kor-base'

# 개체#속성 쌍 설정(label25)
entity_property_pair = [
    '제품 전체#일반', '제품 전체#가격', '제품 전체#디자인', '제품 전체#품질', '제품 전체#편의성', '제품 전체#인지도', '제품 전체#다양성',
    '본품#일반', '본품#디자인', '본품#품질', '본품#편의성', '본품#다양성', '본품#가격', '본품#인지도',
    '패키지/구성품#일반', '패키지/구성품#디자인', '패키지/구성품#품질', '패키지/구성품#편의성', '패키지/구성품#가격', '패키지/구성품#다양성',
    '브랜드#일반', '브랜드#가격', '브랜드#품질', '브랜드#인지도', '브랜드#디자인',
                    ]

# 개체#속성 쌍 설정(label23)
# entity_property_pair = [
#     '제품 전체#일반', '제품 전체#가격', '제품 전체#디자인', '제품 전체#품질', '제품 전체#편의성', '제품 전체#인지도',
#     '본품#일반', '본품#디자인', '본품#품질', '본품#편의성', '본품#다양성', '본품#가격', '본품#인지도',
#     '패키지/구성품#일반', '패키지/구성품#디자인', '패키지/구성품#품질', '패키지/구성품#편의성', '패키지/구성품#가격', '패키지/구성품#다양성',
#     '브랜드#일반', '브랜드#가격', '브랜드#품질', '브랜드#인지도',
#                     ]

# 문장과 개체#속성 쌍의 관계를 True, False 로 표시
tf_id_to_name = ['True', 'False']
tf_name_to_id = {tf_id_to_name[i]: i for i in range(len(tf_id_to_name))}

# 문장과 개체#속성 쌍의 관계로 감성을 positive, negative, neutral 로 표시
polarity_id_to_name = ['positive', 'negative', 'neutral']
polarity_name_to_id = {polarity_id_to_name[i]: i for i in range(len(polarity_id_to_name))}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

special_tokens_dict = {
    'additional_special_tokens': ['&name&', '&affiliation&', '&social-security-num&', '&tel-num&', '&card-num&', '&bank-account&', '&num&', '&online-account&']
}

In [None]:
# json 파일 읽어서 list에 저장
def jsonload(fname, encoding="utf-8"):
    with open(fname, encoding=encoding) as f:
        j = json.load(f)
    return j

# json 개체를 파일이름으로 저장
def jsondump(j, fname):
    with open(fname, "w", encoding="UTF8") as f:
        json.dump(j, f, ensure_ascii=False)

# jsonl 파일 읽어서 list에 저장
def jsonlload(fname, encoding="utf-8"):
    json_list = []
    with open(fname, encoding=encoding) as f:
        for line in f.readlines():
            json_list.append(json.loads(line))
    return json_list

# 모델 정의
electra 모델을 기반으로 한 classification 모델 이용

In [None]:
# 아래 ELECTRABaseClassifier의 classifier로 사용될 class
class SimpleClassifier(nn.Module):

    def __init__(self, num_label):
        super().__init__()
        self.dense = nn.Linear(classifier_hidden_size, classifier_hidden_size)
        self.dropout = nn.Dropout(classifier_dropout_prob)
        self.output = nn.Linear(classifier_hidden_size, num_label)

    def forward(self, features):
        x = features[:, 0, :]
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.output(x)
        return x

# 불러올 base model 기반 classification class 생성
class ELECTRABaseClassifier(nn.Module):
    def __init__(self, num_label, len_tokenizer):
        super(ELECTRABaseClassifier, self).__init__()

        self.num_label = num_label
        self.xlm_roberta = AutoModel.from_pretrained(base_model)
        self.xlm_roberta.resize_token_embeddings(len_tokenizer)

        self.labels_classifier = SimpleClassifier(self.num_label)

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.xlm_roberta(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=None
        )

        sequence_output = outputs[0]
        logits = self.labels_classifier(sequence_output)

        loss = None

        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits.view(-1, self.num_label),
                                                labels.view(-1))

        return loss, logits


In [None]:
# 인자로 받아온 모델들을 통해 predict 하는 문장 데이터에 label(entity_property, polarity) 해주는 함수
def predict_from_korean_form(tokenizer, ce_model, pc_model, data):

    ce_model.to(device)
    ce_model.eval()
    for sentence in data:
        form = sentence['sentence_form']
        sentence['annotation'] = []
        if type(form) != str:
            print("form type is arong: ", form)
            continue

        # 개체#속성 쌍을 순서대로 입력
        for pair in entity_property_pair:
            # 문장과 pair를 tokenizer화           
            tokenized_data = tokenizer(form, pair, padding='max_length', max_length=256, truncation=True)

            # tonkenizer 된 값으로 entity_property 모델을 통해 True, False 결과값을 출력
            input_ids = torch.tensor([tokenized_data['input_ids']]).to(device)
            attention_mask = torch.tensor([tokenized_data['attention_mask']]).to(device)
            with torch.no_grad():
                _, ce_logits = ce_model(input_ids, attention_mask)

            ce_predictions = torch.argmax(ce_logits, dim = -1)

            ce_result = tf_id_to_name[ce_predictions[0]]

            # True 일 경우에만 다음 코드 진행
            if ce_result == 'True':
                # tonkenizer 된 값으로 polarity 모델을 통해 Positive, Negative,Neutral 결과값을 출력                
                with torch.no_grad():
                    _, pc_logits = pc_model(input_ids, attention_mask)

                pc_predictions = torch.argmax(pc_logits, dim=-1)
                pc_result = polarity_id_to_name[pc_predictions[0]]

                # entity_property 모델과 polarity 모델을 통해서 출력 된 결과 값을 label 입력
                sentence['annotation'].append([pair, pc_result])


    return data


In [None]:
# 불러올 entity_property pt 파일 경로 설정
test_category_extraction_model_path = './'
# 불러올 polarity pt 파일 경로 설정
test_polarity_classification_model_path = './'
# 학습 후 저장된 pt 파일로 predict 할 json 데이터 설정
test_data_path = './'

In [None]:
# 변수로 지정된 경로에 따라 predict 파일에 labeling 후 저장하는 함수
def test_sentiment_analysis():

    # predict 시 사용할 tokenizer와 데이터를 불러오기
    tokenizer = AutoTokenizer.from_pretrained(base_model)
    num_added_toks = tokenizer.add_special_tokens(special_tokens_dict)
    test_data = jsonlload(test_data_path)
    
    # entity_property 모델 불러오기
    model = RoBertaBaseClassifier(len(tf_id_to_name), len(tokenizer))
    model.load_state_dict(torch.load(test_category_extraction_model_path, map_location=device))
    model.to(device)
    model.eval()

    # polarity 모델 불러오기        
    polarity_model = RoBertaBaseClassifier(len(polarity_id_to_name), len(tokenizer))
    polarity_model.load_state_dict(torch.load(test_polarity_classification_model_path, map_location=device))
    polarity_model.to(device)
    polarity_model.eval()

    # predict_from_korean_form 함수를 통해 predict 데이터 만들기
    pred_data = predict_from_korean_form(tokenizer, model, polarity_model, copy.deepcopy(test_data))

    # 만들어진 predict 데이터를 json 파일로 저장
    jsondump(pred_data, './.json')

In [None]:
# 변수로 지정된 경로에 따라 predict 파일을 생성 및 저장 함수
test_sentiment_analysis()

Some weights of the model checkpoint at kykim/electra-kor-base were not used when initializing ElectraModel: ['discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.bias', 'discriminator_predictions.dense.weight', 'discriminator_predictions.dense_prediction.weight']
- This IS expected if you are initializing ElectraModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ElectraModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of the model checkpoint at kykim/electra-kor-base were not used when initializing ElectraModel: ['discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.bias', 'discriminator_predictions.dense.weight', 'd