<a href="https://colab.research.google.com/github/SY-256/llms-from-scratch/blob/main/notebooks/ch06.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 分類のためのファインチューニング
- LLMのさまざまなファインチューニングアプローチ
- スパムメールを識別するために事前学習済みLLMをファインチューニングする

## 6.1 ファインチューニングのさまざまなカテゴリ
- インストラクションチューニング: 特定の指示を使用した一連のタスクを言語モデルに訓練することで、自然言語のプロンプトで表示されたタスクを理解して実行する能力を向上させる
- 分類チューニング: 特定のクラスラベルを認識する能力を向上させる

インストラクションチューニングを行ったモデルは、幅広いタスクに対応できる

分類チューニングを行ったモデルは、訓練中に遭遇したクラスの予測に限定される（専門性が高い）

## 6.2 データセットを準備する

In [None]:
# データセットのダウンロードと解凍
import urllib.request
import zipfile
import os
from pathlib import Path

url = "https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip"
zip_path = "sms_spam_collection.zip"
extracted_path = "sms_spam_collection"
data_file_path = Path(extracted_path) / "SMSSpamCollection.tsv"

def download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path):
    if data_file_path.exists():
        print(f"{data_file_path} already exists. Skipping download and extraction")
        return

    with urllib.request.urlopen(url) as response:
        # ファイルダウンロード
        with open(zip_path, "wb") as out_file:
            out_file.write(response.read())

    with zipfile.ZipFile(zip_path, "r") as zip_ref:
        # ファイル解凍
        zip_ref.extractall(extracted_path)

    original_file_path = Path(extracted_path) / "SMSSpamCollection"
    os.rename(original_file_path, data_file_path) # ファイル拡張子.tsvを追加
    print(f"File downloaded and saved as {data_file_path}")

download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path)

In [None]:
# データの読み込み
import pandas as pd

df = pd.read_csv(
    data_file_path, sep="\t", header=None, names=["Label", "Text"]
)

df

In [None]:
# クラスラベルの分布
print(df["Label"].value_counts())

In [None]:
# アンダーサンプリングして均衡なデータセットを作成
def create_balanced_dataset(df):
    # スパムの数に合わせてデータセットをアンダーサンプリング
    num_spam = df[df["Label"] == "spam"].shape[0]
    ham_subset = df[df["Label"] == "ham"].sample(
        num_spam, random_state=123
    )
    balanced_df = pd.concat(
        [ham_subset, df[df["Label"] == "spam"]]
    )

    return balanced_df

balanced_df = create_balanced_dataset(df)
print(balanced_df["Label"].value_counts())

In [None]:
# ラベルのマッピング
balanced_df["Label"] = balanced_df["Label"].map({"ham": 0, "spam": 1})

In [None]:
# データセットを訓練／検証／評価用に分割する
def random_split(df, train_frac, validation_frac):

    df = df.sample(
        frac=1, random_state=123
    ).reset_index(drop=True)
    train_end = int(len(df) * train_frac) # 分割インデックスを計算
    validation_end = train_end + int(len(df) * validation_frac)

    train_df = df[:train_end] # DataFrameを分割
    validation_df = df[train_end:validation_end]
    test_df = df[validation_end:]

    return train_df, validation_df, test_df

train_df, validation_df, test_df = random_split(balanced_df, 0.7, 0.1)

In [None]:
# CSVファイルで保存
train_df.to_csv("train.csv", index=None)
validation_df.to_csv("validation.csv", index=None)
test_df.to_csv("test.csv", index=None)

## 6.3 データローダーを作成する

テキストの長さがまちまちのテキストチャンク化戦略
- すべてのメッセージをデータセットまたはバッチ内で最も短いメッセージと同じ長さに切りそろえる

->計算量は少なくなるが、大量の情報が失われる場合がある

- すべてのメッセージをデータセットまたはバッチ内で最も長いメッセージと同じ長さにパディングする

->パディングトークンとして`"<|endoftext|>"`を使用する

In [None]:
# テキストのパディング
import tiktoken

tokenizer = tiktoken.get_encoding("gpt2")
print(tokenizer.encode("<|endoftext|>", allowed_special={"<|endoftext|>"}))

In [None]:
# PyTorch Datasetクラスをセットアップする
import torch
from torch.utils.data import Dataset

class SpamDataset(Dataset):
    def __init__(self, csv_file, tokenizer, max_length=None, pad_token_id=50256): # token_id=50256='<|endoftext|>'のパディングトークン
        self.data = pd.read_csv(csv_file)
        self.encoded_texts = [
            tokenizer.encode(text) for text in self.data["Text"]
        ]

        if max_length is None:
            self.max_length = self._longest_encoded_length()
        else:
            self.max_length = max_length

            self.encoded_texts = [
                encoded_text[:self.max_length] for encoded_text in self.encoded_texts
            ]

        self.encoded_texts = [
            encoded_text + [pad_token_id] *
            (self.max_length - len(encoded_text))
            for encoded_text in self.encoded_texts
        ]

    def __getitem__(self, index):
        encoded = self.encoded_texts[index]
        label = self.data.iloc[index]["Label"]
        return (torch.tensor(encoded, dtype=torch.long),
                torch.tensor(label, dtype=torch.long)
        )

    def __len__(self):
        return len(self.data)

    def _longest_encoded_length(self):
        max_length = 0
        for encoded_text in self.encoded_texts:
            encoded_length = len(encoded_text)
            if encoded_length > max_length:
                max_length = encoded_length

        return max_length


In [None]:
# パディングを適用する
train_dataset = SpamDataset(
    csv_file="train.csv",
    max_length=None,
    tokenizer=tokenizer
)

In [None]:
# 最も長いシーケンスのトークン数
print(train_dataset.max_length)

最大で1,024トークン（コンテキストの長さの上限値）のシーケンスで対処できる

1,024よりも長いテキストがデータセットに含まれている場合、max_length=1024を指定すると1,024を超えないようにできる

In [None]:
# 検証データセット／テストデータセットの作成
# max_lengthをtrain_dataset.max_lengthに合わせる
# 検証データセットとテストデータセットに1,024トークンを超えるシーケンスが存在しない場合は、両方のデータセットでmax_length=Noneを設定できる

val_dataset = SpamDataset(
    csv_file="validation.csv",
    max_length=train_dataset.max_length,
    tokenizer=tokenizer
)

test_dataset = SpamDataset(
    csv_file="test.csv",
    max_length=train_dataset.max_length,
    tokenizer=tokenizer
)

In [None]:
# 練習問題 6-1 コンテキストの長さの上限を引き上げる（120 -> 1,024）
train_dataset_1024 = SpamDataset(
    csv_file="train.csv",
    max_length=1024,
    tokenizer=tokenizer
)

print(train_dataset_1024.max_length)

In [None]:
val_dataset_1024 = SpamDataset(
    csv_file="validation.csv",
    max_length=train_dataset.max_length,
    tokenizer=tokenizer
)

test_dataset_1024 = SpamDataset(
    csv_file="test.csv",
    max_length=train_dataset.max_length,
    tokenizer=tokenizer
)

In [None]:
# PyTorchデータローダーを作成する
from torch.utils.data import DataLoader

num_workers = 0 # ほとんどのコンピュータとの互換性を確保する設定
batch_size = 8
torch.manual_seed(123)

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=num_workers,
    drop_last=True
)
val_loader = DataLoader(
    dataset=val_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    drop_last=False

)
test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=batch_size,
    num_workers=num_workers,
    drop_last=False
)

In [None]:
# 最後のバッチのテンソル次元を出力して、期待されるバッチサイズを実際に返すことを確認
for input_batch, target_batch in train_loader:
    pass

print(f"Input batch dimensions: {input_batch.shape}")
print(f"Label batch dimensions: {target_batch.shape}")

In [None]:
# 各データセットのバッチの総数を出力
print(f"{len(train_loader)} training batches")
print(f"{len(val_loader)} validation batches")
print(f"{len(test_loader)} test batches")


## 6.4 事前学習済みモデルの重みでモデルを初期化する

In [None]:
# モデル設定
CHOOSE_MODEL = "gpt2-small (124M)"
INPUT_PROMPT = "Every effort moves"

BASE_CONFIG = {
    "vocab_size": 50257, # 語彙のサイズ
    "context_length": 1024, # コンテキストの長さ
    "drop_rate": 0.0, # ドロップアウト率
    "qkv_bias": True, # クエリ、キーバリューの計算にバイアスを使用するか
}

model_config = {
    "gpt2-small (124M)": {"emb_dim": 768, "n_layers": 12, "n_heads": 12},
    "gpt2-medium (355M)": {"emb_dim": 1024, "n_layers": 24, "n_heads": 16},
    "gpt2-large (774M)": {"ebm_dim": 1280, "n_layers": 36, "n_heads": 20},
    "gpt2-xl (1558M)": {"emb_dim": 1600, "n_layers": 48, "n_heads": 25}
}

BASE_CONFIG.update(model_config[CHOOSE_MODEL])

assert train_dataset.max_length <= BASE_CONFIG["context_length"], (
    f"Dataset length {train_dataset.max_length} exceeds model's context"
    f"length {BASE_CONFIG['context_length']}. Reinitialize data sets with"
    f"`max_length={BASE_CONFIG['context_length']}`"
)

In [None]:
! git clone https://github.com/rasbt/LLMs-from-scratch.git

In [None]:
import sys
sys.path.append('/content/LLMs-from-scratch/ch06/01_main-chapter-code')

In [None]:
# 事前学習済みGPTモデルを読み込む
from gpt_download import download_and_load_gpt2
from previous_chapters import GPTModel, load_weights_into_gpt

model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")")
settings, params = download_and_load_gpt2(
    model_size=model_size, models_dir="gpt2"
)

model = GPTModel(BASE_CONFIG)
load_weights_into_gpt(model, params) # 各種重みパラメータ設定
model.eval();

In [None]:
# テキスト生成ユーティリティ関数を再利用
from previous_chapters import (
    generate_text_simple,
    text_to_token_ids,
    token_ids_to_text
)

text_1 = "Every effort moves you"

token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(text_1, tokenizer),
    max_new_tokens=15,
    context_size=BASE_CONFIG["context_length"]
)
print(token_ids_to_text(token_ids, tokenizer))

In [None]:
# ファインチューイング前にモデルに指示を与えてスパムを分類できるか
text_2 = (
    "Is the following text 'spam'? Answer with 'yes' or 'no':"
    " 'You are a winner you have been specially"
    " selected to receive $1000 cach or a $2000 award.'"
)

token_ids = generate_text_simple(
    model=model,
    idx=text_to_token_ids(text_2, tokenizer),
    max_new_tokens=23,
    context_size=BASE_CONFIG["context_length"]
)
print(token_ids_to_text(token_ids, tokenizer))

出力の結果から、モデルが指示に従うのに苦戦していることがわかる

モデルは事前学習は行っているが、スパム分類を行う用にインストラクションチューニングは行っていないため、この結果になっている

## 6.5 分類ヘッドを追加する
- 隠れ層の表現を50,257語の語彙にマッピングする元の出力層を、2つのクラス（`spam`:1 or `not spam`:0）にマッピングする小さな出力層に変える
- 出力層を書き換えること以外は、以前のモデルと同じ

## 選択した層のファインチューニング vs. すべての層のファインチューニング
事前学習済みモデルを使用する場合、一般的にモデルのすべての層を新たにファインチューニングする必要はない。ニューラルネットワークの言語モデルでは、低い方（入力に近い方）の層は一般的に幅広いタスクやデータセットに適用できる基本的な言語構造やセマンティックを捉える。
したがって、多くの場合では、言語上の微妙なパターンやタスク固有の特徴量だけに特化した最後の層（出力に近い層）だけをファインチューニングすれば十分である

小数の層だけをファインチューニングすることで計算効率も良くなる


In [None]:
# モデルを分類チューニングするために、モデルを凍結(freeze)する
# モデルを凍結させるとすべての層が訓練不能になる
for param in model.parameters():
    param.requires_grad = False

In [None]:
# 出力層(model.out_head)を置き換える
# 分類層を追加する
torch.manual_seed(123)

num_classes = 2
model.out_head = torch.nn.Linear(
    in_features=BASE_CONFIG["emb_dim"],
    out_features=num_classes # 今回の分類タスクのクラス数で設定
)

出力層model.out_headのrequires_grad属性はデフォルトでTrueに設定

この層はモデルにおいて訓練中に更新される唯一の層になる

追加の層もファインチューニングするとモデルの予測精度が向上することがわかっている
->最後の正規化層(LayerNorm)、最後のTransformerブロックを訓練したほうが、出力層だけを訓練するより性能が向上する

In [None]:
# 最後のLayerNormと最後のTransformerブロックを訓練可能に設定する -> requires_grad=Trueに設定
for param in model.trf_blocks[-1].parameters():
    param.requires_grad = True # 最後のTransformerブロックを訓練可能に

for param in model.final_norm.parameters():
    param.requires_grad = True # 最後のLayerNormを訓練可能に

In [None]:
# 特定層の訓練可否設定の変更後もこれまでと同じように使用できる
inputs = tokenizer.encode("Do you have time")
inputs = torch.tensor(inputs).unsqueeze(0)
print(f"Inputs: {inputs}")
print(f"Inputs dimensions: {inputs.shape}")

In [None]:
# エンコードされたトークンIDも同じように渡せる
with torch.no_grad():
    outputs = model(inputs)

print(f"Outputs:\n {outputs}")
print(f"Outputs dimensions: {outputs.shape}")

モデルの出力層を置き換えているので出力テンソル形状が50257 -> 2になっている

In [None]:
# 2値分類の場合、すべての出力行をファインチューニングする必要はない
# たった1つ（最後の出力トークン）に焦点を合わせることができる
# 最後のトークンは、他のすべてのトークンに対するAttentionスコアを持つ唯一のトークンであるため
# (バッチサイズ, 入力数, ラベル数)
print(f"Last output token: {outputs[:, -1, :]}")

## 6.6 分類の損失と正解率を算出する
- ファインチューニングで使用する評価関数の実装

In [None]:
# 最後の出力トークンを確率に変換する方法
print(f"Last output token: {outputs[:, -1, :]}")

# クラスラベルの取得（softmax関数でlogit -> argmaxでラベル取得）
probas = torch.softmax(outputs[:, -1, :], dim=-1)
label = torch.argmax(probas)
print(f"Class label: {label.item()}")

In [None]:
# 値が大きくなればsoftmax関数の出力も大きくなるので、必ずしもsoftmax関数使わなくても良い
logits = outputs[:, -1, :]
label = torch.argmax(logits)
print(f"Class label: {label.item()}")

In [None]:
# 分類正解率を計算する
def calc_accuracy_loader(data_loader, model, device, num_batches=None):
    model.eval()
    correct_predictions, num_examples = 0.0, 0

    if num_batches is None:
        num_bacthes = len(data_loader)
    else:
        num_batches = min(num_batches, len(data_loader))
    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i < num_batches:
            input_batch = input_batch.to(device)
            target_batch = target_batch.to(device)

            with torch.no_grad():
                logits = model(input_batch)[:, -1, :] # 最後の出力トークンのロジットを計算

            predicted_labels = torch.argmax(logits, dim=-1)

            num_examples += predicted_labels.shape[0]
            correct_predictions += (
                (predicted_labels == target_batch).sum().item()
            )
        else:
            break
        return correct_predictions / num_examples

In [None]:
# 分類正解率を計算する関数を使ってみる
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

torch.manual_seed(123)
train_accuracy = calc_accuracy_loader(
    train_loader, model, device, num_batches=10
)
val_accuracy = calc_accuracy_loader(
    val_loader, model, device, num_batches=10
)
test_accuracy = calc_accuracy_loader(
    test_loader, model, device, num_batches=10
)

print(f"Training accuracy: {train_accuracy*100:.2f}%")
print(f"Validation accuracy: {val_accuracy*100:.2f}%")
print(f"Test accuracy: {test_accuracy*100:.2f}%")

In [None]:
# 損失関数の定義
# 出力層の結果に対する交差エントロピー誤差を使用
def calc_loss_batch(input_batch, target_batch, model, device):
    input_batch = input_batch.to(device)
    target_batch = target_batch.to(device)
    logits = model(input_batch)[:, -1, :] # 最後の出力トークンのロジット
    loss = torch.nn.functional.cross_entropy(logits, target_batch)
    return loss

In [None]:
# データローダから得られるすべてのについて損失を計算する関数
# 分類損失を計算する
def calc_loss_loader(data_loader, model, device, num_batches=None):
    total_loss = 0.
    if len(data_loader) == 0:
        return float("nan")
    elif num_batches is None:
        num_batches = len(data_loader)
    else:
        # バッチ数がデータローダーのバッチ数を超えないように調節
        num_batches = min(num_batches, len(data_loader))
    for i, (input_batch, target_batch) in enumerate(data_loader):
        if i < num_batches:
            loss = calc_loss_batch(
                input_batch, target_batch, model, device
            )
            total_loss += loss.item()
        else:
            break

    return total_loss / num_batches

In [None]:
# 損失計算
with torch.no_grad():
    train_loss = calc_loss_loader(
        train_loader, model, device, num_batches=5
    )
    val_loss = calc_loss_loader(
        val_loader, model, device, num_batches=5
    )
    test_loss = calc_loss_loader(
        test_loader, model, device, num_batches=5
    )

print(f"Training loss: {train_loss:.3f}")
print(f"Validation loss: {val_loss:.3f}")
print(f"Test loss: {test_loss:.3f}")

## 6.7 教師ありデータでのモデルファインチューニング
- 訓練関数を定義して損失が小さくなるように訓練させる
- モデル評価時はテキスト生成ではなく、分類正解率を計算する

In [None]:
# スパムを分類するためのモデルのファインチューニング
def train_classifier_simple(model, train_loader, val_loader, optimizer, device, num_epochs, eval_freq, eval_iter):
    # 損失と既視のサンプルを追跡するためにリストを初期化
    train_losses, val_losses, train_accs, val_accs = [], [], [], []
    example_seen, global_step = 0, -1

    # メインの訓練ループ
    for epoch in range(num_epochs):
        model.train()

        for input_batch, target_batch in train_loader:
            optimizer.zero_grad() # 勾配リセット
            loss = calc_loss_batch(
                input_batch, target_batch, model, device
            )
            loss.backward() # 誤差逆伝播（損失の勾配計算）
            optimizer.step() # 損失の勾配を使ってモデルの重みを更新
            example_seen += input_batch.shape[0] # トークンではなくサンプルを追跡
            global_step += 1

            if global_step % eval_freq == 0:
                # オプションの評価ステップ
                train_loss, val_loss = evaluate_model(
                    model, train_loader, val_loader, device, eval_iter
                )
                train_losses.append(train_loss)
                val_losses.append(val_loss)
                print(f"Ep {epoch+1} (Step {global_step:06d}): "
                f"Train loss {train_loss:.3f}, "
                f"Val loss {val_loss:.3f}")

        train_accuracy = calc_accuracy_loader(
            # 各エポックの後に正解率を計算
            train_loader, model, device, num_batches=eval_iter
        )
        val_accuracy = calc_accuracy_loader(
            val_loader, model, device, num_batches=eval_iter
        )
        print(f"Training accuracy: {train_accuracy*100:.2f}% | ", end="")
        print(f"Validation accuracy: {val_accuracy*100:.2f}%")
        train_accs.append(train_accuracy)
        val_accs.append(val_accuracy)

    return train_losses, val_losses, train_accs, val_accs, example_seen

def evaluate_model(model, train_loader, val_loader, device, eval_iter):
    model.eval()
    with torch.no_grad():
        train_loss = calc_loss_loader(
            train_loader, model, device, num_batches=eval_iter
        )
        val_loss = calc_loss_loader(
            val_loader, model, device, num_batches=eval_iter
        )

    model.train()
    return train_loss, val_loss

In [None]:
# 訓練を開始
import time

start_time = time.time()
torch.manual_seed(123)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.1)
num_epochs = 5

train_losses, val_losses, train_accs, val_accs, example_seen = \
    train_classifier_simple(
        model, train_loader, val_loader, optimizer, device, num_epochs=num_epochs, eval_freq=50, eval_iter=5
    )

end_time = time.time()
execution_time_minutes = (end_time - start_time) / 60
print(f"Training completed in {execution_time_minutes:.2f} minutes.")

In [None]:
# 分類損失をプロット
import matplotlib.pyplot as plt

def plot_values(epochs_seen, example_seen, train_values, val_values, label="loss"):
    fig, ax1 = plt.subplots(nrows=1, ncols=1, figsize=(5, 3))

    # 各エポックに対する訓練と検証の損失をプロット
    ax1.plot(epochs_seen, train_values, label=f"Training {label}")
    ax1.plot(epochs_seen, val_values, linestyle="-.", label=f"Validation {label}")
    ax1.set_xlabel("Epochs")
    ax1.set_ylabel(label.capitalize())
    ax1.legend()

    ax2 = ax1.twiny()
    ax2.plot(example_seen, train_values, alpha=0)
    ax2.set_xlabel("Example seen")

    fig.tight_layout()
    plt.savefig(f"{label}-plot.pdf")
    plt.show()

epochs_tensor = torch.linspace(0, num_epochs, len(train_losses))
example_seen_tensor = torch.linspace(0, example_seen, len(train_losses))

plot_values(epochs_tensor, example_seen_tensor, train_losses, val_losses)

損失曲線のプロットから訓練のlossと検証のlossが問題なく推移しているため、過剰適合の兆候もほとんどなく、上手く学習が進んでいることがわかる

## エポック数の選択
- エポック数はデータセットとタスクの難易度によって決まる
- 通常は5くらいから始めると良い
- 最初の数エポックでモデルが過剰適合に陥っていることを損失プロットが示唆している場合は、エポック数を減らすことが考えられる
- さらに訓練すると検証データセットの損失が改善する場合はエポック数を増やす
- 検証データセットでの損失が0に近いことがベストなエポック数の選択基準になる

In [None]:
# 分類正解率のプロット
epochs_tensor = torch.linspace(0, num_epochs, len(train_accs))
example_seen_tensor = torch.linspace(0, example_seen, len(train_accs))

plot_values(
    epochs_tensor, example_seen_tensor, train_accs, val_accs, label="accuracy"
)

エポックを通じて実線（Train）と破線（Validation）が近接していることは、このモデルが訓練データセットに過剰適合していないことを示唆している

In [None]:
# データセット全体での性能指標を計算
