# 사용자 선호에 맞는 시 창작 모델

### 환경 설정

In [1]:
!python -m pip install --upgrade pip



In [2]:
!pip install typing_extensions pydantic openai



In [3]:
!pip install datasets transformers peft trl bitsandbytes



In [15]:
import os
import torch
import json
import time
import random
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments, DataCollatorForSeq2Seq, BitsAndBytesConfig, GenerationConfig, AutoModelForSequenceClassification
from datasets import Dataset
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from trl import ORPOTrainer, ORPOConfig, AutoModelForCausalLMWithValueHead
from trl.trainer.utils import DPODataCollatorWithPadding
from tqdm import tqdm

In [5]:
os.environ["WANDB_DISABLED"] = "true"           # wandb 비활성화
os.environ["TOKENIZERS_PARALLELISM"] = "false"  # 병렬 토크나이저 경고 방지

device = "gpu" if torch.cuda.is_available() else "cpu"  # GPU 설정 변수
device

'cpu'

---

## Q-LoRA 파인튜닝

In [6]:
# 데이터 로드 및 Dataset 변환
dataset_path = "./korean_poetry_dataset.json"

with open(dataset_path, "r", encoding="utf-8") as f:
    poem_data = json.load(f)

preprocessed_data = [{"topic": item["text"]["topic"], "poem": item["text"]["poem"]} for item in poem_data]

train_dataset = Dataset.from_list(preprocessed_data)

In [7]:
# Tokenizer 로드
model_name = "Bllossom/llama-3.2-Korean-Bllossom-3B"

tokenizer = AutoTokenizer.from_pretrained(model_name)

if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [8]:
# 데이터 전처리 함수 (토큰화 + labels 추가)
def preprocess_text(sample):
    input_texts = [f"주제: {t}\n시: {p}" for t, p in zip(sample["topic"], sample["poem"])]

    model_inputs = tokenizer(
                        input_texts, 
                        padding="max_length",
                        max_length=512,
                        truncation=True
                    )

    model_inputs["labels"] = model_inputs["input_ids"].copy()
    pad_token_id = tokenizer.pad_token_id
    model_inputs["labels"] = [
        [(l if l != pad_token_id else -100) for l in label] for label in model_inputs["labels"]
    ]

    return model_inputs

In [9]:
# 데이터셋 변환
train_dataset = train_dataset.map(
    preprocess_text,
    batched=True,
    remove_columns=["topic", "poem"]
)

Map: 100%|██████████| 2600/2600 [00:00<00:00, 2745.05 examples/s]


In [10]:
# 데이터 콜레이터
data_collator = DataCollatorForSeq2Seq(tokenizer, model=None)

In [11]:
# VRAM 최적화를 위한 4-bit 설정
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4"
)

In [None]:
# 모델 로드
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,
    device_map="auto"
)

In [14]:
# LoRA 설정
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

In [None]:
# 양자화 모델 훈련을 위한 준비
model = prepare_model_for_kbit_training(model)

In [None]:
# LoRA 적용
model = get_peft_model(model, lora_config)
model.print_trainable_parameters()

model.train()   # 모델 학습 모드 설정

In [None]:
training_args = TrainingArguments(
    output_dir="./q_lora_poem",
    eval_strategy="no",
    save_strategy="epoch",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=16,
    learning_rate=2e-4,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=2,
    optim="adamw_bnb_8bit",
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator
)

In [None]:
trainer.train()

---

### 시 생성

In [None]:
# 파인튜닝된 모델로 시 생성
qlora_checkpoint = "./q_lora_poem/checkpoint-243"

model = AutoModelForCausalLM.from_pretrained(qlora_checkpoint)
tokenizer = AutoTokenizer.from_pretrained(model_name)

generate_pipeline = pipeline(
    "text-generation",
    model=model,
    tokenizer=tokenizer,
    pad_token_id=tokenizer.eos_token_id,
    batch_size=2
)

In [None]:
topics = ["바람", "비", "노을", "달빛", "안개", "사랑", "이별", "운명", "기다림", "후회", "추억", "시간", "청춘", "변화", "마지막 순간", "군중", "밤거리", "버스", "인생", "빌딩", "사람들", "거짓말", "욕망", "돈", "권력", "비밀", "죽음", "희망", "동물", "자연", "도시", "바다", "산", "하늘", "별", "꽃", "나무", "강", "바위", "흙", "눈", "빗방울", "눈물", "웃음"]

eval_file = 'rlhf_evaluation_data.json'

try:
    with open(eval_file, "r", encoding="utf-8") as f:
        eval_dataset = json.load(f)
except FileNotFoundError:
    eval_dataset = []

In [None]:
# 시 생성을 위한 변수 설정
num_batches = 5
batch_size = 20
total_samples = num_batches * batch_size 
generated_samples = len(eval_dataset)

In [None]:
# 시 생성 함수
def generate_poem_batch():
    batch_data = []

    with tqdm(total=batch_size, desc="<시 생성 중>", leave=False) as t:
        for _ in range(batch_size):
            topic = random.choice(topics)
            input_text = f"주제: {topic}\n시:"

            start_time = time.time()
            poem = generate_pipeline(
                                        input_text,
                                        max_new_tokens=100,
                                        temperature=0.8,
                                        top_p=0.9
                                    )[0]['generated_text']
            end_time = time.time()

            gen_time = end_time - start_time
            batch_data.append({
                "topic": topic,
                "poem": poem,
                "selected": None
            })

            # tqdm
            t.update(1)

            global generated_samples
            generated_samples += 1
            complete_rate = (generated_samples / total_samples) * 100
            remaining_time = ((total_samples - generated_samples) * gen_time) / 60

            print(f'\n{generated_samples}/{total_samples}개 완료 ({complete_rate:.2f}%)')
            print(f'- 예상 남은 시간 : {remaining_time:.1f}분')
            print('-' * 50)
    
    return batch_data

In [None]:
# 시 저장 및 json 저장
for _ in tqdm(range(num_batches), desc="<전체 진행 상황>", position=0):
    eval_dataset.extend(generate_poem_batch())

    with open(eval_file, 'w', encoding='utf-8') as f:
        json.dump(eval_dataset, f, ensure_ascii=False, indent=4)

---

### Reward model
- 앞서 생성한 시에 대해서 selected=true로 수정해 피드백 수정

In [None]:
# 생성한 시+선호도 파일 로드
with open(eval_file, "r", encoding="utf-8") as f:
    evaluation_data = json.load(f)

reward_data = [
    {'text_a': f'주제: {item["topic"]}', 'text_b': item['poem']}
    for item in evaluation_data if item['selected']
]

reward_dataset = Dataset.from_list(reward_data)

In [None]:
# 데이터 전처리 함수 (배치 데이터 처리)
def preprocess_reward_data(sample):

    model_inputs = tokenizer(
                        sample["text_a"],
                        text_pair=sample["text_b"],
                        padding="max_length",
                        max_length=512,
                        truncation=True
                    )

    model_inputs["labels"] = model_inputs["input_ids"].copy()
    pad_token_id = tokenizer.pad_token_id
    model_inputs["labels"] = [
        [(l if l != pad_token_id else -100) for l in label] for label in model_inputs["labels"]
    ]

    return model_inputs

In [None]:
# 전처리
reward_dataset = reward_dataset.map(
    preprocess_reward_data,
    batched=True,
    remove_columns=["text_a", "text_b"]
)

In [None]:
# VRAM 최적화를 위한 4-bit 설정
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_quant_type="nf4",
    device_map="auto"
)

In [None]:
# model 생성
reward_model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config
)

reward_model = prepare_model_for_kbit_training(reward_model)

In [None]:
# LoRA 설정
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM"
)

In [None]:
# LoRA 적용
reward_model = get_peft_model(reward_model, lora_config)

In [None]:
# Trainer 설정
reward_training_args = TrainingArguments(
    output_dir="./reward_model",
    save_strategy="epoch",
    per_device_train_batch_size=2,
    gradient_accumulation_steps=16,
    learning_rate=2e-4,
    num_train_epochs=3,
    logging_dir="./logs",
    logging_steps=100,
    save_total_limit=2,
    remove_unused_columns=False,
    fp16=True
)

reward_trainer = Trainer(
    model=reward_model,
    args=reward_training_args,
    train_dataset=reward_dataset,
    tokenizer=tokenizer
)

In [None]:
reward_trainer.train()

---

### RLHF (ORPO)