# DS Evaluation Pipeline: Matching & Metrics

**Purpose**: Load pre-computed `.npz` files, run the full matching pipeline (geometric + descriptor + fusion), and evaluate with standard biometric metrics.

**Prerequisites**:
- Google Drive with `face-auth-data/mast3r_outputs/` (enrollment `.npz`) and `face-auth-data/auth_probes/` (probe `.npz`)
- These files are generated by CS-1's `scripts/prepare_public_dataset.py`

**No MASt3R model loading needed** — this notebook works entirely with pre-computed data.

**GPU**: If a T4 GPU is available, descriptor matching is automatically accelerated.

---

In [None]:
# ============================================================
# Cell 1: Environment Setup
# ============================================================
# Install dependencies and mount Google Drive.
# ============================================================

!pip install -q open3d plotly scikit-learn tqdm

import torch
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    vram_gb = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f"GPU: {gpu_name} ({vram_gb:.1f} GB) — descriptor matching will use GPU")
else:
    print("No GPU — descriptor matching will use CPU (still works, just slower)")

from google.colab import drive
drive.mount('/content/drive')

import os
SHARED_DIR = "/content/drive/MyDrive/face-auth-data"
ENROLLMENT_DIR = f"{SHARED_DIR}/mast3r_outputs"
PROBE_DIR = f"{SHARED_DIR}/auth_probes"

print(f"\nEnrollment dir: {ENROLLMENT_DIR}")
print(f"Probe dir:      {PROBE_DIR}")
print(f"Enrollment dir exists: {os.path.isdir(ENROLLMENT_DIR)}")
print(f"Probe dir exists:      {os.path.isdir(PROBE_DIR)}")

In [None]:
# ============================================================
# Cell 2: Clone Repo & Configure Python Path
# ============================================================

import sys

REPO_URL = "https://github.com/gaelgm03/ai-visual-computing-pbl.git"
BRANCH = "main"  # Change to your feature branch if needed
REPO_DIR = "/content/ai-visual-computing-pbl"

if not os.path.exists(REPO_DIR):
    !git clone --depth 1 {REPO_URL} {REPO_DIR}

%cd {REPO_DIR}
!git fetch origin && git checkout {BRANCH} && git pull origin {BRANCH}

sys.path.insert(0, REPO_DIR)
print(f"\nRepo ready at {REPO_DIR}, branch: {BRANCH}")

In [None]:
# ============================================================
# Cell 3: Discover Available .npz Files
# ============================================================

import numpy as np
from pathlib import Path

def discover_data(enrollment_dir, probe_dir):
    """Find all enrollment/probe pairs by matching person names."""
    enrollments = {}
    probes = {}

    if os.path.isdir(enrollment_dir):
        for f in os.listdir(enrollment_dir):
            if f.endswith("_enrollment.npz"):
                name = f.replace("_enrollment.npz", "")
                enrollments[name] = os.path.join(enrollment_dir, f)

    if os.path.isdir(probe_dir):
        for f in os.listdir(probe_dir):
            if f.endswith("_probe.npz"):
                name = f.replace("_probe.npz", "")
                probes[name] = os.path.join(probe_dir, f)

    subjects = sorted(set(enrollments.keys()) & set(probes.keys()))
    return subjects, enrollments, probes


subjects, enrollments, probes = discover_data(ENROLLMENT_DIR, PROBE_DIR)

print(f"Found {len(subjects)} subjects with both enrollment and probe data:")
for s in subjects:
    e = np.load(enrollments[s])
    p = np.load(probes[s])
    print(f"  {s}: enrollment={e['point_cloud'].shape[0]:,} pts, "
          f"probe={p['point_cloud'].shape[0]:,} pts")

if not subjects:
    print("\nNo matching enrollment/probe pairs found!")
    print(f"  Check that {ENROLLMENT_DIR} has *_enrollment.npz files")
    print(f"  and {PROBE_DIR} has *_probe.npz files")

In [None]:
# ============================================================
# Cell 4: Inspect Data Format
# ============================================================

sample = np.load(enrollments[subjects[0]], allow_pickle=True)
print("Keys:", list(sample.keys()))
for key in sample.keys():
    arr = sample[key]
    if hasattr(arr, 'shape'):
        print(f"  {key}: shape={arr.shape}, dtype={arr.dtype}")
    else:
        print(f"  {key}: {type(arr)}")

# Check descriptor normalization
desc = sample["descriptors"]
norms = np.linalg.norm(desc, axis=1)
print(f"\nDescriptor norms: min={norms.min():.3f}, max={norms.max():.3f}, mean={norms.mean():.3f}")
if abs(norms.mean() - 1.0) > 0.1:
    print("Descriptors are NOT unit-normalized — the matcher will normalize internally.")
else:
    print("Descriptors appear unit-normalized.")

In [None]:
# ============================================================
# Cell 5: Visualize Point Cloud (CS2DS-share.md §4.2)
# ============================================================

import plotly.graph_objects as go
from plotly.subplots import make_subplots

def visualize_point_cloud(points, colors=None, title="Point Cloud", max_points=10000):
    """Interactive 3D point cloud visualization."""
    if len(points) > max_points:
        idx = np.random.choice(len(points), max_points, replace=False)
        points = points[idx]
        if colors is not None:
            colors = colors[idx]

    if colors is not None:
        color_str = [f'rgb({r},{g},{b})' for r, g, b in colors]
    else:
        color_str = points[:, 2]

    fig = go.Figure(data=[go.Scatter3d(
        x=points[:, 0], y=points[:, 1], z=points[:, 2],
        mode='markers',
        marker=dict(size=1.5, color=color_str, opacity=0.8),
    )])
    fig.update_layout(
        title=title,
        scene=dict(aspectmode='data'),
        width=800, height=600,
    )
    return fig


# Show enrollment and probe side by side for the first subject
person = subjects[0]
e_data = np.load(enrollments[person])
p_data = np.load(probes[person])

fig = visualize_point_cloud(
    e_data["point_cloud"], e_data["colors"],
    title=f"{person} — Enrollment ({e_data['point_cloud'].shape[0]:,} pts)"
)
fig.show()

fig = visualize_point_cloud(
    p_data["point_cloud"], p_data["colors"],
    title=f"{person} — Probe ({p_data['point_cloud'].shape[0]:,} pts)"
)
fig.show()

In [None]:
# ============================================================
# Cell 6: Initialize Matchers
# ============================================================

from core.matching.geometric_matcher import ICPGeometricMatcher
from core.matching.descriptor_matcher import NNDescriptorMatcher
from core.matching.score_fusion import WeightedFusion

config = {
    "icp": {
        "max_iterations": 50,
        "convergence_threshold": 1e-6,
        "max_correspondence_distance": 0.05,
    },
    "chamfer_alpha": 30.0,
    "geometric_subsample": 10000,
    "descriptor_subsample": 15000,
    "match_ratio_weight": 0.4,
    "avg_similarity_weight": 0.6,
    "geometric_weight": 0.4,
    "descriptor_weight": 0.6,
    "accept_threshold": 0.65,
}

geo_matcher = ICPGeometricMatcher(config)
desc_matcher = NNDescriptorMatcher(config)
fusion = WeightedFusion(config)

print("Matchers initialized:")
print(f"  Geometric: ICP + Chamfer (subsample={config['geometric_subsample']})")
print(f"  Descriptor: Reciprocal NN (subsample={config['descriptor_subsample']})")
print(f"  Fusion: geo={config['geometric_weight']}, desc={config['descriptor_weight']}, threshold={config['accept_threshold']}")

In [None]:
# ============================================================
# Cell 7: Single Genuine Pair Test
# ============================================================

import time

person = subjects[0]
enrollment = np.load(enrollments[person], allow_pickle=True)
probe = np.load(probes[person], allow_pickle=True)

print(f"=== Genuine pair: {person} vs {person} ===")

t0 = time.time()
geo_result = geo_matcher.compare(probe["point_cloud"], enrollment["point_cloud"])
t1 = time.time()
desc_result = desc_matcher.compare(
    probe["descriptors"], enrollment["descriptors"],
    probe["point_cloud"], enrollment["point_cloud"],
)
t2 = time.time()
final_result = fusion.fuse(geo_result, desc_result)
t3 = time.time()

print(f"  Geometric:  {geo_result.score:.3f}  (Chamfer: {geo_result.details.get('chamfer_distance', 'N/A')})")
print(f"  Descriptor: {desc_result.score:.3f}  (Match ratio: {desc_result.details.get('match_ratio', 0):.3f}, "
      f"Avg sim: {desc_result.details.get('avg_cosine_similarity', 0):.3f})")
print(f"  Fused:      {final_result.score:.3f}  is_match={final_result.is_match}")
print(f"  Time: geo={t1-t0:.2f}s, desc={t2-t1:.2f}s, fusion={t3-t2:.4f}s")
print(f"  Backend: desc={desc_result.details.get('backend', 'unknown')}")

In [None]:
# ============================================================
# Cell 8: Single Impostor Pair Test
# ============================================================

if len(subjects) >= 2:
    person_a = subjects[0]
    person_b = subjects[1]
    enrollment_a = np.load(enrollments[person_a], allow_pickle=True)
    probe_b = np.load(probes[person_b], allow_pickle=True)

    geo_result = geo_matcher.compare(probe_b["point_cloud"], enrollment_a["point_cloud"])
    desc_result = desc_matcher.compare(
        probe_b["descriptors"], enrollment_a["descriptors"],
        probe_b["point_cloud"], enrollment_a["point_cloud"],
    )
    final_result = fusion.fuse(geo_result, desc_result)

    print(f"=== Impostor pair: {person_b} vs {person_a} ===")
    print(f"  Geometric:  {geo_result.score:.3f}")
    print(f"  Descriptor: {desc_result.score:.3f}")
    print(f"  Fused:      {final_result.score:.3f}  is_match={final_result.is_match}")
else:
    print("Need at least 2 subjects for impostor test")

In [None]:
# ============================================================
# Cell 9: All-vs-All Matching
# ============================================================

from tqdm import tqdm

similarities = []
labels = []  # 1 = genuine, 0 = impostor
pair_info = []

# Cache loaded data to avoid re-reading
enrollment_cache = {}
probe_cache = {}
for s in subjects:
    enrollment_cache[s] = np.load(enrollments[s], allow_pickle=True)
    probe_cache[s] = np.load(probes[s], allow_pickle=True)

total_pairs = len(subjects) ** 2
print(f"Running {total_pairs} pairs ({len(subjects)} subjects x {len(subjects)} enrollments)...")

for probe_person in tqdm(subjects, desc="Matching"):
    p_data = probe_cache[probe_person]

    for enroll_person in subjects:
        e_data = enrollment_cache[enroll_person]
        is_genuine = int(probe_person == enroll_person)

        geo_result = geo_matcher.compare(
            p_data["point_cloud"], e_data["point_cloud"]
        )
        desc_result = desc_matcher.compare(
            p_data["descriptors"], e_data["descriptors"],
            p_data["point_cloud"], e_data["point_cloud"],
        )
        final_result = fusion.fuse(geo_result, desc_result)

        similarities.append(final_result.score)
        labels.append(is_genuine)
        pair_info.append({
            "probe": probe_person,
            "enrollment": enroll_person,
            "genuine": is_genuine,
            "geo_score": geo_result.score,
            "desc_score": desc_result.score,
            "fused_score": final_result.score,
        })

similarities = np.array(similarities)
labels = np.array(labels)

print(f"\nTotal pairs: {len(similarities)}")
print(f"  Genuine:  {labels.sum()} ({labels.mean()*100:.1f}%)")
print(f"  Impostor: {(1-labels).sum().astype(int)} ({(1-labels).mean()*100:.1f}%)")
print(f"\nGenuine score range:  [{similarities[labels==1].min():.3f}, {similarities[labels==1].max():.3f}]")
print(f"Impostor score range: [{similarities[labels==0].min():.3f}, {similarities[labels==0].max():.3f}]")

In [None]:
# ============================================================
# Cell 10: Evaluate with FaceRecognitionEvaluator
# ============================================================

from core.evaluation import FaceRecognitionEvaluator

evaluator = FaceRecognitionEvaluator(threshold=config["accept_threshold"])
eval_result = evaluator.evaluate(
    similarities=similarities.tolist(),
    labels=labels.tolist(),
    plot=True,
)

print(f"\n{'='*50}")
print(f"EVALUATION RESULTS (threshold={eval_result.threshold:.2f})")
print(f"{'='*50}")
print(f"  Accuracy:  {eval_result.accuracy:.3f}")
print(f"  Precision: {eval_result.precision:.3f}")
print(f"  Recall:    {eval_result.recall:.3f}")
print(f"  F1 Score:  {eval_result.f1_score:.3f}")
print(f"  FAR:       {eval_result.far:.3f}")
print(f"  TAR:       {eval_result.tar:.3f}")
print(f"  EER:       {eval_result.eer:.3f} (at threshold={eval_result.eer_threshold:.3f})")
print(f"  AUC:       {eval_result.auc_score:.3f}")

In [None]:
# ============================================================
# Cell 11: Per-Path Score Distributions
# ============================================================

import matplotlib.pyplot as plt

geo_scores = np.array([p['geo_score'] for p in pair_info])
desc_scores = np.array([p['desc_score'] for p in pair_info])
fused_scores = np.array([p['fused_score'] for p in pair_info])

fig, axes = plt.subplots(1, 3, figsize=(18, 5))

for ax, scores, title in zip(
    axes,
    [geo_scores, desc_scores, fused_scores],
    ['Geometric (ICP + Chamfer)', 'Descriptor (Reciprocal NN)', 'Fused (Weighted)'],
):
    ax.hist(scores[labels == 1], bins=20, alpha=0.7, label='Genuine', color='green')
    ax.hist(scores[labels == 0], bins=20, alpha=0.7, label='Impostor', color='red')
    ax.axvline(x=config['accept_threshold'], color='black', linestyle='--', label='Threshold')
    ax.set_title(title)
    ax.set_xlabel('Score')
    ax.set_ylabel('Count')
    ax.legend()

plt.tight_layout()
plt.show()

In [None]:
# ============================================================
# Cell 12: Score Matrix Heatmap
# ============================================================

n = len(subjects)
score_matrix = fused_scores.reshape(n, n)

fig = go.Figure(data=go.Heatmap(
    z=score_matrix,
    x=subjects,
    y=subjects,
    colorscale='RdYlGn',
    zmin=0, zmax=1,
    text=np.round(score_matrix, 2),
    texttemplate="%{text}",
))
fig.update_layout(
    title="Matching Score Matrix (Probe vs Enrollment)",
    xaxis_title="Enrollment",
    yaxis_title="Probe",
    width=max(500, 80 * n),
    height=max(400, 70 * n),
)
fig.show()

In [None]:
# ============================================================
# Cell 13: Weight Optimization (Grid Search)
# ============================================================

from sklearn.metrics import f1_score as sk_f1_score

alphas = np.arange(0.0, 1.05, 0.05)
results_grid = []

for alpha in alphas:
    beta = 1.0 - alpha
    fused = alpha * geo_scores + beta * desc_scores

    best_f1, best_thresh = 0.0, 0.5
    for thresh in np.arange(0.1, 0.95, 0.01):
        preds = (fused >= thresh).astype(int)
        f1_val = sk_f1_score(labels, preds, zero_division=0)
        if f1_val > best_f1:
            best_f1 = f1_val
            best_thresh = thresh

    results_grid.append({
        'alpha': round(alpha, 2),
        'beta': round(beta, 2),
        'best_f1': best_f1,
        'best_threshold': round(best_thresh, 2),
    })

best = max(results_grid, key=lambda x: x['best_f1'])

print(f"{'Alpha':>6} {'Beta':>6} {'Best F1':>8} {'Threshold':>10}")
print("-" * 35)
for r in results_grid:
    marker = " <-- BEST" if r == best else ""
    print(f"{r['alpha']:>6.2f} {r['beta']:>6.2f} {r['best_f1']:>8.3f} {r['best_threshold']:>10.2f}{marker}")

print(f"\nOptimal: geometric_weight={best['alpha']:.2f}, "
      f"descriptor_weight={best['beta']:.2f}, "
      f"threshold={best['best_threshold']:.2f}, "
      f"F1={best['best_f1']:.3f}")

In [None]:
# ============================================================
# Cell 14: Re-evaluate with Optimal Parameters
# ============================================================

optimal_similarities = best['alpha'] * geo_scores + best['beta'] * desc_scores
optimal_evaluator = FaceRecognitionEvaluator(threshold=best['best_threshold'])

optimal_result = optimal_evaluator.evaluate(
    optimal_similarities.tolist(),
    labels.tolist(),
    plot=True,
)

print(f"\n{'='*50}")
print(f"OPTIMIZED RESULTS")
print(f"{'='*50}")
print(f"  Weights:   geo={best['alpha']:.2f}, desc={best['beta']:.2f}")
print(f"  Threshold: {best['best_threshold']:.2f}")
print(f"  Accuracy:  {optimal_result.accuracy:.3f}")
print(f"  F1 Score:  {optimal_result.f1_score:.3f}")
print(f"  FAR:       {optimal_result.far:.3f}")
print(f"  TAR:       {optimal_result.tar:.3f}")
print(f"  EER:       {optimal_result.eer:.3f}")
print(f"  AUC:       {optimal_result.auc_score:.3f}")

In [None]:
# ============================================================
# Cell 15: Recommended config.yaml Values
# ============================================================

print("=" * 60)
print("RECOMMENDED CONFIG.YAML VALUES")
print("=" * 60)
print(f"""
matching:
  geometric_weight: {best['alpha']:.2f}     # Optimized alpha
  descriptor_weight: {best['beta']:.2f}    # Optimized beta
  accept_threshold: {best['best_threshold']:.2f}    # Optimized threshold
""")
print(f"Based on {len(subjects)} subjects, {len(similarities)} total pairs")
print(f"F1={optimal_result.f1_score:.3f}, EER={optimal_result.eer:.3f}, AUC={optimal_result.auc_score:.3f}")

# Save figures to Drive
figures_dir = f"{SHARED_DIR}/evaluation_results/figures"
optimal_evaluator.evaluate(
    optimal_similarities.tolist(),
    labels.tolist(),
    plot=False,
    save_dir=figures_dir,
)
print(f"\nFigures saved to {figures_dir}")

---

## Before Closing: Push to GitHub!

Colab sessions are temporary. Push any changes before closing.

In [None]:
# Uncomment and run before closing:

# %cd /content/ai-visual-computing-pbl
# !git add -A
# !git status
# !git commit -m "feat(ds1): add evaluation results"
# !git push origin {BRANCH}