In [None]:
import pathlib
import os

path = str(pathlib.Path(os.path.abspath(".")).parent.parent)
print(path)

In [None]:
import pandas as pd

df = pd.read_csv(f"{path}/data/processed/model/finetuning_preprocessed.txt", sep="\t", names=['document', 'label'])
# df
print(df.columns)
print(len(df))

In [None]:
df['label'].value_counts()

In [None]:
# 데이터셋 분리

from sklearn.model_selection import train_test_split

train, valid = train_test_split(df, test_size=0.2, shuffle=True, random_state=1)

In [None]:
# tokenizer 초기화

from transformers import AutoTokenizer

# tokenizer에 저장되어 있는 vocab.txt 파일을 로드
tokenizer = AutoTokenizer.from_pretrained("beomi/KcELECTRA-small-v2022")

In [None]:
# tokenizing 및 tensordataset

from datasets import Dataset

def tokenize_function(df):
    # tokenizing
    # 문장을 모델이 처리하는 의미를 갖는 최소 단위로 토큰화 하고, 토큰화 된 단어들을 정수 인코딩

    # 하나의 sequence가 가질 수 있는 최대 토큰의 갯수
    # ex) 나는 밥을 먹었다. -> 4개의 토큰 + [CLS]문장의 시작, [SEP]문장의 끝 => 총 6개의 토큰
    #     -> max_length=128이므로, 나머지 122개는 [PAD]패딩 토큰으로 채워짐
    max_length = 128

    # 1) 입력 받은 문장(df['document'])을 서브워드 단위로 분리(토큰화)
    # 2) 분리된 서브워드를 로드된 단어 사전에서 찾아 해당 정수 인덱스로 변환(정수 인코딩)
    # 3) 규칙에 따라 길이를 맞추고, input_ids와 attention_mask 생성
    #       - input_ids : 텍스트를 모델이 이해할 수 있는 정수 인덱스로 변환한 값
    #       - attention_mask : 실제 의미있는 토큰(1)/패딩 토큰(0)을 이진 분류하여 모델이 불필요한 계산을 하지 않도록 마스킹
    encodings = tokenizer(
        list(df['document']),
        padding="max_length",
        truncation=True,
        max_length = max_length
    )

    # dataset
    # 모델에 입력된 모든 데이터를 tensor 객체로 저장하고 관리하는 저장소
    #       - 데이터 통합 및 관리
    #               여러 종류의 데이터를 동일한 인덱스로 묶어 데이터를 인덱스로 접근할 때 한 번에 출력할 수 있어 데이터 관리 용이

    dataset = {
        # x : 학습 데이터
        'input_ids':encodings['input_ids'],
        'attention_mask':encodings['attention_mask'],
        # y : 레이블
        'labels':df['label'].to_list()
    }

    return dataset

train_dataset = Dataset.from_dict(tokenize_function(train))
valid_dataset = Dataset.from_dict(tokenize_function(valid))

In [None]:
# TrainingArguments

import torch
from transformers import TrainingArguments

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

if device=="cpu":
    num_train_epochs=1
    per_device_train_batch_size=2
    gradient_accumulation_steps=8
    dataloader_num_workers=0
    dataloader_pin_memory=False

elif device=="cuda":
    num_train_epochs=3
    per_device_train_batch_size=16
    gradient_accumulation_steps=2
    dataloader_num_workers=2
    dataloader_pin_memory=True

training_args = TrainingArguments(
    output_dir=f"{path}/model/KcELECTRA/finetuned/checkpoints",
    overwrite_output_dir=True,
    learning_rate=5e-5,

    num_train_epochs=num_train_epochs,
    per_device_train_batch_size=per_device_train_batch_size,
    gradient_accumulation_steps=gradient_accumulation_steps,

    logging_steps=50,

    save_strategy="steps",
    save_steps=1000,

    dataloader_num_workers=dataloader_num_workers,
    dataloader_pin_memory=dataloader_pin_memory,

    weight_decay=0.01,
    load_best_model_at_end=False,

    report_to="none",
    push_to_hub=False,
    hub_model_id=None,
    hub_token=None,
)

In [None]:
# 모델 초기화

from transformers import ElectraForSequenceClassification

# KcELECTRA-small 모델로 finetuning
model = ElectraForSequenceClassification.from_pretrained("beomi/KcELECTRA-small-v2022", num_labels=2)

In [None]:
# Trainer 설정

from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    processing_class=tokenizer,
)

In [None]:
# 모델 학습

trainer.train()

In [None]:
# 모델 저장

save_dir = f"{path}/model/KcELECTRA/finetuned"
trainer.save_model(save_dir)
tokenizer.save_pretrained(save_dir)

In [None]:
# 모델 평가

from torch.utils.data import DataLoader
from sklearn.metrics import accuracy_score, f1_score

# DataLoader 준비
valid_dataset.set_format(type='torch', columns=['input_ids', 'attention_mask', 'labels'])
eval_dataloader = DataLoader(valid_dataset, batch_size=32, shuffle=False)

model.eval()
model.to(device)
all_preds, all_labels = [], []

for batch in eval_dataloader:
    inputs = {k: v.to(device) for k, v in batch.items() if k in ['input_ids', 'attention_mask']}
    labels = batch['labels'].to(device)

    with torch.no_grad():
        outputs = model(**inputs)
        preds = torch.argmax(outputs.logits, dim=-1)

    all_preds.extend(preds.cpu().numpy())
    all_labels.extend(labels.cpu().numpy())

# Accuracy
acc = accuracy_score(all_labels, all_preds)

# F1 score
f1 = f1_score(all_labels, all_preds)

print(f"Accuracy: {acc*100:.2f}%")
print(f"F1 Score: {f1:.4f}")

In [None]:
# 예측 및 평가

device = "cuda" if torch.cuda.is_available() else "cpu"

model.eval()
text = ["이 영화 정말 재미있다", "별로였다"]
tokenizer.model_max_length = 128
encoding = tokenizer(text, padding=True, truncation=True, return_tensors="pt").to(device)
with torch.no_grad():
    logits = model(**encoding).logits
    preds = logits.argmax(dim=-1)
print(preds.cpu().numpy())  # [1, 0]