In [1]:
import numpy as np
import torch
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import onnxruntime as ort
import mxnet as mx
from mxnet import recordio
from umap.umap_ import UMAP
from sklearn.cluster import KMeans
import matplotlib.pyplot as plt
import seaborn as sns
import mlflow
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
import gc
from collections import defaultdict
import faiss
import h5py
from sklearn.metrics import roc_curve, roc_auc_score

In [2]:
class MXFaceDataset(Dataset):
    def __init__(self, rec_path, idx_path, transform=None):
        self.rec_path = rec_path
        self.idx_path = idx_path
        self.transform = transform
        self.record = recordio.MXIndexedRecordIO(self.idx_path, self.rec_path, 'r')
        self.keys = list(self.record.keys)

    def __len__(self):
        return len(self.keys)
    
    def __getitem__(self, idx):
        try:
            key = self.keys[idx]
            header, img = recordio.unpack(self.record.read_idx(key))
            label = int(header.label)
            img = mx.image.imdecode(img)
            img = Image.fromarray(img.asnumpy())
            if self.transform:
                img = self.transform(img)
            return img, label
        except Exception as e:
            print(f"Error loading image at index {idx}: {e}")
            blank_img = Image.fromarray(np.zeros((112, 112, 3), dtype=np.uint8))
            if self.transform:
                blank_img = self.transform(blank_img)
            return blank_img, -1

In [3]:
test_transform = transforms.Compose([
    transforms.Resize((112, 112)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [4]:
def load_onnx_model(model_path):
    so = ort.SessionOptions()
    so.log_severity_level = 3
    ort_session = ort.InferenceSession(
        model_path,
        providers=['CUDAExecutionProvider', 'CPUExecutionProvider'],
        sess_options=so
    )
    return ort_session

In [5]:
def l2_normalize(x):
    norms = np.linalg.norm(x, axis=1, keepdims=True)
    return x / np.clip(norms, 1e-10, None)

In [6]:
def get_embeddings(ort_session, dataloader, device='cuda', h5_path="embeddings.h5", max_classes=None, sample_per_class=None):
    if max_classes or sample_per_class:
        print("[INFO] Applying class/sample subsampling before embedding generation...")
        label_to_indices = defaultdict(list)
        for idx in range(len(dataloader.dataset)):
            try:
                _, lbl = dataloader.dataset[idx]
                if lbl != -1:
                    label_to_indices[lbl].append(idx)
            except Exception:
                continue

        all_labels = list(label_to_indices.keys())
        if max_classes and len(all_labels) > max_classes:
            sampled_labels = np.random.choice(all_labels, max_classes, replace=False)
        else:
            sampled_labels = all_labels

        subset_indices = []
        for lbl in sampled_labels:
            idxs = label_to_indices[lbl]
            if sample_per_class and len(idxs) > sample_per_class:
                idxs = np.random.choice(idxs, sample_per_class, replace=False).tolist()
            subset_indices.extend(idxs)

        # buat dataloader baru dengan subset
        subset = torch.utils.data.Subset(dataloader.dataset, subset_indices)
        dataloader = DataLoader(
            subset,
            batch_size=dataloader.batch_size,
            shuffle=False,
            num_workers=dataloader.num_workers,
            pin_memory=True
        )
        print(f"[INFO] Subsampled dataset: {len(sampled_labels)} classes, {len(subset_indices)} samples")

    labels = []
    total_samples = len(dataloader.dataset)

    first_batch = True
    offset = 0

    with h5py.File(h5_path, "w") as h5f:
        dset = None

        pbar = tqdm(dataloader, desc="Generating embeddings", unit="batch")
        with torch.inference_mode():
            for batch_idx, (images, batch_labels) in enumerate(pbar):
                try:
                    images = images.to(device, non_blocking=True)
                    resized = F.interpolate(images, size=(112, 112),
                                            mode='bilinear', align_corners=False)

                    ort_inputs = {ort_session.get_inputs()[0].name: resized.cpu().numpy()}
                    ort_outs = ort_session.run(None, ort_inputs)
                    emb = ort_outs[0].astype(np.float32)  # (B, D)

                    # normalisasi embedding langsung
                    emb = l2_normalize(emb)

                    if first_batch:
                        emb_dim = emb.shape[1]
                        dset = h5f.create_dataset(
                            "embeddings", (total_samples, emb_dim), dtype="float32"
                        )
                        first_batch = False

                    batch_size = emb.shape[0]
                    dset[offset:offset+batch_size] = emb
                    offset += batch_size

                    labels.append(batch_labels.numpy())

                    if batch_idx % 50 == 0:
                        torch.cuda.empty_cache()
                        gc.collect()

                except Exception as e:
                    print(f"Error processing batch {batch_idx}: {e}")
                    continue

    labels = np.concatenate(labels)
    valid_mask = labels != -1
    return h5_path, labels[valid_mask]

In [7]:
def compute_intra_inter(h5_path, labels):
    intra, inter = [], []

    with h5py.File(h5_path, "r") as h5f:
        embeddings = h5f["embeddings"]
        label_to_idx = defaultdict(list)
        for idx, lbl in enumerate(labels):
            label_to_idx[lbl].append(idx)

        all_labels = list(label_to_idx.keys())

        for lbl in all_labels:
            idxs = label_to_idx[lbl]
            if len(idxs) < 2:
                continue
            embs = np.array([embeddings[i] for i in idxs], dtype=np.float32)

            index = faiss.IndexFlatIP(embs.shape[1])
            index.add(embs)
            D, _ = index.search(embs, embs.shape[0])
            for i in range(embs.shape[0]):
                intra.extend(D[i, i+1:])

        all_lbls = list(label_to_idx.keys())
        for i in range(len(all_lbls)):
            lbl_i = all_lbls[i]
            embs_i = np.array([embeddings[k] for k in label_to_idx[lbl_i]], dtype=np.float32)
            for j in range(i+1, len(all_lbls)):
                lbl_j = all_lbls[j]
                embs_j = np.array([embeddings[k] for k in label_to_idx[lbl_j]], dtype=np.float32)

                index_j = faiss.IndexFlatIP(embs_j.shape[1])
                index_j.add(embs_j)
                D, _ = index_j.search(embs_i, embs_j.shape[0])
                inter.extend(D.flatten())

    return np.array(intra, dtype=np.float32), np.array(inter, dtype=np.float32)

In [8]:
def evaluate_roc_auc(intra, inter):
    y_true = np.concatenate([np.ones_like(intra), np.zeros_like(inter)])
    y_scores = np.concatenate([intra, inter])
    fpr, tpr, thresholds = roc_curve(y_true, y_scores)
    auc = roc_auc_score(y_true, y_scores)

    # EER
    fnr = 1 - tpr
    eer_threshold = thresholds[np.nanargmin(np.absolute(fnr - fpr))]
    eer = fpr[np.nanargmin(np.absolute(fnr - fpr))]

    # Auto threshold (Youden’s J statistic)
    j_scores = tpr - fpr
    best_idx = np.argmax(j_scores)
    best_threshold = thresholds[best_idx]

    return fpr, tpr, auc, eer, eer_threshold, best_threshold

In [9]:
def evaluate_identification(h5_path, labels, k=5, max_samples=10000):
    with h5py.File(h5_path, "r") as h5f:
        embeddings_ds = h5f["embeddings"]
        N = len(labels)
        
        if N > max_samples:
            idx_sample = np.random.choice(N, max_samples, replace=False)
            embs = np.array([embeddings_ds[i] for i in idx_sample], dtype=np.float32)
            lbls = np.array(labels)[idx_sample]
        else:
            embs = np.array(embeddings_ds, dtype=np.float32)
            lbls = np.array(labels)
        
        dim = embs.shape[1]
        index = faiss.IndexFlatIP(dim)
        index.add(embs)

        D, I = index.search(embs, k+1)
        top1, topk = 0, 0
        for i in range(embs.shape[0]):
            neighbors = I[i][1:]
            neighbor_labels = lbls[neighbors]
            if neighbor_labels[0] == lbls[i]:
                top1 += 1
            if lbls[i] in neighbor_labels:
                topk += 1

        return top1 / embs.shape[0], topk / embs.shape[0]

In [10]:
def evaluate_verification(intra, inter, thresholds=[0.3,0.4,0.5,0.6,0.7]):
    results = []
    for threshold in thresholds:
        tp = sum(1 for sim in intra if sim >= threshold)
        tn = sum(1 for sim in inter if sim < threshold)
        fp = sum(1 for sim in inter if sim >= threshold)
        fn = sum(1 for sim in intra if sim < threshold)
        
        accuracy = (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        results.append({
            'threshold': threshold,
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1
        })
    return results

In [11]:
def plot_similarity_distribution(intra, inter, title="Similarity Distribution"):
    plt.figure(figsize=(8,6))
    sns.kdeplot(intra, label="Intra-class", fill=True, alpha=0.5)
    sns.kdeplot(inter, label="Inter-class", fill=True, alpha=0.5)
    plt.xlabel("Cosine Similarity")
    plt.ylabel("Density")
    plt.title(title)
    plt.legend()
    return plt.gcf()

In [12]:
def plot_roc(fpr, tpr, auc, title="ROC Curve"):
    plt.figure(figsize=(8,6))
    plt.plot(fpr, tpr, label=f"AUC = {auc:.4f}")
    plt.plot([0,1], [0,1], 'k--')
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title(title)
    plt.legend(loc="lower right")
    return plt.gcf()

In [13]:
def plot_embedding_cluster(h5_path, max_samples=5000, n_neighbors=15, min_dist=0.1, n_components=2, n_clusters=10):
    with h5py.File(h5_path, "r") as h5f:
        embeddings_ds = h5f["embeddings"]
        N = len(embeddings_ds)
        if N > max_samples:
            idx_sample = np.random.choice(N, max_samples, replace=False)
            embs = np.array([embeddings_ds[i] for i in idx_sample], dtype=np.float32)
        else:
            embs = np.array(embeddings_ds, dtype=np.float32)

        # UMAP untuk reduksi dimensi
        reducer = UMAP(n_neighbors=n_neighbors, min_dist=min_dist, 
                       n_components=n_components, random_state=42)
        embedding_2d = reducer.fit_transform(embs)

        # Clustering KMeans
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        cluster_ids = kmeans.fit_predict(embs)

        # Plot hasil cluster
        plt.figure(figsize=(10,8))
        scatter = plt.scatter(embedding_2d[:,0], embedding_2d[:,1], 
                              c=cluster_ids, cmap="tab20", s=5, alpha=0.7)
        plt.colorbar(scatter, label="Cluster IDs")
        plt.title(f"UMAP Projection with {n_clusters} Clusters (KMeans)")
        return plt.gcf()

In [14]:
model_path = './student_resnet18.onnx'
rec_path = './ms1m-retinaface-t1-compact/train.rec'
idx_path = './ms1m-retinaface-t1-compact/train.idx'

ort_session = load_onnx_model(model_path)
dataset = MXFaceDataset(rec_path=rec_path, idx_path=idx_path, transform=test_transform)
loader = DataLoader(dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=True)

In [15]:
print("Active provider:", ort_session.get_providers())
print(ort.get_device()) 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

Active provider: ['CUDAExecutionProvider', 'CPUExecutionProvider']
GPU
cuda


In [None]:
all_labels = [dataset[i][1] for i in range(len(dataset))]
num_classes = len(set(all_labels))
print(f"Total classes: {num_classes}")

In [None]:
mlflow.set_experiment("Face Recognition Embedding Analysis")

with mlflow.start_run(run_name="Student - ResNet18 Backbone") as run:
    print(f"MLflow Run ID: {run.info.run_id}")

    embeddings_path, labels = get_embeddings(ort_session, loader, max_classes=500, sample_per_class=30)
    print(f"Embeddings saved to HDF5")

    mlflow.log_metric("total_samples", len(labels))
    mlflow.log_metric("embedding_dimension", 512)

    intra, inter = compute_intra_inter(embeddings_path, labels)
    mlflow.log_metric("intra_mean", float(np.mean(intra)))
    mlflow.log_metric("inter_mean", float(np.mean(inter)))
    mlflow.log_metric("separation_margin", float(np.mean(intra) - np.mean(inter)))

    top1, top5 = evaluate_identification(embeddings_path, labels, k=5)
    mlflow.log_metric("top1_accuracy", top1)
    mlflow.log_metric("top5_accuracy", top5)

    verification_results = evaluate_verification(intra, inter)
    for r in verification_results:
        mlflow.log_metric(f"acc_t{r['threshold']}", r['accuracy'])
        mlflow.log_metric(f"prec_t{r['threshold']}", r['precision'])
        mlflow.log_metric(f"rec_t{r['threshold']}", r['recall'])
        mlflow.log_metric(f"f1_t{r['threshold']}", r['f1'])

    fpr, tpr, auc, eer, eer_thr, best_thr = evaluate_roc_auc(intra, inter)
    mlflow.log_metric("roc_auc", auc)
    mlflow.log_metric("eer", eer)
    mlflow.log_metric("eer_threshold", eer_thr)
    mlflow.log_metric("best_threshold", best_thr)

    fig_roc = plot_roc(fpr, tpr, auc)
    fig_roc.savefig("roc_curve.png", dpi=300, bbox_inches='tight')
    mlflow.log_artifact("roc_curve.png")
    plt.close(fig_roc)

    fig_sim = plot_similarity_distribution(intra, inter)
    fig_sim.savefig("similarity_distribution.png", dpi=300, bbox_inches='tight')
    mlflow.log_artifact("similarity_distribution.png")
    plt.close(fig_sim)

    fig_umap = plot_embedding_cluster(embeddings_path, labels)
    fig_umap.savefig("umap_projection.png", dpi=300, bbox_inches='tight')
    mlflow.log_artifact("umap_projection.png")
    plt.close(fig_umap)

    mlflow.log_artifact(model_path, "model")
    print(f"✅ Full evaluation completed. AUC={auc:.4f}, EER={eer:.4f}, results saved to MLflow!")

MLflow Run ID: 67c48595c4ea44d4aca1709f6066999c
[INFO] Applying class/sample subsampling before embedding generation...
[INFO] Subsampled dataset: 500 classes, 13754 samples


Generating embeddings: 100%|██████████| 215/215 [00:07<00:00, 28.31batch/s]


Embeddings saved to HDF5
