### 참고 링크
- https://wikidocs.net/157896
-

In [1]:
# 패키지 설치
!pip install transformers torch sentencepiece



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
import torch
import transformers
from transformers import AutoModelWithLMHead, PreTrainedTokenizerFast # KoGPT2에 맞춰 사전 훈련된 토크나이저
from fastai.text.all import *
import fastai
import re

print(torch.__version__)
print(transformers.__version__)
print( fastai.__version__)

2.5.1+cu121
4.47.1
2.7.18


### **1. 데이터 전처리**

- 필수 및 선택 필드 정의
- 선택 필드의 결측치는 ```<empty>```로 채움

In [4]:
# 데이터 로드 및 전처리

import pandas as pd
import random

# 필수 및 선택 필드 정의
required_fields = ['caption', 'name', 'i_action', 'classification']
optional_fields = [
    'character', 'setting', 'action', 'feeling',
    'causalRelationship', 'outcomeResolution', 'prediction'
]

def preprocess_data(file_path):
    # 데이터 로드
    df = pd.read_csv(file_path, encoding='cp949')

    # 결측치 처리 (선택 필드는 <empty>)
    df.fillna('<empty>', inplace=True)

    # 입력 텍스트 생성 함수
    def generate_input_text(row):
        # 필수 필드 검증 (결측값 확인)
        for field in required_fields:
            assert pd.notna(row[field]) and row[field].strip(), f"Error: 필수 필드 {field}가 비어 있습니다."

        # 필수 필드 설정
        input_tokens = [
            f"[caption] {row['caption']}",
            f"[name] {row['name']}",
            f"[i_action] {row['i_action']}",
            f"[classification] {row['classification']}"
        ]

        # 선택 필드 처리 (값이 없으면 <empty>)
        for field in optional_fields:
            token_value = row[field] if pd.notna(row[field]) and row[field].strip() else "<empty>"
            input_tokens.append(f"[{field}] {token_value}")

        # 행별 랜덤 시드 설정 (일관성 유지)
        unique_id = row["id"] if "id" in row else row.name  # id가 없으면 인덱스 사용
        random.seed(unique_id)

        # 선택 필드 랜덤 섞기
        random.shuffle(optional_fields)



        # 최종 텍스트 구성
        return " ".join(input_tokens + optional_fields)

    # input_text 및 target_text 컬럼 생성
    df['input_text'] = df.apply(generate_input_text, axis=1)
    df['target_text'] = df['srcText']

    return df

# 파일 경로 설정
train_file_path = '/content/drive/MyDrive/7th-project/data/train_sample.csv'
val_file_path = '/content/drive/MyDrive/7th-project/data/val_sample.csv'

# 전처리 적용
df_train = preprocess_data(train_file_path)
df_val = preprocess_data(val_file_path)

# 데이터 확인
print(df_train[['input_text', 'target_text']].head())
print(df_val[['input_text', 'target_text']].head())

                                                                                                                                                                                                                                                                                                                          input_text  \
0             [caption] 숲속의 나비 [name] 나비 [i_action] 날다 [classification] 자연탐구 [character] 아름다운 나비 [setting] 숲속 [action] 춤추다 [feeling] 기쁨 [causalRelationship] 숲속의 바람이 나비를 춤추게 했다. [outcomeResolution] 숲속의 나비는 행복했다. [prediction] 나비는 또 다른 꽃을 찾을 것이다. feeling prediction outcomeResolution action character causalRelationship setting   
1                            [caption] 바다의 고래 [name] 고래 [i_action] 유영하다 [classification] 자연탐구 [feeling] 평화 [prediction] <empty> [outcomeResolution] 고래는 바다를 사랑하게 되었다. [action] <empty> [character] 거대한 고래 [causalRelationship] <empty> [setting] 바다 outcomeResolution action prediction character causalRelationship feeling setting   
2               

### **2. KoGPT2 모델 준비**
- **토크나이저 준비 및 특수 토큰 추가**: 모델이 데이터에서 사용되는 태그([caption], [name] 등)를 인식하도록 특수 토큰을 추가

- KoGPT2 모델을 로드하고, 새롭게 추가된 토큰을 반영하도록 임베딩 크기를 조정

In [5]:
from transformers import GPT2LMHeadModel, PreTrainedTokenizerFast

# KoGPT2 토크나이저 불러오기
tokenizer = PreTrainedTokenizerFast.from_pretrained(
    "skt/kogpt2-base-v2",
    bos_token='</s>', eos_token='</s>', unk_token='<unk>',
    pad_token='<pad>', mask_token='<mask>'
)

# 사용자 정의 special tokens
special_tokens_dict = {
    'additional_special_tokens': [
        "[caption]", "[name]", "[i_action]", "[classification]",
        "[character]", "[setting]", "[action]", "[feeling]",
        "[causalRelationship]", "[outcomeResolution]", "[prediction]",
        "<empty>"
    ]
}

# 토크나이저에 특수 토큰 추가
tokenizer.add_special_tokens(special_tokens_dict)

# 확인: 특수 토큰이 정상적으로 추가되었는지 확인
print("Special tokens added:", tokenizer.special_tokens_map)
print("Vocabulary size:", len(tokenizer))

# 모델 로드 (사전 학습된 모델)
model = GPT2LMHeadModel.from_pretrained("skt/kogpt2-base-v2")

# 모델의 임베딩 크기를 조정 (새로운 토큰 개수 추가)
model.resize_token_embeddings(len(tokenizer))

# GPU 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

print("Model and tokenizer loaded successfully.")

# 토크나이저 저장
#tokenizer.save_pretrained("./fine_tuned_kogpt2_tokenizer")


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer.json:   0%|          | 0.00/2.83M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.00k [00:00<?, ?B/s]

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'PreTrainedTokenizerFast'.


Special tokens added: {'bos_token': '</s>', 'eos_token': '</s>', 'unk_token': '<unk>', 'pad_token': '<pad>', 'mask_token': '<mask>', 'additional_special_tokens': ['[caption]', '[name]', '[i_action]', '[classification]', '[character]', '[setting]', '[action]', '[feeling]', '[causalRelationship]', '[outcomeResolution]', '[prediction]', '<empty>']}
Vocabulary size: 51212


pytorch_model.bin:   0%|          | 0.00/513M [00:00<?, ?B/s]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


Model and tokenizer loaded successfully.


### **3. 데이터셋 및 데이터로더 구성**

- PyTorch ```Dataset```을 활용하여 input_text와 target_text를 각각 토큰화하고, 데이터로더(DataLoader)를 생성

- 데이터의 패딩, 길이 조정 및 PyTorch 텐서로 변환

In [6]:
# 데이터셋 클래스 정의
from torch.utils.data import DataLoader, Dataset

class StoryDataset(Dataset):
    def __init__(self, dataframe, tokenizer, max_length=512):
        self.data = dataframe
        self.tokenizer = tokenizer
        self.max_length = max_length

        if 'input_text' not in self.data.columns or 'target_text' not in self.data.columns:
            raise KeyError("데이터셋에 'input_text' 또는 'target_text' 컬럼이 없습니다.")

    def __len__(self):
        return len(self.data)  # 데이터 크기 반환

    def __getitem__(self, index):
        row = self.data.iloc[index]

        input_text = row['input_text']
        target_text = row['target_text']

        # 입력 텍스트 및 타겟 텍스트를 별도로 토큰화
        encoding = self.tokenizer(
            input_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        target_encoding = self.tokenizer(
            target_text,
            max_length=self.max_length,
            padding="max_length",
            truncation=True,
            return_tensors="pt"
        )

        input_ids = encoding["input_ids"].squeeze(0)
        attention_mask = encoding["attention_mask"].squeeze(0)
        labels = target_encoding["input_ids"].squeeze(0)

        return {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "labels": labels
        }


In [7]:
# 데이터셋 및 데이터로더 생성
train_dataset = StoryDataset(df_train, tokenizer, max_length=256)
val_dataset = StoryDataset(df_val, tokenizer, max_length=256)

# DataLoader 설정
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=8)

# 데이터 확인
for batch in train_dataloader:
    print(batch["input_ids"].shape)
    break


torch.Size([8, 256])


### **4. 모델 학습**

- AdamW 옵티마이저와 크로스 엔트로피 손실 함수를 사용하여 모델 학습

In [8]:
from transformers import AdamW

# GPU 설정 (GPU 없으 CPU 사용)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# 옵티마이저 설정 (AdamW 사용)
optimizer = AdamW(model.parameters(), lr=5e-5)

# 손실 함수
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)

# 학습 관련 파라미터 설정
epochs = 20
train_losses = []
val_losses = []

# Early Stopping 설정
early_stopping_patience = 3  # 검증 손실 개선 없을 시 조기 종료 기준
best_val_loss = float('inf')
patience_counter = 0


for epoch in range(epochs):
    model.train()  # 모델을 학습 모드로 설정
    total_loss = 0

    for batch in train_dataloader:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()  # 기존의 gradient 초기화

        outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
        loss = outputs.loss
        loss.backward()
        optimizer.step()  # 가중치 업데이트

        total_loss += loss.item()

    avg_train_loss = total_loss / len(train_dataloader)
    train_losses.append(avg_train_loss)
    print(f"Epoch {epoch + 1}, Training Loss: {avg_train_loss:.4f}")

    # 검증 단계
    model.eval()
    val_loss = 0

    with torch.no_grad():  # 검증 단계에서는 gradient 계산을 하지 않음
        for batch in val_dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            val_loss += loss.item()

    avg_val_loss = val_loss / len(val_dataloader)
    val_losses.append(avg_val_loss)
    print(f"Epoch {epoch + 1}, Validation Loss: {avg_val_loss:.4f}")

    # Early Stopping 체크
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        patience_counter = 0  # 개선 시 patience 초기화
        # 최적 모델 저장
        model.save_pretrained("./fine_tuned_kogpt2_best")
        tokenizer.save_pretrained("./fine_tuned_kogpt2_best")
    else:
        patience_counter += 1

    if patience_counter >= early_stopping_patience:
        print("Early stopping triggered")
        break




Epoch 1, Training Loss: 14.3567
Epoch 1, Validation Loss: 6.1436
Epoch 2, Training Loss: 3.6549
Epoch 2, Validation Loss: 0.4046
Epoch 3, Training Loss: 0.4233
Epoch 3, Validation Loss: 0.4814
Epoch 4, Training Loss: 0.5073
Epoch 4, Validation Loss: 0.4994
Epoch 5, Training Loss: 0.4720
Epoch 5, Validation Loss: 0.4626
Early stopping triggered


In [9]:
for batch in train_dataloader:
    print(tokenizer.decode(batch["input_ids"][0], skip_special_tokens=False))
    print(tokenizer.decode(batch["labels"][0], skip_special_tokens=False))
    break

[caption] 밤하늘의 별 [name] 별 [i_action] 반짝이다 [classification] 예술경험 [outcomeResolution] 별은 밤하늘에서 더 반짝이게 되었다. [prediction] <empty> [character] 반짝이는 별 [setting] <empty> [feeling] 감탄 [causalRelationship] <empty> [action] 빛나다 causalRelationship feeling action setting outcomeResolution character prediction<pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><pad><p

In [None]:
'''
# 모델 저장
output_dir = "./fine_tuned_kogpt2_best"
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)
print(f"Model saved to {output_dir}")
'''

In [10]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)  # 모델을 디바이스로 이동

GPT2LMHeadModel(
  (transformer): GPT2Model(
    (wte): Embedding(51212, 768)
    (wpe): Embedding(1024, 768)
    (drop): Dropout(p=0.1, inplace=False)
    (h): ModuleList(
      (0-11): 12 x GPT2Block(
        (ln_1): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (attn): GPT2SdpaAttention(
          (c_attn): Conv1D(nf=2304, nx=768)
          (c_proj): Conv1D(nf=768, nx=768)
          (attn_dropout): Dropout(p=0.1, inplace=False)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
        (ln_2): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
        (mlp): GPT2MLP(
          (c_fc): Conv1D(nf=3072, nx=768)
          (c_proj): Conv1D(nf=768, nx=3072)
          (act): NewGELUActivation()
          (dropout): Dropout(p=0.1, inplace=False)
        )
      )
    )
    (ln_f): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
  )
  (lm_head): Linear(in_features=768, out_features=51212, bias=False)
)

In [11]:
# GPU가 있으면 GPU로 이동, 없으면 CPU 사용
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GPT2LMHeadModel.from_pretrained("./fine_tuned_kogpt2_best").to(device)

def generate_story(input_text, model, tokenizer, max_length=100):
    model.eval()
    input_ids = tokenizer.encode(input_text, return_tensors="pt").to(device)

    with torch.no_grad():
        output = model.generate(
        input_ids.to(device),  # 모델과 동일한 디바이스로 이동
        max_length=max_length,  # 최대 길이 설정
        pad_token_id=tokenizer.pad_token_id,  # 패딩 토큰 설정
        eos_token_id=tokenizer.eos_token_id,  # 종료 토큰 설정
        bos_token_id=tokenizer.bos_token_id,  # 시작 토큰 설정
        repetition_penalty=1.2,  # 반복 패널티 적용 (1.0보다 큰 값으로 설정)
        temperature=0.7,  # 생성의 무작위성 조절 (0.7~1.0 추천)
        top_k=50,  # 확률이 높은 K개의 단어 중 샘플링
        top_p=0.9,  # 누적 확률이 높은 단어만 선택
        do_sample=True  # 샘플링 활성화
)

    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
    return generated_text

test_text = "[caption] 숲속의 나비 [name] 나비 [i_action] 날다 [classification] 자연탐구"
generated_story = generate_story(test_text, model, tokenizer)
print("Generated Story:", generated_story)


Generated Story: 숲속의 나비  나비  날다  자연탐구


In [14]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# BLEU 점수 계산 함수
def calculate_bleu_score(reference_texts, generated_texts):
    bleu_scores = []
    smoothie = SmoothingFunction().method4  # 부드러운 점수 계산 적용

    for ref, gen in zip(reference_texts, generated_texts):
        ref_tokens = ref.split()
        gen_tokens = gen.split()
        score = sentence_bleu([ref_tokens], gen_tokens, smoothing_function=smoothie)
        bleu_scores.append(score)

    return sum(bleu_scores) / len(bleu_scores)

# 평가 샘플 테스트
reference_texts = df_val['target_text'].tolist()
generated_texts = [generate_story(text, model, tokenizer) for text in df_val['input_text'].tolist()]

bleu_score = calculate_bleu_score(reference_texts, generated_texts)
print(f"BLEU Score: {bleu_score:.4f}")


BLEU Score: 0.0216


In [15]:
!pip install rouge-score

Collecting rouge-score
  Downloading rouge_score-0.1.2.tar.gz (17 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: rouge-score
  Building wheel for rouge-score (setup.py) ... [?25l[?25hdone
  Created wheel for rouge-score: filename=rouge_score-0.1.2-py3-none-any.whl size=24935 sha256=5425c7dd1e066fd004694f0ea536caef42b16c443b9b3df83dc7c9d8b09fd968
  Stored in directory: /root/.cache/pip/wheels/1e/19/43/8a442dc83660ca25e163e1bd1f89919284ab0d0c1475475148
Successfully built rouge-score
Installing collected packages: rouge-score
Successfully installed rouge-score-0.1.2


In [16]:
from rouge_score import rouge_scorer

# ROUGE 점수 계산 함수
def calculate_rouge_scores(reference_texts, generated_texts):
    # 올바른 ROUGE 타입 설정 (하이픈 제거)
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
    rouge_1, rouge_2, rouge_l = 0, 0, 0

    for ref, gen in zip(reference_texts, generated_texts):
        scores = scorer.score(ref, gen)
        rouge_1 += scores['rouge1'].fmeasure
        rouge_2 += scores['rouge2'].fmeasure
        rouge_l += scores['rougeL'].fmeasure

    n = len(reference_texts)
    return {
        "ROUGE-1": rouge_1 / n,
        "ROUGE-2": rouge_2 / n,
        "ROUGE-L": rouge_l / n
    }

# ROUGE 평가 실행
reference_texts = df_val['target_text'].tolist()
generated_texts = [generate_story(text, model, tokenizer) for text in df_val['input_text'].tolist()]

rouge_scores = calculate_rouge_scores(reference_texts, generated_texts)
print(f"ROUGE Scores: {rouge_scores}")


ROUGE Scores: {'ROUGE-1': 0.0, 'ROUGE-2': 0.0, 'ROUGE-L': 0.0}


In [17]:
from transformers import GPT2LMHeadModel, PreTrainedTokenizerFast

# 모델 로드
model_path = "./fine_tuned_kogpt2_best"
loaded_model = GPT2LMHeadModel.from_pretrained(model_path)
loaded_tokenizer = PreTrainedTokenizerFast.from_pretrained(model_path)

# 모델의 임베딩 크기 확인
print("Vocab Size (Model):", loaded_model.config.vocab_size)
print("Vocab Size (Tokenizer):", len(loaded_tokenizer))

# 출력된 모델과 토크나이저의 vocab size 일치 확인

Vocab Size (Model): 51212
Vocab Size (Tokenizer): 51212


In [13]:
# 토크나이저 테스트
encoded = tokenizer.encode(test_text, return_tensors="pt")
print("Encoded input:", encoded)
decoded = tokenizer.decode(encoded[0])
print("Decoded input:", decoded)

# 토크나이저가 데이터를 정확히 인코딩 및 디코딩하는 것 확인

Encoded input: tensor([[51200, 47498, 19842,   739, 51201, 19842,   739, 51202,  9673,  7182,
           739, 51203,  9632, 32601]])
Decoded input: [caption] 숲속의 나비 [name] 나비 [i_action] 날다 [classification] 자연탐구


### **5. 문제점**
- 입력 그대로 출력됨...
  - 너무 적은 데이터로 학습시켜서?
  - 하이퍼파라미터 조정?
  - 특수 토큰을 단순한 텍스트로 학습할 가능성

In [None]:
# 이전 결과 삭제
'''
import os
import shutil

shutil.rmtree("./results", ignore_errors=True)
'''

In [None]:
'''
import shutil
import os

# 기존 모델 폴더 삭제
model_dir = "./fine_tuned_kogpt2_best"
if os.path.exists(model_dir):
    shutil.rmtree(model_dir)
    print(f"Deleted existing directory: {model_dir}")
else:
    print(f"No directory found at: {model_dir}")
'''
