##Step 0. 환경준비/ 라이브러리 설치

In [None]:
# !pip install datasets
# !pip install loralib
# !pip install trl
# !pip install accelerate
# !pip install transformers
# !pip install evaluate
#!pip install sacrebleu rouge-score
# #실행 후 런타임 재시작

##Step 0. 환경 설정 / 라이브러리 임포트

In [None]:
!git clone https://github.com/airobotlab/KoChatGPT
!cp -r KoChatGPT/colossalai_ChatGPT_230319/chatgpt chatgpt

import os

modifications = [
    {
        "file": "chatgpt/trainer/callbacks/save_checkpoint.py",
        "changes": [
            {"line": 3, "old": "from chatgpt.trainer.strategies import ColossalAIStrategy, Strategy",
             "new": "from chatgpt.trainer.strategies import Strategy"},
            {"line": 71, "old": "only_rank0 = not isinstance(self.strategy, ColossalAIStrategy)",
             "new": "            only_rank0 = not isinstance(self.strategy)"},
        ],
    },
    {
        "file": "chatgpt/trainer/strategies/__init__.py",
        "changes": [
            {"line": 1, "old": "from .colossalai import ColossalAIStrategy", "new": ""},  # 삭제
            {"line": 5, "old": "__all__ = ['Strategy', 'NaiveStrategy', 'DDPStrategy', 'ColossalAIStrategy']",
             "new": "__all__ = ['Strategy', 'NaiveStrategy', 'DDPStrategy']"},
        ],
    },
    {
        "file": "chatgpt/dataset/reward_dataset.py",
        "changes": [
            {"line": 3, "old": "from tqdm import tqdm", "new": "from tqdm.notebook import tqdm"},
        ],
    },
    {
        "file": "chatgpt/trainer/base.py",
        "changes": [
            {"line": 8, "old": "from tqdm import tqdm", "new": "from tqdm.notebook import tqdm"},
        ]
    },
    {
        "file": "chatgpt/trainer/rm.py",
        "changes": [
            {"line": 8, "old": "from tqdm import tqdm", "new": "from tqdm.notebook import tqdm"},
        ]
    }
]

#파일에서 지정된 줄을 찾아 내용을 수정
def modify_file(file_path, changes):


    if not os.path.exists(file_path):
        print(f"⚠️ 파일이 존재하지 않습니다: {file_path}")
        return

    with open(file_path, "r", encoding="utf-8") as file:
        lines = file.readlines()

    modified = False

    for change in changes:
        line_index = change["line"]
        if 0 <= line_index < len(lines):
            if lines[line_index].strip() == change["old"]:
                lines[line_index] = change["new"] + "\n"
                modified = True
            else:
                print(f"⚠️ {file_path} 파일의 {change['line']}번째 줄이 예상과 다릅니다.")
                print(f"   예상: {change['old']}")
                print(f"   실제: {lines[line_index].strip()}")

    if modified:
        with open(file_path, "w", encoding="utf-8") as file:
            file.writelines(lines)
        print(f"✅ 수정 완료: {file_path}")
    else:
        print(f"⚠️ {file_path} 수정할 내용이 없습니다.")

for mod in modifications:
    modify_file(mod["file"], mod["changes"])

##라이브러리 버전 확인 및 dirve 연결
학습에 필요한 autotokenizer, autoModelForCauseLM등을 import

추후 원활한 수행을 위해 drive와 연결

In [None]:
import torch
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM
import pandas as pd
import numpy

print("Torch version:{}".format(torch.__version__)) # Torch version:1.12.1
print("Cuda version: {}".format(torch.version.cuda)) # Cuda version: 11.3
print("transformers version: {}".format(transformers.__version__)) # transformers 4.28.0
print("GPU 사용 가능여부: {}".format(torch.cuda.is_available()))

from google.colab import drive
drive.mount('/content/drive')

# 만일 아래 모듈이 불러와지지 않는다면 Clone 및 수정을 잘 진행했는지 확인해주세요.
from chatgpt.trainer.strategies import NaiveStrategy

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "skt/kogpt2-base-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

###Step 1. 데이터 및 디코딩 성능 확인

In [None]:
input_txt =  "바람도 없는 공중에 수직의 파문을 내이며 고요히 떨어지는 오동잎은 누구의 발자취 입니까."

tokens = tokenizer(input_txt).tokens()
input_ids = tokenizer(input_txt, return_tensors="pt")["input_ids"].numpy()

pd.options.display.max_columns = 40
pd.options.display.max_rows = 60
df = pd.DataFrame([tokens, input_ids[0]], index=["kogpt-2_tokens", "Input_IDs"])
df

In [None]:
# 디코딩 성능 확인
max_length=128
input_ids = tokenizer(input_txt, return_tensors="pt")["input_ids"].to(device)
output_greedy = model.generate(input_ids, max_length=max_length, do_sample=False)
print(tokenizer.decode(output_greedy[0]))

처음 생성한 답변을 반복적으로 수행하는 모습이 보인다.
앞서 생성한 토큰의 영향을 많이 받음

In [None]:
output_beam = model.generate(input_ids, max_length=max_length, num_beams=7, no_repeat_ngram_size=2,
                             do_sample=True, top_p=0.90)
print(tokenizer.decode(output_beam[0]))

단순히 디코딩에서 beam search를 적용해주는것만으로 어느정도의 품질은 기대할 수 있지만
모델 자체의 성능을 끌어올린다면 더 경제적으로 고품질의 출력을 만들 수 있을것이라 생각함.


###Step 2. Supervised Fine-Tuning

베이스 모델 : skt/kogpt2-base-v2

데이터 : {prompt, completion} 데이터쌍




In [None]:
from typing import Optional, Dict, Sequence
from torch.utils.data import Dataset
from dataclasses import dataclass
import logging
import copy
import json

In [None]:
# 모델, 토크나이저 호출

model = AutoModelForCausalLM.from_pretrained('skt/kogpt2-base-v2')
tokenizer = AutoTokenizer.from_pretrained(
    'skt/kogpt2-base-v2', bos_token='</s>', eos_token='</s>', unk_token='</s>', pad_token='</s>',
    padding_side="right",
    model_max_length=512,
)

print(tokenizer)

In [None]:
# 모델 인퍼런스 단계에서 사용할 prompt 딕셔너리 템플릿과 SFT 데이터셋 클래스를 정의

class SFT_dataset(Dataset):

    def __init__(self, data_path_1_SFT: str, tokenizer: transformers.PreTrainedTokenizer, verbose=False):
        super(SFT_dataset, self).__init__()
        logging.warning("Loading data...")

        pattern_instruction = 'prompt'  # instruction
        pattern_output = 'completion'  # response

        with open(data_path_1_SFT, "r", encoding='utf-8-sig') as json_file:
            list_data_dict = json.load(json_file)

        PROMPT_DICT = {
            "prompt_input": (
                "### Instruction(명령어):\n{prompt}\n\n### Response(응답):"
            )
        }

        prompt_input = PROMPT_DICT["prompt_input"]

        sources = []
        for example in list_data_dict:
            tmp = prompt_input.format_map(example)
            sources.append(tmp)

        targets = []
        for example in list_data_dict:
            targets.append(f"{example[pattern_output]}{tokenizer.eos_token}")
        examples = [s + t for s, t in zip(sources, targets)]

        sources_tokenized = self._tokenize_fn(sources, tokenizer)  # source
        examples_tokenized = self._tokenize_fn(examples, tokenizer)  # source + target

        input_ids = examples_tokenized["input_ids"]
        labels = copy.deepcopy(input_ids)
        for label, source_len in zip(labels, sources_tokenized["input_ids_lens"]):
            label[:source_len] = -100

        data_dict = dict(input_ids=input_ids, labels=labels)

        self.input_ids = data_dict["input_ids"]
        self.labels = data_dict["labels"]
        logging.warning("Loading data done!!: %d"%(len(self.labels)))


    def _tokenize_fn(self, strings: Sequence[str], tokenizer: transformers.PreTrainedTokenizer) -> Dict:
        tokenized_list = [
            tokenizer(
                text,
                return_tensors="pt",
                padding="longest",
                max_length=tokenizer.model_max_length,
                truncation=True,
            )
            for text in strings
        ]
        input_ids = labels = [tokenized.input_ids[0] for tokenized in tokenized_list]
        input_ids_lens = labels_lens = [
            tokenized.input_ids.ne(tokenizer.pad_token_id).sum().item() for tokenized in tokenized_list
        ]
        return dict(
            input_ids=input_ids,
            labels=labels,
            input_ids_lens=input_ids_lens,
            labels_lens=labels_lens,
        )


    def __len__(self):
        return len(self.input_ids)


    def __getitem__(self, i) -> Dict[str, torch.Tensor]:
        return dict(input_ids=self.input_ids[i], labels=self.labels[i])

In [None]:
@dataclass
class DataCollatorForSupervisedDataset(object):

    tokenizer: transformers.PreTrainedTokenizer

    def __call__(self, instances: Sequence[Dict]) -> Dict[str, torch.Tensor]:
        input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels"))
        input_ids = torch.nn.utils.rnn.pad_sequence(
            input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id
        )
        labels = torch.nn.utils.rnn.pad_sequence(labels, batch_first=True, padding_value= -100)
        return dict(
            input_ids=input_ids,
            labels=labels,
            attention_mask=input_ids.ne(self.tokenizer.pad_token_id),
        )

In [None]:
# SFT_dataset 클래스를 사용해 훈련셋을 만들고 data collator 인스턴스 생성

train_dataset = SFT_dataset(data_path_1_SFT='KoChatGPT/data_kochatgpt/kochatgpt_1_SFT.jsonl', tokenizer=tokenizer)
data_collator = DataCollatorForSupervisedDataset(tokenizer=tokenizer)

print('input : %s'%train_dataset.input_ids[0])
print('output: %s'%train_dataset.labels[0])

In [None]:
# 디코딩 함수 작성
def decode_tokens(tokenizer, input_ids, label_ids):
  """
  주어진 input_ids와 label_ids를 디코딩하여 원본 text를 출력하는 함수.
  label_ids에 포함된 -100 값은 디코딩에서 제외합니다.
  """
  # input_ids 디코딩
  # skip_special_tokens=True 옵션으로 </s>와 같은 특수 토큰을 제외하고 볼 수 있습니다.
  decoded_input = tokenizer.decode(input_ids, skip_special_tokens=True)

  # label_ids에서 -100을 제외한 토큰만 필터링
  filtered_label_ids = [token_id for token_id in label_ids if token_id != -100]

  # 필터링된 label_ids 디코딩
  decoded_label = tokenizer.decode(filtered_label_ids, skip_special_tokens=True)

  print("--- [디코딩 결과] ---")
  print(f"➡️ Input (전체 원본 문장):\n{decoded_input}\n")
  print(f"✅ Label (모델이 학습하는 정답 문장):\n{decoded_label}")

# 함수를 사용하여 첫 번째 데이터 확인
decode_tokens(
    tokenizer,
    train_dataset.input_ids[0],
    train_dataset.labels[0]
)

In [None]:
# Training arguments를 사용해 trainer 클래스를 정의

# training_args = transformers.TrainingArguments(
#     output_dir="test",
#     overwrite_output_dir=True,
#     num_train_epochs=1,
#     per_device_train_batch_size=8,
#     per_device_eval_batch_size=8,
#     warmup_steps=5,
#     prediction_loss_only=True,
#     fp16 = True
#     )
training_args = transformers.TrainingArguments(
        output_dir="models/improved_sft_v2",
        overwrite_output_dir=True,
        num_train_epochs=3,  # 변경: 1 -> 3
        per_device_train_batch_size=4,  # 변경: 8 -> 4 (메모리 절약)
        gradient_accumulation_steps=4,  # 추가: 가상 배치 16
        learning_rate=2e-5,  # 변경: 기본값 -> 낮춤
        warmup_steps=100,  # 추가: 워밍업
        prediction_loss_only=True,
        fp16=True
        )
trainer = transformers.Trainer(
    model=model,
    args=training_args,
    data_collator=data_collator,
    train_dataset=train_dataset
)



In [None]:
# SFT 훈련을 진행

trainer.train()
model.save_pretrained('/content/drive/MyDrive/model_output/output_1_SFT')

###모델 성능 평가

In [None]:
# 문장 생성 능력을 확인하기 위해 빠르게 허깅페이스의 pipleline 클래스를 사용하여 generator 구현
generator = transformers.pipeline('text-generation', model='models/output_1_SFT', tokenizer=tokenizer)

generation_args = dict(
    num_beams=4,
    repetition_penalty=2.0,
    no_repeat_ngram_size=4,
    eos_token_id=375, # \n
    max_new_tokens=64,
    do_sample=True,
    top_k=50,
    early_stopping=True
)

PROMPT_DICT = {
    "prompt_input": (
        "### Instruction(명령어):\n{prompt}\n\n### Response(응답):"
    )
}

list_prompt = ['불고기용 고기 한우에요?',
               '리처드 닉슨이 43대 부통령직을 수행한 년도는?',
               '시카고 오헤어 국제공항은 어디에 있어?',
               '오늘 미세먼지 어때?']

list_prompt = [PROMPT_DICT['prompt_input'].format_map({'prompt' : tmp}) for tmp in list_prompt]

list_result = generator(list_prompt, **generation_args)
for prompt, result in zip(list_prompt, list_result):
    print()
    print((result[0]['generated_text']))

저는 인공지능 어시스턴트이기 때문에 "입력"에 대한 정보를 가지고 있지 않습니다

라는 프롬프트가 존재하는듯함

###Step 2. RM 학습
Ranknig셋으로 준비된 데이터를

pairwise형식으로 전환 후 학습시도



In [None]:
# RM 학습에 필요한 클래스 임포트
from chatgpt.dataset import RewardDataset
from chatgpt.models.base import RewardModel
from chatgpt.trainer.strategies import NaiveStrategy
from chatgpt.trainer.rm import RewardModelTrainer

from transformers.models.gpt2.configuration_gpt2 import GPT2Config
from transformers.models.gpt2.modeling_gpt2 import GPT2Model

import torch.nn as nn
import random

# SFT와 동일한 토크나이저 설정 사용
rm_tokenizer = AutoTokenizer.from_pretrained(
    'skt/kogpt2-base-v2', bos_token='</s>', eos_token='</s>', unk_token='</s>', pad_token='</s>',
    padding_side="right",
    model_max_length=512,
)

In [None]:
class GPTRM_custom(RewardModel):
    """
    GPT-2를 기반으로 하는 Custom Reward Model.
    입력된 텍스트의 좋고 나쁨을 판단하여 단일 점수(reward)를 출력합니다.
    """
    def __init__(self,
                 pretrained: Optional[str] = None,
                 config: Optional[GPT2Config] = None,
                 checkpoint: bool = False,
                 lora_rank: int = 0,
                 lora_train_bias: str = 'none',
                 tokenizer=None) -> None:

        if pretrained is not None:
            # 사전 학습된 GPT2 모델을 불러옵니다.
            model = GPT2Model.from_pretrained(pretrained)
            # Special token이 추가된 토크나이저에 맞게 임베딩 크기를 조정합니다.
            model.resize_token_embeddings(len(tokenizer))
        elif config is not None:
            model = GPT2Model(config)
        else:
            model = GPT2Model(GPT2Config())

        if checkpoint:
            model.gradient_checkpointing_enable()

        # 모델의 마지막 hidden state를 입력으로 받아 단일 점수를 출력하는 value_head를 정의합니다.
        value_head = nn.Linear(model.config.n_embd, 1)

        # 부모 클래스인 RewardModel을 초기화합니다.
        super().__init__(model, value_head, lora_rank, lora_train_bias)

        if pretrained is not None:
            self.model = model
            self.pretrained = pretrained

    def save_pretrained(self, dir):
        if self.pretrained is not None:
            self.model.save_pretrained(dir)

# NaiveStrategy 컨텍스트 내에서 RM 모델을 초기화합니다.
with NaiveStrategy().model_init_context():
    rm_model = GPTRM_custom(pretrained='skt/kogpt2-base-v2', lora_rank=0, tokenizer=rm_tokenizer).cuda()

In [None]:
# 준비된 RM 데이터셋 로드
with open('/content/KoChatGPT/data_kochatgpt/kochatgpt_2_RM.jsonl', "r", encoding='utf-8-sig') as json_file:
    list_data_dict = json.load(json_file)

# 랭킹 정보를 바탕으로 pairwise(chosen, rejected) 쌍 만들기
total_data_ranking2chosen = []
for tmp in list_data_dict:
    one_data_ranking2chosen = []

    # 3개의 답변 중 2개를 뽑는 모든 조합(3가지)에 대해 쌍을 생성
    # completion_0 vs completion_1
    data = {}
    data['prompt'] = tmp['prompt']
    if tmp['ranking'][0] < tmp['ranking'][1]:
        data['chosen'] = tmp['completion_0']
        data['rejected'] = tmp['completion_1']
    else:
        data['chosen'] = tmp['completion_1']
        data['rejected'] = tmp['completion_0']
    one_data_ranking2chosen.append(data)

    # completion_0 vs completion_2
    data = {}
    data['prompt'] = tmp['prompt']
    if tmp['ranking'][0] < tmp['ranking'][2]:
        data['chosen'] = tmp['completion_0']
        data['rejected'] = tmp['completion_2']
    else:
        data['chosen'] = tmp['completion_2']
        data['rejected'] = tmp['completion_0']
    one_data_ranking2chosen.append(data)

    # completion_1 vs completion_2
    data = {}
    data['prompt'] = tmp['prompt']
    if tmp['ranking'][1] < tmp['ranking'][2]:
        data['chosen'] = tmp['completion_1']
        data['rejected'] = tmp['completion_2']
    else:
        data['chosen'] = tmp['completion_2']
        data['rejected'] = tmp['completion_1']
    one_data_ranking2chosen.append(data)

    total_data_ranking2chosen.extend(one_data_ranking2chosen)

print('before data num: %d'%(len(list_data_dict)))
print('after  data num: %d'%(len(total_data_ranking2chosen)))
print('data example: \n%s'%total_data_ranking2chosen[45])

In [None]:
# 데이터 셔플 및 분할
random.seed(230319)
random.shuffle(total_data_ranking2chosen)

train_data = total_data_ranking2chosen[:1000]
eval_data = total_data_ranking2chosen[1000:1200]

print(f"Train data size: {len(train_data)}")
print(f"Eval data size: {len(eval_data)}")

# RewardDataset 객체 생성
train_dataset = RewardDataset(train_data, rm_tokenizer, 512)
eval_dataset = RewardDataset(eval_data, rm_tokenizer, 512)

# 데이터 샘플 확인
idx = 1
print('#'*70)
print('## prompt ##')
print(train_data[idx]['prompt'])
print('#'*70)
print('## chosen ##')
print(train_data[idx]['chosen'])
print('#'*70)
print('## rejected ##')
print(train_data[idx]['rejected'])

In [None]:
# RM 트레이너 설정
rm_trainer = RewardModelTrainer(model=rm_model,
                             strategy=NaiveStrategy(),
                             optim=torch.optim.Adam(rm_model.parameters(), lr=5e-5),
                             train_dataset=train_dataset,
                             eval_dataset=eval_dataset,
                             batch_size=4,
                             max_epochs=1)

# RM 학습 시작
rm_trainer.fit(use_lora=0)

# 학습된 RM 모델 저장
rm_model.save_pretrained('/content/drive/MyDrive/model_output/output_2_RM')

In [None]:
# RM 추론 함수 정의
def inference_RM(input_text):
    input_ids = rm_tokenizer.encode(input_text, return_tensors='pt').to(
        torch.cuda.current_device()
    )
    output = rm_model(input_ids)
    output_reward = output.cpu().detach().numpy()[0]

    print('input: %s\nreward score: %.1f' % (input_text, output_reward))
    return output_reward

# 테스트 1: 부정적인 문장
input_text = '인공지능은 똥멍청이 입니다'
output_reward = inference_RM(input_text=input_text)

# 테스트 2: 긍정적이고 짧은 문장
input_text = '인공지능(AI)은 매우 유용합니다.'
output_reward = inference_RM(input_text=input_text)

# 테스트 3: 긍정적이고 상세한 문장
input_text = "인공지능은 일반적으로 인간의 지능이 필요하거나 인간이 분석할 수 있는 것보다 규모가 큰 데이터를 포함하는 방식으로 추론, 학습 및 행동할 수 있는 컴퓨터 및 기계를 구축하는 것과 관련된 과학 분야입니다."
output_reward = inference_RM(input_text=input_text)

# GPU 메모리 정리
torch.cuda.empty_cache()

###Step 4. PPO 적용

SFT, RM모델을 이용해 PPO를 수행


In [None]:
from chatgpt.models.gpt import GPTActor, GPTCritic
from chatgpt.trainer import PPOTrainer

from copy import deepcopy

In [None]:
# NaiveStrategy 컨텍스트 내에서 PPO 학습에 필요한 모든 모델을 준비합니다.
with NaiveStrategy().model_init_context():
    # Actor: SFT 모델을 불러옵니다.
    actor = GPTActor(pretrained='/content/drive/MyDrive/model_output/output_1_SFT', lora_rank=0).to(torch.cuda.current_device())

    # Critic: RM 모델을 불러옵니다.
    critic = GPTCritic(pretrained='/content/drive/MyDrive/model_output/output_2_RM', lora_rank=0).to(torch.cuda.current_device())

    # Tokenizer: 이전과 동일한 설정을 사용합니다.
    tokenizer = AutoTokenizer.from_pretrained(
        'skt/kogpt2-base-v2', bos_token='</s>', eos_token='</s>', unk_token='</s>', pad_token='</s>',
        padding_side="right",
        model_max_length=512
    )

    # Initial Model: SFT 모델을 복사하여 KL 페널티 계산에 사용합니다.
    initial_model = deepcopy(actor)

    # Reward Model: Critic 모델을 기반으로 보상 계산에 사용합니다.
    reward_model = RewardModel(deepcopy(critic.model), deepcopy(critic.value_head)).to(torch.cuda.current_device())

# Actor와 Critic을 위한 옵티마이저를 설정합니다.
actor_optim = torch.optim.Adam(actor.parameters(), lr=5e-6)
critic_optim = torch.optim.Adam(critic.parameters(), lr=5e-6)

# Strategy를 통해 모델과 옵티마이저를 래핑합니다.
(actor, actor_optim), (critic, critic_optim), reward_model, initial_model = NaiveStrategy().prepare(
    (actor, actor_optim), (critic, critic_optim), reward_model, initial_model)

In [None]:
# PPO 학습을 위한 프롬프트 데이터 로드
with open('//content/KoChatGPT/data_kochatgpt/kochatgpt_3_PPO.jsonl', "r", encoding='utf-8-sig') as json_file:
    list_data_dict = json.load(json_file)
    list_prompt = [tmp['prompt'] for tmp in list_data_dict]

# PPO Trainer 내부에서 사용할 토크나이저 함수 정의
def tokenize_fn(texts):
    batch = tokenizer(texts, return_tensors='pt', max_length=96, padding=True, truncation=True)
    return {k: v.cuda() for k, v in batch.items()}

print(f"PPO 학습에 사용될 프롬프트 개수: {len(list_prompt)}")

In [None]:
# PPOTrainer 초기화
trainer = PPOTrainer(NaiveStrategy(),
                     actor,           #학습이 진행될 모델
                     critic,          #RM
                     reward_model,    #RM
                     initial_model,   #SFT적용모델
                     actor_optim,
                     critic_optim,
                     max_epochs=1,
                     train_batch_size=8,
                     tokenizer=tokenize_fn,
                     max_length=128,
                     do_sample=True,
                     temperature=1.0,
                     top_k=50,
                     pad_token_id=tokenizer.pad_token_id,
                     eos_token_id=tokenizer.eos_token_id)

# PPO 학습 시작
trainer.fit(list_prompt,
            num_episodes=10,
            max_timesteps=3,
            update_timesteps=3)

# 최종 PPO 모델 저장
actor.model.save_pretrained('/content/drive/MyDrive/model_output/output_3_PPO')

In [None]:
# 최종 PPO 모델로 답변 생성
def generation(input_text, model, tokenizer):
    input_ids = tokenizer.encode(input_text, return_tensors='pt').to(
        torch.cuda.current_device())

    # PPO 모델은 Actor 클래스로 래핑되어 있으므로, 내부 모델을 직접 사용
    outputs = model.model.generate(input_ids,
                             max_length=250,
                             do_sample=True,
                             top_k=50,
                             top_p=0.95,
                             num_return_sequences=1)

    output_text = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
    print(output_text)
    return output_text

# 테스트용 프롬프트 리스트
list_prompt = [
    '불고기용 고기 한우에요?',
    '리처드 닉슨이 43대 부통령직을 수행한 년도는?',
    '시카고 오헤어 국제공항은 어디에 있어',
    '오늘 미세먼지 어때?']

# SFT/PPO 모델에 맞는 프롬프트 형식으로 변환
PROMPT_DICT = {
    "prompt_input": (
        "### Instruction(명령어):\n{prompt}\n\n### Response(응답):"
    )
}
list_formatted_prompt = [PROMPT_DICT['prompt_input'].format_map({'prompt': tmp}) for tmp in list_prompt]

print("--- PPO Model Generation Results ---")
for input_text in list_formatted_prompt:
    generation(input_text, actor, tokenizer)
    print("-" * 30)

#실험 결과 확인
pipeline을 이용해 출력을 생성

In [None]:
from transformers import pipeline, AutoTokenizer
import torch, re

generators = {
    name: pipeline(
        "text-generation",
        model=path,
        tokenizer=tokenizer,
        device=device
    )
    for name, path in MODEL_PATHS.items()
}
print("ready:", list(generators.keys()))

#프롬프트 템플릿 + 종료 처리
PROMPT_DICT = {
    "prompt_input": "### Instruction(명령어):\n{prompt}\n\n### Response(응답):",
}
def build_prompt(p: str) -> str:
    return PROMPT_DICT["prompt_input"].format_map({"prompt": p})

# 종료 규칙: 응답 이후 불필요한 텍스트를 잘라주는 간단 후처리
def postprocess(prompt_text: str, generated_text: str) -> str:
    # 1) 프롬프트 에코 제거
    body = generated_text[len(prompt_text):] if generated_text.startswith(prompt_text) else generated_text

    # 2) "다음 섹션"을 암시하는 마커가 보이면 거기서 자르기 (옵션)
    cut_points = [
        "\n\n###",             # 다음 섹션 마커
        "\n\nInstruction",     # 잘못 에코 시
        "\n\n명령어",           # 한글 에코 시
    ]
    for cp in cut_points:
        if cp in body:
            body = body.split(cp)[0]
            break

    # 3) 과도한 공백 정리
    body = body.strip()
    return body

# 종료 토큰: 개행을 EOS로 쓰고 싶다면 토크나이저에서 id 찾기
EOS_NL = tokenizer.encode("\n", add_special_tokens=False)[0]



In [None]:
GEN_ARGS_GREEDY = dict(
    do_sample=False,
    num_beams=4,
    max_new_tokens=96,
    no_repeat_ngram_size=4,
    repetition_penalty=1.2,
    eos_token_id=EOS_NL,                  # 응답 끝나면 개행에서 끊기
    pad_token_id=tokenizer.eos_token_id,
    early_stopping=True,
)


In [None]:
list_prompt = [
    "불고기용 고기 한우에요?",
    "리처드 닉슨이 43대 부통령직을 수행한 년도는?",
    "시카고 오헤어 국제공항은 어디에 있어?",
    "오늘 미세먼지 어때?"
]
batched_inputs = [build_prompt(p) for p in list_prompt]

def run_and_show(generators, batched_inputs, gen_args):
    # 각 모델을 순회하며 배치 생성
    all_outputs = {name: gen(batched_inputs, **gen_args) for name, gen in generators.items()}

    # 프롬프트별로 묶어서 출력
    for i, raw_prompt in enumerate(list_prompt):
        full_prompt = batched_inputs[i]
        print("="*100)
        print(f"PROMPT {i+1}: {raw_prompt}")
        for name in ["base", "sft", "ppo"]:
            res_text = all_outputs[name][i][0]["generated_text"]
            ans = postprocess(full_prompt, res_text)
            print(f"\n[{name.upper()}]\n{ans}\n")

#  그리디 비교
run_and_show(generators, batched_inputs, GEN_ARGS_GREEDY)

베이스 모델에서 sft, PPO를 거칠수록 점점 답변이 깨끗해지는걸 볼 수 있다.


--------------------------------------
--------------------------------------

#Beam search 적용
--------------------------------------
--------------------------------------


In [None]:
# 개행 EOS는 이전에 만든 EOS_NL 사용(없으면 아래 줄로 구하세요)
# EOS_NL = tokenizer.encode("\n", add_special_tokens=False)[0]

GEN_ARGS_BEAM = dict(
    do_sample=True,          # 🔒 순수 빔서치 (랜덤성 X)
    num_beams=7,
    max_new_tokens=96,
    no_repeat_ngram_size=4,
    repetition_penalty=1.2,
    eos_token_id=EOS_NL,      # 응답 끝을 개행으로 유도
    pad_token_id=tokenizer.eos_token_id,
    early_stopping=True,
)

# 실행
run_and_show(generators, batched_inputs, GEN_ARGS_BEAM)

Beam Search를 적용한 모델의 답변의 어휘가 기존에 비해 편해졌다.
적용하지 않은 출력에서는 기존에 주어진 단어들을 우선적으로 선택해 어투가 딱딱한 느낌이 든 반면
Beam Search를 적용한 출력에서는 좀 더 다양한 어휘를 보이며 자연스러운 출력을 보였다.

###EX 01. 추가 데이터 정제

주말사이에 시도해보겠습니다.