In [None]:
!pip install --upgrade pip setuptools wheel
!pip install torch torchvision pillow matplotlib scikit-learn tqdm kaggle

In [None]:
import os
import json

# -------------------------------
# Enter Kaggle credentials
# -------------------------------
kaggle_username = "sohniarunimamaroju"
kaggle_key = "KGAT_dbd83512dacb55b3812965e2531602bf"

# Create .kaggle folder if not exists
os.makedirs("/root/.kaggle", exist_ok=True)

# Write kaggle.json
with open("/root/.kaggle/kaggle.json", "w") as f:
    json.dump({"username": kaggle_username, "key": kaggle_key}, f)

# Set permissions
!chmod 600 /root/.kaggle/kaggle.json

# Download dataset from Kaggle
!mkdir -p data/cedar/signatures
!kaggle datasets download -d matteocarnebella/cedar-signatures -q
!unzip -oq cedar-signatures.zip -d data/cedar/signatures

# List files to confirm
!ls data/cedar/signatures


In [None]:
import os, glob, re, random
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as T
from sklearn.model_selection import train_test_split
from collections import Counter

base_dir = "/content/data/cedar/signatures/signatures"
if not os.path.exists(base_dir):
    raise FileNotFoundError(f"Folder not found: {base_dir}")

# Helper functions
def get_writer_id(fname):
    nums = re.findall(r'\d+', fname)
    return nums[0] if nums else "unknown"

def is_forged(fname):
    fn = fname.lower()
    return ("forg" in fn) or ("fgr" in fn)

# Collect all images
images = []
for signer_folder in sorted(os.listdir(base_dir)):
    signer_path = os.path.join(base_dir, signer_folder)
    if not os.path.isdir(signer_path):
        continue
    files = sorted(glob.glob(os.path.join(signer_path, "*.*")))
    for f in files:
        if f.lower().endswith(('.png', '.jpg', '.jpeg')):
            fname = os.path.basename(f)
            m = re.findall(r'\d+', signer_folder)
            writer = m[0] if m else get_writer_id(fname)
            forged = is_forged(fname) or ("forged" in signer_folder.lower())
            images.append({
                "img_path": f,
                "writer": writer,
                "fname": fname,
                "forged": forged
            })

print(f"✅ Total images: {len(images)}, Unique writers: {len(set([x['writer'] for x in images]))}")
print("Sample counts per writer:", Counter([x['writer'] for x in images]).most_common(8))


In [None]:
# ------------------------------
# Transforms (3 channels for MobileNet)
# ------------------------------
train_transform = T.Compose([
    T.Resize((128,128)),
    T.RandomRotation(10),
    T.RandomAffine(degrees=0, translate=(0.05,0.05), scale=(0.95,1.05), shear=5),
    T.Grayscale(num_output_channels=3),  # convert 1 channel -> 3
    T.ToTensor(),
    T.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
])

val_transform = T.Compose([
    T.Resize((128,128)),
    T.Grayscale(num_output_channels=3),  # convert 1 channel -> 3
    T.ToTensor(),
    T.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
])

print("✅ Transforms ready.")


In [None]:
class SiameseDataset(Dataset):
    def __init__(self, images_list, transform=None, same_prob=0.5):
        self.images = images_list
        self.transform = transform
        self.same_prob = same_prob
        self.by_writer = {}
        for idx, rec in enumerate(self.images):
            self.by_writer.setdefault(rec["writer"], []).append(idx)
        self.writers = list(self.by_writer.keys())

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        a_idx = idx
        a = self.images[a_idx]
        if random.random() < self.same_prob and len(self.by_writer[a["writer"]]) > 1:
            choices = [i for i in self.by_writer[a["writer"]] if i != a_idx]
            b_idx = random.choice(choices); label = 1.0
        else:
            other_writer = random.choice([w for w in self.writers if w != a["writer"]])
            b_idx = random.choice(self.by_writer[other_writer]); label = 0.0
        b = self.images[b_idx]

        img1 = Image.open(a["img_path"]).convert("L")
        img2 = Image.open(b["img_path"]).convert("L")
        if self.transform:
            img1 = self.transform(img1)
            img2 = self.transform(img2)
        return img1, img2, torch.tensor(label, dtype=torch.float32)


In [None]:
# ------------------------------
# Split by writers
# ------------------------------
writer_list = sorted(list(set([x['writer'] for x in images])))
train_w, val_w = train_test_split(writer_list, test_size=0.2, random_state=42)

train_imgs = [x for x in images if x['writer'] in train_w]
val_imgs   = [x for x in images if x['writer'] in val_w]

train_ds = SiameseDataset(train_imgs, transform=train_transform)
val_ds   = SiameseDataset(val_imgs, transform=val_transform)

# Dataloaders
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True, num_workers=0, pin_memory=True)
val_loader   = DataLoader(val_ds, batch_size=64, shuffle=False, num_workers=0, pin_memory=True)

print(f"✅ Done. Writers: {len(writer_list)} | Train pairs: {len(train_ds)} | Val pairs: {len(val_ds)}")


In [None]:
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models

# ------------------------------
# Embedding Network
# ------------------------------
class EmbeddingNet(nn.Module):
    def __init__(self, emb_dim=128):
        super().__init__()
        base = models.mobilenet_v2(weights=models.MobileNet_V2_Weights.IMAGENET1K_V1)
        base.classifier = nn.Identity()
        self.backbone = base
        self.head = nn.Sequential(
            nn.Linear(1280, 512),
            nn.ReLU(),
            nn.BatchNorm1d(512),
            nn.Linear(512, emb_dim)
        )

    def forward(self, x):
        f = self.backbone(x)
        e = self.head(f)
        return F.normalize(e, p=2, dim=1)  # L2 normalize

# ------------------------------
# Siamese Network
# ------------------------------
class SiameseNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = EmbeddingNet()

    def forward(self, x1, x2):
        e1 = self.embedding(x1)
        e2 = self.embedding(x2)
        return e1, e2

# ------------------------------
# Contrastive Loss
# ------------------------------
class ContrastiveLoss(nn.Module):
    def __init__(self, margin=1.0):
        super().__init__()
        self.margin = margin

    def forward(self, e1, e2, label):
        dist = F.pairwise_distance(e1, e2)
        loss = label * dist**2 + (1 - label) * F.relu(self.margin - dist)**2
        return loss.mean()

print("✅ Siamese Network and Contrastive Loss ready!")


In [None]:
from tqdm.auto import tqdm
from sklearn.metrics import roc_auc_score
import numpy as np
import torch

# ------------------------------
# Training function
# ------------------------------
def train(model, loader, optimizer, criterion, device):
    model.train()
    running_loss = 0
    for x1, x2, y in tqdm(loader, desc="Training", leave=False):
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)
        optimizer.zero_grad()
        e1, e2 = model(x1, x2)
        loss = criterion(e1, e2, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * x1.size(0)
    return running_loss / len(loader.dataset)

# ------------------------------
# Validation function
# ------------------------------
def validate(model, loader, criterion, device):
    model.eval()
    running_loss = 0
    labels = []
    dists = []
    with torch.no_grad():
        for x1, x2, y in tqdm(loader, desc="Validating", leave=False):
            x1, x2, y = x1.to(device), x2.to(device), y.to(device)
            e1, e2 = model(x1, x2)
            loss = criterion(e1, e2, y)
            running_loss += loss.item() * x1.size(0)
            labels.extend(y.cpu().numpy())
            dists.extend(F.pairwise_distance(e1, e2).cpu().numpy())
    auc = roc_auc_score(labels, -np.array(dists))  # smaller dist = positive
    return running_loss / len(loader.dataset), auc

print("✅ Training & validation functions ready!")


In [None]:
import torch.optim as optim

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

model = SiameseNet().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = ContrastiveLoss(margin=1.0)


In [None]:
# ------------------------------
# ADVANCED TRAINING LOOP
# ------------------------------
import time
import copy
import numpy as np
from tqdm.auto import tqdm
from sklearn.metrics import roc_auc_score
import torch
import torch.nn.functional as F

# ---------- CONFIG ----------
epochs = 40
initial_lr = 1e-3
patience = 7
min_delta = 1e-4
model_path_best = "siamese_best_auc.pth"
use_amp = torch.cuda.is_available()
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Training device:", device, "| AMP:", use_amp)

# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=initial_lr)

# scheduler: reduce LR when val AUC plateaus
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='max',
    factor=0.5,
    patience=3,
    min_lr=1e-6
)

# criterion
criterion = ContrastiveLoss(margin=1.2)

# AMP scaler
scaler = torch.cuda.amp.GradScaler(enabled=use_amp)

best_val_auc = -np.inf
best_state = None
no_improve = 0

train_losses, val_losses, val_aucs = [], [], []
start_time = time.time()

for epoch in range(epochs):
    epoch_start = time.time()
    # ---- train ----
    model.train()
    running_loss = 0.0
    total = 0
    train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} - Train", leave=False)
    for x1, x2, y in train_pbar:
        x1, x2, y = x1.to(device), x2.to(device), y.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast(enabled=use_amp):
            e1, e2 = model(x1, x2)
            loss = criterion(e1, e2, y)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        b = x1.size(0)
        running_loss += loss.item() * b
        total += b
        train_pbar.set_postfix({"loss": f"{running_loss/total:.4f}"})
    train_loss = running_loss / total if total > 0 else 0.0

    # ---- validate ----
    model.eval()
    running_loss = 0.0
    total = 0
    labels = []
    dists = []
    with torch.no_grad():
        val_pbar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} - Val", leave=False)
        for x1, x2, y in val_pbar:
            x1, x2, y = x1.to(device), x2.to(device), y.to(device)
            with torch.cuda.amp.autocast(enabled=use_amp):
                e1, e2 = model(x1, x2)
                loss = criterion(e1, e2, y)
            b = x1.size(0)
            running_loss += loss.item() * b
            total += b
            labels.extend(y.cpu().numpy())
            dists.extend(F.pairwise_distance(e1, e2).cpu().numpy())
            val_pbar.set_postfix({"loss": f"{running_loss/total:.4f}"})
    val_loss = running_loss / total if total > 0 else 0.0

    # compute AUC safely
    try:
        val_auc = roc_auc_score(labels, -np.array(dists))
    except Exception:
        val_auc = float("nan")

    train_losses.append(train_loss)
    val_losses.append(val_loss)
    val_aucs.append(val_auc)

    # scheduler step uses val_auc (if valid)
    if not np.isnan(val_auc):
        scheduler.step(val_auc)

    # checkpoint best
    improved = False
    if not np.isnan(val_auc) and val_auc > best_val_auc + min_delta:
        best_val_auc = val_auc
        best_state = copy.deepcopy(model.state_dict())
        torch.save(best_state, model_path_best)
        improved = True
        no_improve = 0
    else:
        no_improve += 1

    epoch_time = time.time() - epoch_start
    print(f"Epoch {epoch+1}/{epochs} — train_loss: {train_loss:.4f} | val_loss: {val_loss:.4f} | val_auc: {val_auc:.4f} | time: {epoch_time:.1f}s {'(improved)' if improved else ''}")

    # early stopping
    if no_improve >= patience:
        print(f"Early stopping: no improvement for {patience} epochs. Best val AUC: {best_val_auc:.4f}")
        break

total_time = time.time() - start_time
print(f"\nTraining finished in {total_time/60:.2f} minutes. Best val AUC: {best_val_auc:.4f}")

# load best checkpoint if exists
if best_state is not None:
    model.load_state_dict(best_state)
    print("Loaded best model from checkpoint:", model_path_best)
else:
    print("No checkpoint saved (no valid val_auc improvement).")


In [None]:
# =========================
# THRESHOLD TUNING & EVAL
# =========================
import numpy as np
from sklearn.metrics import roc_curve, roc_auc_score, confusion_matrix, classification_report, accuracy_score
import torch
import torch.nn.functional as F

# choose model file
if os.path.exists("siamese_best_auc.pth"):
    ckpt = "siamese_best_auc.pth"
elif os.path.exists("siamese_model.pth"):
    ckpt = "siamese_model.pth"
else:
    raise FileNotFoundError("No model checkpoint found. Train or place siamese_best_auc.pth / siamese_model.pth in the working dir.")

print("Loading model from:", ckpt)
model.load_state_dict(torch.load(ckpt, map_location=device))
model.to(device).eval()

# collect val distances and labels
all_labels, all_dists = [], []
with torch.no_grad():
    for x1, x2, y in val_loader:
        x1, x2 = x1.to(device), x2.to(device)
        e1, e2 = model(x1, x2)
        dist = F.pairwise_distance(e1, e2)
        all_labels.extend(y.cpu().numpy())
        all_dists.extend(dist.cpu().numpy())

all_labels = np.array(all_labels)
all_dists = np.array(all_dists)
print(f"Collected {len(all_dists)} validation distances. Labels diff: {np.unique(all_labels, return_counts=True)}")

# compute ROC-AUC
try:
    auc = roc_auc_score(all_labels, -all_dists)
except Exception as e:
    auc = float("nan")
print(f"Val ROC-AUC: {auc:.4f}")

# find best threshold by maximizing accuracy (or Youden)
ths = np.linspace(all_dists.min(), all_dists.max(), 200)
best_acc, best_t = -1, None
for t in ths:
    preds = (all_dists < t).astype(int)
    acc = accuracy_score(all_labels, preds)
    if acc > best_acc:
        best_acc = acc
        best_t = t

#compute youden index threshold
fpr, tpr, roc_th = roc_curve(all_labels, -all_dists)
youden_idx = np.argmax(tpr - fpr)
youden_t = roc_th[youden_idx]

# compute EER
fnr = 1 - tpr
eer_idx = np.nanargmin(np.abs(fnr - fpr))
eer = (fpr[eer_idx] + fnr[eer_idx]) / 2

print(f"Best-acc threshold: {best_t:.4f} | Accuracy at best: {best_acc:.4f}")
print(f"Youden threshold (from ROC): {youden_t:.4f}")
print(f"EER ≈ {eer:.4f}")

#confusion matrix at best_t
preds_best = (all_dists < best_t).astype(int)
cm = confusion_matrix(all_labels, preds_best)
print("Confusion matrix (rows=true, cols=pred):\n", cm)
print("\nClassification report:\n", classification_report(all_labels, preds_best, digits=4))

# Save threshold for inference
with open("best_threshold.txt", "w") as f:
    f.write(str(best_t))
print("Saved best threshold to best_threshold.txt")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

# ------------------------------
# Plot Loss and ROC-AUC curves
# ------------------------------
epochs_range = range(1, len(train_losses)+1)

plt.figure(figsize=(12,5))

# Loss curves
plt.subplot(1,2,1)
plt.plot(epochs_range, train_losses, label='Train Loss', marker='o')
plt.plot(epochs_range, val_losses, label='Val Loss', marker='o')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training & Validation Loss')
plt.legend()
plt.grid(True)

# ROC-AUC curves
plt.subplot(1,2,2)
plt.plot(epochs_range, val_aucs, label='Val ROC-AUC', marker='o', color='green')
plt.xlabel('Epoch')
plt.ylabel('ROC-AUC')
plt.title('Validation ROC-AUC')
plt.ylim(0,1)
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()


In [None]:
# ------------------------------
# Save the trained model
# ------------------------------
model_path = "siamese_model.pth"
torch.save(model.state_dict(), model_path)
print(f"✅ Model saved to {model_path}")


In [None]:
model = SiameseNet().to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
print("✅ Model loaded and ready for inference")


In [None]:
import torch.nn.functional as F
import random

model.eval()  # set to evaluation mode

# pick 5 random pairs from validation set
for _ in range(5):
    x1, x2, label = random.choice(val_ds)
    x1, x2 = x1.unsqueeze(0).to(device), x2.unsqueeze(0).to(device)

    with torch.no_grad():
        e1, e2 = model(x1, x2)
        dist = F.pairwise_distance(e1, e2)
        pred = 1 if dist.item() < 1.0 else 0  # threshold = 1.0

    print(f"True Label: {int(label.item())} | Predicted: {pred} | Distance: {dist.item():.4f}")


In [None]:
import matplotlib.pyplot as plt
import numpy as np

model.eval()
threshold = 1.0  # distance threshold for predicting same/different

num_samples = 6  # number of pairs to visualize
samples = list(val_loader)[:num_samples]  # get first batch

x1_batch, x2_batch, y_batch = samples[0]
x1_batch, x2_batch = x1_batch.to(device), x2_batch.to(device)

with torch.no_grad():
    e1, e2 = model(x1_batch, x2_batch)
    dists = F.pairwise_distance(e1, e2).cpu().numpy()
    labels = y_batch.numpy()
    preds = (dists < threshold).astype(int)

# Plot images with predictions
plt.figure(figsize=(12, num_samples*2))
for i in range(num_samples):
    img1 = x1_batch[i].cpu().permute(1,2,0).numpy() * 0.5 + 0.5  # unnormalize
    img2 = x2_batch[i].cpu().permute(1,2,0).numpy() * 0.5 + 0.5

    plt.subplot(num_samples, 2, 2*i+1)
    plt.imshow(img1.squeeze(), cmap='gray')
    plt.axis('off')
    if i == 0: plt.title("Image 1")

    plt.subplot(num_samples, 2, 2*i+2)
    plt.imshow(img2.squeeze(), cmap='gray')
    plt.axis('off')
    if i == 0: plt.title("Image 2")

    plt.suptitle(f"True: {labels[i]} | Pred: {preds[i]} | Dist: {dists[i]:.4f}", y=0.92-i*0.02)

plt.tight_layout()
plt.show()


In [None]:
# =========================
# FINAL INFERENCE CELL (upload + test)
# =========================
from google.colab import files
from PIL import Image
import torch.nn.functional as F
import torchvision.transforms as T

# determine threshold
if os.path.exists("best_threshold.txt"):
    with open("best_threshold.txt","r") as f:
        best_thresh = float(f.read().strip())
else:
    best_thresh = 1.0
print("Using threshold:", best_thresh)

# test transform
IMG_SIZE = 160 if (os.path.exists("siamese_best_auc.pth") and '160' in str(train_transform)) else 128


test_transform = T.Compose([
    T.Resize((IMG_SIZE,IMG_SIZE)),
    T.Grayscale(num_output_channels=3),
    T.ToTensor(),
    T.Normalize([0.5,0.5,0.5],[0.5,0.5,0.5])
])

# reload best model
ckpt = "siamese_best_auc.pth" if os.path.exists("siamese_best_auc.pth") else "siamese_model.pth"
model.load_state_dict(torch.load(ckpt, map_location=device))
model.to(device).eval()
print("Loaded model:", ckpt)

# helper
def emb_from_path(p):
    img = Image.open(p).convert("L")
    t = test_transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        e, _ = model(t,t)
    return e

# upload two images
print("Upload your reference (genuine) images — at least 1 (press choose files):")
uploaded_refs = files.upload()
ref_paths = list(uploaded_refs.keys())
print("Uploaded references:", ref_paths)

print("\nUpload the test signature to verify:")
uploaded_test = files.upload()
test_path = list(uploaded_test.keys())[0]
print("Test image:", test_path)

# compute distances to all references
ref_embs = [emb_from_path(p) for p in ref_paths]
test_emb = emb_from_path(test_path)

dists = [float(F.pairwise_distance(r, test_emb).item()) for r in ref_embs]
print("Distances to references:", dists)
avg_dist = sum(dists)/len(dists)
print("Average distance:", avg_dist)

if avg_dist < best_thresh:
    print("\n=> Prediction: ✅ GENUINE signature (avg_dist < threshold)")
else:
    print("\n=> Prediction: ❌ FORGED signature (avg_dist >= threshold)")


In [None]:
# ==========================================
#  IMPROVED: SIGNATURE VERIFICATION
# ==========================================
import torch
import torchvision.transforms as T
from PIL import Image
import torch.nn.functional as F
import os

# -------- PARAMETERS --------

IMG_SIZE = 128
DEFAULT_THRESHOLD = 1.0   # fallback threshold if best_threshold.txt not found

# -------- Test transform  ----------
test_transform = T.Compose([
    T.Resize((IMG_SIZE, IMG_SIZE)),
    T.Grayscale(num_output_channels=3),
    T.ToTensor(),
    T.Normalize([0.5,0.5,0.5], [0.5,0.5,0.5])
])

# -------- Load threshold (auto) ----------
if os.path.exists("best_threshold.txt"):
    try:
        with open("best_threshold.txt", "r") as f:
            best_threshold = float(f.read().strip())
        print(f"Loaded best_threshold.txt -> {best_threshold:.4f}")
    except Exception as e:
        print("Could not read best_threshold.txt, using default threshold:", e)
        best_threshold = DEFAULT_THRESHOLD
else:
    best_threshold = DEFAULT_THRESHOLD
    print("best_threshold.txt not found — using default threshold:", best_threshold)

# -------- Load & check model ----------
if 'model' not in globals():
    raise RuntimeError("Model object not found. Make sure you have loaded your model (siamese/model) before running this cell.")
model.to(device)
model.eval()
print("Model is loaded and in eval mode. Device:", device)

# -------- Helper: compute embedding----------
@torch.no_grad()
def emb_from_path(path):
    if not os.path.exists(path):
        raise FileNotFoundError(f"Image not found: {path}")
    img = Image.open(path).convert("L")
    t = test_transform(img).unsqueeze(0).to(device)
    with torch.no_grad():
        # model(x,x) returns (e1,e2) so we take e1
        e1, e2 = model(t, t)
    return e1

# -------- Verify Signature ----------
def verify_signature_with_refs(model, reference_paths, test_path, device, threshold=None):
    if threshold is None:
        threshold = best_threshold

    # check files
    missing = [p for p in reference_paths + [test_path] if not os.path.exists(p)]
    if missing:
        raise FileNotFoundError("These files are missing: " + ", ".join(missing))

    # cache reference embeddings
    ref_embs = []
    for p in reference_paths:
        try:
            ref_embs.append(emb_from_path(p))
        except Exception as e:
            raise RuntimeError(f"Error computing embedding for reference {p}: {e}")

    # test embedding
    test_emb = emb_from_path(test_path)

    # compute distances
    distances = [float(F.pairwise_distance(r, test_emb).item()) for r in ref_embs]
    avg_dist = sum(distances) / len(distances)

    # print results
    print("\n--- Signature Verification Result ---")
    print("Reference paths:")
    for i, p in enumerate(reference_paths):
        print(f"  Ref {i+1}: {p}")
    print("\nDistances to references:")
    for i, d in enumerate(distances):
        print(f"  Reference {i+1}: {d:.4f}")
    print(f"\nAverage Distance: {avg_dist:.4f}")
    print(f"Threshold used: {threshold:.4f}")

    if avg_dist < threshold:
        print("\n=> Prediction: ✅ GENUINE signature")
        return True
    else:
        print("\n=> Prediction: ❌ FORGED signature")
        return False

# -------- Example usage ----------
reference_signs = [
    "/content/my1.jpeg",
    "/content/my1.jpeg",
    "/content/my1.jpeg"
]


test_sign = "/content/myf.jpeg"

# Run verification
try:
    ok = verify_signature_with_refs(model, reference_signs, test_sign, device, threshold=None)
except Exception as e:
    print("Error during verification:", e)
