<a href="https://colab.research.google.com/github/haru1489248/nlp-100-nock/blob/main/ch10/section_98.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## 98. ファインチューニング
問題96のプロンプトに対して、正解の感情ラベルをテキストの応答として返すように事前学習済みモデルをファインチューニングせよ。
chat template を使用したinputは長すぎるので、今回は使用しなかった（スペックが足りない）

In [1]:
!pip install -U transformers evaluate

Collecting transformers
  Downloading transformers-5.1.0-py3-none-any.whl.metadata (31 kB)
Collecting evaluate
  Downloading evaluate-0.4.6-py3-none-any.whl.metadata (9.5 kB)
Downloading transformers-5.1.0-py3-none-any.whl (10.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.3/10.3 MB[0m [31m62.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading evaluate-0.4.6-py3-none-any.whl (84 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m84.1/84.1 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: transformers, evaluate
  Attempting uninstall: transformers
    Found existing installation: transformers 5.0.0
    Uninstalling transformers-5.0.0:
      Successfully uninstalled transformers-5.0.0
Successfully installed evaluate-0.4.6 transformers-5.1.0


In [2]:
import torch
import evaluate
import numpy as np
from typing import Any, Tuple, Union, Optional
from datasets import Dataset
# parameter efficient fine-tuning module import
from peft import LoraConfig, TaskType, get_peft_model
from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    TrainingArguments,
    Trainer,
)
from google.colab import drive
drive.mount("/content/drive")

Mounted at /content/drive


In [3]:
class CausalLMDataCollator:
  def __init__(self, tokenizer: Any, label_pad_token_id: int):
    self.tokenizer = tokenizer
    self.label_pad_token_id = label_pad_token_id

  def __call__(self, features: list[dict[str, Union[str, int]]]) -> dict[str, torch.Tensor]:
    labels = [f["labels"] for f in features]
    for f in features:
      f.pop("labels")

    batch = self.tokenizer.pad(
        features,
        padding=True,
        return_tensors="pt",
    )

    max_len = batch["input_ids"].shape[1]
    padded_labels = []
    for l in labels:
      l = l[:max_len]
      padded = l + [self.label_pad_token_id] * (max_len - len(l))
      padded_labels.append(padded)

    batch["labels"] = torch.tensor(padded_labels, dtype=torch.long)
    return batch

In [None]:
model_id = "llm-jp/llm-jp-3-150m-instruct3"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dev_src = "/content/drive/MyDrive/SST-2/dev.tsv"
train_src = "/content/drive/MyDrive/SST-2/train.tsv"

tokenizer = AutoTokenizer.from_pretrained(model_id)
tokenizer.padding_side = "left"
if tokenizer.pad_token_id is None:
  tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
metric = evaluate.load("accuracy")

POS_ID = tokenizer("positive", add_special_tokens=False)["input_ids"]
NEG_ID = tokenizer("negative", add_special_tokens=False)["input_ids"]

# assert 条件, エラー出力データ
# assert は条件がfalseの場合はエラーを出力する
assert len(POS_ID) == 1 and len(NEG_ID) == 1, (POS_ID, NEG_ID)

POS_ID = POS_ID[0]
NEG_ID = NEG_ID[0]

In [6]:
def preprocess_logits_for_metrics(logits: Union[torch.Tensor, tuple], labels: Optional[torch.Tensor]) -> torch.Tensor:
  if isinstance(logits, tuple):
    logits = logits[0]
  pos = logits[:, :, POS_ID] # shape = (batch_size, seq_len)
  neg = logits[:, :, NEG_ID]
  return torch.stack([pos, neg], dim=-1) # 末尾に新しくvocab_sizeの代わりに入れる

In [7]:
def compute_accuracy(eval_pred: Tuple[np.ndarray, np.ndarray]) -> dict[str, float]:
  logits, labels = eval_pred
  preds, refs = [], []

  for i in range(labels.shape[0]):
    idxs = np.where(labels[i] != -100)[0]
    if len(idxs) == 0:
      continue
    t = idxs[0]

    pos_score = logits[i, t, 0] # vocab_sizeは2で0番目はpositive
    neg_score = logits[i, t, 1]
    pred_label = 1 if pos_score > neg_score else 0

    gold_token = labels[i, t]
    gold_label = 1 if gold_token == POS_ID else 0

    preds.append(pred_label)
    refs.append(gold_label)

  return metric.compute(predictions=preds, references=refs)

In [8]:
def main() -> None:
  train_dataset = Dataset.from_csv(train_src, sep="\t")
  dev_dataset = Dataset.from_csv(dev_src, sep="\t")

  model = AutoModelForCausalLM.from_pretrained(
      model_id,
      device_map="auto" if torch.cuda.is_available() else None,
  )

  peft_config = LoraConfig(
      task_type=TaskType.CAUSAL_LM,
      inference_mode=False, # 配布されているものを使用するときはTrueらしい
      r=8,
      lora_alpha=16,
      lora_dropout=0.1, # LoRAの部分だけ1割の確率でドロップアウトさせる
      target_modules=[
          "q_proj", "k_proj", "v_proj", "o_proj", # query key value ouput
          "gate_proj", "up_proj", "down_proj" # 層の名前だけどわからなかった
      ],
  )

  model = get_peft_model(model, peft_config=peft_config)
  model.config.pad_token_id = tokenizer.pad_token_id

  def tokenize_function(examples):
    prompts = []
    answers = []
    for sentence, label in zip(examples["sentence"], examples["label"]):
         prompt = f"次の文をpositiveかnegativeで答えて。\n文: {sentence}\nラベル:"

         # 終了がわかりやすい方が精度上がりやすいらしい？
         ans = ("positive" if int(label) == 1 else "negative") + tokenizer.eos_token

         prompts.append(prompt)
         answers.append(ans)

    prompt_token = tokenizer(
        prompts,
        padding=False, # 後でdatacollatorでバッチごとにpaddingしたいのでFalse
        max_length=64,
        truncation=True,
        add_special_tokens=False
    )

    answer_token = tokenizer(
        answers,
        add_special_tokens=False,
        padding=False,
    )

    input_ids, attention_mask, labels = [], [], []

    for prompt_ids, answer_ids in zip(prompt_token["input_ids"], answer_token["input_ids"]):
      ids = prompt_ids + answer_ids
      input_ids.append(ids)
      attention_mask.append([1] * len(ids))

      # ignore loss on prompt tokens
      labels.append([-100] * len(prompt_ids) + answer_ids)

    return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}

  # batched=Trueでバッチごとにfunctionにデータを渡す
  train_data = train_dataset.map(tokenize_function, batched=True, remove_columns=train_dataset.column_names) # 必要のな区なったカラムを消す
  dev_data = dev_dataset.map(tokenize_function, batched=True, remove_columns=dev_dataset.column_names)

  datacollator = CausalLMDataCollator(
      tokenizer=tokenizer,
      label_pad_token_id=-100,
      )

  training_args = TrainingArguments(
        output_dir="./results_98ioynb",
        num_train_epochs=1, # データを何周するか
        per_device_train_batch_size=16,
        per_device_eval_batch_size=2,
        learning_rate=2e-4, # 2 * 10^{-4}: 0.0002
        lr_scheduler_type="linear",
        warmup_ratio=0.1,
        eval_strategy="epoch", # 評価をいつ実行するか決める
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="accuracy",
        fp16=torch.cuda.is_available(),
        save_only_model=True,
        eval_accumulation_steps=2,  # 10ステップごとに結果をCPUへ移動させる
  )

  trainer = Trainer(
      model=model,
      args=training_args,
      train_dataset=train_data, # データが多過ぎて学習に時間がかかるので今回は少なくした
      eval_dataset=dev_data,
      data_collator=datacollator,
      compute_metrics=compute_accuracy,
      preprocess_logits_for_metrics=preprocess_logits_for_metrics
  )

  trainer.train()

  eval_results = trainer.evaluate()
  print(f"Accuracy (dev dataset): {eval_results}")

In [None]:
if __name__ == "__main__":
  main()