# 기본적인 세팅

1. gemma3:1b 허깅페이스에서 라이선스 동의를 받아놓아야 함

2. 허깅페이스 api key를 아래에 입력해야 함

3. corpus.json을 업로드 해놓아야 함

4. A100

In [None]:
!pip install trl huggingface_hub loguru -q
!huggingface-cli login

In [None]:
import json
import torch
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments
)
from trl import SFTTrainer, SFTConfig
from loguru import logger
import shutil
import sys
import os
import itertools
import platform

logger.remove()
logger.add(
    sys.stdout,
    level="INFO",
    colorize=True,
    format="<green>{time:HH:mm:ss}</green> | <level>{level: <5}</level> | {message}"
)

# 버전 로깅
logger.info(f"python version       : {platform.python_version()}")
logger.info(f"torch version        : {torch.__version__}")
logger.info(f"transformers version : {__import__('transformers').__version__}")
logger.info(f"datasets version     : {__import__('datasets').__version__}")
logger.info(f"trl version          : {__import__('trl').__version__}")

# 데이터 로드
def load_raw_data(path="/content/corpus.json"):
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

# 데이터 전처리
def make_sft_data(raw_data):
    result = []
    for item in raw_data:
        instruction = item['instruction']
        keywords = item['input']
        prompt = f"{instruction}: {', '.join(keywords)}"
        chosen = item['chosen']
        result.append({
            'input': prompt,
            'target': chosen
        })
    return result

# 전처리 함수
def preprocess(example):
    input_enc = tokenizer(example["input"], truncation=True, max_length=192)
    target_enc = tokenizer(example["target"], truncation=True, max_length=192)

    return {
        "input_ids": input_enc["input_ids"],
        "attention_mask": input_enc["attention_mask"],
        "labels": target_enc["input_ids"]
    }

def test(prompt, model, tokenizer, necessary_word):
    model = model.to("cuda")

    # 토크나이즈
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    # 생성
    with torch.no_grad():
        output = model.generate(
            **inputs,
            max_new_tokens=256,
            do_sample=True,
            top_k=50,
            top_p=0.95,
            temperature=0.7,
            pad_token_id=tokenizer.eos_token_id
        )

    # 디코딩
    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
    logger.info("="*100)
    logger.info(necessary_word + "\n" + prompt)
    logger.info(necessary_word + "\n" + generated_text)
    logger.info("="*100)
    return generated_text

In [None]:
# 훈련 파라미터
num_epochs = 5
batch_size = 8
save_total_limit = 2
output_dir = "./outputs/sft"
logging_dir = "./outputs/logs"
sft_dir = "./outputs/best_sft"

# 모델 및 토크나이저 설정
model_name = "meta-llama/Llama-3.2-1B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(model_name)

In [None]:
# 문학적 instruction 목록
instructions = [
    "제공된 단어로 문학적인 어조로 짧은 장면을 창작해주세요.",
    "아래 단어들을 활용해 상징과 감정이 담긴 문학적 단편을 작성해주세요.",
    "다음 키워드를 사용해 감성적이고 은유적인 이야기를 작성해주세요.",
    "아래 키워드를 사용해 비유와 상징이 녹아든 문학적 장면을 묘사해주세요."
]

# 키워드 목록
keywords_list = [
    ["밥", "숟가락", "그릇"],
    ["창문", "바람", "햇살"],
    ["우산", "비", "골목"],
    ["신발", "거리", "그림자"],
    ["책상", "연필", "종이"],
    ["시계", "벽", "침묵"],
    ["의자", "창가", "오후"],
    ["커피", "잔", "향기"],
    ["손", "온기", "기억"],
    ["길", "노을", "발자국"]
]

necessary_word = "[Before Train]"

# 데카르트 곱을 이용해 모든 instruction-keywords 조합 생성
test_prompts = [
    f"{instruction} {', '.join(keywords)}"
    for instruction, keywords in itertools.product(instructions, keywords_list)
]

before_response = []
for prompt in test_prompts:
    before_response.append(test(prompt, model, tokenizer, necessary_word))

In [None]:
# pad_token 설정 및 로그
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    model.resize_token_embeddings(len(tokenizer))
    logger.info(f"[Tokenizer] pad_token was None. Set to eos_token: {tokenizer.pad_token}")
else:
    logger.info(f"[Tokenizer] pad_token already set: {tokenizer.pad_token}")

# 데이터 준비 및 분할
raw_data = load_raw_data()
sft_records = make_sft_data(raw_data)
dataset = Dataset.from_list(sft_records)
split_dataset = dataset.train_test_split(test_size=0.2, seed=42)
train_dataset = split_dataset["train"].map(preprocess, remove_columns=["input", "target"])
eval_dataset = split_dataset["test"].map(preprocess, remove_columns=["input", "target"])


logger.info(f"[Raw Data] First sample:\n{json.dumps(raw_data[0], ensure_ascii=False, indent=2)}")
logger.info(f"[SFT Records] First record:\n{json.dumps(sft_records[0], ensure_ascii=False, indent=2)}")
logger.info(f"[Dataset] Total samples: {len(dataset)}")
logger.info(f"[Split] Train size: {len(split_dataset['train'])}, Eval size: {len(split_dataset['test'])}")
logger.info(f"[Train Preprocessed] Sample keys: {list(train_dataset[0].keys())}")
logger.info(f"[Train Preprocessed] input_ids length: {len(train_dataset[0]['input_ids'])}, labels length: {len(train_dataset[0]['labels'])}")


In [None]:
# 하이퍼파라미터 정의

total_train_steps = len(train_dataset) // batch_size * num_epochs
logging_steps = max(1, total_train_steps // (num_epochs * 2))
save_steps = logging_steps

# 로깅 출력
logger.info(f"Total samples (train): {len(train_dataset)}")
logger.info(f"Total samples (eval): {len(eval_dataset)}")
logger.info(f"Batch size: {batch_size}")
logger.info(f"Epochs: {num_epochs}")
logger.info(f"Total training steps: {total_train_steps}")
logger.info(f"Logging steps: {logging_steps}")
logger.info(f"Save steps: {save_steps}")

In [None]:
training_args = TrainingArguments(
    output_dir=output_dir,
    per_device_train_batch_size=batch_size,
    num_train_epochs=num_epochs,
    fp16=True,
    logging_strategy="steps",
    logging_steps=logging_steps,
    save_strategy="steps",
    save_steps=save_steps,
    eval_strategy="steps",
    eval_steps=logging_steps,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    greater_is_better=False,
    save_total_limit=save_total_limit,
    report_to="none",
    logging_dir=logging_dir
)

# Trainer 설정
trainer = SFTTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
)

# 학습 시작
trainer.train()

# 모델 저장
trainer.model.save_pretrained(sft_dir)
tokenizer.save_pretrained(sft_dir)
shutil.make_archive("best_sft", 'zip', sft_dir)

In [None]:
total_size = sum(os.path.getsize(os.path.join(root, f))
                 for root, _, files in os.walk(sft_dir)
                 for f in files)

logger.info(f"Total size of best_sft: {total_size / (1024**2):.2f} MB")

In [None]:
necessary_word = "[After Train]"

after_response = []
for prompt in test_prompts:
    after_response.append(test(prompt, model, tokenizer, necessary_word))