# 설치 필요 패키지

In [None]:
# pip install transformers datasets peft bitsandbytes accelerate scikit-learn

import os
import torch
import pandas as pd
from datetime import datetime
import logging
import warnings
from datasets import Dataset
from sklearn.model_selection import train_test_split

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,
    TrainingArguments,
    DataCollatorForLanguageModeling,
    Trainer
)
from peft import (
    prepare_model_for_kbit_training,
    get_peft_model,
    LoraConfig,
    TaskType
)

# 기본 설정

In [None]:
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

# === 설정값 ===
MODEL_ID = "MLP-KTLim/llama3-Bllossom"
SYSTEM_PROMPT = "당신은 유용한 AI 어시스턴트입니다. 사용자의 요구사항에 맞게 문장 변환을 해야합니다."
SAVE_DIR = "./checkpoints"
DATA_PATH = "./data/sentence_pairs.csv"  # 사용자가 여기에 CSV 파일 위치 맞게 설정

# 테스트용 문장

In [None]:
TEST_SENTENCES = [
    "윤석열 대통령이 김영선 전 의원 공천을 직접 지시했다는 녹음이 공개됐습니다.",
    "KT&G 주가는 하반기 들어 24.7% 상승하며 52주 신고가를 기록했습니다."
]

# 모델 및 토크나이저 설정

In [None]:
def setup_model_and_tokenizer():
    tokenizer = AutoTokenizer.from_pretrained(MODEL_ID)
    quant_config = BitsAndBytesConfig(
        load_in_8bit=True,
        llm_int8_enable_fp32_cpu_offload=True
    )
    model = AutoModelForCausalLM.from_pretrained(
        MODEL_ID,
        torch_dtype=torch.float16,
        device_map="auto",
        quantization_config=quant_config,
    )
    model = prepare_model_for_kbit_training(model)
    lora_config = LoraConfig(
        task_type=TaskType.CAUSAL_LM,
        r=16,
        lora_alpha=64,
        lora_dropout=0.1,
        target_modules=["q_proj", "v_proj", "k_proj", "o_proj", "gate_proj", "up_proj", "down_proj"]
    )
    model = get_peft_model(model, lora_config)
    return model, tokenizer

# 학습 데이터셋

In [None]:
def create_weighted_training_pairs(df):
    training_pairs = []
    for _, row in df.iterrows():
        instruction = f"'{row['adult_sentence']}'라는 문장을 어린이가 읽기 쉽도록 구어체로 바꿔줘. 어려운 단어는 쉽게 풀어서 설명해줘."
        prompt = f"""<system>{SYSTEM_PROMPT}</system>\n<user>{instruction}</user>\n<assistant>{row['kids_sentence']}</assistant>"""
        training_pairs.append({"text": prompt, "weight": float(row['similarity_score'])})
    return training_pairs

def tokenize_and_add_weight(examples, tokenizer):
    tokenized = tokenizer(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=256,
        return_tensors="pt"
    )
    return {**tokenized, "weight": examples["weight"]}

class WeightedDataCollator(DataCollatorForLanguageModeling):
    def __call__(self, examples):
        weights = torch.tensor([ex.pop("weight", 1.0) for ex in examples], dtype=torch.float)
        batch = super().__call__(examples)
        batch['weight'] = weights
        return batch

class CustomTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        weights = inputs.pop("weight", None)
        outputs = model(**inputs)
        loss = outputs.loss
        if weights is not None:
            loss = (loss.view(-1) * weights.view(-1)).mean()
        return (loss, outputs) if return_outputs else loss

def train(training_pairs, model, tokenizer):
    train_data, val_data = train_test_split(training_pairs, test_size=0.2, random_state=42)
    train_dataset = Dataset.from_dict({"text": [x["text"] for x in train_data], "weight": [x["weight"] for x in train_data]})
    val_dataset = Dataset.from_dict({"text": [x["text"] for x in val_data], "weight": [x["weight"] for x in val_data]})
    train_tokenized = train_dataset.map(lambda x: tokenize_and_add_weight(x, tokenizer), batched=True, remove_columns=train_dataset.column_names)
    val_tokenized = val_dataset.map(lambda x: tokenize_and_add_weight(x, tokenizer), batched=True, remove_columns=val_dataset.column_names)

    training_args = TrainingArguments(
        output_dir=SAVE_DIR,
        num_train_epochs=5,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=16,
        gradient_accumulation_steps=4,
        learning_rate=2e-4,
        weight_decay=0.01,
        fp16=True,
        save_strategy="steps",
        save_steps=100,
        evaluation_strategy="steps",
        eval_steps=100,
        logging_steps=100,
        save_total_limit=2,
        load_best_model_at_end=True,
        metric_for_best_model="eval_loss",
        remove_unused_columns=False,
        report_to="none",
        gradient_checkpointing=True,
    )

    trainer = CustomTrainer(
        model=model,
        tokenizer=tokenizer,
        args=training_args,
        train_dataset=train_tokenized,
        eval_dataset=val_tokenized,
        data_collator=WeightedDataCollator(tokenizer=tokenizer, mlm=False)
    )

    trainer.train()

    model.save_pretrained(f"{SAVE_DIR}/llama3_finetuned")
    tokenizer.save_pretrained(f"{SAVE_DIR}/llama3_finetuned")

# 추론용 함수

In [None]:
def translate_to_kids_news(text, model, tokenizer):
    instruction = f"'{text}'라는 문장을 어린이가 이해하기 쉽게 구어체로 바꿔줘."
    prompt = f"""<system>{SYSTEM_PROMPT}</system>\n<user>{instruction}</user>\n<assistant>"""
    inputs = tokenizer(prompt, return_tensors="pt", truncation=True, max_length=512).to(model.device)
    outputs = model.generate(**inputs, max_new_tokens=200, do_sample=True, top_p=0.9, temperature=0.6)
    return tokenizer.decode(outputs[0], skip_special_tokens=True).replace(prompt, "").split("</assistant>")[0].strip()

# 실행

In [None]:
if __name__ == "__main__":
    # 데이터 로드
    sentence_df = pd.read_csv(DATA_PATH)
    training_pairs = create_weighted_training_pairs(sentence_df)

    # 모델 준비
    model, tokenizer = setup_model_and_tokenizer()

    # 학습 실행
    train(training_pairs, model, tokenizer)

    # 예시 추론
    for sent in TEST_SENTENCES:
        print("\n[원문]", sent)
        print("[어린이용]", translate_to_kids_news(sent, model, tokenizer))