In [None]:
!pip install transformers==4.39.0 peft accelerate datasets==2.16.1 trl bitsandbytes google-cloud-secret-manager

In [None]:
import re
import os
from typing import Any, Optional

import torch
import wandb
from google.cloud import secretmanager
from datasets import Dataset, load_dataset
from peft import LoraConfig
from trl import SFTTrainer
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig, TrainingArguments, DataCollatorForLanguageModeling

In [None]:
secret_client = secretmanager.SecretManagerServiceClient()
secret_name = f"projects/fast-campus-machine-learning/secrets/vertex-ai-notebook/versions/1"
response = secret_client.access_secret_version(request={"name": secret_name})
huggingface_token = response.payload.data.decode("UTF-8")

In [None]:
lora_config = LoraConfig(
    r=8,
    target_modules=["q_proj", "o_proj", "k_proj", "v_proj", "gate_proj", "up_proj", "down_proj"],
    task_type="CAUSAL_LM",
)

In [None]:
model_id = "google/gemma-2b-it"
bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_id, token=huggingface_token)
model = AutoModelForCausalLM.from_pretrained(model_id, quantization_config=bnb_config, device_map="auto", token=huggingface_token)

In [None]:
fine_tuning_template = "<start_of_turn>user\n{instruction}<end_of_turn>\n<start_of_turn>model\n{response}</end_of_turn>"
query_template = "<start_of_turn>user\n{instruction}<end_of_turn>\n<start_of_turn>model\n"

In [None]:
def get_response(instruction: str, max_new_tokens=20, device: str="cuda") -> str:
    text = query_template.format(
        instruction=instruction
    )
    inputs = tokenizer(text, return_tensors="pt").to(device)

    outputs = model.generate(**inputs, max_new_tokens=max_new_tokens)
    output = tokenizer.decode(outputs[0], skip_special_tokens=True)

    pattern = r"<start_of_turn>model\s(.+?)(<end_of_turn>|$)"
    match = re.search(pattern, output, re.DOTALL)

    if match: return match.group(1).strip()
    return ""

In [None]:
# Korean (Before Fine-tuned)
print(get_response("Fast Campus에 대해 어떻게 생각해?", max_new_tokens=120))

In [None]:
usr_pattern = re.compile(r"<usr>\s(.+?)\s<bot>")
bot_pattern = re.compile(r"<bot>\s(.+)")


def process(sample: dict[str, list[str]]) -> dict[str, list[Any]]:
    return {
        "instruction": [usr_pattern.search(t).group(1) if usr_pattern.search(t) else None for t in sample["text"]],
        "response": [bot_pattern.search(t).group(1) if bot_pattern.search(t) else None for t in sample["text"]],
    }


data: Dataset = load_dataset("heegyu/open-korean-instructions")
data = data.map(process, batched=True)

In [None]:
def formatting_func(example) -> list[str]:
    return [
        fine_tuning_template.format(
            instruction=example["instruction"][i],
            response=example["response"][i]
        )
        for i in range(len(example["instruction"]))
    ]

trainer = SFTTrainer(
    model=model,
    train_dataset=data["train"],
    args=TrainingArguments(
        report_to="none",
        per_device_train_batch_size=2,
        num_train_epochs=1,
        max_steps=500,
        optim="adafactor",
        warmup_steps=2,
        learning_rate=2e-4,
        fp16=True,
        logging_steps=100,
        output_dir="outputs",
        dataloader_drop_last=True,
    ),
    data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
    peft_config=lora_config,
    formatting_func=formatting_func,
)
trainer.train()

In [None]:
# Korean
print(get_response("Fast Campus에 대해 어떻게 생각해?", max_new_tokens=120))