In [None]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead, create_reference_model
from datasets import Dataset # Hugging Face Datasets
import math
import re # 正規表現によるパース用

# --- 基本設定 ---
# Generatorモデル (LoRAでファインチューニングする対象)
generator_model_name = "cyberagent/open-calm-7b" # 例: 日本語モデル
# Predictorモデル (IFDスコア算出用、固定)
predictor_model_name = "stabilityai/japanese-stablelm-instruct-gamma-7b" # 例: 一問一答に強いとされるモデル

# PEFT (LoRA) 設定
lora_config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type="CAUSAL_LM",
)

# QLoRAを使う場合 (4bit量子化)
use_qlora = True # Trueにすると4bit量子化を使用
if use_qlora:
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16, # または torch.float16
        bnb_4bit_use_double_quant=True,
    )
else:
    bnb_config = None

# PPO設定
ppo_config = PPOConfig(
    model_name=generator_model_name, # TRL内部での参照用
    learning_rate=1.41e-5,
    batch_size=16,          # 1ステップで処理する文脈の数
    mini_batch_size=4,      # PPOのミニバッチサイズ
    ppo_epochs=4,           # 1回のデータ収集でPPOの更新を行うエポック数
    log_with="wandb",       # wandbなどのロギングツールを指定可能 (任意)
    # gradient_accumulation_steps=1,
    # early_stopping=False,
    # target_kl=0.1,        # KLダイバージェンスの目標値
    # kl_penalty="kl",
    # seed=42,
    # init_kl_coef=0.2,
    # adap_kl_ctrl=True,
)

# その他設定
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
max_context_length = 512   # Generatorに入力する文脈の最大長
max_generation_length = 128 # Generatorが生成する「質問＋回答」の最大長
num_epochs = 10            # 強化学習の総エポック数
dataset_path = "path/to/your/1000_documents.jsonl" # データセットのパス (仮)

In [None]:
# --- Generatorモデルのロード ---
print(f"Loading generator tokenizer: {generator_model_name}")
generator_tokenizer = AutoTokenizer.from_pretrained(generator_model_name)
if generator_tokenizer.pad_token is None:
    generator_tokenizer.pad_token = generator_tokenizer.eos_token # PADトークンがない場合はEOSトークンで代用

print(f"Loading generator model: {generator_model_name}")
if use_qlora:
    generator_model = AutoModelForCausalLM.from_pretrained(
        generator_model_name,
        quantization_config=bnb_config,
        device_map={"": 0} # GPU 0 にロード (環境に合わせて調整)
    )
    generator_model = prepare_model_for_kbit_training(generator_model)
else:
    generator_model = AutoModelForCausalLM.from_pretrained(
        generator_model_name,
        torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16, # 量子化しない場合の型
        device_map="auto"
    )

generator_model = get_peft_model(generator_model, lora_config)
# generator_model.print_trainable_parameters() # 学習可能なパラメータ数を表示

# TRLのPPOではAutoModelForCausalLMWithValueHeadが必要
# 既存のPEFTモデルからValueHead付きモデルを初期化
generator_model_with_value_head = AutoModelForCausalLMWithValueHead.from_pretrained(generator_model)

# 参照モデルの作成 (KLダイバージェンス計算用)
# LoRAを適用する前の重みを持つモデルが理想だが、ここでは簡単のため同じモデルを複製
# 実際には、学習開始時のgenerator_modelのコピーや、LoRA適用前のベースモデルを使う
# ref_model = create_reference_model(generator_model_with_value_head)
# または、学習初期の重みを保存しておき、それをロードする
# ここでは、簡単のため、初期化されたもう一つのモデルをrefとして使う（TRLが内部で処理してくれる場合もある）
# もしgenerator_model_with_value_headがPEFTモデルなら、それに対応した参照モデル作成が必要
# TRLのドキュメントに従い、ref_modelはNoneにしてPPOTrainerに渡すと内部で作成される場合がある
ref_model = None


# --- Predictorモデルのロード ---
print(f"Loading predictor tokenizer: {predictor_model_name}")
predictor_tokenizer = AutoTokenizer.from_pretrained(predictor_model_name)
if predictor_tokenizer.pad_token is None:
    predictor_tokenizer.pad_token = predictor_tokenizer.eos_token

print(f"Loading predictor model: {predictor_model_name}")
# Predictorは量子化なしでロードするか、別途設定
predictor_model = AutoModelForCausalLM.from_pretrained(
    predictor_model_name,
    torch_dtype=torch.bfloat16 if torch.cuda.is_bf16_supported() else torch.float16,
    device_map="auto" # 環境に合わせて調整
)
predictor_model.eval() # 評価モード

print(f"Generator model on: {generator_model_with_value_head.device}, Predictor model on: {predictor_model.device}")

In [None]:
def calculate_log_probability(model, tokenizer, text_sequence, context_sequence=None, device="cpu"):
    model.to(device)
    if context_sequence:
        # 文脈+質問 と 回答 の形式を想定
        # トークナイザーによってはeos_tokenの扱いに注意
        full_text = context_sequence + tokenizer.eos_token + text_sequence
        input_ids = tokenizer.encode(full_text, return_tensors="pt", truncation=True, max_length=tokenizer.model_max_length).to(device)
        
        context_plus_instruction_text = context_sequence # 論文ではQ (Instruction, [Input])
        context_plus_instruction_ids = tokenizer.encode(context_plus_instruction_text + tokenizer.eos_token, return_tensors="pt", truncation=True, max_length=tokenizer.model_max_length).to(device)
        context_plus_instruction_length = context_plus_instruction_ids.shape[1]
        
        labels = input_ids.clone()
        # 文脈+質問部分の損失は計算しない
        if input_ids.shape[1] > context_plus_instruction_length:
             labels[:, :context_plus_instruction_length] = -100
        else: # 回答が空か非常に短い場合など
             # この場合、有効なラベルがないので損失は0またはエラー。IFD計算には不適切。
             print(f"Warning: Answer part is empty or too short after tokenization for log_prob. Full text len: {input_ids.shape[1]}, Context+Instruction len: {context_plus_instruction_length}")
             return 1e9 # 非常に大きな損失（IFDスコアが高くなるように）
    else:
        # 回答Aのみ
        input_ids = tokenizer.encode(text_sequence, return_tensors="pt", truncation=True, max_length=tokenizer.model_max_length).to(device)
        labels = input_ids.clone()

    if input_ids.shape[1] == 0 or (context_sequence and input_ids.shape[1] <= context_plus_instruction_length): # 入力が空、またはラベルが全て-100になるケース
        print(f"Warning: No valid tokens to calculate loss. Input shape: {input_ids.shape}")
        return 1e9 # 非常に大きな損失

    with torch.no_grad():
        outputs = model(input_ids, labels=labels)
        log_prob_score = outputs.loss.item()

    if math.isnan(log_prob_score) or math.isinf(log_prob_score):
        print(f"Warning: Log probability score is NaN or Inf. Returning a large loss value.")
        return 1e9
    return log_prob_score

def calculate_normalized_ifd_reward(predictor_model, predictor_tokenizer, context_plus_question, answer, device="cpu"):
    """ 単一の (文脈+質問, 回答) ペアに対する正規化IFD報酬を計算 """
    if not context_plus_question or not answer:
        return torch.tensor(0.0, device=device)

    s_A_given_Q = calculate_log_probability(predictor_model, predictor_tokenizer, answer, context_sequence=context_plus_question, device=device)
    s_A = calculate_log_probability(predictor_model, predictor_tokenizer, answer, context_sequence=None, device=device)

    if s_A == 0 or abs(s_A) < 1e-9: # ゼロ除算または非常に小さい値による不安定化を避ける
        if s_A_given_Q > 1e-9 : # s_Aがほぼ0でs_A_given_Qが意味のある値ならIFDは発散
            ifd_score = float('inf')
        else: # 両方ほぼ0なら、IFD=1 (中立) または別の扱いに
            ifd_score = 1.0
    else:
        ifd_score = s_A_given_Q / s_A
    
    if math.isinf(ifd_score) or ifd_score > 1e9: # 大きすぎる値をクリップ
        normalized_score = 0.0
    elif ifd_score < 0: # 損失が負になる異常ケース
        normalized_score = 1.0 # IFDが負なら報酬最大 (要検討)
    else:
        normalized_score = 1.0 / (1.0 + ifd_score)
    
    return torch.tensor(normalized_score, device=device)

In [None]:
# --- データセットの準備 ---
# ここでは、テキストファイルの各行が1つのドキュメント(文脈)であると仮定
# 実際には、Hugging Face DatasetsライブラリでJSONLなどを読み込むのが良い
try:
    with open(dataset_path, "r", encoding="utf-8") as f:
        contexts = [line.strip() for line in f if line.strip()][:100] # テスト用に最初の100件
    # TRLのPPOTrainerはHugging Face Datasets形式のデータセットを期待することがある
    # context_dataset_dict = {"query": [generator_tokenizer.encode(f"以下の文脈から一問一答を作成してください。\n文脈: {ctx[:max_context_length]}\n質問:", return_tensors="pt").to(device).squeeze(0) for ctx in contexts]}
    # context_hf_dataset = Dataset.from_dict(context_dataset_dict)
    # query_tensors = [generator_tokenizer(f"以下の文脈から一問一答を作成してください。\n文脈: {ctx[:max_context_length]}\n質問:", return_tensors="pt", truncation=True, max_length=max_context_length).input_ids.squeeze(0).to(device) for ctx in contexts]

    # PPOTrainerの初期化時にデータセットは渡さない（動的に生成するため）
    # トークナイザーはgeneratorのものを使用
    ppo_trainer = PPOTrainer(
        config=ppo_config,
        model=generator_model_with_value_head,
        ref_model=ref_model, # create_reference_model(generator_model_with_value_head) または None
        tokenizer=generator_tokenizer,
        # dataset=context_hf_dataset, # データセットは後で与える
        # data_collator=None
    )
    print("PPOTrainer initialized.")
except Exception as e:
    print(f"Error preparing dataset or PPO trainer: {e}")
    # 適切なエラー処理

In [None]:
# --- 強化学習ループ ---
generation_kwargs = {
    "min_length": -1, # 生成を停止しない
    "top_k": 0.0,
    "top_p": 1.0,
    "do_sample": True,
    "pad_token_id": generator_tokenizer.eos_token_id, # pad_token_id を eos_token_id に設定
    "max_new_tokens": max_generation_length,
    # "eos_token_id": -1 # 明示的にEOSで止めない場合 (PPOの挙動に影響する可能性)
}

# データローダーの代わり (簡単のため)
def get_context_batch(contexts, batch_size, tokenizer, max_len, device):
    for i in range(0, len(contexts), batch_size):
        batch = contexts[i:i+batch_size]
        # Generatorへの入力プロンプトを作成
        # このプロンプトエンジニアリングが重要
        prompts = [f"以下の文脈から主要な情報に関する質問とその端的な回答を一つ作成してください。\n\n文脈:\n{ctx}\n\n質問:" for ctx in batch]
        
        # query_tensors: generatorへの入力 (プロンプト)
        # PPOTrainerの generate メソッドは tokenizedされたリストのリストを期待する
        query_tensors = [tokenizer.encode(prompt, return_tensors="pt", truncation=True, max_length=max_len).to(device).squeeze(0) for prompt in prompts]
        # query_texts = prompts # ログ用
        yield query_tensors, prompts # トークン化されたプロンプトと元のプロンプトテキストを返す

print("Starting PPO training...")
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    for step, (query_tensors_batch, prompt_texts_batch) in enumerate(get_context_batch(contexts, ppo_config.batch_size, generator_tokenizer, max_context_length, device)):
        if not query_tensors_batch:
            continue

        print(f"  Step {step+1}, Batch size: {len(query_tensors_batch)}")

        # 1. Generatorによる一問一答データの生成 (response_tensors には生成された「質問+回答」部分のみが入る)
        # response_tensors = ppo_trainer.generate(query_tensors_batch, **generation_kwargs)
        # generateはリストのリストを返す (各要素がTensor)
        response_tensors_list = []
        for query_tensor in query_tensors_batch:
            # 各クエリテンソルに対して個別に生成
            # TRLの generate メソッドは query (単一のTensor) を期待
            response_tensor = ppo_trainer.generate(query_tensor, **generation_kwargs)
            response_tensors_list.append(response_tensor.squeeze()) # バッチ次元を削除

        # 生成されたテキストをデコード
        # query_texts_batch: generatorへの入力プロンプトのリスト (デコード済み)
        # response_texts_batch: generatorが生成した「質問+回答」のテキストのリスト
        # input_texts_for_decode = [query_tensors_batch[i] for i in range(len(response_tensors_list))] # これだとプロンプト
        
        # response_tensors_list は生成された部分のみ
        response_texts_batch = [generator_tokenizer.decode(r_tensor, skip_special_tokens=True) for r_tensor in response_tensors_list]

        # 2. 生成テキストから「文脈+質問」と「回答」をパース
        #    報酬計算のために、generatorへの入力(文脈)と、生成された質問、生成された回答が必要
        #    predictorへの入力は「文脈＋生成された質問」
        parsed_for_predictor = []
        for i in range(len(prompt_texts_batch)):
            original_context_prompt = prompt_texts_batch[i] # "以下の文脈...質問:"
            generated_qa_text = response_texts_batch[i]     # Generatorが「質問:」の後に生成したテキスト

            # "質問:" の後の実際の文脈部分を抽出 (プロンプトの構造に依存)
            # ここはプロンプト設計と密接に関連
            context_match = re.search(r"文脈:\n(.*?)\n\n質問:", original_context_prompt, re.DOTALL)
            if not context_match:
                print(f"    Warning: Could not parse context from prompt: {original_context_prompt}")
                parsed_for_predictor.append(("", "", "")) # (文脈+生成質問, 生成回答, 元の文脈プロンプト)
                continue
            
            actual_context = context_match.group(1).strip()

            # generated_qa_text から「生成された質問」と「生成された回答」を分離する
            # 例: 生成テキストが "これは質問ですか？\n回答: はい、そうです。" のようになっていると仮定
            # このパース処理は非常に重要であり、実際の生成形式に合わせて頑健にする必要がある
            # ここでは単純な改行と "回答:" で分割を試みる
            parts = generated_qa_text.split("回答:", 1)
            if len(parts) == 2:
                generated_question = parts[0].strip()
                generated_answer = parts[1].strip()
            else: # パース失敗
                print(f"    Warning: Could not parse Q/A from: {generated_qa_text}")
                generated_question = generated_qa_text # 全体を質問とみなすか、空にするか
                generated_answer = ""

            if not generated_question or not generated_answer:
                print(f"    Warning: Parsed question or answer is empty. Q: '{generated_question}', A: '{generated_answer}'")
                # 報酬0にするために空でないようにダミーを入れるか、後でフィルタリング
                
            context_plus_generated_question = actual_context + "\n質問: " + generated_question # predictorへの入力
            parsed_for_predictor.append((context_plus_generated_question, generated_answer, original_context_prompt))

        # 3. IFDスコアを正規化した報酬の計算
        rewards_list = []
        valid_query_tensors = []
        valid_response_tensors = []

        for i, (ctx_q_for_pred, ans_for_pred, _) in enumerate(parsed_for_predictor):
            if ans_for_pred: # 回答がパースできた場合のみ
                reward = calculate_normalized_ifd_reward(predictor_model, predictor_tokenizer, ctx_q_for_pred, ans_for_pred, device)
                rewards_list.append(reward)
                valid_query_tensors.append(query_tensors_batch[i]) # 元のgeneratorへの入力プロンプト
                valid_response_tensors.append(response_tensors_list[i]) # generatorが生成したQ+A部分
            else:
                # 質の低い生成やパース失敗の場合は低い報酬を与えるか、このサンプルを学習から除外
                # ここでは学習から除外するアプローチ（validリストに追加しない）
                print(f"    Skipping sample due to empty parsed answer. Original generated text: {response_texts_batch[i]}")
        
        if not rewards_list: # 有効な報酬が得られなかった場合はスキップ
            print("    No valid rewards generated for this batch, skipping PPO step.")
            continue
            
        rewards_tensor = torch.stack(rewards_list)

        # 4. PPOトレーナーで学習ステップを実行
        # query_tensors は generator への入力プロンプト
        # response_tensors は generator が生成した部分 (質問+回答)
        # rewards は各 (query, response) ペアに対する報酬
        # TRLのPPOTrainerはリストのリストではなく、単一のリストを期待することが多い
        # また、各要素は1D Tensor
        
        # query_tensors_for_step = [qt.squeeze(0) for qt in valid_query_tensors] # (seq_len)
        # response_tensors_for_step = [rt.squeeze(0) for rt in valid_response_tensors] # (gen_len)

        # stats = ppo_trainer.step(query_tensors_for_step, response_tensors_for_step, rewards_tensor)
        stats = ppo_trainer.step(valid_query_tensors, valid_response_tensors, rewards_tensor)


        # ログ出力 (任意)
        log_output = {
            "epoch": epoch + 1,
            "step": step + 1,
            "mean_reward": rewards_tensor.mean().item() if rewards_tensor.numel() > 0 else 0,
            "ppo/loss/policy": stats.get("ppo/loss/policy"),
            "ppo/loss/value": stats.get("ppo/loss/value"),
        }
        print(f"    Log: {log_output}")
        if ppo_config.log_with == "wandb":
            ppo_trainer.log_stats(stats, {"query": prompt_texts_batch[:len(valid_query_tensors)]}, rewards_tensor, {"response": [generator_tokenizer.decode(r, skip_special_tokens=True) for r in valid_response_tensors]})


# 学習後のモデルの保存 (LoRAアダプタのみ保存)
# generator_model_with_value_head.save_pretrained("path/to/your/final_generator_ppo_model")
# generator_tokenizer.save_pretrained("path/to/your/final_generator_ppo_model")
# PEFTモデルの保存方法を確認 (通常はアダプタのみ)
ppo_trainer.model.save_pretrained("path/to/your/final_generator_ppo_lora_adapters")
generator_tokenizer.save_pretrained("path/to/your/final_generator_ppo_lora_adapters")


print("PPO training attempt finished.")