<a href="https://colab.research.google.com/github/dakyommii/MinhwaProject/blob/main/minhwa-explanation/train/kogpt2-train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# KoGPT2

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Library

In [None]:
!pip install transformers torch

In [None]:
!pip install pandas

## dataset

In [None]:
import pandas as pd
import csv

# CSV 파일 로드
csv_file_path = "/content/drive/MyDrive/seol/datasets/kor_data-fin.csv"
df = pd.read_csv(csv_file_path)

# 텍스트 데이터 추출
keyword_col = df['keyword']
exp_col = df['explanation']
# 다른 열도 필요한 만큼 추가

# 추출된 텍스트 데이터 리스트를 리스트로 변환
kwd_lst = keyword_col.tolist()
exp_lst=exp_col.tolist()

# 파인튜닝에 사용할 데이터 생성
train_texts = []
for i in range(len(kwd_lst)):
    combined_text = f"{kwd_lst[i]} | {exp_lst[i]}"  # 필요에 따라 조합
    train_texts.append(combined_text)

## fine-tuning

In [None]:
from typing import Optional
import torch
import transformers
from transformers import AutoModelWithLMHead, PreTrainedTokenizerFast, GPT2LMHeadModel, GPT2Tokenizer, AdamW, get_linear_schedule_with_warmup
from fastai.text.all import *
import fastai
import re
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict

### Fine-tuned 모델 로드

In [None]:
# from transformers import GPT2LMHeadModel, PreTrainedTokenizerFast

# KoGPT2 모델 불러오기
model = GPT2LMHeadModel.from_pretrained("skt/kogpt2-base-v2")

# KoGPT2 토크나이저 불러오기
tokenizer = PreTrainedTokenizerFast.from_pretrained("skt/kogpt2-base-v2",
                                                    bos_token='</s>',
                                                    eos_token='</s>',
                                                    unk_token='<unk>',
                                                    pad_token='<pad>',
                                                    mask_token='<mask>')


### 데이터셋 로드 & Fine-tuning

In [None]:
# from transformers import GPT2LMHeadModel, GPT2Tokenizer, AdamW, get_linear_schedule_with_warmup
# from torch.utils.data import Dataset, DataLoader

# 데이터셋 클래스 정의
class MyDataset(Dataset):
    def __init__(self, tokenizer, text_list, max_length=512):
        self.tokenizer = tokenizer
        self.text_list = text_list
        self.max_length = max_length

    def __len__(self):
        return len(self.text_list)

    def __getitem__(self, idx):
        text = self.text_list[idx]
        encoding = self.tokenizer(
            text,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )
        inputs = {
            'input_ids': encoding['input_ids'],
            'attention_mask': encoding['attention_mask']  # attention_mask 추가
        }
        return inputs


# GPU 사용 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 모델과 토크나이저 로드
model.to(device)

# 파인튜닝을 위한 데이터 준비
train_dataset = MyDataset(tokenizer, train_texts)
train_dataloader = DataLoader(train_dataset, batch_size=4, shuffle=True)

# 옵티마이저 및 스케줄러 설정
num_epochs = 300
optimizer = AdamW(model.parameters(), lr=1e-4)
scheduler = get_linear_schedule_with_warmup(
    optimizer, num_warmup_steps=100, num_training_steps=len(train_dataloader) * num_epochs
)

# 초기 손실을 큰 값으로 설정
best_loss = float("inf")

# 파인튜닝 하기
for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    for batch in train_dataloader:
      inputs = {
          'input_ids': batch['input_ids'].to(device),
          'attention_mask': batch['attention_mask'].to(device)
      }
      outputs = model(**inputs, labels=inputs['input_ids'])  # labels도 input_ids와 동일하게 설정
      loss = outputs.loss
      loss.backward()
      optimizer.step()
      scheduler.step()
      optimizer.zero_grad()
      total_loss += loss.item()

    # 평균 손실 계산
    average_loss = total_loss / len(train_dataloader)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {average_loss}")

    # 현재 모델의 손실이 이전 모델보다 낮으면 모델 저장
    if average_loss < best_loss:
        best_loss = average_loss
        model.save_pretrained("/content/drive/MyDrive/seol/ckpt/kogpt2-test")
        tokenizer.save_pretrained("/content/drive/MyDrive/seol/ckpt/kogpt2-test")


### 텍스트 생성 테스트

In [None]:
tokenizer = PreTrainedTokenizerFast.from_pretrained("/content/drive/MyDrive/seol/ckpt/kogpt2-fin",
  bos_token='</s>', eos_token='</s>', unk_token='<unk>',
  pad_token='<pad>', mask_token='<mask>') # 토크나이저

model = AutoModelWithLMHead.from_pretrained("/content/drive/MyDrive/seol/ckpt/kogpt2-fin")  # 모델 불러오기

prompt = "호랑이, 까치" ## 입력 텍스트
input_ids = tokenizer.encode(prompt, return_tensors="pt")

# 텍스트 생성
output = model.generate(input_ids, max_length=100, num_return_sequences=1, no_repeat_ngram_size=2, top_k=50, top_p=0.95)

# 생성된 텍스트 출력
generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
print(generated_text)


### 후처리

In [None]:
def remove_after_char(s, char):
    # 문자열을 특정 문자 기준으로 분할
    parts = s.split(char)
    # 첫 번째 부분만 반환
    return parts[0]

In [None]:
val=generated_text
val_new = remove_after_char(val, '”')
generated_text=val_new

In [None]:
kwd=prompt.split(',')

In [None]:
# defaultdict를 int로 초기화합니다. 새 키에 대한 기본값은 0이 됩니다.
key_map = defaultdict(int)

# 각 요소에 대해 카운트를 증가시킵니다.
for i in range(0,len(kwd)):

  if i>0: k=kwd[i][1:]
  else: k=kwd[i]

  kr=k.split()[-1]
  key_map[kr] += 1

In [None]:
# 민화 장르 분류
gen=''

# 모란 개수
mc = key_map['모란']

if(key_map['호랑이']>0): gen='호작도'
elif(key_map['학']>0): gen='송학도'
elif(key_map['봉황']>0): gen='봉황도'
elif(key_map['연꽃']>0):
  # 모란 없음
  if(mc==0 or mc<key_map['연꽃']): gen='연화도'
  # 모란 있음 -> 개수 비교
  else: gen='모란도'
elif(mc>0): gen='모란도'
else: gen='화조도'

In [None]:
rep=gen+"입니다"
val=generated_text
val_new = val.replace('모란도입니다', rep) # 바꿀 텍스트와 대체할 텍스트
generated_text=val_new

val=generated_text
val_new = val.replace('호작도입니다', rep) # 바꿀 텍스트와 대체할 텍스트
generated_text=val_new

val=generated_text
val_new = val.replace('송학도입니다', rep) # 바꿀 텍스트와 대체할 텍스트
generated_text=val_new

val=generated_text
val_new = val.replace('봉황도입니다', rep) # 바꿀 텍스트와 대체할 텍스트
generated_text=val_new

val=generated_text
val_new = val.replace('연화도입니다', rep) # 바꿀 텍스트와 대체할 텍스트
generated_text=val_new

val=generated_text
val_new = val.replace('화조도입니다', rep) # 바꿀 텍스트와 대체할 텍스트
generated_text=val_new

In [None]:
generated_text