## **캡스톤치맥회동 AI 스터디 python**

### 스터디 개요

1. TAPT(Task-Adaptive-Pretraining)을 통한 성능 고도화

2. Optuna를 활용한 최적의 하이퍼파라미터 탐색



### 스터디로 얻어갈 수 있는 능력

* HuggingFace Hub에서 제공하는 토크나이저 및 모델을 사용할 수 있습니다.

* MLM(Masked Language Model) 기반의 TAPT를 적용하여, 파인튜닝 전 도메인 적합성을 높일 수 있습니다.

* Optuna 프레임워크를 통해 최적의 하이퍼파라미터를 자동으로 찾을 수 있습니다.



> [TODO]에 코드를 채워넣으면 됩니다!

## **0. 난수 시드 고정**

In [None]:
import random
import numpy as np
import torch

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)          # Pytorch CPU 연산 난수 시드를 고정합니다.
    torch.cuda.manual_seed_all(seed) # GPU 연산 난수 시드를 고정합니다.

    # 매 실행마다 동일한 결과를 재현합니다.
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(42)

## **1. 데이터셋 불러오기**

In [2]:
from datasets import load_dataset

train_dataset = load_dataset("imdb", split="train")
test_dataset = load_dataset("imdb", split="test")

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
# 데이터셋 길이 확인
print("train dataset 개수:", len(train_dataset))
print("test dataset 개수:", len(test_dataset))

# 데이터셋 모양 확인
print("\ntrain dataset 모양")
print(train_dataset)

print("\ntest dataset 모양")
print(test_dataset)

train dataset 개수: 25000
test dataset 개수: 25000

train dataset 모양
Dataset({
    features: ['text', 'label'],
    num_rows: 25000
})

test dataset 모양
Dataset({
    features: ['text', 'label'],
    num_rows: 25000
})


In [4]:
import pandas as pd

# Dataset 객체를 전처리 가능한 구조로 변환합니다.
train_df = pd.DataFrame(train_dataset)
test_df = pd.DataFrame(test_dataset)

# 2번에서 train : validation : test 데이터셋을 80 : 10 : 10으로 분할하기 위해, 전체 데이터셋으로 합칩니다.
df = pd.concat((train_df, test_df)).reset_index(drop=True)

# 5개의 텍스트를 출력합니다.
df.head()

Unnamed: 0,text,label
0,I rented I AM CURIOUS-YELLOW from my video sto...,0
1,"""I Am Curious: Yellow"" is a risible and preten...",0
2,If only to avoid making this type of film in t...,0
3,This film was probably inspired by Godard's Ma...,0
4,"Oh, brother...after hearing about this ridicul...",0


## **2. 데이터셋 분할**

In [5]:
from sklearn.model_selection import train_test_split

X = df.text
y = df.label

X_train, X_temp, y_train, y_temp = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
    shuffle=True,
    stratify=y
)

X_val, X_test, y_val, y_test = train_test_split(
    X_temp,
    y_temp,
    test_size=0.2,
    random_state=42,
    shuffle=True,
    stratify=y_temp
)

## **3. 데이터셋 전처리**

In [6]:
import re

def clean_text(text):
    # 텍스트를 소문자로 변환합니다.
    text = text.lower()

    # 소문자와 공백을 제외한 문자를 제거합니다.
    text = re.sub(r"^[a-z\s]", "", text)

X_train_clean = X_train.apply(clean_text)
X_val_clean = X_val.apply(clean_text)
X_test_clean = X_test.apply(clean_text)

## **4. 토크나이저 불러오기**

In [12]:
from transformers import AutoTokenizer

# 모델은 아래 모델을 사용하시면 됩니다!
# google/bert_uncased_L-4_H-256_A-4
# albert-base-v2

model_name = "albert-base-v2"

tokenizer = AutoTokenizer.from_pretrained(model_name)

# 토큰화 함수를 만듭니다.
def tokenize_function(sentences):
    return tokenizer(
        sentences["text"],
        truncation=True,       # 문장이 max_length보다 길면 자릅니다.
        padding="max_length",  # 문장이 짧으면 0(pad)으로 채워 길이를 맞춥니다.
        max_length=512
    )

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


## **5. 학습**

### **5-1. TAPT**
* MLM(Masked Language Model) 기반 TAPT(Tasked Adaptive Pretraining)를 적용하는 단계입니다.
* 이 단계에서는 IMDB 데이터셋의 도메인을 미리 학습시켜, 본 학습(Fine-tuning)의 효율을 높입니다.

In [None]:
# Masked LM 모델을 불러옵니다.
from transformers import AutoModelForMaskedLM

# GPU를 사용할 수 있는 환경이면 GPU를 사용해서 모델을 학습시킵니다.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 사전 학습된 모델을 가져옵니다.
model = AutoModelForMaskedLM.from_pretrained(model_name).to(device)

In [None]:
from datasets import Dataset

# [TODO 1-1] X_train_clean과 X_val_clean을 concat해서 TAPT 학습용 데이터셋을 만듭니다.
tapt_df = None

# [TODO 1-2] Hugging Face Dataset 포맷으로 변환합니다.
tapt_dataset = None

# [TODO 1-3] tapt dataset에 대하여 토큰화를 수행합니다.
tokenized_tapt_dataset = None


In [None]:
from transformers import DataCollatorForLanguageModeling

# 데이터셋의 15%를 마스킹해줍니다.
data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=True,
    mlm_probability=0.15
)

In [None]:
from transformers import TrainingArguments, Trainer

# TAPT 학습을 위한 기본 설정입니다.
training_args = TrainingArguments(
    output_dir="./tapt_output",      # 결과물이 지정될 경로를 지정합니다.
    overwrite_output_dir=True,       # 덮어쓰기를 허용합니다.
    num_train_epochs=3,              # TAPT는 오래 학습하지 않아도 됩니다.
    per_device_train_batch_size=64,  # OOM(Out-Of-Memory)가 발생하면, 배치 사이즈를 줄여주세요!
    learning_rate=2e-5,              # # 사전 학습된 모델을 망가뜨리지 않도록 작은 학습률을 사용합니다.
    logging_steps=100,               # 중간중간 학습 로그를 보여줍니다.
    save_strategy="epoch",           # 매 에폭마다 저장합니다.
    save_total_limit=1,              # 가장 최신 모델 1개만 남기고 이전 체크포인트를 삭제합니다.
    fp16=True,                       # GPU 메모리를 절약합니다.
)

# Trainer를 정의합니다.
trainer = Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=tokenized_tapt_dataset
)

# TAPT 학습을 시작합니다.
trainer.train()

# 학습된 모델을 저장합니다.
trainer.save_model("./tapt_output")

# 토크나이저도 같은 폴더에 저장합니다.
tokenizer.save_pretrained("./tapt_output")

### **5-2. Optuna로 최적의 하이퍼파리미터 탐색**

In [None]:
# [TODO 2-1] X_train_clean, X_val_clean, y_train, y_val을 사용하여 데이터프레임을 만듭니다.
train_df = None
val_df = None

# [TODO 2-2] Hugging Face Dataset 포맷으로 변환합니다.
train_dataset = None
val_dataset = None

# [TODO 2-3] 각 dataset에 대하여 토큰화를 수행합니다.
tokenized_train_dataset = None
tokenized_val_dataset = None

In [9]:
from transformers import AutoModelForSequenceClassification

TAPT_MODEL_PATH = "./tapt_output"

# 모델 초기화 합수를 정의합니다.
def model_init():
    return AutoModelForSequenceClassification.from_pretrained(
        TAPT_MODEL_PATH,
        num_labels=2
    ).to(device)

In [None]:
from sklearn.metrics import accuracy_score

# [TODO 3] 평가 지표를 정의합니다.
def compute_metrics(p):
    # p는 (모델이 예측한 점수들, 실제 정답)이 묶여 있는 튜플입니다.
    """
    predictions = [
      [2.5, -1.2],  # 첫 번째 문장: 0번(부정)일 점수가 높음
      [-0.5, 3.1],  # 두 번째 문장: 1번(긍정)일 점수가 높음
      [0.1, 0.2]    # 세 번째 문장: 1번(긍정)일 점수가 아주 조금 더 높음
    ]

    labels = [0, 1, 0]  # 실제 정답: 부정, 긍정, 부정
    """
    predictions, labels = p

    # [TODO 3-1] 예측한 값 중에 가장 큰 값을 선택합니다.
    # 힌트: np.argmax()를 사용하세요.
    preds = None

    # [TODO 3-2] labels, preds를 사용하여 정확도를 계산합니다.
    # 힌트: accuracy_score()를 사용하세요.
    accuracy = None

    return {f"accuracy: {accuracy: .4f}"}

In [None]:
# 파인튜닝을 위한 기본 설정입니다.
finetune_args = TrainingArguments(
    output_dir="./finetune_output",  # 결과물이 지정될 경로를 지정합니다.
    overwrite_output_dir=True,       # 덮어쓰기를 허용합니다.
    num_train_epochs=3,              # 파인튜닝은 이미 학습되어 있는 모델을 다듬는 과정이라 3~5 에폭이면 충분합니다.
    per_device_train_batch_size=64,  # OOM(Out-Of-Memory)가 발생하면, 배치 사이즈를 줄여주세요!
    learning_rate=2e-5,              # 사전 학습된 모델을 망가뜨리지 않도록 작은 학습률을 사용합니다.
    logging_steps=100,               # 중간중간 학습 로그를 보여줍니다.
    save_strategy="epoch",           # 매 에폭마다 저장합니다.
    save_total_limit=1,              # 가장 최신 모델 1개만 남기고 이전 체크포인트를 삭제합니다.
    fp16=True,                       # GPU 메모리를 절약합니다.
    evaluation_strategy="epoch",     # 에폭 단위로 평가를 진행합니다.
    load_best_model_at_end=True,     # 끝나면 가장 좋았던 모델을 선택합니다.
    metric_for_best_model="accuracy" # 정확도를 평가 지표로 사용합니다.
)

# Trainer를 정의합니다.
trainer = Trainer(
    model=None,
    model_init=model_init,
    args=finetune_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_val_dataset,
    compute_metrics=compute_metrics
)

SyntaxError: invalid syntax. Perhaps you forgot a comma? (507906089.py, line 22)

In [None]:
# Optuna가 탐색할 범위를 지정합니다.
def hp_space(trial):
    return {
        "learning_rate": trial.suggest_float("learning_rate", 1e-5, 5e-5, log=True),  # learning_rate 탐색 범위를 바꿔보세요!
        "num_train_epochs": trial.suggest_int("num_train_epochs", 2, 4)               # epochs 탐색 범위도 바꿔가며 실험해보세요!
    }

In [None]:
best_run = trainer.hyperparameter_search(
    direction="maximize",  # accuracy가 최대화되도록 튜닝합니다.
    hp_space=hp_space,     # 위에서 정해둔 하이퍼파라미터 탐색 함수입니다.
    backend="optuna",
    n_trials=10            # 실험 횟수입니다.
)

print(best_run)

### **5-3. 최적의 하이퍼파라미터로 최종 학습**

In [None]:
# 찾은 최적의 파라미터를 Trainer에 적용합니다.
for name, value in best_run.hyperparameters.items():
    """
    name: 하이퍼파라미터 종류
    value: 하이퍼파라미터 값
    e.g., name = "learning_rate
    e.g., value = "3e-5
    """
    setattr(trainer.args, name, value)

# 최종 학습을 시작합니다.
trainer.train()

# 검증 지표를 확인합니다.
metrics = trainer.evaluate()
accuracy = metrics["eval_accuracy"]
print(f"검증 정확도: {accuracy: .4f}")

# 최종 학습된 모델을 저장합니다.
trainer.save_model("./final_best_model")

# 토크나이저도 같은 폴더에 저장합니다.
tokenizer.save_pretrained("./final_best_model")

## **6. 테스트**

In [None]:
# 테스트 데이터셋을 만들어줍니다,
test_df = pd.DataFrame({"text": X_test_clean, "label": y_test})
test_dataset = Dataset.from_pandas(test_df)
tokenized_test_dataset = test_dataset.map(tokenize_function, batched=True)

In [None]:
metrics = trainer.evaluate(tokenized_test_dataset)
accuracy = metrics["test_accuracy"]
print(f"최종 테스트 정확도: {accuracy: .4f}")