In [1]:
print("hello")

hello


In [None]:
# =================================================================
# LLM Curiosity Benchmark: 4つの報酬モデルの比較検証
# Objective: RepE, RND, Contrastive, Mahalanobis を一挙実装・比較する
# =================================================================

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from transformers import AutoModelForCausalLM, AutoTokenizer
from sklearn.covariance import EmpiricalCovariance
from collections import Counter

# ==========================================
# 0. 設定 & モデルロード
# ==========================================
MODEL_NAME = "Qwen/Qwen2.5-7B-Instruct"
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
LAYER_ID = 24  # 中間層〜後半層を使用（意味表現が豊富な層）

print(f"Loading Model: {MODEL_NAME} on {DEVICE}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME, 
    torch_dtype=torch.float16,
    device_map="auto",
    trust_remote_code=True
)
model.eval()

# --- 共通ユーティリティ: 隠れ層の取得 ---
def get_hidden_state(text, layer_idx=LAYER_ID):
    inputs = tokenizer(text, return_tensors="pt").to(DEVICE)
    with torch.no_grad():
        outputs = model(**inputs, output_hidden_states=True)
    # 指定層の、最後のトークンのベクトルを取得 [1, Dim]
    h = outputs.hidden_states[layer_idx][:, -1, :].float() 
    return h

In [None]:
# # ==========================================
# # 1. キャリブレーションデータの準備
# # ==========================================
# # 「退屈な/普通の」テキストの分布を学習するために使用
# calibration_texts = [
#     "The quick brown fox jumps over the lazy dog.",
#     "Artificial intelligence is transforming the world.",
#     "Python is a programming language.",
#     "To be or not to be, that is the question.",
#     "This is a pen. That is a book.",
#     "The weather today is sunny with a chance of rain.",
#     "Deep learning requires a lot of data.",
#     "The history of science is fascinating.",
#     "Please maximize the objective function.",
#     "The output of the model should be diverse.",
#     "1, 2, 3, 4, 5, 6, 7, 8, 9, 10."
# ] * 5 # データ数を増やす

# print("Collecting Calibration Hidden States...")
# calib_hiddens = []
# for txt in tqdm(calibration_texts):
#     calib_hiddens.append(get_hidden_state(txt))
# calib_matrix = torch.cat(calib_hiddens, dim=0) # [N, Dim]
# print(f"Calibration Data Shape: {calib_matrix.shape}")

In [None]:
# ==========================================
# 1. キャリブレーションデータの準備 (Enhanced Version)
# ==========================================
import random

# --- 強化版データ生成関数 ---
def generate_robust_calibration_data(n_samples=500):
    """
    多様なジャンル（Wiki, Code, News, Chat, Fiction）から
    十分な長さを持つ「ありふれたテキスト」を生成する。
    """
    
    # 1. Wikipedia / Academic Style
    wiki_templates = [
        "The history of {topic} can be traced back to the early {century}th century, when scholars first began to analyze the phenomenon of {concept}. ",
        "In the field of {field}, {topic} plays a critical role in understanding the underlying mechanisms of {concept}. ",
        "Recent studies have shown that {topic} is significantly correlated with {concept}, although the causal relationship remains a subject of debate among experts in {field}. ",
        "{topic} is defined as a system of {concept} that interacts with various environmental factors to produce predictable outcomes. ",
        "During the {period}, the development of {topic} accelerated rapidly, leading to major breakthroughs in {field} and related disciplines. "
    ]
    topics = ["quantum mechanics", "ancient civilization", "photosynthesis", "macroeconomics", "machine learning", "renaissance art", "molecular biology", "urban planning"]
    fields = ["physics", "history", "biology", "economics", "computer science", "art history", "chemistry", "sociology"]
    concepts = ["energy distribution", "cultural exchange", "cellular respiration", "market equilibrium", "neural networks", "aesthetic theory", "atomic bonding", "social stratification"]
    
    # 2. Source Code / Technical
    code_snippets = [
        "def process_data(data):\n    # This function processes input data\n    if not data:\n        return None\n    results = []\n    for item in data:\n        results.append(transform(item))\n    return results\n",
        "import numpy as np\nimport pandas as pd\n\n# Initialize dataset\ndf = pd.read_csv('data.csv')\nprint(df.head())\n",
        "Error: Connection timeout. Please check your network settings and try again. Code: 503 Service Unavailable.\n",
        "class ModelConfig:\n    def __init__(self, hidden_size=768, num_layers=12):\n        self.hidden_size = hidden_size\n        self.num_layers = num_layers\n",
        "\n<head>\n  <title>Welcome to the Website</title>\n  <link rel=\"stylesheet\" href=\"style.css\">\n</head>\n"
    ]

    # 3. News / Journalism
    news_templates = [
        "BREAKING: Local authorities in {city} have announced a new initiative to combat {issue}, aiming to reduce incidents by 50% over the next five years. ",
        "The stock market saw a significant {movement} today as investors reacted to the latest report on {issue}. Analysts predict continued volatility. ",
        "In a press conference held today, the CEO of {company} unveiled their latest product, promising to revolutionize the way we think about {issue}. ",
        "Residents of {city} gathered in the town square to protest against the proposed changes to {issue}, citing concerns over long-term environmental impact. "
    ]
    cities = ["New York", "London", "Tokyo", "Berlin", "San Francisco", "Sydney"]
    issues = ["climate change", "inflation", "traffic congestion", "housing affordability", "digital privacy", "public health"]
    movements = ["surge", "decline", "fluctuation", "rally", "drop"]
    
    # 4. Common Chat / Assistant
    chat_phrases = [
        "I'm sorry, but I cannot fulfill that request. As an AI language model, I prioritize safety and helpfulness. ",
        "Here is a summary of the text you provided: It discusses the importance of renewable energy. ",
        "Sure! Here's a recipe for chocolate chip cookies. First, preheat your oven to 350 degrees Fahrenheit. ",
        "To solve this equation, we first need to isolate the variable x by subtracting 5 from both sides. ",
        "That's an interesting question. There are several factors to consider when choosing a laptop for programming. "
    ]

    # 5. Fiction / Narrative
    fiction_templates = [
        "The sun dipped below the horizon, casting long shadows across the {place}. {name} sighed and looked at the old {object} in his hand. ",
        "It was a dark and stormy night. The wind howled outside the {place}, rattling the windows of the small cottage where {name} sat alone. ",
        "\"I can't believe you did that,\" {name} whispered, staring at the broken {object} on the floor. The room fell silent. ",
        "As the spaceship approached the {place}, the crew prepared for landing. {name} checked the sensors one last time. "
    ]
    places = ["abandoned warehouse", "ancient forest", "bustling marketplace", "quiet library", "distant planet"]
    names = ["John", "Elara", "Detective Smith", "Captain Miller", "The old wizard"]
    objects = ["pocket watch", "amulet", "laser pistol", "faded photograph", "mysterious key"]

    
    # --- 生成ループ ---
    generated_texts = []
    
    for _ in range(n_samples):
        category = random.choice(["wiki", "code", "news", "chat", "fiction"])
        text_block = ""
        
        if category == "wiki":
            for _ in range(random.randint(3, 5)):
                tmpl = random.choice(wiki_templates)
                text_block += tmpl.format(
                    topic=random.choice(topics), 
                    century=random.randint(15, 20),
                    concept=random.choice(concepts),
                    field=random.choice(fields),
                    period="Industrial Revolution"
                )
        elif category == "code":
            text_block = "\n".join(random.sample(code_snippets, k=random.randint(2, 3)))
        elif category == "news":
            for _ in range(random.randint(3, 4)):
                tmpl = random.choice(news_templates)
                text_block += tmpl.format(
                    city=random.choice(cities),
                    issue=random.choice(issues),
                    movement=random.choice(movements),
                    company="TechCorp"
                )
        elif category == "chat":
            text_block = " ".join(random.sample(chat_phrases, k=random.randint(3, 5)))
        elif category == "fiction":
            for _ in range(random.randint(3, 5)):
                tmpl = random.choice(fiction_templates)
                text_block += tmpl.format(
                    place=random.choice(places),
                    name=random.choice(names),
                    object=random.choice(objects)
                )
        
        generated_texts.append(text_block)
    
    return generated_texts

# --- データ生成と行列作成の実行 ---
# 1. 強力なデータを生成
calibration_texts = generate_robust_calibration_data(n_samples=500)
print(f"Generated {len(calibration_texts)} robust calibration texts.")
print(f"Sample: {calibration_texts[0][:100]}...")

# 2. Hidden States を収集して calib_matrix を作成
print("Collecting Calibration Hidden States...")
calib_hiddens = []
for txt in tqdm(calibration_texts):
    # 先ほど定義した get_hidden_state 関数を使用
    calib_hiddens.append(get_hidden_state(txt))

# 3. 結合して行列化 [N, Dim]
calib_matrix = torch.cat(calib_hiddens, dim=0) 
print(f"Calibration Data Shape: {calib_matrix.shape}")

In [None]:
# ==========================================
# Method 1: Representation Engineering (RepE)
# 「退屈な概念ベクトル」を引き算する
# ==========================================
class RepEReward:
    def __init__(self):
        # 「退屈」と「創造的」のペアから方向ベクトルを定義
        boring_examples = ["The dog walked down the street.", "I like apples.", "1 1 1 1 1"]
        creative_examples = ["The neon cyberpunk dragon flew over Tokyo.", "Eternity is a mere moment in the eyes of a black hole.", "Chaos theory explains the beauty of fractals."]
        
        diffs = []
        for b, c in zip(boring_examples, creative_examples):
            hb = get_hidden_state(b)
            hc = get_hidden_state(c)
            diffs.append(hb - hc) # Boring - Creative 方向
        
        # 平均して「退屈方向ベクトル」を作成
        self.boring_direction = torch.mean(torch.stack(diffs), dim=0).normalize()
        
    def get_score(self, h):
        # 現在のベクトルと「退屈ベクトル」のコサイン類似度
        # 似ているほどマイナス（罰）、似ていない（逆方向）ほどプラス（報酬）
        sim = F.cosine_similarity(h, self.boring_direction)
        return -sim.item() # マイナスをかけて報酬化

In [None]:
# ==========================================
# Method 2: Random Network Distillation (RND)
# 「予測誤差（驚き）」を報酬にする
# ==========================================
class RNDReward(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, out_dim=64):
        super().__init__()
        self.device = DEVICE
        
        # Target: 固定されたランダムな写像（世界の真理）
        self.target = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        ).to(DEVICE)
        for p in self.target.parameters(): p.requires_grad = False
        
        # Predictor: Targetを予測しようとする（退屈学習器）
        self.predictor = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, out_dim)
        ).to(DEVICE)
        
        self.optimizer = torch.optim.Adam(self.predictor.parameters(), lr=1e-3)
    
    def train_on_calibration(self, calibration_data, epochs=100):
        # キャリブレーションデータ（退屈な文）を覚えさせる
        print("Training RND Predictor on Calibration Data...")
        X = calibration_data.to(self.device)
        target_y = self.target(X).detach()
        
        for _ in range(epochs):
            pred_y = self.predictor(X)
            loss = F.mse_loss(pred_y, target_y)
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
    def get_score(self, h):
        # 予測誤差が大きいほど報酬が高い
        with torch.no_grad():
            t_out = self.target(h)
            p_out = self.predictor(h)
            error = F.mse_loss(p_out, t_out).item()
        return error * 100 # スケール調整

In [None]:
# ==========================================
# Method 3: N-gram Contrastive (Simplified CD)
# 「ありきたりな単語並び」なら罰則
# ==========================================
class NgramContrastiveReward:
    def __init__(self, calibration_texts, n=3):
        self.n = n
        self.ngrams = Counter()
        self.total_count = 0
        
        # 退屈なデータのN-gram分布を作る
        for text in calibration_texts:
            tokens = tokenizer.encode(text)
            if len(tokens) < n: continue
            for i in range(len(tokens) - n + 1):
                gram = tuple(tokens[i:i+n])
                self.ngrams[gram] += 1
                self.total_count += 1
                
    def get_score(self, text):
        # テキストの「ありきたり度」を計算
        tokens = tokenizer.encode(text)
        if len(tokens) < self.n: return 0.0
        
        boring_prob_sum = 0
        for i in range(len(tokens) - self.n + 1):
            gram = tuple(tokens[i:i+self.n])
            # 出現頻度が高いN-gramほどペナルティ
            count = self.ngrams.get(gram, 0)
            prob = (count + 1) / (self.total_count + 1e5) # スムージング
            boring_prob_sum += np.log(prob)
            
        # Boring確率が低い（負の対数が大きい）ほど報酬が高い
        # ＝ -1 * log(prob)
        return -1.0 * (boring_prob_sum / len(tokens))

In [None]:
# ==========================================
# Method 4: Mahalanobis Distance
# 「普通の分布」からの統計的距離
# ==========================================
class MahalanobisReward:
    def __init__(self, calibration_matrix):
        # 共分散行列と平均を計算
        X = calibration_matrix.cpu().numpy().astype(np.float32)
        self.cov_model = EmpiricalCovariance(assume_centered=False).fit(X)
        
    def get_score(self, h):
        h_np = h.cpu().numpy().astype(np.float32)
        # マハラノビス距離の2乗を返す
        dist = self.cov_model.mahalanobis(h_np)[0]
        return np.sqrt(dist) # 距離そのものを報酬に

In [None]:
# ==========================================
# 実行と初期化
# ==========================================
# 1. RepE
repe_model = RepEReward()

# 2. RND
rnd_model = RNDReward(input_dim=model.config.hidden_size)
rnd_model.train_on_calibration(calib_matrix)

# 3. N-gram
ngram_model = NgramContrastiveReward(calibration_texts)

# 4. Mahalanobis
mahal_model = MahalanobisReward(calib_matrix)

In [None]:
# ==========================================
# 5. 評価テスト
# ==========================================
test_cases = [
    ("Repetitive", "the the the the the the the the the the"),
    ("Simple", "This is a pen. The weather is nice."),
    ("Wikipedia", "The Roman Empire was one of the largest in history."),
    ("Creative", "The nebula whispered secrets of ancient stars to the void."),
    ("Gibberish", "dsjfkl jklj fs djsklf jklsdj fkldsj kljf"), # ノイズ
]

results = []

print("\n--- Running Evaluation ---")
for label, text in test_cases:
    h = get_hidden_state(text)
    
    # 各スコア計算
    s_repe = repe_model.get_score(h)
    s_rnd  = rnd_model.get_score(h)
    s_ngram = ngram_model.get_score(text)
    s_mahal = mahal_model.get_score(h)
    
    results.append({
        "Label": label,
        "RepE (Direction)": s_repe,
        "RND (Prediction Error)": s_rnd,
        "N-gram (Rareness)": s_ngram,
        "Mahalanobis (Distance)": s_mahal
    })

df = pd.DataFrame(results)

# ==========================================
# 6. 可視化 (正規化してプロット)
# ==========================================
# 比較のためにMin-Max正規化
numeric_cols = df.columns[1:]
df_norm = df.copy()
for col in numeric_cols:
    df_norm[col] = (df[col] - df[col].min()) / (df[col].max() - df[col].min())

# プロット用に整形
df_melt = df_norm.melt(id_vars="Label", var_name="Method", value_name="Normalized Score")

plt.figure(figsize=(12, 6))
sns.barplot(data=df_melt, x="Label", y="Normalized Score", hue="Method")
plt.title("Comparison of Curiosity Reward Models (Normalized)")
plt.ylabel("Normalized Reward (Higher is Better)")
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.show()

# 数値テーブル表示
print("\n=== Raw Scores ===")
print(df.round(4))

# 簡易判定
print("\n=== Best Method Recommendation ===")
best_methods = []
for col in numeric_cols:
    score_creative = df[df["Label"]=="Creative"][col].values[0]
    score_simple = df[df["Label"]=="Simple"][col].values[0]
    score_repetitive = df[df["Label"]=="Repetitive"][col].values[0]
    
    # 理想: Creative > Simple > Repetitive
    if score_creative > score_simple and score_simple > score_repetitive:
        print(f"✅ {col}: Perfect Order!")
        best_methods.append(col)
    elif score_creative > score_repetitive:
        print(f"⚠️ {col}: Good (Creative > Repetitive) but check Simple.")
    else:
        print(f"❌ {col}: Failed (Repetitive might be high).")

print(f"\nRecommended for PPO: {best_methods if best_methods else 'RND or Mahalanobis (Requires tuning)'}")