In [None]:
# Google Colabでの設定
google_colab = True

if google_colab:
    from google.colab import drive
    from google.colab import userdata

    drive.mount("/content/drive")

    # ディレクトリ移動
    %cd /content/drive/MyDrive/Python/kaggle_map/src/ettin-encoder-1b_exp004

In [None]:
import os
import gc
import time
import random

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import classification_report, confusion_matrix

import torch
import wandb
from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer

In [None]:
class CFG:
    """実験設定管理クラス"""

    # ============== 実験情報 =============
    comp_name = "kaggle_map"
    exp_name = "ettin-encoder-1b_exp004"
    model_name = "jhu-clsp/ettin-encoder-1b"

    # ============== ファイルパス設定 =============
    comp_dir_path = "../../kaggle/input/"
    comp_dataset_path = f"{comp_dir_path}/map-charting-student-math-misunderstandings/"
    output_dir_path = "output/"
    log_dir_path = "logs/"

    # ============== モデル設定 =============
    max_len = 256

    num_train_epochs = 10
    per_device_train_batch_size = 32
    gradient_accumulation_steps = 1
    per_device_eval_batch_size = 64
    optim_type = "adamw_torch"
    learning_rate = 5e-5
    lr_scheduler_type = "cosine"
    warmup_steps = 50
    weight_decay = 0.01

    # ============== その他設定 =============
    seed = 42
    device = torch.device("cuda")

In [None]:
# 乱数固定
def set_seed(seed=None, cudnn_deterministic=True):
    if seed is None:
        seed = 42

    os.environ["PYTHONHASHSEED"] = str(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = cudnn_deterministic
    torch.backends.cudnn.benchmark = False

def make_dirs(cfg):
    for dir in [cfg.output_dir_path, cfg.log_dir_path]:
        os.makedirs(dir, exist_ok=True)

def cfg_init(cfg):
    set_seed(cfg.seed)
    make_dirs(cfg)

# LLMの学習

## データの読み込み

In [None]:
def add_folds_by_qid_cat_misc(df, n_splits=5, random_state=42, fallback="pair"):
    s_qid  = df["QuestionId"].astype(str).fillna("NA")
    s_cat  = df["Category"].astype(str).fillna("NA")
    s_misc = df["Misconception"].astype(str).fillna("NA")

    y_triple = s_qid + "|" + s_cat + "|" + s_misc
    y_pair   = s_cat + "|" + s_misc

    cnt = y_triple.value_counts()
    if (cnt < n_splits).any():
        if fallback == "pair":
            rare = y_triple.map(cnt) < n_splits
            y = np.where(rare, y_pair, y_triple)
        elif fallback == "category":
            rare = y_triple.map(cnt) < n_splits
            y = np.where(rare, s_cat, y_triple)
        elif fallback == "none":
            y = y_triple
        else:
            raise ValueError("fallback は 'pair' / 'category' / 'none' のいずれかにしてください。")
    else:
        y = y_triple

    skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=random_state)

    folds = np.full(len(df), -1, dtype=int)
    for fold, (_, val_idx) in enumerate(skf.split(np.zeros(len(df)), y)):
        folds[val_idx] = fold

    out = df.copy()
    out["fold"] = folds
    return out

In [None]:
# データ読み込み
train = pd.read_csv(f"{CFG.comp_dataset_path}/train.csv")
train = add_folds_by_qid_cat_misc(train, n_splits=5, random_state=42, fallback="pair")

train.Misconception = train.Misconception.fillna("NA")
train["target"] = train.Category + ":" + train.Misconception

# ラベルエンコード
le = LabelEncoder()
train["label"] = le.fit_transform(train["target"])

n_classes = len(le.classes_)
print(f"訓練データの形状: {train.shape} - {n_classes}個のターゲットクラス")
train.head(3)

## 特徴量エンジニアリング

In [None]:
# 正解ラベルの作成
idx = train.apply(lambda row: row.Category.split("_")[0], axis=1) == "True"
correct = train.loc[idx].copy()
correct["c"] = correct.groupby(["QuestionId", "MC_Answer"]).MC_Answer.transform("count")
correct = correct.sort_values("c", ascending=False)
correct = correct.drop_duplicates(["QuestionId"])
correct = correct[["QuestionId", "MC_Answer"]]
correct["is_correct"] = 1

# 正解ラベルを訓練データにマージ
train = train.merge(correct, on=["QuestionId", "MC_Answer"], how="left")
train.is_correct = train.is_correct.fillna(0)

# # 問題別の選択肢を抽出してカンマ区切りの文字列として追加
# question_choices = train.groupby("QuestionId")["Misconception"].apply(lambda x: sorted(x.unique())).reset_index()
# question_choices.columns = ["QuestionId", "choices_list"]
# question_choices["choices"] = question_choices["choices_list"].apply(lambda x: ", ".join(x))
# train = train.merge(question_choices[["QuestionId", "choices"]], on="QuestionId", how="left")

## データの前処理

In [None]:
# tokenizerの読み込み
tokenizer = AutoTokenizer.from_pretrained(CFG.model_name)

In [None]:
# 学習データのtokenizer処理
def format_input(row):
    x = "Yes"
    if not row["is_correct"]:
        x = "No"
    return (
        f"Question: {row['QuestionText']}\n"
        # f"Choices: {row['choices']}\n"
        f"Answer: {row['MC_Answer']}\n"
        f"Correct: {x}\n"
        f"Student Explanation: {row['StudentExplanation']}"
    )

train["text"] = train.apply(format_input, axis=1)
print("LLMへのプロンプト例:")
print()
print(train.text.values[0])

In [None]:
lengths = [len(tokenizer.encode(t, truncation=False)) for t in train["text"]]

# トークン長の分布を可視化
plt.hist(lengths, bins=50)
plt.xlabel("token_length")
plt.ylabel("count")
plt.grid(True)
plt.show()

# max_lenを超えるサンプル数を確認
L = (np.array(lengths) > CFG.max_len).sum()
print(f"{CFG.max_len}トークンを超える訓練サンプルが{L}個あります")
np.sort(lengths)

In [None]:
# 訓練データと検証データに分割
train_df = train[train.fold != 0]
val_df = train[train.fold == 0]

# Hugging Faceデータセットに変換
COLS = ["text", "label"]
train_ds = Dataset.from_pandas(train_df[COLS])
val_ds = Dataset.from_pandas(val_df[COLS])

In [None]:
# トークナイズ関数
def tokenize(batch):
    return tokenizer(batch["text"], padding="max_length", truncation=True, max_length=256)

train_ds = train_ds.map(tokenize, batched=True)
val_ds = val_ds.map(tokenize, batched=True)

# PyTorch用のフォーマットを設定
columns = ["input_ids", "attention_mask", "label"]
train_ds.set_format(type="torch", columns=columns)
val_ds.set_format(type="torch", columns=columns)

## 学習設定

In [None]:
# wandbのログイン
wandb.login(key=userdata.get("WANDB_API_KEY"))
wandb.init(project=CFG.comp_name, name=CFG.exp_name)

In [None]:
# モデルの読み込み
model = AutoModelForSequenceClassification.from_pretrained(
    CFG.model_name,
    num_labels=n_classes,
    reference_compile=False,
)
# hdim = model.config.hidden_size
# model.score = torch.nn.Sequential(
#     torch.nn.Dropout(0.1),
#     torch.nn.Linear(hdim, hdim // 2),
#     torch.nn.Dropout(0.1),
#     torch.nn.GELU(),
#     torch.nn.Linear(hdim // 2, 2),
# ).to(CFG.device)

In [None]:
# 学習の設定
training_args = TrainingArguments(
    output_dir=CFG.output_dir_path,
    do_train=True,
    do_eval=True,
    eval_strategy="steps",
    save_strategy="steps",
    num_train_epochs=CFG.num_train_epochs,
    per_device_train_batch_size=CFG.per_device_train_batch_size,
    gradient_accumulation_steps=CFG.gradient_accumulation_steps,
    per_device_eval_batch_size=CFG.per_device_eval_batch_size,
    learning_rate=CFG.learning_rate,
    optim=CFG.optim_type,
    lr_scheduler_type=CFG.lr_scheduler_type,
    warmup_steps=CFG.warmup_steps,
    weight_decay=CFG.weight_decay,
    logging_dir=CFG.log_dir_path,
    logging_steps=50,
    save_steps=600,
    eval_steps=300,
    save_total_limit=1,
    metric_for_best_model="map@3",
    greater_is_better=True,
    load_best_model_at_end=True,
    report_to="wandb",
    bf16=True,
    fp16=False,  # KaggleはT4なのでFP16で推論
)

In [None]:
# カスタムメトリック
def compute_map3(eval_pred):
    logits, labels = eval_pred
    probs = torch.nn.functional.softmax(torch.tensor(logits), dim=-1).numpy()

    top3 = np.argsort(-probs, axis=1)[:, :3]  # トップ3の予測
    match = top3 == labels[:, None]

    # MAP@3を計算
    map3 = 0
    for i in range(len(labels)):
        if match[i, 0]:
            map3 += 1.0
        elif match[i, 1]:
            map3 += 1.0 / 2
        elif match[i, 2]:
            map3 += 1.0 / 3
    return {"map@3": map3 / len(labels)}

## モデルの学習

In [None]:
# トレーナーの設定
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_ds,
    eval_dataset=val_ds,
    processing_class=tokenizer,
    compute_metrics=compute_map3,
)
# hdim = model.config.hidden_size
# model.score = torch.nn.Sequential(
#     torch.nn.Dropout(0.1),
#     torch.nn.Linear(hdim, hdim // 2),
#     torch.nn.Dropout(0.1),
#     torch.nn.GELU(),
#     torch.nn.Linear(hdim // 2, 2),
# ).bfloat16().to(CFG.device)

# モデルの学習
trainer.train()

## モデルの保存

In [None]:
trainer.save_model(f"{CFG.output_dir_path}/model")
tokenizer.save_pretrained(f"{CFG.output_dir_path}/tokenizer")

In [None]:
# WandBのセッションを終了
with wandb.init():
    wandb.finish()

In [None]:
# キャッシュ削除
del model, tokenizer, trainer
gc.collect()
torch.cuda.empty_cache()

# 結果分析

In [None]:
# 学習済みモデルの読み込み
model_path = f"{CFG.output_dir_path}/model"
tokenizer_path = f"{CFG.output_dir_path}/tokenizer"

inference_model = AutoModelForSequenceClassification.from_pretrained(
    model_path,
    num_labels=n_classes,
    reference_compile=False,
)
# hdim = inference_model.config.hidden_size
# inference_model.score = torch.nn.Sequential(
#     torch.nn.Dropout(0.1),
#     torch.nn.Linear(hdim, hdim // 2),
#     torch.nn.Dropout(0.1),
#     torch.nn.GELU(),
#     torch.nn.Linear(hdim // 2, 2),
# ).bfloat16().to(CFG.device)
inference_tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)

# 推論用のインスタンスを作成
inference_trainer = Trainer(
    model=inference_model,
    processing_class=inference_tokenizer
)

# 検証データの推論実行
inference_model.eval()
start_time = time.time()

val_predictions = inference_trainer.predict(val_ds)
logits = val_predictions.predictions
val_labels = val_predictions.label_ids

# 推論時間を計算
inference_time = time.time() - start_time
print(f"推論時間: {inference_time:.2f}秒")
print(f"サンプル数: {len(val_labels)}個")
print(f"1サンプルあたりの推論時間: {inference_time/len(val_labels)*1000:.2f}ms")

# 確率値に変換
val_probs = torch.nn.functional.softmax(torch.tensor(logits), dim=-1).numpy()

# Top-1精度の計算
val_pred_labels = np.argmax(logits, axis=1)
accuracy = (val_pred_labels == val_labels).mean()
print(f"\nTop-1精度: {accuracy:.4f}")

# MAP@3スコアの計算
map3_score = compute_map3((logits, val_labels))["map@3"]
print(f"MAP@3スコア: {map3_score:.4f}")

## 項目別の結果分析

In [None]:
# Top-3予測の取得
top3_indices = np.argsort(-val_probs, axis=1)[:, :3]
top3_probs = np.take_along_axis(val_probs, top3_indices, axis=1)

# Top-1予測の取得
max_probs = val_probs.max(axis=1)

In [None]:
# 検証データに必要な情報を追加
val_df_analysis = val_df.copy()
val_df_analysis["pred_label"] = val_pred_labels
val_df_analysis["actual_label"] = val_labels
val_df_analysis["confidence"] = max_probs
val_df_analysis["is_correct"] = (val_pred_labels == val_labels)

# Top-3予測の情報を追加
for i in range(3):
    val_df_analysis[f"top{i+1}_label"] = top3_indices[:, i]
    val_df_analysis[f"top{i+1}_prob"] = top3_probs[:, i]
    val_df_analysis[f"top{i+1}_name"] = [le.inverse_transform([x])[0] for x in top3_indices[:, i]]

# Top-3に正解が含まれているか
val_df_analysis["is_in_top3"] = [label in top3_indices[i] for i, label in enumerate(val_labels)]

# MAP@3スコアを各サンプルごとに計算
def calculate_map3_per_sample(row):
    if row["actual_label"] == row["top1_label"]:
        return 1.0
    elif row["actual_label"] == row["top2_label"]:
        return 1.0 / 2
    elif row["actual_label"] == row["top3_label"]:
        return 1.0 / 3
    else:
        return 0.0

val_df_analysis["map3_score"] = val_df_analysis.apply(calculate_map3_per_sample, axis=1)

# CategoryとMisconceptionを分離
val_df_analysis["actual_category"] = val_df_analysis["target"].apply(lambda x: x.split(":")[0])
val_df_analysis["actual_misconception"] = val_df_analysis["target"].apply(lambda x: x.split(":")[1])
val_df_analysis["pred_category"] = val_df_analysis["top1_name"].apply(lambda x: x.split(":")[0])
val_df_analysis["pred_misconception"] = val_df_analysis["top1_name"].apply(lambda x: x.split(":")[1])

### Questionごとの予測確率分布

In [None]:
question_stats = val_df_analysis.groupby("QuestionId").agg({
    "map3_score": "mean",
    "is_correct": "mean",
    "is_in_top3": "mean",
    "confidence": "mean",
    "QuestionText": "first",
    "label": "count"
}).rename(columns={
    "map3_score": "MAP@3",
    "is_correct": "Top1_Accuracy",
    "is_in_top3": "Top3_Accuracy",
    "confidence": "Avg_Confidence",
    "label": "Sample_Count"
}).sort_values("MAP@3", ascending=False)

# 結果の表示
question_stats = question_stats.reset_index()
question_stats

### Categoryごとの予測確率分布

In [None]:
category_stats = val_df_analysis.groupby("actual_category").agg({
    "map3_score": "mean",
    "is_correct": "mean",
    "is_in_top3": "mean",
    "confidence": "mean",
    "label": "count"
}).rename(columns={
    "map3_score": "MAP@3",
    "is_correct": "Top1_Accuracy",
    "is_in_top3": "Top3_Accuracy",
    "confidence": "Avg_Confidence",
    "label": "Sample_Count"
}).sort_values("MAP@3", ascending=False)

# 結果の表示
category_stats = category_stats.reset_index()
category_stats

### Misconceptionごとの予測確率分布

In [None]:
misconception_stats = val_df_analysis.groupby("actual_misconception").agg({
    "map3_score": "mean",
    "is_correct": "mean",
    "is_in_top3": "mean",
    "confidence": "mean",
    "label": "count"
}).rename(columns={
    "map3_score": "MAP@3",
    "is_correct": "Top1_Accuracy",
    "is_in_top3": "Top3_Accuracy",
    "confidence": "Avg_Confidence",
    "label": "Sample_Count"
}).sort_values("MAP@3", ascending=False)

# 結果の表示
misconception_stats = misconception_stats.reset_index()
display(misconception_stats.head())
display(misconception_stats.tail())

### ラベル（Category:Misconception）ごとの予測確率分布

In [None]:
label_level_stats = val_df_analysis.groupby("target").agg({
    "map3_score": "mean",
    "is_correct": "mean",
    "is_in_top3": "mean",
    "confidence": "mean",
    "top1_prob": "mean",
    "top2_prob": "mean",
    "top3_prob": "mean",
    "label": "count"
}).rename(columns={
    "map3_score": "MAP@3",
    "is_correct": "Top1_Accuracy",
    "is_in_top3": "Top3_Accuracy",
    "confidence": "Avg_Confidence",
    "top1_prob": "Avg_Top1_Prob",
    "top2_prob": "Avg_Top2_Prob",
    "top3_prob": "Avg_Top3_Prob",
    "label": "Sample_Count"
}).sort_values('MAP@3', ascending=False)

# 結果を表示
label_level_stats = label_level_stats.reset_index()
display(label_level_stats.head())
display(label_level_stats.tail())

In [None]:
# CSVで保存
question_stats.to_csv(f"{CFG.output_dir_path}/question_statistics.csv")
category_stats.to_csv(f"{CFG.output_dir_path}/category_statistics.csv")
misconception_stats.to_csv(f"{CFG.output_dir_path}/misconception_statistics.csv")
label_level_stats.to_csv(f"{CFG.output_dir_path}/label_level_statistics.csv")

In [None]:
# 検証データの全レコードに予測結果を付与したデータフレームを保存
val_df_with_predictions = val_df_analysis[[
    "row_id", "QuestionId", "QuestionText", "MC_Answer", "StudentExplanation",
    "Category", "Misconception", "target", "label", "actual_label",
    "pred_label", "confidence", "is_correct", "is_in_top3", "map3_score",
    "top1_label", "top1_prob", "top1_name",
    "top2_label", "top2_prob", "top2_name",
    "top3_label", "top3_prob", "top3_name",
    "actual_category", "actual_misconception",
    "pred_category", "pred_misconception"
]]

val_df_with_predictions.to_csv(f"{CFG.output_dir_path}/val_predictions_full.csv", index=False)

## 誤分類サンプルの分析

### Top-1で誤分類したサンプル

In [None]:
# 誤分類サンプルの特定
misclassified_mask = (val_pred_labels != val_labels)
misclassified_indices = np.where(misclassified_mask)[0]

print(f"誤分類サンプル数: {len(misclassified_indices)} / {len(val_labels)} ({len(misclassified_indices)/len(val_labels)*100:.1f}%)")

# 確信度の低いサンプルトップ10を表示
low_confidence_indices = np.argsort(max_probs)[:10]
print(f"\n=== 確信度の低いサンプル（上位10件）===")

for i, idx in enumerate(low_confidence_indices):
    actual_label = val_labels[idx]
    pred_label = val_pred_labels[idx]
    confidence = max_probs[idx]
    is_correct = "正解" if actual_label == pred_label else "誤り"

    actual_name = le.inverse_transform([actual_label])[0]
    pred_name = le.inverse_transform([pred_label])[0]

    print(f"#{i+1:2d} 確信度={confidence:.3f} ({is_correct})")
    print(f"    実際: {actual_name}")
    print(f"    予測: {pred_name}")
    print(f"    Top-3予測: {[le.inverse_transform([x])[0] for x in top3_indices[idx]]}")
    print(f"    Top-3確率: {top3_probs[idx]}")
    print()

# 誤分類の中で確信度の高いサンプルを表示（モデルが自信を持って間違えたケース）
if len(misclassified_indices) > 0:
    misclassified_confidences = max_probs[misclassified_indices]
    high_confidence_wrong_indices = misclassified_indices[np.argsort(-misclassified_confidences)[:10]]

    print(f"=== 高確信度で誤分類したサンプル（上位10件）===")
    for i, idx in enumerate(high_confidence_wrong_indices):
        actual_label = val_labels[idx]
        pred_label = val_pred_labels[idx]
        confidence = max_probs[idx]

        actual_name = le.inverse_transform([actual_label])[0]
        pred_name = le.inverse_transform([pred_label])[0]

        print(f"#{i+1} 確信度={confidence:.3f}")
        print(f"    実際: {actual_name}")
        print(f"    予測: {pred_name}")
        print(f"    Top-3予測: {[le.inverse_transform([x])[0] for x in top3_indices[idx]]}")
        print(f"    Top-3確率: {top3_probs[idx]}")
        print()

### Top-3で誤分類したサンプル

In [None]:
# 誤分類サンプルの特定
misclassified_mask = [val_labels[i] not in top3_indices[i] for i in range(len(val_labels))]
misclassified_indices = np.where(misclassified_mask)[0]

print(f"誤分類サンプル数: {len(misclassified_indices)} / {len(val_labels)} "
      f"({len(misclassified_indices)/len(val_labels)*100:.1f}%)")

# 確信度の低いサンプルトップ10を表示
low_confidence_indices = np.argsort(max_probs)[:10]
print(f"\n=== 確信度の低いサンプル（上位10件）===")

for i, idx in enumerate(low_confidence_indices):
    actual_label = val_labels[idx]
    pred_label = val_pred_labels[idx]
    confidence = max_probs[idx]
    is_correct = "正解" if actual_label in top3_indices[idx] else "誤り"

    actual_name = le.inverse_transform([actual_label])[0]
    pred_name = le.inverse_transform([pred_label])[0]

    print(f"#{i+1:2d} 確信度={confidence:.3f} ({is_correct})")
    print(f"    実際: {actual_name}")
    print(f"    予測: {pred_name}")
    print(f"    Top-3予測: {[le.inverse_transform([x])[0] for x in top3_indices[idx]]}")
    print(f"    Top-3確率: {top3_probs[idx]}")
    print()

# 誤分類の中で確信度の高いサンプルを表示（モデルが自信を持って間違えたケース）
if len(misclassified_indices) > 0:
    misclassified_confidences = max_probs[misclassified_indices]
    high_confidence_wrong_indices = misclassified_indices[np.argsort(-misclassified_confidences)[:10]]

    print(f"=== 高確信度で誤分類したサンプル（上位10件）===")
    for i, idx in enumerate(high_confidence_wrong_indices):
        actual_label = val_labels[idx]
        pred_label = val_pred_labels[idx]
        confidence = max_probs[idx]

        actual_name = le.inverse_transform([actual_label])[0]
        pred_name = le.inverse_transform([pred_label])[0]

        print(f"#{i+1} 確信度={confidence:.3f}")
        print(f"    実際: {actual_name}")
        print(f"    予測: {pred_name}")
        print(f"    Top-3予測: {[le.inverse_transform([x])[0] for x in top3_indices[idx]]}")
        print(f"    Top-3確率: {top3_probs[idx]}")
        print()