In [1]:
#om gan ganapathaye namah om namah shivaya

In [3]:
import os
import numpy as np
import pandas as pd
import cv2
from tqdm import tqdm # Library for progress bars (must be installed: pip install tqdm)

# ========= CONFIG (50k Limits Confirmed) =========
INDEX_CSV    = r"D:\5th sem\mini_project\dataset\socofing_index_features.csv"
PAIRS_VAL    = r"D:\5th sem\mini_project\dataset\pairs\pairs_val.csv"
PAIRS_TEST   = r"D:\5th sem\mini_project\dataset\pairs\pairs_test.csv"

# From previous SNN evaluation step
SNN_NPZ      = r"D:\5th sem\mini_project\models\snn_eval_distances.npz"

# VITAL: Limits set to 50k for fast CPU run
MAX_VAL_PAIRS  = 50000 
MAX_TEST_PAIRS = 50000 

RATIO_THRESH = 0.75
RANDOM_SEED  = 42


# ========= STEP 1: BUILD agg_path -> des_path MAP =========
def build_agg_to_des_map(index_csv):
    df_idx = pd.read_csv(index_csv)
    assert "agg_path" in df_idx.columns and "des_path" in df_idx.columns, \
        "Index CSV must have agg_path and des_path columns"
    mapping = dict(zip(df_idx["agg_path"], df_idx["des_path"]))
    print(f"Index map built for {len(mapping)} aggregated feature files.")
    return mapping


# ========= STEP 2: ELASTIC SCORE (SIFT + BF + RATIO TEST) =========
def compute_elastic_score(des1, des2, ratio_thresh=0.75):
    """
    des1, des2: numpy arrays [N1, 128], [N2, 128] (float32)
    Returns normalized elastic score in [0,1].
    """
    if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
        return 0.0

    des1 = des1.astype(np.float32)
    des2 = des2.astype(np.float32)

    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < ratio_thresh * n.distance:
            good.append(m)

    good_count = len(good)
    denom = max(len(des1), len(des2))
    if denom == 0:
        return 0.0

    return good_count / float(denom)


# ========= STEP 3: COMPUTE ELASTIC SCORES FOR A PAIR CSV (FIXED) =========
def compute_elastic_for_pairs(pairs_csv, agg_to_des_map, max_pairs=None):
    df_pairs = pd.read_csv(pairs_csv)
    n_total = len(df_pairs)
    csv_name = os.path.basename(pairs_csv)

    if max_pairs is not None and max_pairs < n_total:
        # FIX: Sample the DataFrame, but DO NOT call .reset_index(drop=True).
        # This ensures the sampled DataFrame retains the ORIGINAL row indices.
        df_pairs = df_pairs.sample(n=max_pairs, random_state=RANDOM_SEED)
        print(f"{csv_name}: sampled {len(df_pairs)}/{n_total} pairs for elastic evaluation.")
    else:
        print(f"{csv_name}: using all {n_total} pairs for elastic evaluation.")

    # idxs_out now correctly holds the ORIGINAL row indices from the full CSV, 
    # which is necessary for indexing into the SNN NPZ file.
    idxs_out = df_pairs.index.to_numpy(dtype=np.int64) 
    
    labels = df_pairs["label"].to_numpy(dtype=np.float32)

    des_cache = {}
    scores = []
    
    # Initialize the progress bar using tqdm
    tqdm_iter = tqdm(df_pairs.iterrows(), 
                     total=len(df_pairs), 
                     desc=f"Processing {csv_name}",
                     unit=" pairs")

    for idx, row in tqdm_iter:
        agg1 = row["agg_path_1"]
        agg2 = row["agg_path_2"]

        des_path_1 = agg_to_des_map.get(agg1, None)
        des_path_2 = agg_to_des_map.get(agg2, None)

        if des_path_1 is None or des_path_2 is None:
            scores.append(0.0)
            continue

        # Load descriptors with caching
        if des_path_1 not in des_cache:
            if os.path.exists(des_path_1):
                des_cache[des_path_1] = np.load(des_path_1)
            else:
                des_cache[des_path_1] = None

        if des_path_2 not in des_cache:
            if os.path.exists(des_path_2):
                des_cache[des_path_2] = np.load(des_path_2)
            else:
                des_cache[des_path_2] = None

        des1 = des_cache[des_path_1]
        des2 = des_cache[des_path_2]

        score = compute_elastic_score(des1, des2, ratio_thresh=RATIO_THRESH)
        scores.append(score)

    scores = np.array(scores, dtype=np.float32)
    print(f"\nFinished {csv_name} elastic scoring. Pairs: {len(scores)}; labels: {len(labels)}")
    
    # Return the correct original indices for fusion
    return scores, labels, idxs_out


# ========= STEP 4: CALIBRATE THRESHOLD (ELASTIC ONLY) (Unchanged) =========
def calibrate_threshold(scores, labels, mode="elastic"):
    s_min, s_max = float(scores.min()), float(scores.max())
    print(f"{mode} score range: [{s_min:.4f}, {s_max:.4f}]")

    best_acc = 0.0
    best_thr = None

    thresholds = np.linspace(s_min, s_max, num=200)

    for thr in thresholds:
        preds = (scores >= thr).astype(np.float32)
        correct = (preds == labels).sum()
        acc = correct / len(labels)

        if acc > best_acc:
            best_acc = acc
            best_thr = thr

    pos_mask = (labels == 1)
    neg_mask = (labels == 0)

    tpr = ((scores[pos_mask] >= best_thr).sum() / pos_mask.sum())
    fpr = ((scores[neg_mask] >= best_thr).sum() / neg_mask.sum())

    print(f"{mode.upper()} best_thr={best_thr:.4f}, acc={best_acc:.4f}, "
          f"TPR={tpr:.4f}, FPR={fpr:.4f}")
    return best_thr, best_acc, tpr, fpr


# ========= STEP 5: MAIN PIPELINE (Unchanged except for the necessary fixes) =========
def main():
    # --- mapping agg_path -> des_path ---
    agg_to_des = build_agg_to_des_map(INDEX_CSV)

    # --- elastic scores on VAL and TEST ---
    print("\n=== Elastic matching on VAL set ===")
    elastic_val, labels_val, idxs_val = compute_elastic_for_pairs(
        PAIRS_VAL, agg_to_des, max_pairs=MAX_VAL_PAIRS
    )

    print("\n=== Elastic matching on TEST set ===")
    elastic_test, labels_test, idxs_test = compute_elastic_for_pairs(
        PAIRS_TEST, agg_to_des, max_pairs=MAX_TEST_PAIRS
    )

    # --- calibrate elastic-only threshold on VAL ---
    print("\n=== Calibrating ELASTIC-ONLY threshold on VAL ===")
    best_thr_el, best_acc_el, tpr_el, fpr_el = calibrate_threshold(elastic_val, labels_val, mode="elastic_val")

    print("\n=== Evaluating ELASTIC-ONLY on TEST at VAL threshold ===")
    preds_test_el = (elastic_test >= best_thr_el).astype(np.float32)
    correct_test_el = (preds_test_el == labels_test).sum()
    acc_test_el = correct_test_el / len(labels_test)

    pos_mask_t = (labels_test == 1)
    neg_mask_t = (labels_test == 0)

    tpr_t_el = ((elastic_test[pos_mask_t] >= best_thr_el).sum() / pos_mask_t.sum())
    fpr_t_el = ((elastic_test[neg_mask_t] >= best_thr_el).sum() / neg_mask_t.sum())

    print(f"ELASTIC TEST @thr={best_thr_el:.4f} -> "
          f"acc={acc_test_el:.4f}, TPR={tpr_t_el:.4f}, FPR={fpr_t_el:.4f}")

    # --- Load SNN distances for fusion ---
    print("\n=== Loading SNN distances for fusion ===")
    snn_data = np.load(SNN_NPZ)
    val_dist_all   = snn_data["val_dist"]
    val_labels_all = snn_data["val_labels"]
    test_dist_all  = snn_data["test_dist"]
    test_labels_all= snn_data["test_labels"]

    # SANITY CHECK: This assertion should now PASS because idxs_val/idxs_test 
    # contain the correct original row indices.
    assert np.allclose(labels_val,    val_labels_all[idxs_val]),  "VAL labels mismatch between CSV and SNN npz"
    assert np.allclose(labels_test,  test_labels_all[idxs_test]),"TEST labels mismatch between CSV and SNN npz"

    # convert SNN distances to similarity in [0,1]
    snn_sim_val  = 1.0 / (1.0 + val_dist_all[idxs_val])
    snn_sim_test = 1.0 / (1.0 + test_dist_all[idxs_test])

    # simple fusion: average of SNN similarity and elastic score
    fusion_val  = 0.5 * snn_sim_val  + 0.5 * elastic_val
    fusion_test = 0.5 * snn_sim_test + 0.5 * elastic_test

    print("\n=== Calibrating FUSION threshold on VAL ===")
    best_thr_fus, best_acc_fus, tpr_fus, fpr_fus = calibrate_threshold(fusion_val, labels_val, mode="fusion_val")

    print("\n=== Evaluating FUSION on TEST at VAL threshold ===")
    preds_test_fus = (fusion_test >= best_thr_fus).astype(np.float32)
    correct_test_fus = (preds_test_fus == labels_test).sum()
    acc_test_fus = correct_test_fus / len(labels_test)

    pos_mask_t = (labels_test == 1)
    neg_mask_t = (labels_test == 0)

    tpr_t_fus = ((fusion_test[pos_mask_t] >= best_thr_fus).sum() / pos_mask_t.sum())
    fpr_t_fus = ((fusion_test[neg_mask_t] >= best_thr_fus).sum() / neg_mask_t.sum())

    print(f"FUSION TEST @thr={best_thr_fus:.4f} -> "
          f"acc={acc_test_fus:.4f}, TPR={tpr_t_fus:.4f}, FPR={fpr_t_fus:.4f}")

    # --- save scores for later analysis ---
    OUT_NPZ = r"D:\5th sem\mini_project\models\elastic_fusion_scores.npz"
    np.savez(
        OUT_NPZ,
        elastic_val=elastic_val,
        labels_val=labels_val,
        elastic_test=elastic_test,
        labels_test=labels_test,
        fusion_val=fusion_val,
        fusion_test=fusion_test,
        idxs_val=idxs_val,
        idxs_test=idxs_test,
        best_thr_el=best_thr_el,
        best_thr_fus=best_thr_fus,
    )
    print(f"\nSaved elastic and fusion scores to {OUT_NPZ}")


if __name__ == "__main__":
    main()

Index map built for 55270 aggregated feature files.

=== Elastic matching on VAL set ===
pairs_val.csv: sampled 50000/641121 pairs for elastic evaluation.


Processing pairs_val.csv: 100%|██████████████████████████████████████████████| 50000/50000 [46:55<00:00, 17.76 pairs/s]



Finished pairs_val.csv elastic scoring. Pairs: 50000; labels: 50000

=== Elastic matching on TEST set ===
pairs_test.csv: sampled 50000/665673 pairs for elastic evaluation.


Processing pairs_test.csv: 100%|█████████████████████████████████████████████| 50000/50000 [54:54<00:00, 15.18 pairs/s]



Finished pairs_test.csv elastic scoring. Pairs: 50000; labels: 50000

=== Calibrating ELASTIC-ONLY threshold on VAL ===
elastic_val score range: [0.0013, 1.0000]
ELASTIC_VAL best_thr=0.0214, acc=0.7812, TPR=0.5655, FPR=0.0368

=== Evaluating ELASTIC-ONLY on TEST at VAL threshold ===
ELASTIC TEST @thr=0.0214 -> acc=0.7803, TPR=0.5556, FPR=0.0316

=== Loading SNN distances for fusion ===

=== Calibrating FUSION threshold on VAL ===
fusion_val score range: [0.1760, 1.0000]
FUSION_VAL best_thr=0.3334, acc=0.8397, TPR=0.8113, FPR=0.1364

=== Evaluating FUSION on TEST at VAL threshold ===
FUSION TEST @thr=0.3334 -> acc=0.8379, TPR=0.8026, FPR=0.1325

Saved elastic and fusion scores to D:\5th sem\mini_project\models\elastic_fusion_scores.npz


In [1]:
import os
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import cv2

# ========== CONFIG ==========
INDEX_CSV = r"D:\5th sem\mini_project\dataset\socofing_index_features.csv"
MODEL_PATH = r"D:\5th sem\mini_project\models\snn_sift_agg_best.pt"

# Where full descriptors are already saved (used via des_path in CSV)
# e.g., D:\5th sem\mini_project\dataset\NPYSF_full_des\...

# 1:N evaluation settings
# 1:N evaluation settings
ALTER_LEVEL_PROBE = "hard"     # keep as hard for forensic difficulty
MAX_PROBES = 200               # start with 200 probes to run fast
TOP_K1 = 100                   # refine top 100 candidates per probe
TOP_RANKS = [1, 5, 10, 20]     # metrics you care about
RATIO_THRESH = 0.75            # keep this
SNN_WEIGHT  = 0.5
EL_WEIGHT   = 0.5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


# ========== MODEL (same as training) ==========
class SiameseBranch(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, embedding_dim),
        )

    def forward(self, x):
        return self.net(x)


class SiameseNetworkEmbedding(nn.Module):
    """
    Single-branch version: take one 128-D vector and output embedding.
    We reuse the same branch as in the Siamese model.
    """
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.branch = SiameseBranch(input_dim, embedding_dim)

    def forward(self, x):
        return self.branch(x)


# ========== ELASTIC MATCHING (SIFT descriptor-based) ==========
def compute_elastic_score(des1, des2, ratio_thresh=0.75):
    """
    des1, des2: numpy arrays [N1, 128], [N2, 128] (float32)
    Returns normalized elastic score in [0,1].
    """
    if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
        return 0.0

    des1 = des1.astype(np.float32)
    des2 = des2.astype(np.float32)

    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < ratio_thresh * n.distance:
            good.append(m)

    good_count = len(good)
    denom = max(len(des1), len(des2))
    if denom == 0:
        return 0.0

    return good_count / float(denom)


# ========== LOAD DATA (GALLERY & PROBES) ==========
def load_index_and_split(index_csv, alter_level_probe="hard", max_probes=None):
    df = pd.read_csv(index_csv)

    # Gallery: all real (unaltered) fingerprints
    gallery_df = df[df["real_or_altered"].str.lower() == "real"].copy()

    # Probes: altered with requested level
    if alter_level_probe == "all":
        probes_df = df[df["real_or_altered"].str.lower() == "altered"].copy()
    else:
        probes_df = df[
            (df["real_or_altered"].str.lower() == "altered") &
            (df["alter_level"].str.lower() == alter_level_probe.lower())
        ].copy()

    if max_probes is not None and max_probes < len(probes_df):
        probes_df = probes_df.sample(n=max_probes, random_state=42).reset_index(drop=True)

    print(f"Gallery size: {len(gallery_df)} (real fingerprints)")
    print(f"Probes size:  {len(probes_df)} (altered={alter_level_probe})")

    return gallery_df.reset_index(drop=True), probes_df.reset_index(drop=True)


# ========== PRECOMPUTE SNN EMBEDDINGS FOR GALLERY ==========
def compute_gallery_embeddings(model, gallery_df):
    """
    Loads all agg vectors for gallery, computes embeddings.
    Returns:
      gallery_feats: [N_gallery, 128] tensor
      gallery_embs:  [N_gallery, emb_dim] tensor
    """
    agg_paths = gallery_df["agg_path"].tolist()
    feats = []
    for p in agg_paths:
        v = np.load(p).astype(np.float32)  # [128]
        feats.append(v)

    feats = np.stack(feats, axis=0)  # [N_gallery, 128]
    feats_t = torch.from_numpy(feats).to(device)

    with torch.no_grad():
        embs = model(feats_t)  # [N_gallery, emb_dim]

    print(f"Gallery embeddings shape: {embs.shape}")
    return feats_t.cpu(), embs.cpu()


# ========== LOAD DESCRIPTORS INTO CACHE FOR GALLERY ==========
def build_gallery_descriptor_cache(gallery_df):
    """
    Prepares a dict index -> descriptors for gallery.
    Each entry is loaded on first use (lazy caching).
    """
    des_paths = gallery_df["des_path"].tolist()
    cache = {}
    return des_paths, cache


# ========== 1:N MATCHING FOR A SINGLE PROBE ==========
def identify_probe(
    probe_row,
    gallery_df,
    gallery_embs,
    snn_model,
    des_paths_gallery,
    des_cache,
    top_k1=200,
    ratio_thresh=0.75,
    snn_weight=0.5,
    el_weight=0.5
):
    """
    For a single probe fingerprint:
      - compute its embedding
      - compute SNN distances to all gallery
      - select TOP_K1 by SNN similarity
      - for those, compute elastic_score
      - fuse SNN similarity and elastic_score
      - return sorted candidate indices and fused scores
    """
    # --- SNN embedding of probe ---
    v_probe = np.load(probe_row["agg_path"]).astype(np.float32)  # [128]
    v_probe_t = torch.from_numpy(v_probe).unsqueeze(0).to(device)  # [1,128]

    with torch.no_grad():
        z_probe = snn_model(v_probe_t)  # [1,emb_dim]
    z_probe = z_probe.cpu()  # [1,emb_dim]

    # --- SNN distances to gallery (vectorized) ---
    # gallery_embs: [N_gallery, emb_dim]
    diff = gallery_embs - z_probe  # [N_gallery, emb_dim]
    dists = torch.norm(diff, dim=1).numpy()  # [N_gallery]

    # convert to similarity in [0,1]: s = 1/(1+d)
    snn_sim = 1.0 / (1.0 + dists)  # [N_gallery]

    # --- top-K1 candidates by SNN similarity ---
    N_gallery = len(gallery_df)
    k1 = min(top_k1, N_gallery)
    # argsort descending by similarity
    idx_sorted_snn = np.argsort(-snn_sim)  # high to low
    cand_idx = idx_sorted_snn[:k1]

    # --- elastic matching for those K1 candidates ---
    elastic_scores = np.zeros(k1, dtype=np.float32)

    # Load probe descriptors
    des_path_probe = probe_row["des_path"]
    if os.path.exists(des_path_probe):
        des_probe = np.load(des_path_probe).astype(np.float32)
    else:
        des_probe = None

    for i, g_idx in enumerate(cand_idx):
        des_path_g = des_paths_gallery[g_idx]

        if des_path_g not in des_cache:
            if os.path.exists(des_path_g):
                des_cache[des_path_g] = np.load(des_path_g).astype(np.float32)
            else:
                des_cache[des_path_g] = None

        des_g = des_cache[des_path_g]

        elastic_scores[i] = compute_elastic_score(des_probe, des_g, ratio_thresh=ratio_thresh)

    # --- fuse scores ---
    snn_sim_top = snn_sim[cand_idx]  # [K1]

    # optional: normalize elastic scores to [0,1] (already approximately [0,1])
    # fusion: weighted average
    fused_scores = snn_weight * snn_sim_top + el_weight * elastic_scores

    # sort candidates by fused_scores descending
    order = np.argsort(-fused_scores)
    final_idx = cand_idx[order]
    final_scores = fused_scores[order]

    return final_idx, final_scores


# ========== 1:N EVALUATION ==========
def evaluate_identification(
    gallery_df,
    probes_df,
    gallery_embs,
    snn_model,
    des_paths_gallery,
    des_cache,
    top_k1=200,
    top_ranks=(1, 5, 10, 20),
    ratio_thresh=0.75,
    snn_weight=0.5,
    el_weight=0.5
):
    """
    For each probe, perform 1:N identification and measure
    rank-1, rank-5, rank-10, etc. identification rates.
    """
    total = len(probes_df)
    rank_hits = {k: 0 for k in top_ranks}

    for idx_probe, probe_row in probes_df.iterrows():
        true_subj = probe_row["subject_id"]

        # get ranking of gallery indices by fused score
        cand_indices, fused_scores = identify_probe(
            probe_row,
            gallery_df,
            gallery_embs,
            snn_model,
            des_paths_gallery,
            des_cache,
            top_k1=top_k1,
            ratio_thresh=ratio_thresh,
            snn_weight=snn_weight,
            el_weight=el_weight
        )

        # convert gallery indices to subject IDs
        subj_candidates = gallery_df.loc[cand_indices, "subject_id"].to_numpy()

        # find the first occurrence (rank) of true_subj in subj_candidates
        ranks = np.where(subj_candidates == true_subj)[0]
        if len(ranks) > 0:
            r = int(ranks[0]) + 1  # rank position (1-based)
        else:
            r = None  # not found in top-K1

        # update hit counts for each top_k
        for k in top_ranks:
            if r is not None and r <= k:
                rank_hits[k] += 1

        if (idx_probe + 1) % 50 == 0 or idx_probe == total - 1:
            print(f"Processed {idx_probe+1}/{total} probes...")

    # compute identification rates
    for k in top_ranks:
        rate = rank_hits[k] / total
        print(f"Rank-{k} identification rate: {rate:.4f}  ({rank_hits[k]}/{total})")

    return rank_hits


# ========== MAIN ==========
def main():
    # 1) Load index and split gallery / probes
    gallery_df, probes_df = load_index_and_split(INDEX_CSV, alter_level_probe=ALTER_LEVEL_PROBE, max_probes=MAX_PROBES)

    # 2) Load SNN embedding model
    snn_model = SiameseNetworkEmbedding(input_dim=128, embedding_dim=128).to(device)
    state = torch.load(MODEL_PATH, map_location=device)
    # state is from SiameseNetwork with .branch, so extract matching keys
    # If saved with SiameseNetwork, keys are like "branch.net.0.weight", etc.
    # This will still load fine into SiameseNetworkEmbedding.branch
    snn_model.load_state_dict(state, strict=False)
    snn_model.eval()
    print(f"Loaded SNN embedding model from {MODEL_PATH}")

    # 3) Precompute gallery embeddings
    gallery_feats, gallery_embs = compute_gallery_embeddings(snn_model, gallery_df)
    gallery_embs = gallery_embs.to(device)

    # 4) Prepare gallery descriptor cache
    des_paths_gallery, des_cache = build_gallery_descriptor_cache(gallery_df)

    # 5) Run 1:N evaluation
    print("\n=== 1:N Identification Evaluation ===")
    evaluate_identification(
        gallery_df,
        probes_df,
        gallery_embs,
        snn_model,
        des_paths_gallery,
        des_cache,
        top_k1=TOP_K1,
        top_ranks=TOP_RANKS,
        ratio_thresh=RATIO_THRESH,
        snn_weight=SNN_WEIGHT,
        el_weight=EL_WEIGHT
    )


if __name__ == "__main__":
    main()


Using device: cpu
Gallery size: 12000 (real fingerprints)
Probes size:  200 (altered=hard)
Loaded SNN embedding model from D:\5th sem\mini_project\models\snn_sift_agg_best.pt
Gallery embeddings shape: torch.Size([12000, 128])

=== 1:N Identification Evaluation ===
Processed 50/200 probes...
Processed 100/200 probes...
Processed 150/200 probes...
Processed 200/200 probes...
Rank-1 identification rate: 0.6600  (132/200)
Rank-5 identification rate: 0.6650  (133/200)
Rank-10 identification rate: 0.6650  (133/200)
Rank-20 identification rate: 0.6700  (134/200)


In [11]:
import os
import numpy as np
import pandas as pd
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F

# ======================================================
#                 EDIT THESE PATHS
# ======================================================
INDEX_CSV = r"D:\5th sem\mini_project\dataset\socofing_index_features.csv"
MODEL_PATH = r"D:\5th sem\mini_project\models\snn_sift_agg_best.pt"

# Query fingerprint (crime-scene sample)
QUERY_IMAGE = r"D:\5th sem\mini_project\dataset\SOCOFing\Altered\Altered-Medium\1__M_Left_index_finger_Obl.BMP"  # <-- change to your test image path

# Size and preprocessing must match your SIFT pipeline
IMG_SIZE = (512, 512)

TOP_K1 = 200   # SNN preselection size for elastic refinement
TOP_K  = 3    # final number of suspects shown

RATIO_THRESH = 0.75  # Lowe ratio for elastic matching
SNN_WEIGHT   = 0.5   # fusion weights
EL_WEIGHT    = 0.5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


# ======================================================
#         MODEL (same architecture as training)
# ======================================================
class SiameseBranch(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, embedding_dim),
        )

    def forward(self, x):
        return self.net(x)


class SiameseNetworkEmbedding(nn.Module):
    """Single-branch encoder to get embedding from 128-D feature."""
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.branch = SiameseBranch(input_dim, embedding_dim)

    def forward(self, x):
        return self.branch(x)


# ======================================================
#            SIFT PREPROCESSING FOR QUERY
# ======================================================
def preprocess_image(img_path):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise RuntimeError(f"Cannot read image: {img_path}")
    img = cv2.resize(img, IMG_SIZE)
    img = cv2.equalizeHist(img)
    return img


def compute_query_features(img_path):
    """
    Compute:
      - agg_vector: 128-D mean SIFT descriptor (for SNN)
      - des_full: full [N,128] SIFT descriptors (for elastic)
    """
    img = preprocess_image(img_path)
    sift = cv2.SIFT_create()
    kps, des = sift.detectAndCompute(img, None)

    if des is None or len(des) == 0:
        # no descriptors: return zero agg and None full
        agg_vec = np.zeros(128, dtype=np.float32)
        return agg_vec, None

    # mean descriptor, center and L2 normalize (same as your pipeline)
    v = des.astype(np.float32).mean(axis=0)
    v = v - v.mean()
    n = np.linalg.norm(v)
    if n < 1e-12:
        agg_vec = np.zeros(128, dtype=np.float32)
    else:
        agg_vec = (v / n).astype(np.float32)

    return agg_vec, des.astype(np.float32)


# ======================================================
#          ELASTIC SIFT MATCHING SCORE
# ======================================================
def compute_elastic_score(des1, des2, ratio_thresh=0.75):
    """Lowe's ratio test, normalized good matches count ∈ [0,1]."""
    if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
        return 0.0

    des1 = des1.astype(np.float32)
    des2 = des2.astype(np.float32)

    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < ratio_thresh * n.distance:
            good.append(m)

    good_count = len(good)
    denom = max(len(des1), len(des2))
    if denom == 0:
        return 0.0
    return good_count / float(denom)


# ======================================================
#         LOAD GALLERY (POPULATION) FROM CSV
# ======================================================
def load_gallery(index_csv):
    df = pd.read_csv(index_csv)

    # Example: use all REAL fingerprints as gallery (population)
    gallery_df = df[df["real_or_altered"].str.lower() == "real"].copy()
    gallery_df.reset_index(drop=True, inplace=True)

    print("Gallery population size (real fingerprints):", len(gallery_df))
    return gallery_df


def load_gallery_agg_features(gallery_df):
    agg_paths = gallery_df["agg_path"].tolist()
    feats = []
    for p in agg_paths:
        v = np.load(p).astype(np.float32)  # [128]
        feats.append(v)
    feats = np.stack(feats, axis=0)  # [N_gallery, 128]
    return torch.from_numpy(feats).to(device)


# ======================================================
#      MAIN SEARCH: SINGLE QUERY vs FULL POPULATION
# ======================================================
def main():
    # 1) Load gallery metadata
    gallery_df = load_gallery(INDEX_CSV)

    # 2) Load SNN embedding model
    snn_model = SiameseNetworkEmbedding(input_dim=128, embedding_dim=128).to(device)
    state = torch.load(MODEL_PATH, map_location=device)
    snn_model.load_state_dict(state, strict=False)
    snn_model.eval()
    print("Loaded SNN embedding model from:", MODEL_PATH)

    # 3) Precompute gallery embeddings
    gallery_feats = load_gallery_agg_features(gallery_df)  # [N_gallery,128]
    with torch.no_grad():
        gallery_embs = snn_model(gallery_feats).cpu().numpy()  # [N_gallery,emb_dim]
    gallery_feats = gallery_feats.cpu().numpy()

    # 4) Compute query features (agg + full SIFT descriptors)
    print("\nComputing features for query image:", QUERY_IMAGE)
    q_agg, q_des = compute_query_features(QUERY_IMAGE)
    q_agg_t = torch.from_numpy(q_agg).unsqueeze(0).to(device)  # [1,128]
    with torch.no_grad():
        q_emb = snn_model(q_agg_t).cpu().numpy()[0]  # [emb_dim]

    # 5) SNN similarity to all gallery
    dists = np.linalg.norm(gallery_embs - q_emb, axis=1)      # [N_gallery]
    snn_sim = 1.0 / (1.0 + dists)                             # similarity ∈ (0,1]

    # 6) Take top-K1 by SNN similarity
    N_gallery = len(gallery_df)
    k1 = min(TOP_K1, N_gallery)
    idx_sorted = np.argsort(-snn_sim)     # descending similarity
    cand_idx = idx_sorted[:k1]           # candidate indices in gallery

    # 7) Elastic matching on top-K1
    des_cache = {}
    elastic_scores = np.zeros(k1, dtype=np.float32)

    if q_des is None:
        print("Warning: query has no SIFT descriptors; elastic scores will be zero.")

    for i, g_idx in enumerate(cand_idx):
        des_path_g = gallery_df.loc[g_idx, "des_path"]
        if des_path_g not in des_cache:
            if os.path.exists(des_path_g):
                des_cache[des_path_g] = np.load(des_path_g).astype(np.float32)
            else:
                des_cache[des_path_g] = None

        des_g = des_cache[des_path_g]
        elastic_scores[i] = compute_elastic_score(q_des, des_g, ratio_thresh=RATIO_THRESH)

    # 8) Fuse SNN similarity and elastic scores
    snn_sim_top = snn_sim[cand_idx]
    fused = SNN_WEIGHT * snn_sim_top + EL_WEIGHT * elastic_scores

    # 9) Sort final candidates by fused score
    order = np.argsort(-fused)
    final_idx = cand_idx[order]
    final_scores = fused[order]

    # 10) Print top-K results with subject IDs and info
    K = min(TOP_K, len(final_idx))
    print("\n================= TOP MATCHES (FUSED) =================")
    for rank in range(K):
        g_idx = final_idx[rank]
        row = gallery_df.loc[g_idx]

        subj = row["subject_id"]
        finger = row["finger"]
        img_path = row["image_path"]
        s_snn = snn_sim[g_idx]
        s_el  = elastic_scores[order[rank]]

        print(f"{rank+1}. subject={subj}, finger={finger}, "
              f"fused_score={final_scores[rank]:.4f}, "
              f"SNN_sim={s_snn:.4f}, elastic={s_el:.4f}")
        print(f"     image_path: {img_path}")

    print("\nDone. This list is your 'prospective criminals' list for this query.")


if __name__ == "__main__":
    main()


Using device: cpu
Gallery population size (real fingerprints): 12000
Loaded SNN embedding model from: D:\5th sem\mini_project\models\snn_sift_agg_best.pt

Computing features for query image: D:\5th sem\mini_project\dataset\SOCOFing\Altered\Altered-Medium\1__M_Left_index_finger_Obl.BMP

1. subject=1, finger=index_finger, fused_score=0.7715, SNN_sim=0.8765, elastic=0.6664
     image_path: D:\5th sem\mini_project\dataset\SOCOFing\Real\1__M_Left_index_finger.BMP
2. subject=1, finger=index_finger, fused_score=0.7715, SNN_sim=0.8765, elastic=0.6664
     image_path: D:\5th sem\mini_project\dataset\SOCOFing\Real\1__M_Left_index_finger.BMP
3. subject=359, finger=middle_finger, fused_score=0.4509, SNN_sim=0.8886, elastic=0.0133
     image_path: D:\5th sem\mini_project\dataset\SOCOFing\Real\359__M_Right_middle_finger.BMP

Done. This list is your 'prospective criminals' list for this query.


In [13]:
import os
import numpy as np
import pandas as pd
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F

# ======================================================
#                 EDIT THESE PATHS
# ======================================================
INDEX_CSV = r"D:\5th sem\mini_project\dataset\socofing_index_features.csv"
MODEL_PATH = r"D:\5th sem\mini_project\models\snn_sift_agg_best.pt"

# Query fingerprint (crime-scene sample)
QUERY_IMAGE = r"D:\Desktop\new\images\starbucks-logo-png-transparent-0.png"  # <-- change to your test image path

# Size and preprocessing must match your SIFT pipeline
IMG_SIZE = (512, 512)

TOP_K1 = 200   # SNN preselection size for elastic refinement
TOP_K  = 3    # final number of suspects shown

RATIO_THRESH = 0.75  # Lowe ratio for elastic matching
SNN_WEIGHT   = 0.5   # fusion weights
EL_WEIGHT    = 0.5

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


# ======================================================
#         MODEL (same architecture as training)
# ======================================================
class SiameseBranch(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, embedding_dim),
        )

    def forward(self, x):
        return self.net(x)


class SiameseNetworkEmbedding(nn.Module):
    """Single-branch encoder to get embedding from 128-D feature."""
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.branch = SiameseBranch(input_dim, embedding_dim)

    def forward(self, x):
        return self.branch(x)


# ======================================================
#            SIFT PREPROCESSING FOR QUERY
# ======================================================
def preprocess_image(img_path):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise RuntimeError(f"Cannot read image: {img_path}")
    img = cv2.resize(img, IMG_SIZE)
    img = cv2.equalizeHist(img)
    return img


def compute_query_features(img_path):
    """
    Compute:
      - agg_vector: 128-D mean SIFT descriptor (for SNN)
      - des_full: full [N,128] SIFT descriptors (for elastic)
    """
    img = preprocess_image(img_path)
    sift = cv2.SIFT_create()
    kps, des = sift.detectAndCompute(img, None)

    if des is None or len(des) == 0:
        # no descriptors: return zero agg and None full
        agg_vec = np.zeros(128, dtype=np.float32)
        return agg_vec, None

    # mean descriptor, center and L2 normalize (same as your pipeline)
    v = des.astype(np.float32).mean(axis=0)
    v = v - v.mean()
    n = np.linalg.norm(v)
    if n < 1e-12:
        agg_vec = np.zeros(128, dtype=np.float32)
    else:
        agg_vec = (v / n).astype(np.float32)

    return agg_vec, des.astype(np.float32)


# ======================================================
#          ELASTIC SIFT MATCHING SCORE
# ======================================================
def compute_elastic_score(des1, des2, ratio_thresh=0.75):
    """Lowe's ratio test, normalized good matches count ∈ [0,1]."""
    if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
        return 0.0

    des1 = des1.astype(np.float32)
    des2 = des2.astype(np.float32)

    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < ratio_thresh * n.distance:
            good.append(m)

    good_count = len(good)
    denom = max(len(des1), len(des2))
    if denom == 0:
        return 0.0
    return good_count / float(denom)


# ======================================================
#         LOAD GALLERY (POPULATION) FROM CSV
# ======================================================
def load_gallery(index_csv):
    df = pd.read_csv(index_csv)

    # Example: use all REAL fingerprints as gallery (population)
    gallery_df = df[df["real_or_altered"].str.lower() == "real"].copy()
    gallery_df.reset_index(drop=True, inplace=True)

    print("Gallery population size (real fingerprints):", len(gallery_df))
    return gallery_df


def load_gallery_agg_features(gallery_df):
    agg_paths = gallery_df["agg_path"].tolist()
    feats = []
    for p in agg_paths:
        v = np.load(p).astype(np.float32)  # [128]
        feats.append(v)
    feats = np.stack(feats, axis=0)  # [N_gallery, 128]
    return torch.from_numpy(feats).to(device)


# ======================================================
#      MAIN SEARCH: SINGLE QUERY vs FULL POPULATION
# ======================================================
def main():
    # 1) Load gallery metadata
    gallery_df = load_gallery(INDEX_CSV)

    # 2) Load SNN embedding model
    snn_model = SiameseNetworkEmbedding(input_dim=128, embedding_dim=128).to(device)
    state = torch.load(MODEL_PATH, map_location=device)
    snn_model.load_state_dict(state, strict=False)
    snn_model.eval()
    print("Loaded SNN embedding model from:", MODEL_PATH)

    # 3) Precompute gallery embeddings
    gallery_feats = load_gallery_agg_features(gallery_df)  # [N_gallery,128]
    with torch.no_grad():
        gallery_embs = snn_model(gallery_feats).cpu().numpy()  # [N_gallery,emb_dim]
    gallery_feats = gallery_feats.cpu().numpy()

    # 4) Compute query features (agg + full SIFT descriptors)
    print("\nComputing features for query image:", QUERY_IMAGE)
    q_agg, q_des = compute_query_features(QUERY_IMAGE)
    q_agg_t = torch.from_numpy(q_agg).unsqueeze(0).to(device)  # [1,128]
    with torch.no_grad():
        q_emb = snn_model(q_agg_t).cpu().numpy()[0]  # [emb_dim]

    # 5) SNN similarity to all gallery
    dists = np.linalg.norm(gallery_embs - q_emb, axis=1)      # [N_gallery]
    snn_sim = 1.0 / (1.0 + dists)                             # similarity ∈ (0,1]

    # 6) Take top-K1 by SNN similarity
    N_gallery = len(gallery_df)
    k1 = min(TOP_K1, N_gallery)
    idx_sorted = np.argsort(-snn_sim)     # descending similarity
    cand_idx = idx_sorted[:k1]           # candidate indices in gallery

    # 7) Elastic matching on top-K1
    des_cache = {}
    elastic_scores = np.zeros(k1, dtype=np.float32)

    if q_des is None:
        print("Warning: query has no SIFT descriptors; elastic scores will be zero.")

    for i, g_idx in enumerate(cand_idx):
        des_path_g = gallery_df.loc[g_idx, "des_path"]
        if des_path_g not in des_cache:
            if os.path.exists(des_path_g):
                des_cache[des_path_g] = np.load(des_path_g).astype(np.float32)
            else:
                des_cache[des_path_g] = None

        des_g = des_cache[des_path_g]
        elastic_scores[i] = compute_elastic_score(q_des, des_g, ratio_thresh=RATIO_THRESH)

    # 8) Fuse SNN similarity and elastic scores
    snn_sim_top = snn_sim[cand_idx]
    fused = SNN_WEIGHT * snn_sim_top + EL_WEIGHT * elastic_scores

    # 9) Sort final candidates by fused score
    order = np.argsort(-fused)
    final_idx = cand_idx[order]
    final_scores = fused[order]

    # 10) Print top-K results with subject IDs and info
    K = min(TOP_K, len(final_idx))
    print("\n================= TOP MATCHES (FUSED) =================")
    for rank in range(K):
        g_idx = final_idx[rank]
        row = gallery_df.loc[g_idx]

        subj = row["subject_id"]
        finger = row["finger"]
        img_path = row["image_path"]
        s_snn = snn_sim[g_idx]
        s_el  = elastic_scores[order[rank]]

        print(f"{rank+1}. subject={subj}, finger={finger}, "
              f"fused_score={final_scores[rank]:.4f}, "
              f"SNN_sim={s_snn:.4f}, elastic={s_el:.4f}")
        print(f"     image_path: {img_path}")

    print("\nDone. This list is your 'prospective criminals' list for this query.")


if __name__ == "__main__":
    main()


Using device: cpu
Gallery population size (real fingerprints): 12000
Loaded SNN embedding model from: D:\5th sem\mini_project\models\snn_sift_agg_best.pt

Computing features for query image: D:\Desktop\new\images\starbucks-logo-png-transparent-0.png

1. subject=217, finger=middle_finger, fused_score=0.3297, SNN_sim=0.6315, elastic=0.0280
     image_path: D:\5th sem\mini_project\dataset\SOCOFing\Real\217__M_Right_middle_finger.BMP
2. subject=217, finger=middle_finger, fused_score=0.3297, SNN_sim=0.6315, elastic=0.0280
     image_path: D:\5th sem\mini_project\dataset\SOCOFing\Real\217__M_Right_middle_finger.BMP
3. subject=217, finger=thumb_finger, fused_score=0.3238, SNN_sim=0.6386, elastic=0.0091
     image_path: D:\5th sem\mini_project\dataset\SOCOFing\Real\217__M_Right_thumb_finger.BMP

Done. This list is your 'prospective criminals' list for this query.


In [15]:
import os
import numpy as np
import pandas as pd
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F

# ======================================================
#                 EDIT THESE PATHS
# ======================================================
INDEX_CSV = r"D:\5th sem\mini_project\dataset\socofing_index_features.csv"
MODEL_PATH = r"D:\5th sem\mini_project\models\snn_sift_agg_best.pt"

# Crime-scene query fingerprint
QUERY_IMAGE = r"D:\Desktop\new\images\starbucks-logo-png-transparent-0.png"  # change to your test file

IMG_SIZE = (512, 512)

TOP_K1 = 200   # SNN preselection for elastic refinement
TOP_K  = 3     # final number of suspects to display

RATIO_THRESH = 0.75
SNN_WEIGHT   = 0.5
EL_WEIGHT    = 0.5

# === IMPORTANT: decision threshold on fused score ===
# Set this based on your fusion calibration script (best_thr_fus or tighter).
FUSION_DECISION_THR = 0.6   # example; adjust after looking at fusion_val distribution

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


# ======================================================
#         MODEL (same as training)
# ======================================================
class SiameseBranch(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, embedding_dim),
        )

    def forward(self, x):
        return self.net(x)


class SiameseNetworkEmbedding(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.branch = SiameseBranch(input_dim, embedding_dim)

    def forward(self, x):
        return self.branch(x)


# ======================================================
#            SIFT PREPROCESSING FOR QUERY
# ======================================================
def preprocess_image(img_path):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise RuntimeError(f"Cannot read image: {img_path}")
    img = cv2.resize(img, IMG_SIZE)
    img = cv2.equalizeHist(img)
    return img


def compute_query_features(img_path):
    img = preprocess_image(img_path)
    sift = cv2.SIFT_create()
    kps, des = sift.detectAndCompute(img, None)

    if des is None or len(des) == 0:
        agg_vec = np.zeros(128, dtype=np.float32)
        return agg_vec, None

    v = des.astype(np.float32).mean(axis=0)
    v = v - v.mean()
    n = np.linalg.norm(v)
    if n < 1e-12:
        agg_vec = np.zeros(128, dtype=np.float32)
    else:
        agg_vec = (v / n).astype(np.float32)

    return agg_vec, des.astype(np.float32)


# ======================================================
#          ELASTIC SIFT MATCHING SCORE
# ======================================================
def compute_elastic_score(des1, des2, ratio_thresh=0.75):
    if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
        return 0.0

    des1 = des1.astype(np.float32)
    des2 = des2.astype(np.float32)

    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < ratio_thresh * n.distance:
            good.append(m)

    good_count = len(good)
    denom = max(len(des1), len(des2))
    if denom == 0:
        return 0.0
    return good_count / float(denom)


# ======================================================
#         LOAD GALLERY (POPULATION)
# ======================================================
def load_gallery(index_csv):
    df = pd.read_csv(index_csv)
    gallery_df = df[df["real_or_altered"].str.lower() == "real"].copy()
    gallery_df.reset_index(drop=True, inplace=True)
    print("Gallery population size (real fingerprints):", len(gallery_df))
    return gallery_df


def load_gallery_agg_features(gallery_df):
    agg_paths = gallery_df["agg_path"].tolist()
    feats = []
    for p in agg_paths:
        v = np.load(p).astype(np.float32)
        feats.append(v)
    feats = np.stack(feats, axis=0)
    return torch.from_numpy(feats).to(device)


# ======================================================
#      MAIN SEARCH + DECISION LAYER
# ======================================================
def main():
    # 1) Load gallery
    gallery_df = load_gallery(INDEX_CSV)

    # 2) Load SNN embedding model
    snn_model = SiameseNetworkEmbedding(input_dim=128, embedding_dim=128).to(device)
    state = torch.load(MODEL_PATH, map_location=device)
    snn_model.load_state_dict(state, strict=False)
    snn_model.eval()
    print("Loaded SNN embedding model from:", MODEL_PATH)

    # 3) Precompute gallery embeddings
    gallery_feats = load_gallery_agg_features(gallery_df)   # [N,128]
    with torch.no_grad():
        gallery_embs = snn_model(gallery_feats).cpu().numpy()
    gallery_feats = gallery_feats.cpu().numpy()

    # 4) Query features
    print("\nComputing features for query image:", QUERY_IMAGE)
    q_agg, q_des = compute_query_features(QUERY_IMAGE)

    if q_des is None or len(q_des) == 0:
        print("Query fingerprint has no SIFT descriptors; cannot match reliably.")
        print("Decision: NO MATCH FOUND.")
        return

    q_agg_t = torch.from_numpy(q_agg).unsqueeze(0).to(device)
    with torch.no_grad():
        q_emb = snn_model(q_agg_t).cpu().numpy()[0]

    # 5) SNN similarity to all gallery
    dists = np.linalg.norm(gallery_embs - q_emb, axis=1)
    snn_sim = 1.0 / (1.0 + dists)  # [0,1]

    # 6) Take top-K1 by SNN similarity
    N_gallery = len(gallery_df)
    k1 = min(TOP_K1, N_gallery)
    idx_sorted = np.argsort(-snn_sim)
    cand_idx = idx_sorted[:k1]

    # 7) Elastic scores for top-K1
    des_cache = {}
    elastic_scores = np.zeros(k1, dtype=np.float32)

    for i, g_idx in enumerate(cand_idx):
        des_path_g = gallery_df.loc[g_idx, "des_path"]
        if des_path_g not in des_cache:
            if os.path.exists(des_path_g):
                des_cache[des_path_g] = np.load(des_path_g).astype(np.float32)
            else:
                des_cache[des_path_g] = None
        des_g = des_cache[des_path_g]
        elastic_scores[i] = compute_elastic_score(q_des, des_g, ratio_thresh=RATIO_THRESH)

    # 8) Fused scores
    snn_sim_top = snn_sim[cand_idx]
    fused = SNN_WEIGHT * snn_sim_top + EL_WEIGHT * elastic_scores

    # 9) Decision: is there a reliable match?
    best_idx_local = int(np.argmax(fused))
    best_fused_score = float(fused[best_idx_local])
    best_gallery_idx = int(cand_idx[best_idx_local])

    print(f"\nBest fused score across population: {best_fused_score:.4f}")
    if best_fused_score < FUSION_DECISION_THR:
        print("Decision: NO RELIABLE MATCH FOUND (fused score below threshold).")
        return

    print("Decision: MATCH FOUND. Showing top suspects...\n")

    # 10) Rank and print top-K suspects
    order = np.argsort(-fused)
    final_idx = cand_idx[order]
    final_scores = fused[order]

    K = min(TOP_K, len(final_idx))
    print("=============== TOP SUSPECTS ===============")
    for rank in range(K):
        g_idx = final_idx[rank]
        row = gallery_df.loc[g_idx]

        subj = row["subject_id"]
        finger = row["finger"]
        img_path = row["image_path"]

        s_snn = snn_sim[g_idx]
        s_el  = elastic_scores[order[rank]]

        print(f"{rank+1}. subject={subj}, finger={finger}, "
              f"fused_score={final_scores[rank]:.4f}, "
              f"SNN_sim={s_snn:.4f}, elastic={s_el:.4f}")
        print(f"     enrolled_image: {img_path}")

    print("\nDone.")


if __name__ == "__main__":
    main()


Using device: cpu
Gallery population size (real fingerprints): 12000
Loaded SNN embedding model from: D:\5th sem\mini_project\models\snn_sift_agg_best.pt

Computing features for query image: D:\Desktop\new\images\starbucks-logo-png-transparent-0.png

Best fused score across population: 0.3297
Decision: NO RELIABLE MATCH FOUND (fused score below threshold).


In [27]:
import os
import numpy as np
import pandas as pd
import cv2
import torch
import torch.nn as nn
import torch.nn.functional as F

# ======================================================
#                 EDIT THESE PATHS
# ======================================================
INDEX_CSV = r"D:\5th sem\mini_project\dataset\socofing_index_features.csv"
MODEL_PATH = r"D:\5th sem\mini_project\models\snn_sift_agg_best.pt"

# Crime-scene query fingerprint
QUERY_IMAGE = r"D:\5th sem\mini_project\dataset\SOCOFing\Altered\Altered-Medium\1__M_Right_little_finger_CR.BMP"  # change to your test file

IMG_SIZE = (512, 512)

TOP_K1 = 200   # SNN preselection for elastic refinement
TOP_K  = 3     # final number of suspects to display

RATIO_THRESH = 0.75
SNN_WEIGHT   = 0.5
EL_WEIGHT    = 0.5

# === IMPORTANT: decision threshold on fused score ===
# Set this based on your fusion calibration script (best_thr_fus or tighter).
FUSION_DECISION_THR = 0.6   # example; adjust after looking at fusion_val distribution

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


# ======================================================
#         MODEL (same as training)
# ======================================================
class SiameseBranch(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, embedding_dim),
        )

    def forward(self, x):
        return self.net(x)


class SiameseNetworkEmbedding(nn.Module):
    def __init__(self, input_dim=128, embedding_dim=128):
        super().__init__()
        self.branch = SiameseBranch(input_dim, embedding_dim)

    def forward(self, x):
        return self.branch(x)


# ======================================================
#            SIFT PREPROCESSING FOR QUERY
# ======================================================
def preprocess_image(img_path):
    img = cv2.imread(img_path, cv2.IMREAD_GRAYSCALE)
    if img is None:
        raise RuntimeError(f"Cannot read image: {img_path}")
    img = cv2.resize(img, IMG_SIZE)
    img = cv2.equalizeHist(img)
    return img


def compute_query_features(img_path):
    img = preprocess_image(img_path)
    sift = cv2.SIFT_create()
    kps, des = sift.detectAndCompute(img, None)

    if des is None or len(des) == 0:
        agg_vec = np.zeros(128, dtype=np.float32)
        return agg_vec, None

    v = des.astype(np.float32).mean(axis=0)
    v = v - v.mean()
    n = np.linalg.norm(v)
    if n < 1e-12:
        agg_vec = np.zeros(128, dtype=np.float32)
    else:
        agg_vec = (v / n).astype(np.float32)

    return agg_vec, des.astype(np.float32)


# ======================================================
#          ELASTIC SIFT MATCHING SCORE
# ======================================================
def compute_elastic_score(des1, des2, ratio_thresh=0.75):
    if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
        return 0.0

    des1 = des1.astype(np.float32)
    des2 = des2.astype(np.float32)

    bf = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
    matches = bf.knnMatch(des1, des2, k=2)

    good = []
    for m, n in matches:
        if m.distance < ratio_thresh * n.distance:
            good.append(m)

    good_count = len(good)
    denom = max(len(des1), len(des2))
    if denom == 0:
        return 0.0
    return good_count / float(denom)


# ======================================================
#         LOAD GALLERY (POPULATION)
# ======================================================
def load_gallery(index_csv):
    df = pd.read_csv(index_csv)
    gallery_df = df[df["real_or_altered"].str.lower() == "real"].copy()
    gallery_df.reset_index(drop=True, inplace=True)
    print("Gallery population size (real fingerprints):", len(gallery_df))
    return gallery_df


def load_gallery_agg_features(gallery_df):
    agg_paths = gallery_df["agg_path"].tolist()
    feats = []
    for p in agg_paths:
        v = np.load(p).astype(np.float32)
        feats.append(v)
    feats = np.stack(feats, axis=0)
    return torch.from_numpy(feats).to(device)


# ======================================================
#      MAIN SEARCH + DECISION LAYER
# ======================================================
def main():
    # 1) Load gallery
    gallery_df = load_gallery(INDEX_CSV)

    # 2) Load SNN embedding model
    snn_model = SiameseNetworkEmbedding(input_dim=128, embedding_dim=128).to(device)
    state = torch.load(MODEL_PATH, map_location=device)
    snn_model.load_state_dict(state, strict=False)
    snn_model.eval()
    print("Loaded SNN embedding model from:", MODEL_PATH)

    # 3) Precompute gallery embeddings
    gallery_feats = load_gallery_agg_features(gallery_df)   # [N,128]
    with torch.no_grad():
        gallery_embs = snn_model(gallery_feats).cpu().numpy()
    gallery_feats = gallery_feats.cpu().numpy()

    # 4) Query features
    print("\nComputing features for query image:", QUERY_IMAGE)
    q_agg, q_des = compute_query_features(QUERY_IMAGE)

    if q_des is None or len(q_des) == 0:
        print("Query fingerprint has no SIFT descriptors; cannot match reliably.")
        print("Decision: NO MATCH FOUND.")
        return

    q_agg_t = torch.from_numpy(q_agg).unsqueeze(0).to(device)
    with torch.no_grad():
        q_emb = snn_model(q_agg_t).cpu().numpy()[0]

    # 5) SNN similarity to all gallery
    dists = np.linalg.norm(gallery_embs - q_emb, axis=1)
    snn_sim = 1.0 / (1.0 + dists)  # [0,1]

    # 6) Take top-K1 by SNN similarity
    N_gallery = len(gallery_df)
    k1 = min(TOP_K1, N_gallery)
    idx_sorted = np.argsort(-snn_sim)
    cand_idx = idx_sorted[:k1]

    # 7) Elastic scores for top-K1
    des_cache = {}
    elastic_scores = np.zeros(k1, dtype=np.float32)

    for i, g_idx in enumerate(cand_idx):
        des_path_g = gallery_df.loc[g_idx, "des_path"]
        if des_path_g not in des_cache:
            if os.path.exists(des_path_g):
                des_cache[des_path_g] = np.load(des_path_g).astype(np.float32)
            else:
                des_cache[des_path_g] = None
        des_g = des_cache[des_path_g]
        elastic_scores[i] = compute_elastic_score(q_des, des_g, ratio_thresh=RATIO_THRESH)

    # 8) Fused scores
    snn_sim_top = snn_sim[cand_idx]
    fused = SNN_WEIGHT * snn_sim_top + EL_WEIGHT * elastic_scores

    # 9) Decision: is there a reliable match?
    best_idx_local = int(np.argmax(fused))
    best_fused_score = float(fused[best_idx_local])
    best_gallery_idx = int(cand_idx[best_idx_local])

    print(f"\nBest fused score across population: {best_fused_score:.4f}")
    if best_fused_score < FUSION_DECISION_THR:
        print("Decision: NO RELIABLE MATCH FOUND (fused score below threshold).")
        return

    print("Decision: MATCH FOUND. Showing top suspects...\n")

    # 10) Rank and print top-K suspects
    order = np.argsort(-fused)
    final_idx = cand_idx[order]
    final_scores = fused[order]

    K = min(TOP_K, len(final_idx))
    print("=============== TOP SUSPECTS ===============")
    for rank in range(K):
        g_idx = final_idx[rank]
        row = gallery_df.loc[g_idx]

        subj = row["subject_id"]
        finger = row["finger"]
        img_path = row["image_path"]

        s_snn = snn_sim[g_idx]
        s_el  = elastic_scores[order[rank]]

        print(f"{rank+1}. subject={subj}, finger={finger}, "
              f"fused_score={final_scores[rank]:.4f}, "
              f"SNN_sim={s_snn:.4f}, elastic={s_el:.4f}")
        print(f"     enrolled_image: {img_path}")

    print("\nDone.")


if __name__ == "__main__":
    main()


Using device: cpu
Gallery population size (real fingerprints): 12000
Loaded SNN embedding model from: D:\5th sem\mini_project\models\snn_sift_agg_best.pt

Computing features for query image: D:\5th sem\mini_project\dataset\SOCOFing\Altered\Altered-Medium\1__M_Right_little_finger_CR.BMP

Best fused score across population: 0.7320
Decision: MATCH FOUND. Showing top suspects...

1. subject=1, finger=little_finger, fused_score=0.7320, SNN_sim=0.8765, elastic=0.5876
     enrolled_image: D:\5th sem\mini_project\dataset\SOCOFing\Real\1__M_Right_little_finger.BMP
2. subject=1, finger=little_finger, fused_score=0.7320, SNN_sim=0.8765, elastic=0.5876
     enrolled_image: D:\5th sem\mini_project\dataset\SOCOFing\Real\1__M_Right_little_finger.BMP
3. subject=484, finger=middle_finger, fused_score=0.4658, SNN_sim=0.9136, elastic=0.0179
     enrolled_image: D:\5th sem\mini_project\dataset\SOCOFing\Real\484__M_Left_middle_finger.BMP

Done.


In [9]:
import os
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# ========= CONFIG (edit paths if needed) =========
SNN_NPZ     = r"D:\5th sem\mini_project\models\snn_eval_distances.npz"
EL_FUS_NPZ  = r"D:\5th sem\mini_project\models\elastic_fusion_scores.npz"

# Put your actual SNN best threshold here (distance threshold)
BEST_THR_SNN = 0.5308  # example from your earlier eval, match if dist <= thr


def compute_binary_metrics(labels, preds, name=""):
    acc = accuracy_score(labels, preds)
    prec = precision_score(labels, preds, zero_division=0)
    rec = recall_score(labels, preds, zero_division=0)
    f1 = f1_score(labels, preds, zero_division=0)

    labels = np.array(labels, dtype=np.int32)
    preds  = np.array(preds, dtype=np.int32)

    pos_mask = (labels == 1)
    neg_mask = (labels == 0)

    if pos_mask.sum() > 0:
        FRR = ((preds[pos_mask] == 0).sum() / pos_mask.sum())
    else:
        FRR = 0.0

    if neg_mask.sum() > 0:
        FAR = ((preds[neg_mask] == 1).sum() / neg_mask.sum())
    else:
        FAR = 0.0

    print(f"\n=== {name} ===")
    print(f"Accuracy: {acc:.4f}")
    print(f"Precision: {prec:.4f}")
    print(f"Recall (TPR): {rec:.4f}")
    print(f"F1-score: {f1:.4f}")
    print(f"FAR (False Accept Rate): {FAR:.4f}")
    print(f"FRR (False Reject Rate): {FRR:.4f}")


def main():
    # ----- Load SNN distances -----
    snn_data = np.load(SNN_NPZ)
    val_dist   = snn_data["val_dist"]
    val_labels = snn_data["val_labels"]
    test_dist   = snn_data["test_dist"]
    test_labels = snn_data["test_labels"]
    print("Loaded SNN distances.")
    print(f"SNN val pairs: {len(val_labels)}, test pairs: {len(test_labels)}")

    # ----- SNN METRICS -----
    val_pred_snn  = (val_dist  <= BEST_THR_SNN).astype(np.int32)
    test_pred_snn = (test_dist <= BEST_THR_SNN).astype(np.int32)

    compute_binary_metrics(val_labels,  val_pred_snn,  name="SNN (VAL)")
    compute_binary_metrics(test_labels, test_pred_snn, name="SNN (TEST)")

    # ----- Load Elastic/Fusion scores -----
    el_data = np.load(EL_FUS_NPZ)
    elastic_val   = el_data["elastic_val"]
    labels_val_el = el_data["labels_val"]
    elastic_test  = el_data["elastic_test"]
    labels_test_el= el_data["labels_test"]
    fusion_val    = el_data["fusion_val"]
    fusion_test   = el_data["fusion_test"]
    best_thr_el   = float(el_data["best_thr_el"])
    best_thr_fus  = float(el_data["best_thr_fus"])

    print("\nLoaded elastic/fusion scores.")
    print(f"Elastic/Fusion val pairs: {len(labels_val_el)}, test pairs: {len(labels_test_el)}")

    # Note: labels_val_el / labels_test_el may be subsets (e.g., 50k) of full val/test

    # ----- ELASTIC METRICS (on its subset) -----
    val_pred_el  = (elastic_val  >= best_thr_el).astype(np.int32)
    test_pred_el = (elastic_test >= best_thr_el).astype(np.int32)

    compute_binary_metrics(labels_val_el,  val_pred_el,  name="ELASTIC (VAL)")
    compute_binary_metrics(labels_test_el, test_pred_el, name="ELASTIC (TEST)")

    # ----- FUSION METRICS (on same subset) -----
    val_pred_fus  = (fusion_val  >= best_thr_fus).astype(np.int32)
    test_pred_fus = (fusion_test >= best_thr_fus).astype(np.int32)

    compute_binary_metrics(labels_val_el,  val_pred_fus,  name="FUSION (VAL)")
    compute_binary_metrics(labels_test_el, test_pred_fus, name="FUSION (TEST)")


if __name__ == "__main__":
    main()


Loaded SNN distances.
SNN val pairs: 641121, test pairs: 665673

=== SNN (VAL) ===
Accuracy: 0.8350
Precision: 0.8294
Recall (TPR): 0.8021
F1-score: 0.8155
FAR (False Accept Rate): 0.1375
FRR (False Reject Rate): 0.1979

=== SNN (TEST) ===
Accuracy: 0.8347
Precision: 0.8327
Recall (TPR): 0.7964
F1-score: 0.8142
FAR (False Accept Rate): 0.1333
FRR (False Reject Rate): 0.2036

Loaded elastic/fusion scores.
Elastic/Fusion val pairs: 50000, test pairs: 50000

=== ELASTIC (VAL) ===
Accuracy: 0.7812
Precision: 0.9284
Recall (TPR): 0.5655
F1-score: 0.7029
FAR (False Accept Rate): 0.0368
FRR (False Reject Rate): 0.4345

=== ELASTIC (TEST) ===
Accuracy: 0.7803
Precision: 0.9364
Recall (TPR): 0.5556
F1-score: 0.6974
FAR (False Accept Rate): 0.0316
FRR (False Reject Rate): 0.4444

=== FUSION (VAL) ===
Accuracy: 0.8397
Precision: 0.8338
Recall (TPR): 0.8113
F1-score: 0.8224
FAR (False Accept Rate): 0.1364
FRR (False Reject Rate): 0.1887

=== FUSION (TEST) ===
Accuracy: 0.8379
Precision: 0.8353
Rec