# 05. LoRAによるファインチューニング (学習と永続化)

このノートブックでは、LoRA（QLoRA）による軽量ファインチューニングを行い、その成果物を **Google Driveに実験単位で保存** します。保存したアダプタは **06. 統合演習** で再利用します。

## 事前準備
Google Colabのメニュー「ランタイム」→「ランタイムのタイプを変更」で **T4 GPU** を選択してください。


In [None]:
# 編集禁止セル
import os
import sys
import json
from datetime import datetime

import torch
from google.colab import drive

# 1. Google Driveのマウント
if not os.path.isdir('/content/drive'):
    drive.mount('/content/drive')

repo_path = '/content/llm_lab'
if os.path.exists(repo_path):
    !rm -rf {repo_path}
!git clone -b stable-base https://github.com/akio-kobayashi/llm_lab.git {repo_path}
os.chdir(repo_path)

!pip install -q -U transformers accelerate bitsandbytes sentence-transformers faiss-cpu peft datasets gradio
if 'src' not in sys.path:
    sys.path.append(os.path.abspath('src'))

from src.common import load_llm, generate_text
from src.lora import create_lora_model, train_lora
from peft import PeftModel

# 05/06で共有する保存先
DRIVE_BASE_DIR = '/content/drive/MyDrive/llm_lab_outputs'
DRIVE_RUNS_DIR = os.path.join(DRIVE_BASE_DIR, 'lora_runs')
SELECTED_ADAPTER_RECORD_PATH = os.path.join(DRIVE_BASE_DIR, 'selected_adapter_path.txt')
os.makedirs(DRIVE_RUNS_DIR, exist_ok=True)

print('セットアップが完了しました。')


## 1. ベースモデルのロードと学習前の確認

単一の質問だけでなく、複数の入力パターンで学習前の挙動を確認します。


In [None]:
# 編集禁止セル
base_model, tokenizer = load_llm(use_4bit=True)


def safe_generate_local(model, tokenizer, prompt, max_new_tokens=128, temperature=0.7, top_p=0.9, repetition_penalty=1.05, do_sample=True):
    """05ノート専用: pipelineを使わずに安全側でgenerateする。"""
    try:
        inputs = tokenizer(prompt, return_tensors='pt')
        model_device = next(model.parameters()).device
        inputs = {k: v.to(model_device) for k, v in inputs.items()}

        pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id

        with torch.no_grad():
            generated_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                temperature=temperature,
                top_p=top_p,
                repetition_penalty=repetition_penalty,
                do_sample=do_sample,
                pad_token_id=pad_token_id,
                eos_token_id=tokenizer.eos_token_id,
                use_cache=False,
            )
        return tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    except Exception as e:
        return f"Error: {e}"


def build_prompt(question: str, instruction: str = ''):
    body = question if not instruction else f"{question}\n{instruction}"
    return (
        '以下は、タスクを説明する指示です。要求を適切に満たす応答を書きなさい。\n\n'
        f'### 指示:\n{body}\n\n### 応答:\n'
    )


DEMO_CASES = [
    {
        'name': 'JSON形式: 主人公情報',
        'question': '『星屑のメモリー』の主人公について教えて。',
        'instruction': '回答は必ず以下のJSON形式で出力してください。\n{ "answer": "...", "confidence": "high|medium|low" }',
    },
    {
        'name': 'JSON形式: あらすじ要約',
        'question': '『古都の探偵録』のあらすじを2文で教えて。',
        'instruction': '回答は必ず以下のJSON形式で出力してください。\n{ "answer": "...", "confidence": "high|medium|low" }',
    },
    {
        'name': '要約制約: 2文以内',
        'question': '『最後の航海』の事件の概要を教えて。',
        'instruction': '回答は2文以内で、固有名詞を1つ以上含めてください。',
    },
    {
        'name': '通常QA: 作品ジャンル',
        'question': '『シャドウ・ハンター』はどのようなジャンルの作品ですか？',
        'instruction': '',
    },
    {
        'name': '知識外ケース: 安全応答',
        'question': '『銀河鉄道999』の2026年版アニメ映画の公式公開日は？',
        'instruction': '根拠がない場合は推測せず、「分からない」と明記してください。',
    },
]

# 学習の安定性を優先し、学習前生成はデフォルトでOFF
RUN_PRECHECK_BEFORE_TRAIN = False

if RUN_PRECHECK_BEFORE_TRAIN:
    print('--- 学習前の回答（複数ケース）---')
    for i, case in enumerate(DEMO_CASES, start=1):
        prompt = build_prompt(case['question'], case['instruction'])
        res = safe_generate_local(base_model, tokenizer, prompt, max_new_tokens=128)
        answer = res.split('### 応答:')[-1].strip()
        print(f"[{i}] {case['name']}")
        print(f"Q: {case['question']}")
        print(f"A: {answer}")
        print('-' * 40)
else:
    print('学習前生成はスキップします（RUN_PRECHECK_BEFORE_TRAIN=False）。')
    print('このまま学習セルへ進んでください。')


## 2. LoRA学習の実行 (成果をGoogle Driveへ保存)

`PROFILE_NAME` を切り替えて、計算負荷と学習効果のバランスを比較できます。

- `quick`: 最短で完走確認（推奨）
- `standard`: 演習向け標準設定
- `extended`: 時間に余裕がある場合

学習成果物は `llm_lab_outputs/lora_runs/<run_name>/final_adapter` に保存され、最新実験のパスは `llm_lab_outputs/selected_adapter_path.txt` に記録されます。


In [None]:
# 編集禁止セル
# 実験プロファイル（必要に応じて PROFILE_NAME を変更）
PROFILE_NAME = 'quick'  # quick | standard | extended

PROFILES = {
    'quick': {
        'max_steps': 10,
        'per_device_train_batch_size': 1,
        'gradient_accumulation_steps': 4,
        'max_seq_length': 256,
        'learning_rate': 5e-5,
    },
    'standard': {
        'max_steps': 30,
        'per_device_train_batch_size': 1,
        'gradient_accumulation_steps': 8,
        'max_seq_length': 512,
        'learning_rate': 5e-5,
    },
    'extended': {
        'max_steps': 60,
        'per_device_train_batch_size': 1,
        'gradient_accumulation_steps': 8,
        'max_seq_length': 512,
        'learning_rate': 5e-5,
    },
}

if PROFILE_NAME not in PROFILES:
    raise ValueError(f'PROFILE_NAME must be one of: {list(PROFILES.keys())}')

profile = PROFILES[PROFILE_NAME]
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
RUN_NAME = f"{PROFILE_NAME}_s{profile['max_steps']}_lr{profile['learning_rate']}_{timestamp}"
DRIVE_OUTPUT_DIR = os.path.join(DRIVE_RUNS_DIR, RUN_NAME)
os.makedirs(DRIVE_OUTPUT_DIR, exist_ok=True)

print(f'Run name: {RUN_NAME}')
print(f'Output dir: {DRIVE_OUTPUT_DIR}')

print('1. LoRAアダプタをモデルに追加中...')
lora_model = create_lora_model(base_model)

print('2. 学習を開始します...')
train_lora(
    model=lora_model,
    tokenizer=tokenizer,
    train_dataset_path='data/lora/lora_train_sample.jsonl',
    output_dir=DRIVE_OUTPUT_DIR,
    max_steps=profile['max_steps'],
    learning_rate=profile['learning_rate'],
    per_device_train_batch_size=profile['per_device_train_batch_size'],
    gradient_accumulation_steps=profile['gradient_accumulation_steps'],
    max_seq_length=profile['max_seq_length'],
)

FINAL_ADAPTER_PATH = os.path.join(DRIVE_OUTPUT_DIR, 'final_adapter')
RUN_CONFIG_PATH = os.path.join(DRIVE_OUTPUT_DIR, 'run_config.json')

run_config = {
    'run_name': RUN_NAME,
    'profile_name': PROFILE_NAME,
    'train_dataset_path': 'data/lora/lora_train_sample.jsonl',
    'final_adapter_path': FINAL_ADAPTER_PATH,
    'params': profile,
}

with open(RUN_CONFIG_PATH, 'w', encoding='utf-8') as f:
    json.dump(run_config, f, ensure_ascii=False, indent=2)

with open(SELECTED_ADAPTER_RECORD_PATH, 'w', encoding='utf-8') as f:
    f.write(FINAL_ADAPTER_PATH + '\n')

print(f'学習完了。アダプタ: {FINAL_ADAPTER_PATH}')
print(f'設定保存: {RUN_CONFIG_PATH}')
print(f'06向けアダプタ記録: {SELECTED_ADAPTER_RECORD_PATH}')


## 3. 保存されたアダプタのロードテストと簡易評価

Google Drive からアダプタをロードし、動作確認を行います。あわせて少数の評価質問で「JSON形式で返せているか」を簡易チェックします。


In [None]:
# 編集禁止セル
# アダプタの保存場所（通常は直前セルで生成）
DRIVE_ADAPTER_PATH = globals().get('FINAL_ADAPTER_PATH', None)

# 念のため selected_adapter_path.txt からも復元可能にする
if not DRIVE_ADAPTER_PATH and os.path.exists(SELECTED_ADAPTER_RECORD_PATH):
    with open(SELECTED_ADAPTER_RECORD_PATH, 'r', encoding='utf-8') as f:
        DRIVE_ADAPTER_PATH = f.read().strip()


def safe_generate_local(model, tokenizer, prompt, max_new_tokens=128, temperature=0.7, top_p=0.9, repetition_penalty=1.05, do_sample=True):
    """05最終セル専用: pipelineを使わずに安全側でgenerateする。"""
    try:
        inputs = tokenizer(prompt, return_tensors='pt')
        model_device = next(model.parameters()).device
        inputs = {k: v.to(model_device) for k, v in inputs.items()}

        pad_token_id = tokenizer.pad_token_id if tokenizer.pad_token_id is not None else tokenizer.eos_token_id

        with torch.no_grad():
            generated_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                temperature=temperature,
                top_p=top_p,
                repetition_penalty=repetition_penalty,
                do_sample=do_sample,
                pad_token_id=pad_token_id,
                eos_token_id=tokenizer.eos_token_id,
                use_cache=False,
            )
        return tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    except Exception as e:
        return f"Error: {e}"


if DRIVE_ADAPTER_PATH and os.path.exists(DRIVE_ADAPTER_PATH):
    print(f'Google Driveからアダプタをロードしています: {DRIVE_ADAPTER_PATH}')

    # 学習安定性を優先: 既存の学習済みlora_modelをそのまま使う（再ロードは重い）
    USE_IN_MEMORY_LORA = True

    if USE_IN_MEMORY_LORA and 'lora_model' in globals() and lora_model is not None:
        test_model = lora_model
        eval_tokenizer = tokenizer
    else:
        # 永続化確認をしたい場合のみ再ロード
        eval_base_model, eval_tokenizer = load_llm(use_4bit=True)
        test_model = PeftModel.from_pretrained(eval_base_model, DRIVE_ADAPTER_PATH)

    test_model.eval()

    print('--- 学習後の回答（複数ケース）---')
    for i, case in enumerate(DEMO_CASES, start=1):
        prompt = build_prompt(case['question'], case['instruction'])
        res = safe_generate_local(test_model, eval_tokenizer, prompt, max_new_tokens=128)
        answer = res.split('### 応答:')[-1].strip()
        print(f"[{i}] {case['name']}")
        print(f"Q: {case['question']}")
        print(f"A: {answer}")
        print('-' * 40)

    # 簡易評価: JSON形式の出力率を少数サンプルで確認
    eval_path = 'data/lora/lora_eval_questions.json'
    if os.path.exists(eval_path):
        with open(eval_path, 'r', encoding='utf-8') as f:
            eval_data = json.load(f)

        max_eval_questions = 3
        questions = eval_data.get('questions', [])[:max_eval_questions]
        json_ok = 0

        print('\n--- 簡易評価 (JSON形式チェック) ---')
        for i, q in enumerate(questions, start=1):
            question = q.get('question', '')
            eval_instruction = '回答は必ず以下のJSON形式で出力してください。\\n{ "answer": "...", "confidence": "high|medium|low" }'
            eval_prompt = build_prompt(question, eval_instruction)

            out = safe_generate_local(test_model, eval_tokenizer, eval_prompt, max_new_tokens=128)
            answer_text = out.split('### 応答:')[-1].strip()

            is_json = False
            try:
                left = answer_text.find('{')
                right = answer_text.rfind('}')
                if left != -1 and right != -1 and right > left:
                    json.loads(answer_text[left:right + 1])
                    is_json = True
            except Exception:
                is_json = False

            json_ok += int(is_json)
            print(f'[{i}] JSON形式: {is_json}')
            print(f'質問: {question}')
            print(f'出力: {answer_text[:180]}')
            print('-' * 40)

        print(f'JSON形式遵守率: {json_ok}/{len(questions)}')
else:
    print('エラー: Google Drive にアダプタが見つかりません。学習が正常に完了したか確認してください。')


## まとめ

- LoRAアダプタは `llm_lab_outputs/lora_runs/<run_name>/final_adapter` に保存されます。
- 直近で使うアダプタは `llm_lab_outputs/selected_adapter_path.txt` に記録されます。
- 次の **06. 統合演習** では、この記録ファイルを使ってアダプタを再利用します。
