## Load Libraries

In [1]:
!git clone https://github.com/hanakim120/NLU_BERT.git

Cloning into 'NLU_BERT'...
remote: Enumerating objects: 59, done.[K
remote: Counting objects: 100% (59/59), done.[K
remote: Compressing objects: 100% (55/55), done.[K
remote: Total 59 (delta 15), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (59/59), 10.50 MiB | 3.68 MiB/s, done.


In [1]:
!pip install numpy==1.23



In [2]:
# install
!pip install datasets transformers sentencepiece accelerate -U evaluate



In [19]:
import transformers
transformers.logging.set_verbosity_error()

In [20]:
import warnings
warnings.filterwarnings(action='ignore')

In [21]:
import os
os.environ["WANDB_DISABLED"] = "true"

In [24]:
# 경로 설정
import os
BASE_DIR = '/content/NLU_BERT/text-cls-ynat' # 현재 디렉토리
os.chdir(BASE_DIR)
DATA_DIR = os.path.join(BASE_DIR, './data')
OUTPUT_DIR = os.path.join(BASE_DIR, 'checkpoint')
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [4]:
import random
import numpy as np
import pandas as pd

import torch
from torch.utils.data import Dataset, DataLoader

import evaluate
from datasets import load_dataset
from transformers import AutoModelForSequenceClassification
from transformers import DataCollatorWithPadding
from transformers import TrainingArguments, Trainer

from sklearn.model_selection import train_test_split

from tokenization_kobert import KoBertTokenizer

## Set Hyperparameters

In [5]:
SEED = 456 # random seed 고정 (재현성을 위해)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

In [6]:
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') # 디바이스 설정
DEVICE

device(type='cuda')

## Load Tokenizer and Model

아래 조건에 맞춰 스스로 코드를 짜봅시다!
- 사용할 모델 : https://huggingface.co/monologg/kobert
- tokenizer는 ./SK_Day2/text-cls-ynat/tokenization_kobert.py 를 사용
- 모델 로드에는 AutoModelForSequenceClassification 클래스를 사용

In [7]:
model_name = 'monologg/kobert'
tokenizer = KoBertTokenizer.from_pretrained(model_name)
# 다양한 Transformer 모델의 구조를 자동으로 인식하고, 이에 적합한 분류 모델을 생성해주는 AutoModelForSequenceClassification 클래스를 사용
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=7).to(DEVICE)

Downloading (…)zer_78b3253a26.model:   0%|          | 0.00/371k [00:00<?, ?B/s]

Downloading (…)solve/main/vocab.txt:   0%|          | 0.00/77.8k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/51.0 [00:00<?, ?B/s]

Downloading (…)lve/main/config.json:   0%|          | 0.00/426 [00:00<?, ?B/s]

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'BertTokenizer'. 
The class this function is called from is 'KoBertTokenizer'.


Downloading model.safetensors:   0%|          | 0.00/369M [00:00<?, ?B/s]

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at monologg/kobert and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


## Define Dataset

- pd.read_csv 를 사용하여 데이터 불러오기
- 불러온 데이터를 train/valid 셋으로 분리 (train set : valid set = 7:3)

In [11]:
data = pd.read_csv(os.path.join(DATA_DIR,'train_original.csv')) # 데이터 불러오기
dataset_train, dataset_valid = train_test_split(data, test_size=0.3, random_state=SEED) # train, valid 데이터셋 분리

- BERTDataset 클래스 완성하기

In [12]:
class BERTDataset(Dataset):
    def __init__(self, data, tokenizer):
        input_texts = data['text'] # input
        targets = data['target'] # label

        self.inputs = []; self.labels = []
        for text, label in zip(input_texts, targets):
            tokenized_input = tokenizer(text, padding='max_length', truncation=True, return_tensors='pt') # tokenizer로 text를 tokenizing
            self.inputs.append(tokenized_input) # input을 inputs에 저장
            self.labels.append(torch.tensor(label)) # label을 labels에 저장

    def __getitem__(self, idx):
        return {
            'input_ids': self.inputs[idx]['input_ids'].squeeze(0), # [1, seq_len] -> [seq_len]
            'attention_mask': self.inputs[idx]['attention_mask'].squeeze(0),
            'labels': self.labels[idx].squeeze(0)
        }

    def __len__(self):
        return len(self.labels)

In [13]:
data_train = BERTDataset(dataset_train, tokenizer) # train 데이터셋 생성
data_valid = BERTDataset(dataset_valid, tokenizer) # valid 데이터셋 생성

In [14]:
data_collator = DataCollatorWithPadding(tokenizer=tokenizer) # padding을 위한 data_collator 생성

## Define Metric

In [15]:
f1 = evaluate.load('f1') # f1 score를 계산하기 위한 함수
def compute_metrics(eval_pred):
    predictions, labels = eval_pred # model의 output과 실제 label을 받음
    predictions = np.argmax(predictions, axis=1) # model의 output에서 가장 높은 값을 가진 index를 예측값으로 사용
    return f1.compute(predictions=predictions, references=labels, average='macro') # f1 score 계산, macro: label별 f1 score의 평균


Downloading builder script:   0%|          | 0.00/6.77k [00:00<?, ?B/s]

## Train Model

In [16]:
training_args = TrainingArguments(
    output_dir=OUTPUT_DIR,
    overwrite_output_dir=True,
    do_train=True,
    do_eval=True,
    do_predict=True,
    logging_strategy='steps',
    evaluation_strategy='steps',
    save_strategy='steps',
    logging_steps=100,
    eval_steps=100,
    save_steps=100,
    save_total_limit=2,
    learning_rate= 2e-05,
    adam_beta1 = 0.9,
    adam_beta2 = 0.999,
    adam_epsilon=1e-08,
    weight_decay=0.01,
    lr_scheduler_type='linear',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    load_best_model_at_end=True,
    metric_for_best_model='eval_f1',
    greater_is_better=True,
    seed=SEED,
    auto_find_batch_size=True
)

- Trainer의 인자들을 넣어봅시다!

In [17]:
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=data_train,
    eval_dataset=data_valid,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

- 훈련을 시켜봅시다!
- (훈련 시간이 너무 오래걸려서 로그가 찍히는 것만 확인하고 중지 시키시면 됩니다!)

In [None]:
trainer.train()

## Evaluate Model

- test set 에 대해 예측을 해봅시다!

In [23]:
# 데이터 읽어오기
dataset_test = pd.read_csv(os.path.join(DATA_DIR, 'test_original.csv'))

- 직접 파인튜닝 시킨 checkpoint의 경로를 checkpoint_path 에 넣어줍니다.

In [25]:
model_name = 'monologg/kobert'
checkpoint_path = 'monologg/kobert' ### 학습시킨 모델로 inference 를 하기 위해서는 이 부분에 checkpoint 경로를 넣어줍니다. ###
tokenizer = KoBertTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(checkpoint_path, num_labels=7).to(DEVICE)
model.eval() # model을 evaluation 모드로 변경
preds = []
for idx, sample in dataset_test.iterrows():
    inputs = tokenizer(sample['text'], return_tensors="pt").to(DEVICE)
    with torch.no_grad(): # gradient 계산 비활성화
        logits = model(**inputs).logits # model의 output
        pred = torch.argmax(torch.nn.Softmax(dim=1)(logits), dim=1).cpu().numpy() # model의 output에서 가장 높은 값을 가진 index를 예측값으로 사용
        preds.extend(pred)

- 결과 csv 파일로 저장

In [26]:
dataset_test['target'] = preds
dataset_test.to_csv(os.path.join(BASE_DIR, 'test_output.csv'), index=False)