In [19]:
import random
import json
import numpy as np
import torch
import torch.nn.functional as F
import cv2
import numpy as np
import yaml
import sys
import os
from matplotlib import pyplot as plt

## Loading the data for model testing

In [5]:
with open('eval_1_analysis.json', 'r') as f:
        data = json.load(f)

In [13]:
data["78061"]

{'age': 29, 'gender': 'Male', 'ethnicity': 'white'}

In [14]:
def pairs(data):

    target_ethnicities = ['indian', 'white', 'black', 'middle eastern', 'asian']
    pairs_per_ethnicity = 150
    identities_root = '/mnt/object/dataset/datasets/post_training_opt'

    # Initialize separate lists for different categories
    pairs = []
    indian_pairs = []
    white_pairs = []
    black_pairs = []
    middle_eastern_pairs = []
    asian_pairs = []
    male_pairs = []
    female_pairs = []

    # Prepare: group eligible identities by ethnicity
    eligible = {eth: [] for eth in target_ethnicities}
    for identity, info in data.items():
        eth = info['ethnicity'].lower()
        if eth in eligible:    
            eligible[eth].append((identity, info))

    for eth in target_ethnicities:
        random.shuffle(eligible[eth])
        count = 0
        for identity, info in eligible[eth]:
            folder = os.path.join(identities_root, identity)
            images = os.listdir(folder)
            if len(images) < 3:  # Need at least 3 images (1 anchor, 2 positive)
                continue
            
            # Select anchor and two positive images
            anchor_img = random.choice(images)
            remaining_images = [img for img in images if img != anchor_img]
            positive_imgs = random.sample(remaining_images, 2)
            
            # Find two negatives
            candidates = [
                (neg_id, neg_info) for neg_id, neg_info in eligible[eth]
                if neg_id != identity and neg_info['gender'] == info['gender']
            ]
            if len(candidates) < 2:
                continue
                
            negative_identities = random.sample(candidates, 2)
            current_pairs = []
            
            for i in range(2):
                anchor_path = os.path.join(folder, anchor_img)
                positive_path = os.path.join(folder, positive_imgs[i])
                
                neg_identity, _ = negative_identities[i]
                neg_folder = os.path.join(identities_root, neg_identity)
                neg_images = os.listdir(neg_folder)
                if not neg_images:
                    continue
                negative_img = random.choice(neg_images)
                negative_path = os.path.join(neg_folder, negative_img)
                
                pair = (anchor_path, positive_path, negative_path, identity, neg_identity)
                current_pairs.append(pair)
            
            if len(current_pairs) == 2:
                pairs.extend(current_pairs)
                # Add to specific ethnicity lists
                if eth == 'indian':
                    indian_pairs.extend(current_pairs)
                elif eth == 'white':
                    white_pairs.extend(current_pairs)
                elif eth == 'black':
                    black_pairs.extend(current_pairs)
                elif eth == 'middle eastern':
                    middle_eastern_pairs.extend(current_pairs)
                elif eth == 'asian':
                    asian_pairs.extend(current_pairs)
                    
                # Add to gender-specific lists
                if info['gender'] == 'Male':
                    male_pairs.extend(current_pairs)
                else:
                    female_pairs.extend(current_pairs)
                    
                count += 1
                if count >= pairs_per_ethnicity:
                    break

    return {"all_pairs": pairs, "indian_pairs": indian_pairs, "white_pairs": white_pairs,
            "black_pairs": black_pairs, "middle_eastern_pairs": middle_eastern_pairs,
            "asian_pairs": asian_pairs, "male_pairs": male_pairs, "female_pairs": female_pairs}

In [15]:
all_data = pairs(data)

In [17]:
len(all_data["all_pairs"])

1314

In [20]:
first_100 = all_data["all_pairs"][:100]

### Seeing if model works

In [24]:
from backbones.iresnet import iresnet100  # or your custom path if you cloned insightface locally

# Load YAML
# with open('ms1mv3_arcface_r100_fp16/model.yaml', 'r') as f:
#     config = yaml.safe_load(f)

# Create model
# Try loading a state_dict first, otherwise assume the file *is* the model
MODEL_PATH = "backbone.pth"
checkpoint = torch.load(MODEL_PATH, map_location="cpu")
if isinstance(checkpoint, dict):
    model = iresnet100(pretrained=False)
    model.load_state_dict(checkpoint)
else:
    model = checkpoint

model.eval()

import cv2
import numpy as np
import torch

def preprocess(img_path):
    img = cv2.imread(img_path)
    if img is None:
        raise ValueError(f"Image not found at {img_path}")
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    img = cv2.resize(img, (112, 112))
    img = np.transpose(img, (2, 0, 1))  # Channels first: (3, 112, 112)
    img = img.astype(np.float32)
    img = (img / 255.0 - 0.5) / 0.5  # Normalize
    img = torch.from_numpy(img)      # <-- CONVERT numpy -> torch.Tensor
    img = img.unsqueeze(0)           # Add batch dimension: (1, 3, 112, 112)
    return img, img_rgb


# Step 4: Inference function
def get_embedding(img_path):
    img_tensor, rgb = preprocess(img_path)
    with torch.no_grad():
        emb = model(img_tensor)
        emb = F.normalize(emb, p=2, dim=1)
    return emb, rgb

# Step 5: Compare two embeddings
def compare_embeddings(emb1, emb2, threshold=0.5):
    similarity = F.cosine_similarity(emb1, emb2).item()
    print(f"Cosine Similarity: {similarity:.4f}")
    if similarity > threshold:
        print("Result: SAME PERSON ✅")
    else:
        print("Result: DIFFERENT PERSON ❌")
    return similarity


def plot_images(img1, img2, similarity):
    fig, axs = plt.subplots(1, 2, figsize=(8, 4))
    axs[0].imshow(img1)
    axs[0].axis('off')
    axs[0].set_title('Image 1')

    axs[1].imshow(img2)
    axs[1].axis('off')
    axs[1].set_title('Image 2')

    plt.suptitle(f'Cosine Similarity: {similarity:.4f}', fontsize=16)
    plt.tight_layout()
    plt.show()

In [None]:
five_pairs = random.sample(first_100, 1)

num_samples = 0
for pair in first_100:

    embedding_anchor, rgb1 = get_embedding(pair[0])
    embedding_pos, rgb2 = get_embedding(pair[1])
    embedding_neg, rgb3 = get_embedding(pair[2])

    print("Positive pair:")
    plot_images(rgb1, rgb2, compare_embeddings(embedding_anchor, embedding_pos))
    print("Negative pair:")
    plot_images(rgb1, rgb3, compare_embeddings(embedding_anchor, embedding_neg))

In [61]:
model_size = os.path.getsize('backbone.pth') 

In [90]:
first_100 = all_data["all_pairs"][:100]
first_100 = first_100[:50]

Each data entry has 2 samples one [pstive pair and one negative pair

In [28]:
import time

In [45]:
import cv2
import numpy as np
import torch
import torch.nn.functional as F

def preprocess_batch(img_paths, device="cpu"):
    """
    Load & normalize a list of image files into a tensor
      shape (N, 3, 112, 112) on `device`.
    """
    tensors = []
    for p in img_paths:
        img = cv2.imread(p)
        if img is None:
            raise ValueError(f"Image not found: {p}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (112, 112))
        img = img.astype(np.float32)
        img = (img / 255.0 - 0.5) / 0.5
        t = torch.from_numpy(img).permute(2, 0, 1)  # HWC → CHW
        tensors.append(t)
    batch = torch.stack(tensors, dim=0).to(device)  # (N,3,112,112)
    return batch

def embed_pair(anchor_path, other_path, model, device="cpu"):
    """
    Batch the anchor & other image (2 images), run through model,
    return (anchor_emb, other_emb) as two (D,) tensors.
    """
    batch = preprocess_batch([anchor_path, other_path], device)  # (2,3,112,112)
    model.to(device).eval()
    with torch.no_grad():
        embs = model(batch)                   # (2, D)
        embs = F.normalize(embs, p=2, dim=1)  # L2-norm
    return embs[0], embs[1]

def batch_process_triplets(triplets, model, batch_size=32, device="cpu"):
    """
    Given a list of (anchor, positive, negative) triplets,
    returns three tensors of shape (N, D): (emb_anchors, emb_positives, emb_negatives).
    Uses one forward per group (anchors, positives, negatives).
    """
    model.to(device).eval()

    # split file lists
    anchors  = [a for a, p, n in triplets]
    positives= [p for a, p, n in triplets]
    negatives= [n for a, p, n in triplets]

    def embed_list(paths):
        """Helper: embed in sub-batches and concat."""
        embs = []
        for i in range(0, len(paths), batch_size):
            batch = preprocess_batch(paths[i : i + batch_size], device)
            with torch.no_grad():
                e = model(batch)
                e = F.normalize(e, p=2, dim=1)
            embs.append(e)
        return torch.cat(embs, dim=0)

    emb_a = embed_list(anchors)
    emb_p = embed_list(positives)
    emb_n = embed_list(negatives)

    return emb_a, emb_p, emb_n


### Single sample performance

In [84]:
num_samples = 0
correct = 0
latencies = []
for pair in first_100:
    start = time.time()
    anchor_emb, pos_emb = embed_pair(pair[0], pair[1], model)
    
    sim_1 = F.cosine_similarity(anchor_emb.unsqueeze(0), pos_emb.unsqueeze(0), dim = 1).item()
    if sim_1 >= 0.5:
        correct += 1
    num_samples +=1
    end = time.time()
    latencies.append(end-start)

    start = time.time()
    _, neg_emb = embed_pair(pair[0], pair[2], model)
    
    sim_2 = F.cosine_similarity(anchor_emb.unsqueeze(0), neg_emb.unsqueeze(0), dim = 1).item()
    if sim_2 < 0.5:
        correct += 1
    num_samples +=1

    end = time.time()
    latencies.append(end-start)
acc = correct/num_samples

In [85]:
print(f"Model Size on Disk: {model_size/ (1e6) :.2f} MB")
print(f"Accuracy: {acc*100:.2f}% ({correct}/{num_samples} correct)")
print(f"Inference Latency (single sample, median): {np.percentile(latencies, 50) * 1000:.2f} ms")
print(f"Inference Latency (single sample, 95th percentile): {np.percentile(latencies, 95) * 1000:.2f} ms")
print(f"Inference Latency (single sample, 99th percentile): {np.percentile(latencies, 99) * 1000:.2f} ms")
print(f"Inference Throughput (single sample): {num_samples/np.sum(latencies):.2f} FPS")

Model Size on Disk: 261.22 MB
Accuracy: 94.50% (189/200 correct)
Inference Latency (single sample, median): 808.84 ms
Inference Latency (single sample, 95th percentile): 1148.49 ms
Inference Latency (single sample, 99th percentile): 1165.14 ms
Inference Throughput (single sample): 1.13 FPS


### Batch throughput and latency

In [86]:
#Creating batches

def chunk_triplets(triplets, batch_size=10):
    """
    Yield successive batches of `batch_size` triplets from the list.
    """
    for i in range(0, len(triplets), batch_size):
        yield triplets[i : i + batch_size]

# Example: turn first_100 into a list of 10‐triplet batches
batches = list(chunk_triplets(first_100, batch_size=10))
print(f"Got {len(batches)} batches; first batch has {len(batches[0])} triplets")

Got 10 batches; first batch has 10 triplets


In [87]:
import time
import torch.nn.functional as F

def evaluate_triplets_batch(triplets, model, threshold=0.5, batch_size=32, device="cpu"):
    """
    triplets: list of (anchor_path, positive_path, negative_path)
    model:    your loaded face‐embedding model
    threshold: similarity threshold
    batch_size: sub‐batch size for GPU/CPU
    device:   "cpu" or "cuda"
    """
    # 1) Warm up & start timer
    model.to(device).eval()
    start = time.time()

    # 2) Embed all anchors, positives, negatives in big batches
    emb_a, emb_p, emb_n = batch_process_triplets(
        triplets, model, batch_size=batch_size, device=device
    )  # each is (N, D)

    # 3) Cosine‐similarities
    sims_pos = F.cosine_similarity(emb_a, emb_p, dim=1)  # (N,)
    sims_neg = F.cosine_similarity(emb_a, emb_n, dim=1)  # (N,)

    # 4) Compute metrics
    N = len(triplets)
    num_samples = 2 * N
    correct = int((sims_pos >= threshold).sum().item() +
                  (sims_neg <  threshold).sum().item())

    end = time.time()
    time_taken = end - start
    throughput = num_samples / time_taken
    accuracy   = correct / num_samples
    latency    = time_taken / num_samples

    # 5) Print exactly like your original loop
    print("Time taken is ", time_taken)
    print("Accuracy is ", accuracy)
    print("Throughput is ", throughput)
    print("Single sample latency is on avg: ", latency)

    return accuracy, throughput, latency

# ───────────────────────────────────────────────────────────────────────────────
# Example usage with your first_100[:10]:
# acc, tput, lat = evaluate_triplets_batch(first_100[:10], model, threshold=0.5, batch_size=16, device="cpu")


In [88]:
accuracy = 0
latencies = []
num_batches = 0

for idx, batch in enumerate(batches, start=1):
    print(f"→ Evaluating batch #{idx}")
    triplets = [ t[:3] for t in batch ]
    acc, tput, lat = evaluate_triplets_batch(
        triplets,
        model,
        threshold=0.5,
        batch_size=10,   # you could pick a larger sub‐batch for GPU
        device="cpu"
    )
    accuracy += acc
    latencies.append(lat)
    throughput += tput
    num_batches += 1


    print(f"  Batch #{idx} → acc={acc:.3f}, throughput={tput:.1f} img/s, latency={lat*1000:.1f} ms/img")

→ Evaluating batch #1
Time taken is  11.812459230422974
Accuracy is  0.85
Throughput is  1.6931275367698222
Single sample latency is on avg:  0.5906229615211487
  Batch #1 → acc=0.850, throughput=1.7 img/s, latency=590.6 ms/img
→ Evaluating batch #2
Time taken is  11.662468671798706
Accuracy is  1.0
Throughput is  1.714902784550451
Single sample latency is on avg:  0.5831234335899353
  Batch #2 → acc=1.000, throughput=1.7 img/s, latency=583.1 ms/img
→ Evaluating batch #3
Time taken is  11.985369443893433
Accuracy is  1.0
Throughput is  1.6687011688396503
Single sample latency is on avg:  0.5992684721946716
  Batch #3 → acc=1.000, throughput=1.7 img/s, latency=599.3 ms/img
→ Evaluating batch #4
Time taken is  11.745134353637695
Accuracy is  0.95
Throughput is  1.7028327984860907
Single sample latency is on avg:  0.5872567176818848
  Batch #4 → acc=0.950, throughput=1.7 img/s, latency=587.3 ms/img
→ Evaluating batch #5
Time taken is  12.220762252807617
Accuracy is  0.95
Throughput is  1.

In [89]:
accuracy = accuracy/num_batches
throughput = throughput/num_batches


print(f"Model Size on Disk: {model_size/ (1e6) :.2f} MB")
print(f"Accuracy: {accuracy:.2f}%")
print(f"Inference Latency (single sample, median): {np.percentile(latencies, 50) * 1000:.2f} ms")
print(f"Inference Latency (single sample, 95th percentile): {np.percentile(latencies, 95) * 1000:.2f} ms")
print(f"Inference Latency (single sample, 99th percentile): {np.percentile(latencies, 99) * 1000:.2f} ms")
print(f"Batch Throughput: {throughput:.2f} FPS")

Model Size on Disk: 261.22 MB
Accuracy: 0.95%
Inference Latency (single sample, median): 588.94 ms
Inference Latency (single sample, 95th percentile): 605.74 ms
Inference Latency (single sample, 99th percentile): 609.98 ms
Batch Throughput: 1.87 FPS


## ONNX Impelmentation

In [None]:
import os
import time
import cv2
import numpy as np
import onnxruntime as ort

ONNX_PATH = "backbone.onnx"

MODEL_PATH = "backbone.pth"
checkpoint = torch.load(MODEL_PATH, map_location="cpu")
if isinstance(checkpoint, dict):
    model = iresnet100(pretrained=False)
    model.load_state_dict(checkpoint)
else:
    model = checkpoint

model.eval()

dummy = torch.randn(1, 3, 112, 112, dtype=torch.float32)

torch.onnx.export(
    model,
    dummy,
    ONNX_PATH,
    export_params=True,            # store the trained parameter weights inside the model file
    opset_version=17,              # ONNX opset version
    input_names=["input"],         # model's input names
    output_names=["output"],       # model's output names
    dynamic_axes={                 # allow variable batch size
        "input":  {0: "batch"},
        "output": {0: "batch"},
    }
)

print(f"✅ ONNX model saved to {ONNX_PATH}")

# ───────────────────────────────────────────────────────────────────────────────
# 1) Load your ONNX model
# ───────────────────────────────────────────────────────────────────────────────
ONNX_PATH = "model_serve/fastapi_pt/iresnet100.onnx"
sess = ort.InferenceSession(ONNX_PATH, providers=["CPUExecutionProvider"])

# ───────────────────────────────────────────────────────────────────────────────
# 2) Preprocessing → NumPy batches
# ───────────────────────────────────────────────────────────────────────────────
def preprocess_batch(img_paths):
    """
    Read & normalize a list of image files to a NumPy array
    of shape (N, 3, 112, 112), dtype float32.
    """
    batches = []
    for p in img_paths:
        img = cv2.imread(p)
        if img is None:
            raise ValueError(f"Image not found: {p}")
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (112, 112))
        img = img.astype(np.float32)
        img = (img / 255.0 - 0.5) / 0.5
        # HWC → CHW
        t = np.transpose(img, (2, 0, 1))
        batches.append(t)
    return np.stack(batches, axis=0)  # (N,3,112,112)

# ───────────────────────────────────────────────────────────────────────────────
# 3) Single‐pair embed via ONNX
# ───────────────────────────────────────────────────────────────────────────────
def embed_pair_onnx(anchor_path, other_path):
    """
    Batch the anchor & other image → call ONNX → return two (D,) arrays.
    """
    batch_np = preprocess_batch([anchor_path, other_path])  # (2,3,112,112)
    out = sess.run(None, {"input": batch_np})[0]            # (2, D)
    # L2‐normalize each row
    norms = np.linalg.norm(out, axis=1, keepdims=True)
    embs = out / (norms + 1e-8)
    return embs[0], embs[1]

# ───────────────────────────────────────────────────────────────────────────────
# 4) Batch‐triplets embed via ONNX
# ───────────────────────────────────────────────────────────────────────────────
def batch_process_triplets_onnx(triplets, batch_size=32):
    """
    Given list of (anchor, pos, neg) paths, returns three arrays
    of shape (N, D): (emb_a, emb_p, emb_n).
    """
    anchors   = [a for a, p, n in triplets]
    positives = [p for a, p, n in triplets]
    negatives = [n for a, p, n in triplets]

    def embed_list(paths):
        parts = []
        for i in range(0, len(paths), batch_size):
            sub = paths[i : i + batch_size]
            batch_np = preprocess_batch(sub)            # (B,3,112,112)
            out = sess.run(None, {"input": batch_np})[0]  # (B,D)
            norms = np.linalg.norm(out, axis=1, keepdims=True)
            parts.append(out / (norms + 1e-8))
        return np.vstack(parts)

    emb_a = embed_list(anchors)
    emb_p = embed_list(positives)
    emb_n = embed_list(negatives)
    return emb_a, emb_p, emb_n

# ───────────────────────────────────────────────────────────────────────────────
# 5) Chunk your triplets into batches of n
# ───────────────────────────────────────────────────────────────────────────────
def chunk_triplets(triplets, batch_size=10):
    for i in range(0, len(triplets), batch_size):
        yield triplets[i : i + batch_size]

# ───────────────────────────────────────────────────────────────────────────────
# 6) Evaluate one batch of triplets
# ───────────────────────────────────────────────────────────────────────────────
def evaluate_triplets_batch_onnx(triplets, threshold=0.5, batch_size=32):
    """
    triplets: list of (anchor, pos, neg)
    Returns (accuracy, throughput_FPS, avg_latency_s) for that batch.
    """
    N = len(triplets)
    if N == 0:
        return 0.0, 0.0, 0.0

    t0 = time.time()
    emb_a, emb_p, emb_n = batch_process_triplets_onnx(triplets, batch_size)
    sims_pos = np.sum(emb_a * emb_p, axis=1)  # cosine = dot since L2-normed
    sims_neg = np.sum(emb_a * emb_n, axis=1)

    correct = np.count_nonzero(sims_pos >= threshold) + \
              np.count_nonzero(sims_neg < threshold)
    total   = 2 * N

    t1 = time.time()
    elapsed = t1 - t0
    throughput = total / elapsed
    avg_latency = elapsed / total
    accuracy    = correct / total

    print(f"Batch of {N} triplets → time: {elapsed:.3f}s, "
          f"throughput: {throughput:.1f} img/s, "
          f"latency: {avg_latency*1000:.1f}ms/img, "
          f"acc: {accuracy*100:.2f}% ({correct}/{total})")
    return accuracy, throughput, avg_latency

# ───────────────────────────────────────────────────────────────────────────────
# 7) Main loop: chunk into 10s and evaluate
# ───────────────────────────────────────────────────────────────────────────────
# Suppose first_100 is already defined: list of (a,p,n, …) tuples
triplets_all = [t[:3] for t in first_100]  # discard extra fields

model_size = os.path.getsize(ONNX_PATH)
print(f"Model Size on Disk: {model_size/1e6:.2f} MB")

all_acc = []
all_throughput = []
all_latencies = []

for idx, batch in enumerate(chunk_triplets(triplets_all, batch_size=10), 1):
    print(f"\n→ Evaluating batch #{idx}")
    acc, tput, lat = evaluate_triplets_batch_onnx(batch, threshold=0.5, batch_size=10)
    all_acc.append(acc)
    all_throughput.append(tput)
    all_latencies.append(lat)

# Aggregate over all batches
mean_acc = np.mean(all_acc) * 100
median_lat = np.median(all_latencies) * 1000
p95_lat   = np.percentile(all_latencies, 95) * 1000
p99_lat   = np.percentile(all_latencies, 99) * 1000
mean_tput = np.mean(all_throughput)

print("\n=== Summary ===")
print(f"Model Size on Disk: {model_size/1e6:.2f} MB")
print(f"Accuracy: {mean_acc:.2f}%")
print(f"Inference Latency (single sample, median): {median_lat:.2f} ms")
print(f"Inference Latency (single sample, 95th percentile): {p95_lat:.2f} ms")
print(f"Inference Latency (single sample, 99th percentile): {p99_lat:.2f} ms")
print(f"Inference Throughput (single sample): {mean_tput:.2f} FPS")
