## Model making

In [17]:
import torch
import os
import transformers
from ast import literal_eval
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
from trl import SFTTrainer, DataCollatorForCompletionOnlyLM, SFTConfig
from datasets import Dataset
import json
import pandas as pd
import random
import numpy as np
import evaluate
from sklearn.feature_extraction.text import TfidfVectorizer
from tqdm import tqdm
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from transformers import Trainer, TrainingArguments, DataCollatorForSeq2Seq, DataCollatorForLanguageModeling
from sklearn.utils.class_weight import compute_class_weight
from torch.nn import CrossEntropyLoss

# 1. 설정: pandas 출력 옵션 및 시드 고정
pd.set_option('display.max_columns', None)
os.environ["TOKENIZERS_PARALLELISM"] = "false"
def set_seed(random_seed):
    torch.manual_seed(random_seed)
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed)  
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(random_seed)
    random.seed(random_seed)

set_seed(42)
# 2. 모델 및 토크나이저 로드 (8-bit 양자화)
model_name = "CarrotAI/Llama-3.2-Rabbit-Ko-3B-Instruct"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,  # QLoRA는 4bit 양자화를 사용
    # load_in_4bit=True,
    bnb_4bit_compute_dtype=torch.float16,  # 계산 precision (float16 또는 bfloat16 사용 가능)
    bnb_4bit_use_double_quant=True,       # 이중 양자화 활성화
    bnb_4bit_quant_type="nf4"             # NF4 양자화 타입
)

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    quantization_config=bnb_config,  # BitsAndBytesConfig 추가
    device_map="auto",
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"  # 왼쪽 패딩 설정


# 3. PEFT 설정 (LoRA)
lora_config = LoraConfig(
    r=32,
    lora_alpha=64,
    lora_dropout=0.05,
    target_modules=['q_proj', 'v_proj', 'k_proj', 'o_proj'],
    bias="none",
    task_type="CAUSAL_LM",
)

# model = get_peft_model(model, lora_config)


Loading checkpoint shards: 100%|██████████| 2/2 [00:04<00:00,  2.12s/it]


## Data Processing

In [18]:
def make_dataset(prompt, prompt_plus, system_prompt):
    dataset = pd.read_csv('datas/train+klue.csv')
    records = []
    for _, row in dataset.iterrows():
        problems = literal_eval(row['problems'])
        record = {
            'id': row['id'],
            'paragraph': row['paragraph'],
            'question': problems['question'],
            'choices': problems['choices'],
            'answer': problems.get('klue', None),
            "question_plus": problems.get('question_plus', None),
            'klue' : row.get('klue', None)
        }
        # Include 'question_plus' if it exists
        if 'question_plus' in problems:
            record['question_plus'] = problems['question_plus']
        records.append(record)
            
    # Convert to DataFrame
    df = pd.DataFrame(records)
    dataset = Dataset.from_pandas(df)
    processed_dataset = []

    for i in range(len(dataset)):
        choices_string = "\n".join([f"{idx + 1} - {choice}" for idx, choice in enumerate(dataset[i]["choices"])])

        # <보기>가 있을 때
        if dataset[i]["question_plus"]:
            user_message = prompt_plus.format(
                paragraph=dataset[i]["paragraph"],
                question=dataset[i]["question"],
                question_plus=dataset[i]["question_plus"],
                choices=choices_string,
            )
        # <보기>가 없을 때
        else:
            user_message = prompt.format(
                paragraph=dataset[i]["paragraph"],
                question=dataset[i]["question"],
                choices=choices_string,
            )

        # chat message 형식으로 변환
        processed_dataset.append(
            {
                "id": dataset[i]["id"],
                "messages": [
                    {"role": "system", "content": system_prompt},
                    {"role": "user", "content": user_message},
                    {"role": "assistant", "content": f"{dataset[i]['klue']}"}
                ],
                "label": dataset[i]['klue'],
            }
        )
    processed_dataset = Dataset.from_pandas(pd.DataFrame(processed_dataset))

    def formatting_prompts_func(example):
        output_texts = []
        for i in range(len(example["messages"])):
            output_texts.append(
                tokenizer.apply_chat_template(
                    example["messages"][i],
                    tokenize=False,
                )
            )
        return output_texts

    def tokenize(element):
        outputs = tokenizer(
            formatting_prompts_func(element),
            truncation=False,
            padding=False,
            return_overflowing_tokens=False,
            return_length=False,
        )
        return {
            "input_ids": outputs["input_ids"],
            "attention_mask": outputs["attention_mask"],
        }

    # 데이터 토큰화
    tokenized_dataset = processed_dataset.map(
        tokenize,
        remove_columns=list(processed_dataset.features),
        batched=True,
        num_proc=1,
        load_from_cache_file=True,
        desc="Tokenizing",
    )

    # 데이터 분리
    tokenized_dataset = tokenized_dataset.filter(lambda x: len(x["input_ids"]) <= 2048)  
    tokenized_dataset = tokenized_dataset.train_test_split(test_size=0.10, seed=42)

    train_dataset = tokenized_dataset['train']
    eval_dataset = tokenized_dataset['test']


    train_dataset_token_lengths = [len(train_dataset[i]["input_ids"]) for i in range(len(train_dataset))]
    print(f"max token length: {max(train_dataset_token_lengths)}")
    print(f"min token length: {min(train_dataset_token_lengths)}")
    print(f"avg token length: {np.mean(train_dataset_token_lengths)}")

    # 데이터 확인
    return train_dataset, eval_dataset

In [19]:
system_prompt = """국어 시험 문제를 푸는 대한민국의 고3 수험생으로서 위의 요약을 바탕으로 다음 문제의 답을 구하세요."""
prompt = """
지문:
{paragraph}

질문:
{question}

선택지:
{choices}

출력 형식:
[근거: 답변을 도출한 텍스트] # [결과: 선택지 번호]

문제를 풀이할 때, 반드시 지문을 참고하세요.
문제는 무조건 1개의 정답만 있습니다.
문제를 풀이할 때 모든 선택지들을 검토하세요.

근거:
"""

prompt_plus = """
지문:
{paragraph}

질문:
{question}

<보기>:
{question_plus}

선택지:
{choices}

출력 형식:
[근거: 답변을 도출한 텍스트] # [결과: 선택지 번호]

문제를 풀이할 때, 반드시 지문을 참고하세요.
문제는 무조건 1개의 정답만 있습니다.
문제를 풀이할 때 모든 선택지들을 검토하세요.

근거:
"""

train_dataset, eval_dataset = make_dataset(prompt, prompt_plus, system_prompt)
# 여기서 프롬프트만 바꿔서 데이터셋을 만들기

Tokenizing: 100%|██████████| 2031/2031 [00:04<00:00, 499.22 examples/s]
Filter: 100%|██████████| 2031/2031 [00:01<00:00, 1508.12 examples/s]


max token length: 1683
min token length: 233
avg token length: 765.5440613026819


## Train phase

In [20]:
import numpy as np
import evaluate
acc_metric = evaluate.load("accuracy")
f1_metric = evaluate.load("f1")
def preprocess_logits_for_metrics(logits, labels):
    logits = logits if not isinstance(logits, tuple) else logits[0]
    logit_idx = [tokenizer.vocab["1"],
                    tokenizer.vocab["2"],
                    tokenizer.vocab["3"],
                    tokenizer.vocab["4"], 
                    tokenizer.vocab["5"]]
    logits = logits[:, -2, logit_idx] # -2: answer token, -1: eos token
    return logits


    # metric 계산 함수
def compute_metrics(evaluation_result):
    logits, labels = evaluation_result
    int_output_map = {"1": 0, "2": 1, "3": 2, "4": 3, "5": 4}


    # 토큰화된 레이블 디코딩
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
    labels = [label.split('#')[-1].strip() for label in labels]
    labels = [int_output_map.get(label, -1) for label in labels] 

    # 소프트맥스 함수를 사용하여 로그트 변환
    probs = torch.nn.functional.softmax(torch.tensor(logits, dtype=torch.float32), dim=-1)

    predictions = np.argmax(probs, axis=-1)

    # 정확도 계산
    acc = acc_metric.compute(predictions=predictions, references=labels)
    f1 = f1_metric.compute(predictions=predictions, references=labels, average="macro")

    return {"accuracy": acc["accuracy"], "f1": f1["f1"]}


In [None]:
response_template = "assistant<|end_header_id|>"
data_collator = DataCollatorForCompletionOnlyLM(
    response_template=response_template,
    tokenizer=tokenizer,
)

sft_config = SFTConfig(
    do_train=True,
    do_eval=True,
    lr_scheduler_type="cosine",
    max_seq_length = 2048,
    output_dir=f"./outputs + {model_name.split('/')[-1]}",
    per_device_train_batch_size=1,
    gradient_accumulation_steps=8,
    per_device_eval_batch_size=1,
    num_train_epochs=3,
    learning_rate=1e-5,
    weight_decay=0.01,
    logging_steps=10,
    save_strategy="epoch",
    eval_strategy="epoch",
    save_total_limit=2,
    save_only_model=True,
    report_to="none",
    fp16 = True,
    gradient_checkpointing = True, # 8B모델 돌릴때만 True로, 아니면 False로
    load_best_model_at_end = True
)
trainer = SFTTrainer(
    model=model,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=data_collator,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
    preprocess_logits_for_metrics = preprocess_logits_for_metrics,
    args=sft_config,
    max_seq_length=2048,
    packing=False,
    peft_config = lora_config
)
torch.cuda.empty_cache()
trainer.train()


Deprecated positional argument(s) used in SFTTrainer, please use the SFTConfig to set these arguments instead.
Detected kernel version 5.4.0, which is below the recommended minimum of 5.5.0; this can cause the process to hang. It is recommended to upgrade the kernel to the minimum version or higher.


Epoch,Training Loss,Validation Loss


## Inference Phase

In [4]:

def make_test_dataset(test_df, prompt, prompt_plus, system_prompt):
    # Flatten the JSON dataset
    records = []
    for _, row in test_df.iterrows():
        problems = literal_eval(row['problems'])
        record = {
            'id': row['id'],
            'paragraph': row['paragraph'],
            'question': problems['question'],
            'choices': problems['choices'],
            'answer': problems.get('answer', None),
            "question_plus": problems.get('question_plus', None),
            'klue': row.get('hint', None)
        }
        # Include 'question_plus' if it exists
        if 'question_plus' in problems:
            record['question_plus'] = problems['question_plus']
        records.append(record)
            
    # Convert to DataFrame
    test_df = pd.DataFrame(records)

    from tqdm import tqdm
    test_dataset = []
    for i, row in test_df.iterrows():
        choices_string = "\n".join([f"{idx + 1} - {choice}" for idx, choice in enumerate(row["choices"])])
        len_choices = len(row["choices"])
        
        # <보기>가 있을 때
        if row["question_plus"]:
            user_message = prompt_plus.format(
                paragraph=row["paragraph"],
                question=row["question"],
                question_plus=row["question_plus"],
                choices=choices_string,
            )
        # <보기>가 없을 때
        else:
            user_message = prompt.format(
                paragraph=row["paragraph"],
                question=row["question"],
                choices=choices_string,
            )

        test_dataset.append(
            {
                "id": row["id"],
                "messages": [
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_message},
                ],
                "label": row["answer"],
                "len_choices": len_choices,
                'klue' : row['klue']
            }
        )
    return test_dataset

In [6]:
from torch.utils.data import DataLoader


system_prompt = """국어 시험 문제를 푸는 대한민국의 고3 수험생으로서 위의 요약을 바탕으로 다음 문제의 답을 구하세요."""
# ------------------------------------------------------------------------------------------
prompt = """
지문:
{paragraph}

질문:
{question}

선택지:
{choices}

출력 형식:
[근거: 답변을 도출한 텍스트] # [결과: 선택지 번호]

문제를 풀이할 때, 반드시 지문을 참고하세요.
문제는 무조건 1개의 정답만 있습니다.
문제를 풀이할 때 모든 선택지들을 검토하세요.

근거:
"""
# ------------------------------------------------------------------------------------------
prompt_plus = """
지문:
{paragraph}

질문:
{question}

<보기>:
{question_plus}

선택지:
{choices}

출력 형식:
[근거: 답변을 도출한 텍스트] # [결과: 선택지 번호]

문제를 풀이할 때, 반드시 지문을 참고하세요.
문제는 무조건 1개의 정답만 있습니다.
문제를 풀이할 때 모든 선택지들을 검토하세요.

근거:
"""
# ------------------------------------------------------------------------------------------

test_df = pd.read_csv('datas/test.csv')
test_dataset = make_test_dataset(test_df, prompt, prompt_plus, system_prompt)
test_dataset = test_dataset[:8]
# 배치 데이터 로더를 위한 collate_fn
def collate_fn(batch):
    ids = [item["id"] for item in batch]
    messages = [item["messages"] for item in batch]
    labels = [item.get("label", None) for item in batch]  # 라벨이 존재할 경우만 가져옴
    return ids, messages, labels

# 데이터 로더 설정
batch_size = 8  # 배치 크기 설정
dataloader = DataLoader(test_dataset, batch_size=batch_size, collate_fn=collate_fn)

generated_infer_results = []

with torch.inference_mode():
    for batch in tqdm(dataloader):
        ids, messages, labels = batch

        # 텍스트 생성을 위한 입력 데이터 준비
        inputs = tokenizer.apply_chat_template(
            messages,
            tokenize=True,
            add_generation_prompt=True,
            return_tensors="pt",
            padding=True,  # 배치 크기에 맞게 패딩 추가
        )

        # GPU로 이동 (inputs가 Tensor일 경우 바로 이동)
        inputs = inputs.to(model.device)  # Tensor로 직접 처리

        # 모델 생성
        outputs = model.generate(
            inputs,
            max_new_tokens=150,  # 최대 생성 토큰 수
            pad_token_id=tokenizer.pad_token_id,
        )

        # 결과 디코딩
        generated_texts = tokenizer.batch_decode(
            outputs[:, inputs.shape[1]:], skip_special_tokens=True
        )

        # 결과 저장
        for _id, generated_text, label in zip(ids, generated_texts, labels):
            generated_infer_results.append({
                "id": _id,
                "answer": generated_text,
                "label": label  # 실제 라벨이 있다면 포함
            })

# 결과를 DataFrame으로 저장
generated_infer_results = pd.DataFrame(generated_infer_results)

100%|██████████| 1/1 [00:23<00:00, 23.84s/it]


In [None]:
import re
def extract_last_digit(s):
    match = re.search(r'\d$', s)  # 문자열 끝에서 숫자 하나만 매칭
    return match.group() if match else None  # 매칭된 숫자를 반환, 없으면 None 반환

result = generated_infer_results
# 데이터프레임에 적용
result['text'] = result['answer']
result['answer'] = result['answer'].apply(lambda x: x.split('#')[-1])
result['answer'] = result['answer'].apply(extract_last_digit)
result['answer'] = result['answer'].fillna(1)
result['answer'] = result['answer'].apply(lambda x: int(x))
submission = result[['id', 'text', 'answer']]
submission['answer'].value_counts()

'1. 서학은  조선 후기 유서( 類書)가  조선시기부터  편찬한 유서가  조선의  서학을  수용하였다.  서학은  조선 후기 유서의  편찬 방식에 따라  필요에  맞게 유서를 편찬하였다.  조선의 유서가  조선시기부터  편찬한 유서가  조선의  서학을  수용하였다.  조선의 유서가  조선시기부터  편찬한 유서가  조선의  서학을  수용하였다.  조선의 유서가  조선시기부터  편찬한 유서가 '