# Line Extraction モデルのテスト

このnotebookでは、学習済みのline extractionモデルの性能評価と結果の可視化を行います。

In [None]:
%cd ~/project/kuzushiji-vision-lightning

In [None]:
import ast
import os

# 警告を無視
import warnings

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import yaml
from PIL import Image
from torchvision import transforms
from tqdm.notebook import tqdm

warnings.filterwarnings("ignore")

# GPUが利用可能な場合は使用
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# device = "cpu"
print(f"Using device: {device}")

## 1. 実験設定の読み込み

In [None]:
# 実験ディレクトリの設定
experiment_dir = "experiments/line_extraction/20250429_161507"
model_path = os.path.join(experiment_dir, "weights/best.pt")
config_path = os.path.join(experiment_dir, "config.yaml")


# 設定ファイルの読み込み
def load_yaml(path):
    with open(path, encoding="utf-8") as f:
        return yaml.safe_load(f)


config = load_yaml(config_path)
print("Configuration loaded successfully")
print(f"Model path: {model_path}")

## 2. データの準備

In [None]:
# データ前処理用の関数
def preprocess_image(image_path, input_size=(640, 640)):
    """画像の前処理を行う

    Args:
        image_path (str): 画像ファイルのパス
        input_size (tuple): 入力サイズ

    Returns:
        torch.Tensor: 前処理済みの画像テンソル
        tuple: 元の画像サイズ
    """
    image = Image.open(image_path).convert("RGB")
    original_size = image.size

    transform = transforms.Compose(
        [
            transforms.Resize(input_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.75696, 0.71561, 0.63938], std=[0.19681, 0.20038, 0.24713]),
        ]
    )

    return transform(image).unsqueeze(0), original_size


# アノテーションの読み込みと前処理
def load_and_preprocess_annotations(annotation_file):
    """アノテーションファイルを読み込み、前処理を行う

    Args:
        annotation_file (str): アノテーションファイルのパス

    Returns:
        pd.DataFrame: 前処理済みのアノテーションデータ
    """
    df = pd.read_csv(annotation_file)

    # boxカラムを文字列からリストに変換
    df["box_in_original"] = df["box_in_original"].apply(ast.literal_eval)

    # original_imageからファイル名のみを抽出
    df["image_name"] = df["original_image"].apply(lambda x: os.path.basename(x))

    return df


# テストデータのパスを設定
test_image_dir = "data/yolo_dataset_page_images_by_book/train/images"
test_annotation_file = "data/processed/column_info.csv"

# アノテーションの読み込み
if os.path.exists(test_annotation_file):
    annotations_df = load_and_preprocess_annotations(test_annotation_file)
    print(f"Loaded {len(annotations_df)} annotations")

    # 画像ファイルの存在確認
    image_paths = annotations_df["original_image"].unique()
    existing_images = [f for f in os.listdir(test_image_dir) if f.lower().endswith(('.jpg', '.png'))]
    existing_images = [image_path for image_path in image_paths if any(image_path.endswith(b) for b in existing_images)]
    print(f"Found {len(existing_images)} test images")
else:
    print(f"Warning: Annotation file not found at {test_annotation_file}")

## 3. モデルのセットアップ

In [None]:
# YOLOモデルの読み込み
from ultralytics import YOLO

model = YOLO(model_path)
model.to(device)
print("Model loaded successfully")

## 4. 推論と評価

In [None]:
def calculate_iou(box1, box2):
    """IoUを計算する

    Args:
        box1: [x1, y1, x2, y2]
        box2: [x1, y1, x2, y2]

    Returns:
        float: IoUスコア
    """
    # 交差領域の座標を計算
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])

    # 交差領域の面積を計算
    intersection = max(0, x2 - x1) * max(0, y2 - y1)

    # それぞれのボックスの面積を計算
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])

    # IoUを計算
    union = box1_area + box2_area - intersection
    return intersection / union if union > 0 else 0


def evaluate_predictions(predictions, ground_truth, iou_threshold=0.6):
    """予測結果を評価する

    Args:
        predictions: 予測された矩形のリスト [x1, y1, x2, y2, conf]
        ground_truth: 正解の矩形のリスト [x1, y1, x2, y2]
        iou_threshold: IoUの閾値

    Returns:
        dict: 評価指標
    """
    true_positives = 0
    false_positives = 0
    false_negatives = len(ground_truth)

    # 各予測に対して最も近い正解を探す
    matched_gt = set()
    for pred in predictions:
        best_iou = 0
        best_gt_idx = -1

        for i, gt in enumerate(ground_truth):
            if i in matched_gt:
                continue

            iou = calculate_iou(pred[:4], gt)
            if iou > best_iou:
                best_iou = iou
                best_gt_idx = i

        if best_iou >= iou_threshold:
            true_positives += 1
            matched_gt.add(best_gt_idx)
            false_negatives -= 1
        else:
            false_positives += 1

    # 評価指標の計算
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "true_positives": true_positives,
        "false_positives": false_positives,
        "false_negatives": false_negatives,
    }


# テスト画像に対して推論と評価を実行
results = []
metrics = []

for image_path in tqdm(existing_images, desc="Testing"):
    image_name = os.path.basename(image_path)

    # 推論
    with torch.no_grad():
        predictions = model(image_path)[0]

    # 予測結果の取得
    pred_boxes = predictions.boxes.data.cpu().numpy()

    # グラウンドトゥルースの取得
    gt_boxes = np.array(list(annotations_df[annotations_df["image_name"] == image_name]["box_in_original"]))

    # 評価
    metric = evaluate_predictions(pred_boxes, gt_boxes)
    metrics.append(metric)

    # 結果を保存
    results.append(
        {
            "image_name": image_name,
            "image_path": image_path,
            "predictions": pred_boxes,
            "ground_truth": gt_boxes,
            "metrics": metric,
        }
    )

# 全体の評価指標を計算
overall_metrics = {
    "precision": np.mean([m["precision"] for m in metrics]),
    "recall": np.mean([m["recall"] for m in metrics]),
    "f1": np.mean([m["f1"] for m in metrics]),
}

print("\nOverall Metrics:")
for k, v in overall_metrics.items():
    print(f"{k}: {v:.4f}")

TEST Overall Metrics:
precision: 0.8600
recall: 0.9050
f1: 0.8724

TRAIN Overall Metrics:
precision: 0.8723
recall: 0.8955
f1: 0.8754

## 5. 結果の可視化

In [None]:
def visualize_results(result):
    """検出結果を可視化する

    Args:
        result: 結果辞書
    """
    image = Image.open(result["image_path"])

    plt.figure(figsize=(12, 8))
    plt.imshow(image)

    # 予測結果の描画（赤）
    for box in result["predictions"]:
        x1, y1, x2, y2, conf, _ = box
        rect = plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, color="red", linewidth=2)
        plt.gca().add_patch(rect)
        plt.text(x1, y1 - 5, f"{conf:.2f}", color="red")

    # 正解の描画（緑）
    for box in result["ground_truth"]:
        x1, y1, x2, y2 = box
        rect = plt.Rectangle((x1, y1), x2 - x1, y2 - y1, fill=False, color="green", linewidth=2)
        plt.gca().add_patch(rect)

    plt.axis("off")
    plt.title(f"Image: {result['image_name']}\nF1: {result['metrics']['f1']:.4f}")
    plt.show()


# 最良と最悪のケースを可視化
sorted_results = sorted(results, key=lambda x: x["metrics"]["f1"], reverse=True)

print("Best Case:")
best_case = sorted_results[3]
visualize_results(best_case)

print("\nWorst Case:")
worst_case = sorted_results[-1]
visualize_results(worst_case)

## 6. エラー分析

In [None]:
# F1スコアの分布を可視化
f1_scores = [r["metrics"]["f1"] for r in results]

plt.figure(figsize=(10, 6))
plt.hist(f1_scores, bins=20)
plt.title("Distribution of F1 Scores")
plt.xlabel("F1 Score")
plt.ylabel("Count")
plt.grid(True)
plt.show()

# 困難なケースの分析
difficult_cases = [r for r in results if r["metrics"]["f1"] > 0.8 and r["metrics"]["f1"] < 1.0]
print(f"\nFound {len(difficult_cases)} difficult cases (F1 < 0.5)")

if difficult_cases:
    print("\nAnalyzing a sample of difficult cases:")
    for case in difficult_cases[:5]:
        print(f"\nImage: {case['image_name']}")
        print(f"Metrics: {case['metrics']}")
        visualize_results(case)