### Google Drive mount

In [None]:
from google.colab import drive

drive.mount('/content/drive')

### 필요한 library install

In [None]:
!pip install transformers
!pip install peft
!pip install dataclasses
!pip install wandb

### WanDB 연결

In [None]:
import wandb
wandb.login()

### 라이브러리 load 및 전역변수 설정

In [None]:
import transformers
from transformers import TrainingArguments, Trainer, AutoTokenizer, AutoModelForCausalLM
import torch
from torch.utils.data import Dataset
from peft import LoraConfig, get_peft_model
from dataclasses import dataclass
import json, os, random, logging, math, copy
import numpy as np

IGNORE_INDEX = -100 # 학습 loss 계산에 무시되는 index
os.environ['WANDB_PROJECT'] = 'TEST' # wandb project 이름 설정

### 초기 random 함수 seed 설정

In [None]:
# random seed 설정 함수
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if use multi-GPU
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    np.random.seed(seed)
    random.seed(seed)

### 데이터 load 및 입력 형태에 맞게 전처리

In [None]:
def load_dataset(directory_path):
    filenames = os.listdir(directory_path)
    datas = []
    for filename in filenames:

        with open(os.path.join(directory_path,filename),'r',encoding='utf8')as f:
            datas.append(json.loads(f.read()))


    print(f"loading finished : {len(datas)} datas")
    return datas


def data_transform(datas):
    prompt = (
        "당신은 면접관입니다. 다음 명령에 따라 적절한 질문을 수행하세요.\n"
        "화자의 응답 기록을 참고하여 주제에 관련된 적절한 질문을 생성하세요.\n"
        "### 주제:\n{intent}\n\n### 화자의 응답 기록:\n{answer}\n\n### 질문 :\n"
    )

    dataset = []
    for session in datas:
        before_answer = None

        for turn in session:
            if before_answer is None:
                before_answer = turn['answer']
                continue
            else:
                source = prompt.format_map(dict(
                    answer=before_answer,
                    intent=turn['rule_based_intent']
                ))
                target = turn['question']
                dataset.append(dict(
                    source=source,
                    target=target
                ))

                before_answer = turn['answer']

    print(f"total data samples : {len(dataset)}")

    return dataset

In [None]:
# tokenizing, label설정 등과 같은 전처리를 수행하는 함수
def preprocess(sources, targets, tokenizer):
    # 입력 및 출력을 하나로 연결해서 example을 생성
    examples = [s + t for s, t in zip(sources, targets)]

    # 토크나이징 수행
    input_ids = tokenizer(text=examples, padding=False, return_attention_mask=False, return_length=False,
                          max_length=tokenizer.model_max_length, truncation=True, verbose=False)["input_ids"]
    # 입력 부분을 복사하여 target으로 사용할 label 생성
    labels = copy.deepcopy(input_ids)

    # 오류 체크
    for pieces in input_ids:
        assert not any([math.isnan(piece) or math.isinf(piece) for piece in pieces])

    # 입력을 토크나이징하여, 입력부분의 길이를 계산
    source_lens = tokenizer(text=sources, padding=False, return_attention_mask=False, return_length=True,
                            max_length=tokenizer.model_max_length, truncation=True, verbose=False)["length"]

    # label의 입력 부분에 대해 loss계산을 하지 않도록 IGNORE_INDEX로 설정
    for example_index in range(len(examples)):
        for index in range(source_lens[example_index]):
            labels[example_index][index] = IGNORE_INDEX

    return dict(input_ids=input_ids, labels=labels)


# Dataset 객체를 상속한 클래스 - 모델의 입출력을 가져오기 위한 단위? 로 생각하면 편할듯
class CustomDataset(Dataset):
    def __init__(self, examples, tokenizer):
        self.tokenizer = tokenizer

        sources = [example['source'] for example in examples]
        targets = [f"{example['target']}{tokenizer.eos_token}" for example in examples]

        logging.warning(msg="tokenizing...")
        data_dict = preprocess(sources=sources, targets=targets, tokenizer=tokenizer)
        logging.warning(msg="tokenizing finished")

        self.input_ids = data_dict["input_ids"]
        self.labels = data_dict["labels"]

    def __len__(self):
        return len(self.input_ids)

    def naive__getitem__(self, i):
        return dict(input_ids=self.input_ids[i], labels=self.labels[i])

    def __getitem__(self, idx):
        return dict(input_ids=self.input_ids[idx], labels=self.labels[idx])

# batch 단위를 처리하기 위한 collator function으로 배치내 데이터의 길이를 맞춰주는 padding 처리 등을 수행
@dataclass
class CustomCollator(object):
    tokenizer: transformers.PreTrainedTokenizer

    def __call__(self, instances):
        input_ids, labels = tuple([instance[key] for instance in instances] for key in ("input_ids", "labels"))
        # 이미 tensor일 거 같긴하지만, 혹시 몰라 tensor로 변환
        input_ids = [torch.tensor(piece) for piece in input_ids]
        labels = [torch.tensor(piece) for piece in labels]

        # 일부로 패딩을 left에 주기 위해 flip을 통해 뒤집기를 수행
        input_ids = torch.nn.utils.rnn.pad_sequence([i.flip(dims=[-1]) for i in input_ids], batch_first=True, padding_value=self.tokenizer.pad_token_id).flip(dims=[1])
        labels = torch.nn.utils.rnn.pad_sequence([i.flip(dims=[-1]) for i in labels], batch_first=True, padding_value=IGNORE_INDEX).flip(dims=[1])

        return dict(input_ids=input_ids, labels=labels, attention_mask=input_ids.ne(self.tokenizer.pad_token_id),)


### 모델 설정 및 학습 설정 argument 객체 반환 함수

In [None]:
# training Args 객체를 반환하는 함수 - 학습에 사용되는 파라미터
def get_training_args(args):
    training_args = TrainingArguments(
        output_dir=args['output_dir'],
        evaluation_strategy="no",
        learning_rate=args["learning_rate"],
        weight_decay=args["weight_decay"],
        push_to_hub=False,
        do_train=True,
        num_train_epochs=args['num_epochs'],
        per_device_train_batch_size=args["batch_size"],
        logging_steps=args["logging_steps"],
        gradient_accumulation_steps=args["accumulation_steps"],
        save_strategy="steps",
        save_steps=args["save_steps"],
        warmup_ratio=0.03,
        lr_scheduler_type='cosine',
        max_grad_norm=1.0,
        fp16=False,
        report_to=args["report_to"],
        run_name=args["run_name"],
    )

    return training_args

def get_lora_args(args):
    peft_config = LoraConfig(
        lora_alpha=args['lora_r']*2,
        lora_dropout=args['lora_dropout'],
        r=args['lora_r'],
        bias=args['bias'],
        task_type="CAUSAL_LM"
    )

    return peft_config

### 학습

In [None]:
def training(config):
    # model and tokenizer load
    model = AutoModelForCausalLM.from_pretrained(pretrained_model_name_or_path=config['pretrained_model_name_or_path'],
                                                 trust_remote_code=config['trust_remote_code'],
                                                 cache_dir=config['cache_dir'],
                                                 local_files_only=config['local_files_only'])

    tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=config['pretrained_model_name_or_path'],
                                              trust_remote_code=config['trust_remote_code'],
                                              cache_dir=config['cache_dir'],
                                              local_files_only=config['local_files_only'],
                                              padding_side=config['padding_side'])

    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.pad_token_id = tokenizer.eos_token_id
    tokenizer.model_max_length = config['max_token_length']

    # LoRA 적용
    lora_config = get_lora_args(config['lora_args'])
    model = get_peft_model(model, lora_config)

    # org dataset load
    train_dataset = load_dataset(config['train_data_path'])
    train_dataset = data_transform(train_dataset)


    # prepare train dataset
    train_dataset = CustomDataset(examples=train_dataset, tokenizer=tokenizer)
    data_collator = CustomCollator(tokenizer=tokenizer)

    # prepare training model
    training_args = get_training_args(config['training_args'])

    trainer = Trainer(model=model,
                      tokenizer=tokenizer,
                      args=training_args,
                      train_dataset=train_dataset,
                      data_collator=data_collator)

    # 학습 수행
    trainer.train()

    # 맨 마지막 - 학습 종료 이후 저장하는 부분
    trainer.save_state()
    trainer.save_model(output_dir=os.path.join(config['output_dir'],"final"))


### main 문

In [None]:
root_dir = os.path.abspath("../drive/MyDrive/LoRA_tuning")
input_dir = os.path.join(root_dir, "inputs")

# 반드시 경로 알잘딱 바꿔주기
model_name = "EleutherAI/polyglot-ko-1.3b" # beomi/llama-2-ko-7b , EleutherAI/polyglot-ko-1.3b
output_dir = os.path.join(root_dir, "outputs", "interview", model_name.split("/")[1], "test")
os.makedirs(output_dir, exist_ok=True)

cache_dir = os.path.join(root_dir, 'cache')

set_seed(seed=42)

config = {
    "training_args":{
        "output_dir": output_dir,
        "learning_rate": 2e-5,
        "weight_decay": 0.001,
        "batch_size": 8,
        "accumulation_steps": 32,
        "logging_steps": 1,
        "save_steps": 100,
        "num_epochs": 20,
        "report_to": "wandb",
        "run_name": "session_10000"
    },
    "lora_args": {
        "lora_r": 128,
        "lora_dropout": 0.05,
        "bias": "none"
    },


    "pretrained_model_name_or_path": model_name,
    "trust_remote_code": True,
    "cache_dir": cache_dir,
    "local_files_only": False,
    "padding_side": "left",
    "max_token_length": 1024,

    "train_data_path": input_dir,
    "output_dir": output_dir

}

training(config)