## 한국어 학습 모델별 한국어 쓰기 답안지 점수 구간 예측 성능 비교
### 세 가지 한국어 딥러닝 언어모델의 성능을 비교: KoBERT, KR-BERT, KcBERT

1. 먼저 아래의 URL로부터 유학생 한국어 쓰기 답안지 실험 데이터를 다운로드 한 후, jupyter notebook에 업로드 함.

http://aihumanities.org/ko/archive/data/?vid=1

(웹사이트의 게시판 페이지의 첨부파일 'korean_essay_score_range_prediction_dataset.zip'을 클릭하여 다운로드.)

2. 아래의 셀을 실행하여 다운로드한 데이터의 압축을 풀고, <font color='red'>**폴더명을 'data'로 변경**</font>함.

In [None]:
#! unzip ./korean_essay_score_range_prediction_dataset.zip -d ./

In [None]:
import torch

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
#device
#print('Current cuda device ', torch.cuda.current_device())

3. 테스트 데이터를 정의한 폴더(folder)와 k-fold cross validation을 위한 k (여기서는 k=i로 정의)를 설정함.

In [None]:
import pandas as pd

# 테스트 데이터를 설정
folder = 'happiness'   # all, job, happiness 중 하나를 설정
i =  0    # k-fold cross validation을 위한 숫자 설정 [0-6] 
add_train = ''

data_train = pd.read_csv("data/{}/train_{}.txt".format(folder, i), sep='\t')
data_valid = pd.read_csv("data/{}/val_{}.txt".format(folder, i), sep='\t')
data_test = pd.read_csv("data/{}/test_{}.txt".format(folder, i), sep='\t')
data_train.columns

4. 만약 훈련 데이터에 다른 주제의 데이터를 추가하는 경우, 아래의 add_train에서 하나를 선택함.

In [None]:
# 다양한 실험을 위해 훈련 데이터를 추가
# 단, folder='all'을 선택한 경우는 데이터를 추가하지 않음 

#add_train = 'economic'   # 경제 데이터를 훈련 데이터에 추가
#add_train = 'success'   # 성공 데이터를 훈련 데이터에 추가
#add_train = 'happiness'  # 행복 데이터를 훈련 데이터에 추가
#add_train = 'job'   # 직업 데이터를 훈련 데이터에 추가
#add_train = ['success', 'economic', 'happiness']   # 성공, 경제, 행복 데이터를 훈련 데이터에 추가
#add_train = ['success', 'economic', 'job']   # 성공, 경제, 직업 데이터를 훈련 데이터에 추가

if isinstance(add_train, list):
    for fn in add_train:
        data_train_add = pd.read_csv("data/{}.txt".format(fn), sep='\t')
        data_train = pd.concat([data_train, data_train_add], ignore_index=True)
    add_train = '_' + '_'.join(add_train)

elif add_train in ['success', 'economic', 'job', 'happiness']:
    data_train_add = pd.read_csv("data/{}.txt".format(add_train), sep='\t')
    data_train = pd.concat([data_train, data_train_add], ignore_index=True)
    add_train = '_' + add_train

#print(add_train)    
data_train.shape

5. 한국어 딥러닝 언어모델을 아래의 **model_name**에서 선택함.

In [None]:
from transformers import AutoTokenizer
from transformers import set_seed

set_seed(42)   # 재현성을 위해 seed를 설정

# KoBERT, KR-BERT, KcBERT 중 모델 하나를 선택

#model_name = "monologg/kobert"
#model_name = "snunlp/KR-BERT-char16424"
model_name = "beomi/kcbert-base"

if model_name == "monologg/kobert":
    output_model_folder = 'result_kobert'    
elif model_name == "snunlp/KR-BERT-char16424":
    output_model_folder = 'result_krbert'
elif model_name == "beomi/kcbert-base":
    output_model_folder = 'result_kcbert'    

tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_name)

In [None]:
if model_name == "beomi/kcbert-base":
    max_len = 300   # KcBERT
else:    
    max_len = 512   # KoBERT, KR-BERT 

In [None]:
def tokenize_function(example):
    return tokenizer(example, padding='max_length', truncation=True, max_length=max_len)

encodings_train = data_train["document"].map(tokenize_function)
encodings_valid = data_valid["document"].map(tokenize_function)
encodings_test = data_test["document"].map(tokenize_function)
#print(encodings_valid[0])

In [None]:
from sklearn import preprocessing

le = preprocessing.LabelEncoder()
le.fit(data_train["label"])
le.classes_

In [None]:
import numpy as np

y_train = le.transform(data_train["label"])
y_valid = le.transform(data_valid["label"])
y_test = le.transform(data_test["label"])

In [None]:
from torch.utils.data import Dataset, DataLoader, TensorDataset

In [None]:
class CustomTextDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
        
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        data = self.encodings[idx]
        item = {}
        item['input_ids'] = torch.LongTensor(data['input_ids'])
        item['attention_mask'] = torch.LongTensor(data['attention_mask'])
        item['token_type_ids'] = torch.LongTensor(data['token_type_ids'])
        item['labels'] = torch.LongTensor([self.labels[idx]])
        return item

In [None]:
dataset_train = CustomTextDataset(encodings_train, y_train)
dataset_valid = CustomTextDataset(encodings_valid, y_valid)
dataset_test = CustomTextDataset(encodings_test, y_test)
#dataset_train.encodings[0]["input_ids"]

In [None]:
from torch.utils.data import DataLoader

batch_size = 24
train_dataloader = DataLoader(dataset_train, shuffle=True, batch_size=batch_size)
eval_dataloader = DataLoader(dataset_valid, batch_size=batch_size)
test_dataloader = DataLoader(dataset_test, batch_size=batch_size)

In [None]:
from transformers import AutoModelForSequenceClassification

model = AutoModelForSequenceClassification.from_pretrained(pretrained_model_name_or_path=model_name, num_labels=4)
model.dropout = torch.nn.Dropout(0.5)

In [None]:
model.to(device)

In [None]:
from transformers import AdamW
from transformers import get_scheduler

learning_rate = 1e-5
warmup_ratio = 0.1
max_grad_norm = 1
num_epochs = 50
num_training_steps = num_epochs * len(train_dataloader)
warmup_step = int(num_training_steps * warmup_ratio)

optimizer = AdamW(model.parameters(), lr=1e-5)
lr_scheduler = get_scheduler(
    "cosine",
    optimizer=optimizer,
    num_warmup_steps=warmup_step,
    num_training_steps=num_training_steps
)

In [None]:
#%%capture cap --no-stderr

from tqdm.auto import tqdm
from datasets import load_metric

progress_bar = tqdm(range(num_training_steps))
metric = load_metric("accuracy")

print("------{}{}_{}------".format(folder, add_train, i))
for epoch in range(num_epochs):
    model.train()
    for batch in train_dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        loss = outputs.loss
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad_norm)
        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)
    model.eval()
    for batch in eval_dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)
        metric.add_batch(predictions=predictions, references=batch["labels"].flatten())
    acc_eval = metric.compute()
    print("epoch {} val acc {}".format(epoch+1, acc_eval["accuracy"]))
    for batch in test_dataloader:
        batch = {k: v.to(device) for k, v in batch.items()}
        with torch.no_grad():
            outputs = model(**batch)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=-1)
        metric.add_batch(predictions=predictions, references=batch["labels"].flatten())
    acc_test = metric.compute()
    print("epoch {} test acc {}".format(epoch+1, acc_test["accuracy"]))

In [None]:
# 바로 위의 셀의 맨 위에 있는 '%%capture cap --no-stderr'의 코멘트를 해제한 뒤,
# 아래의 파일 저장 코드를 활성화 하면,
# epoch 별 validation & test accuracy 가 기록된 파일이 언어모델 폴더에 저장됨 

#with open('{}/{}{}_{}a.txt'.format(output_model_folder, folder, add_train, i), 'w') as f:
#     f.write(cap.stdout)

In [None]:
# 상단의 Kernel 메뉴에서 Restart & Run All 을 실행하는 경우, 위의 셀까지 실행이 다 끝나면 알람이 울림

import IPython.display as ipd

sound = np.sin(2*np.pi*400*np.arange(10000*2)/10000)
ipd.Audio(sound, rate=10000, autoplay=True)

In [None]:
from sklearn.metrics import confusion_matrix, accuracy_score

y_pred = []
for batch in test_dataloader:
    batch = {k: v.to(device) for k, v in batch.items()}
    with torch.no_grad():
        outputs = model(**batch)
    logits = outputs.logits
    y_pred += list(torch.argmax(logits, dim=-1).to('cpu').numpy())

def num_to_grade(y_label_num):
    y_label_letter = []
    for val in y_label_num:
        if val == 0:
            y_label_letter.append('D')
        elif val == 1:
            y_label_letter.append('C')
        elif val == 2:
            y_label_letter.append('B')
        elif val == 3:
            y_label_letter.append('A')
    return y_label_letter            

y_true_letter = num_to_grade(y_test)
y_pred_letter = num_to_grade(y_pred)

# 50 epoch 때의 모델의 테스트 데이터 분류 성능을 표시함
print('accuracy:', accuracy_score(y_true_letter, y_pred_letter))

# confusion matrix 생성
cm = confusion_matrix(y_true_letter, y_pred_letter)

In [None]:
# confusion matrix 그림을 출력
# https://stackoverflow.com/questions/60480777/plotting-already-calculated-confusion-matrix-using-python

from mlxtend.plotting import plot_confusion_matrix
import matplotlib.pyplot as plt

# Classes
classes = ['A', 'B', 'C', 'D']

figure, ax = plot_confusion_matrix(conf_mat = cm,
                                   class_names = classes,
                                   #show_absolute = False,
                                   #show_normed = False,
                                   colorbar = False)
ax.set_title("KcBERT Happiness (Acc: 78.57%)")
plt.show()