In [None]:
%pwd
import os
os.chdir("../")


In [None]:
%pwd

In [None]:
import zipfile
import os
from tqdm import tqdm

zip_path = "data/Diddata.zip"
extract_to = "data"

os.makedirs(extract_to, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    members = zip_ref.namelist()
    for member in tqdm(members, desc="Extracting"):
        zip_ref.extract(member, extract_to)

print(f"Extracted to {extract_to}")

In [None]:
import os

base_dir = "data/Diddata"
dirs = [d for d in os.listdir(base_dir) if os.path.isdir(os.path.join(base_dir, d))]
dirs.sort()  # Sort for consistent numbering

for idx, dirname in enumerate(dirs, start=1):
    src = os.path.join(base_dir, dirname)
    dst = os.path.join(base_dir, str(idx))
    os.rename(src, dst)

print(f"Renamed {len(dirs)} directories to 1-{len(dirs)}")

In [None]:
import os, torch, shutil, numpy as np
from glob import glob; from PIL import Image
from torch.utils.data import random_split, Dataset, DataLoader
from torchvision import transforms as T
torch.manual_seed(42)

class CustomDataset(Dataset):
    
    def __init__(self, root, transformations = None, im_files = [".jpg", ".jpeg", ".png"]):
        
        self.transformations = transformations
        self.im_paths = sorted(glob(f"{root}/*/*{[im_file for im_file in im_files]}"))
        
        self.cls_names, self.cls_counts, count, data_count = {}, {}, 0, 0
        for idx, im_path in enumerate(self.im_paths):
            class_name = self.get_class(im_path)
            if class_name not in self.cls_names: self.cls_names[class_name] = count; self.cls_counts[class_name] = 1; count += 1
            else: self.cls_counts[class_name] += 1
        
    def get_class(self, path): return os.path.dirname(path).split("/")[-1]
    
    def __len__(self): return len(self.im_paths)

    def __getitem__(self, idx):
        
        im_path = self.im_paths[idx]
        im = Image.open(im_path).convert("RGB")
        gt = self.cls_names[self.get_class(im_path)]
        
        if self.transformations is not None: im = self.transformations(im)
        
        return im, gt
    
def get_dls(root, transformations, bs, split = [0.9, 0.05, 0.05], ns = 4):
    
    ds = CustomDataset(root = root, transformations = transformations)
    
    total_len = len(ds)
    tr_len = int(total_len * split[0])
    vl_len = int(total_len * split[1])
    ts_len = total_len - (tr_len + vl_len)
    
    tr_ds, vl_ds, ts_ds = random_split(dataset = ds, lengths = [tr_len, vl_len, ts_len])
    
    tr_dl, val_dl, ts_dl = DataLoader(tr_ds, batch_size = bs, shuffle = True, num_workers = ns), DataLoader(vl_ds, batch_size = bs, shuffle = False, num_workers = ns), DataLoader(ts_ds, batch_size = 1, shuffle = False, num_workers = ns)
    
    return tr_dl, val_dl, ts_dl, ds.cls_names

root = "data/Diddata"
mean, std, im_size = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225], 224
tfs = T.Compose([T.Resize((im_size, im_size)), T.ToTensor(), T.Normalize(mean = mean, std = std)])
tr_dl, val_dl, ts_dl, classes = get_dls(root = root, transformations = tfs, bs = 32)

print(len(tr_dl)); print(len(val_dl)); print(len(ts_dl)); print(classes)

In [None]:
import random
from matplotlib import pyplot as plt

def tensor_2_im(t, t_type = "rgb"):
    
    gray_tfs = T.Compose([T.Normalize(mean = [ 0.], std = [1/0.5]), T.Normalize(mean = [-0.5], std = [1])])
    rgb_tfs = T.Compose([T.Normalize(mean = [ 0., 0., 0. ], std = [ 1/0.229, 1/0.224, 1/0.225 ]), T.Normalize(mean = [ -0.485, -0.456, -0.406 ], std = [ 1., 1., 1. ])])
    
    invTrans = gray_tfs if t_type == "gray" else rgb_tfs 
    
    return (invTrans(t) * 255).detach().squeeze().cpu().permute(1,2,0).numpy().astype(np.uint8) if t_type == "gray" else (invTrans(t) * 255).detach().cpu().permute(1,2,0).numpy().astype(np.uint8)

def visualize(data, n_ims, rows, cmap = None, cls_names = None):
    
    assert cmap in ["rgb", "gray"]
    if cmap == "rgb": cmap = "viridis"
    
    plt.figure(figsize = (20, 10))
    indekslar = [random.randint(0, len(data) - 1) for _ in range(n_ims)]
    for idx, indeks in enumerate(indekslar):
        
        im, gt = data[indeks]
        # Start plot
        plt.subplot(rows, n_ims // rows, idx + 1)
        if cmap: plt.imshow(tensor_2_im(im, cmap), cmap=cmap)
        else: plt.imshow(tensor_2_im(im))
        plt.axis('off')
        if cls_names is not None: plt.title(f"GT -> {cls_names[int(gt)]}")
        else: plt.title(f"GT -> {gt}")
            
visualize(tr_dl.dataset, 20, 4, "rgb", list(classes.keys()))

In [None]:
visualize(val_dl.dataset, 20, 4, "rgb", list(classes.keys()))

In [None]:
visualize(ts_dl.dataset, 20, 4, "rgb", list(classes.keys()))

In [None]:
def data_analysis(root, transformations):
    
    ds = CustomDataset(root = root, transformations = transformations)
    cls_counts, width, text_width = ds.cls_counts,  0.7, 0.05
    text_height = 2
    cls_names = list(cls_counts.keys()); counts = list(cls_counts.values())
    
    fig, ax = plt.subplots(figsize = (20, 10))
    indices = np.arange(len(counts))

    ax.bar(indices, counts, width, color = "firebrick")
    ax.set_xlabel("Class Names", color = "red")
    ax.set_xticklabels(cls_names, rotation = 60)
    ax.set(xticks = indices, xticklabels = cls_names)
    ax.set_ylabel("Data Counts", color = "red")
    ax.set_title(f"Dataset Class Imbalance Analysis")

    for i, v in enumerate(counts): ax.text(i - text_width, v + text_height, str(v), color = "royalblue")
    
data_analysis(root = root, transformations = tfs)

In [None]:
# import os
# import torch
# from torch.utils.data import Dataset, DataLoader
# from torchvision import transforms, models
# from PIL import Image
# import torch.nn as nn
# import torch.optim as optim

# # 1. Dataset for pairs
# class IDSelfiePairDataset(Dataset):
#     def __init__(self, root, transform=None):
#         self.pairs = []
#         self.labels = []
#         self.transform = transform
#         # Assumes each dir: [id.jpg, selfie.jpg]
#         for doc_type in os.listdir(root):
#             doc_dir = os.path.join(root, doc_type)
#             if not os.path.isdir(doc_dir): continue
#             files = os.listdir(doc_dir)
#             id_img = [f for f in files if 'id' in f.lower()]
#             selfie_img = [f for f in files if 'selfie' in f.lower()]
#             if id_img and selfie_img:
#                 self.pairs.append((os.path.join(doc_dir, id_img[0]), os.path.join(doc_dir, selfie_img[0])))
#                 self.labels.append(1)  # positive pair
#             # Optionally add negative pairs (different people)
#             # ... (for a real dataset, add negative pairs here)

#     def __len__(self):
#         return len(self.pairs)

#     def __getitem__(self, idx):
#         img1 = Image.open(self.pairs[idx][0]).convert("RGB")
#         img2 = Image.open(self.pairs[idx][1]).convert("RGB")
#         if self.transform:
#             img1 = self.transform(img1)
#             img2 = self.transform(img2)
#         label = torch.tensor(self.labels[idx], dtype=torch.float32)
#         return img1, img2, label

# # 2. Simple Siamese Network
# class SiameseNetwork(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.cnn = models.resnet18(pretrained=True)
#         self.cnn.fc = nn.Identity()
#         self.fc = nn.Sequential(
#             nn.Linear(512*2, 256),
#             nn.ReLU(),
#             nn.Linear(256, 1)
#         )

#     def forward(self, x1, x2):
#         f1 = self.cnn(x1)
#         f2 = self.cnn(x2)
#         out = torch.cat([f1, f2], dim=1)
#         out = self.fc(out)
#         return torch.sigmoid(out).squeeze(1)

# # 3. Training setup
# transform = transforms.Compose([
#     transforms.Resize((224,224)),
#     transforms.ToTensor(),
#     transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
# ])
# dataset = IDSelfiePairDataset(root="data/Diddata", transform=transform)
# dataloader = DataLoader(dataset, batch_size=8, shuffle=True)

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model = SiameseNetwork().to(device)
# criterion = nn.BCELoss()
# optimizer = optim.Adam(model.parameters(), lr=1e-4)

# # 4. Training loop
# epochs = 5
# for epoch in range(epochs):
#     model.train()
#     running_loss = 0.0
#     for img1, img2, label in dataloader:
#         img1, img2, label = img1.to(device), img2.to(device), label.to(device)
#         optimizer.zero_grad()
#         output = model(img1, img2)
#         loss = criterion(output, label)
#         loss.backward()
#         optimizer.step()
#         running_loss += loss.item()
#     print(f"Epoch {epoch+1}, Loss: {running_loss/len(dataloader):.4f}")

In [None]:
import os
import random
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
import torch.nn as nn
import torch.optim as optim

In [None]:
# Cell 2: Custom Dataset for KYC Verification (with positive and negative pairs)
class KYCVerificationDataset(Dataset):
    def __init__(self, root, transform=None, neg_ratio=1.0):
        self.transform = transform
        self.pairs = []
        self.labels = []
        self.person_dirs = [os.path.join(root, d) for d in os.listdir(root) if os.path.isdir(os.path.join(root, d))]
        self.person_dirs.sort()
        # Collect all doc and selfie images for each person
        self.docs = []
        self.selfies = []
        for person_dir in self.person_dirs:
            doc_imgs = [os.path.join(person_dir, f) for f in os.listdir(person_dir) if 'id' in f.lower() or 'passport' in f.lower() or 'national' in f.lower()]
            selfie_imgs = [os.path.join(person_dir, f) for f in os.listdir(person_dir) if 'selfie' in f.lower()]
            self.docs.append(doc_imgs)
            self.selfies.append(selfie_imgs)
        # Positive pairs (same person)
        for i, (doc_list, selfie_list) in enumerate(zip(self.docs, self.selfies)):
            for doc in doc_list:
                for selfie in selfie_list:
                    self.pairs.append((doc, selfie))
                    self.labels.append(1)
        # Negative pairs (different people)
        num_neg = int(len(self.pairs) * neg_ratio)
        for _ in range(num_neg):
            i, j = random.sample(range(len(self.person_dirs)), 2)
            if self.docs[i] and self.selfies[j]:
                doc = random.choice(self.docs[i])
                selfie = random.choice(self.selfies[j])
                self.pairs.append((doc, selfie))
                self.labels.append(0)

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        doc_path, selfie_path = self.pairs[idx]
        doc_img = Image.open(doc_path).convert("RGB")
        selfie_img = Image.open(selfie_path).convert("RGB")
        if self.transform:
            doc_img = self.transform(doc_img)
            selfie_img = self.transform(selfie_img)
        label = torch.tensor(self.labels[idx], dtype=torch.float32)
        return doc_img, selfie_img, label

In [None]:
# Cell 3: Siamese Network Definition
class SiameseNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn = models.resnet18(pretrained=True)
        self.cnn.fc = nn.Identity()
        self.fc = nn.Sequential(
            nn.Linear(512*2, 256),
            nn.ReLU(),
            nn.Linear(256, 1)
        )

    def forward(self, x1, x2):
        f1 = self.cnn(x1)
        f2 = self.cnn(x2)
        out = torch.cat([f1, f2], dim=1)
        out = self.fc(out)
        return torch.sigmoid(out).squeeze(1)

In [None]:
from sklearn.model_selection import train_test_split
# Cell 4: DataLoader and Training Setup
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])
dataset = KYCVerificationDataset(root="data/Diddata", transform=transform, neg_ratio=1.0)

all_indices = list(range(len(dataset)))
# First split: train+val vs test
trainval_indices, test_indices = train_test_split(all_indices, test_size=0.1, random_state=42, shuffle=True)
# Second split: train vs val
train_indices, val_indices = train_test_split(trainval_indices, test_size=0.1, random_state=42, shuffle=True)

from torch.utils.data import Subset, DataLoader

train_dataset = Subset(dataset, train_indices)
val_dataset = Subset(dataset, val_indices)
test_dataset = Subset(dataset, test_indices)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=4)
val_loader = DataLoader(val_dataset, batch_size=16, shuffle=False, num_workers=4)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=4)

In [None]:

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SiameseNetwork().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
# Cell 6: Training Loop with torchmetrics, tqdm, and Model Checkpointing

from torchmetrics.classification import BinaryAccuracy, BinaryPrecision, BinaryRecall, BinaryF1Score
from tqdm import tqdm
import matplotlib.pyplot as plt


# Initialize lists before the training loop
train_losses, val_losses = [], []
train_accs, val_accs = [], []
train_f1s, val_f1s = [], []

epochs = 5
best_val_f1 = 0.0
for epoch in range(epochs):  # <-- Only one loop here!
    # --- Training ---
    model.train()
    running_loss = 0.0
    acc_metric = BinaryAccuracy().to(device)
    prec_metric = BinaryPrecision().to(device)
    rec_metric = BinaryRecall().to(device)
    f1_metric = BinaryF1Score().to(device)

    for doc_img, selfie_img, label in tqdm(train_loader, desc=f"Train Epoch {epoch+1}"):
        doc_img, selfie_img, label = doc_img.to(device), selfie_img.to(device), label.to(device)
        optimizer.zero_grad()
        output = model(doc_img, selfie_img)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
        preds = (output > 0.5).float()
        acc_metric.update(preds, label)
        prec_metric.update(preds, label)
        rec_metric.update(preds, label)
        f1_metric.update(preds, label)

    train_acc = acc_metric.compute().item()
    train_prec = prec_metric.compute().item()
    train_rec = rec_metric.compute().item()
    train_f1 = f1_metric.compute().item()
    avg_train_loss = running_loss / len(train_loader)
    print(f"Train Epoch {epoch+1}: Loss={avg_train_loss:.4f}, Acc={train_acc:.4f}, Prec={train_prec:.4f}, Rec={train_rec:.4f}, F1={train_f1:.4f}")

    # --- Validation ---
    model.eval()
    val_loss = 0.0
    val_acc_metric = BinaryAccuracy().to(device)
    val_prec_metric = BinaryPrecision().to(device)
    val_rec_metric = BinaryRecall().to(device)
    val_f1_metric = BinaryF1Score().to(device)
    with torch.no_grad():
        for doc_img, selfie_img, label in val_loader:
            doc_img, selfie_img, label = doc_img.to(device), selfie_img.to(device), label.to(device)
            output = model(doc_img, selfie_img)
            loss = criterion(output, label)
            val_loss += loss.item()
            preds = (output > 0.5).float()
            val_acc_metric.update(preds, label)
            val_prec_metric.update(preds, label)
            val_rec_metric.update(preds, label)
            val_f1_metric.update(preds, label)

    val_acc = val_acc_metric.compute().item()
    val_prec = val_prec_metric.compute().item()
    val_rec = val_rec_metric.compute().item()
    val_f1 = val_f1_metric.compute().item()
    avg_val_loss = val_loss / len(val_loader)
    print(f"Val Epoch {epoch+1}: Loss={avg_val_loss:.4f}, Acc={val_acc:.4f}, Prec={val_prec:.4f}, Rec={val_rec:.4f}, F1={val_f1:.4f}")

    # Save best model by validation F1
    if val_f1 > best_val_f1:
        best_val_f1 = val_f1
        torch.save(model.state_dict(), "best_kyc_siamese.pt")
        print(f"Best model saved at epoch {epoch+1} with Val F1={val_f1:.4f}")

    # ---- APPEND METRICS HERE ----
    train_losses.append(avg_train_loss)
    val_losses.append(avg_val_loss)
    train_accs.append(train_acc)
    val_accs.append(val_acc)
    train_f1s.append(train_f1)
    val_f1s.append(val_f1)

# After training loop, plot the metrics
epochs_range = range(1, epochs+1)

plt.figure(figsize=(16,5))
plt.subplot(1,3,1)
plt.plot(epochs_range, train_losses, label='Train Loss')
plt.plot(epochs_range, val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Loss')

plt.subplot(1,3,2)
plt.plot(epochs_range, train_accs, label='Train Acc')
plt.plot(epochs_range, val_accs, label='Val Acc')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Accuracy')

plt.subplot(1,3,3)
plt.plot(epochs_range, train_f1s, label='Train F1')
plt.plot(epochs_range, val_f1s, label='Val F1')
plt.xlabel('Epoch')
plt.ylabel('F1 Score')
plt.legend()
plt.title('F1 Score')

plt.tight_layout()
plt.show()

In [None]:
import numpy as np
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report

# --- Test Set Evaluation with Confusion Matrix and Classification Report ---

# Load the best model
model.load_state_dict(torch.load("best_kyc_siamese.pt"))
model.eval()

all_labels = []
all_preds = []
test_loss = 0.0

test_acc_metric = BinaryAccuracy().to(device)
test_prec_metric = BinaryPrecision().to(device)
test_rec_metric = BinaryRecall().to(device)
test_f1_metric = BinaryF1Score().to(device)

with torch.no_grad():
    for doc_img, selfie_img, label in tqdm(test_loader, desc="Testing"):
        doc_img, selfie_img, label = doc_img.to(device), selfie_img.to(device), label.to(device)
        output = model(doc_img, selfie_img)
        loss = criterion(output, label)
        test_loss += loss.item()
        preds = (output > 0.5).float()
        all_labels.extend(label.cpu().numpy())
        all_preds.extend(preds.cpu().numpy())
        test_acc_metric.update(preds, label)
        test_prec_metric.update(preds, label)
        test_rec_metric.update(preds, label)
        test_f1_metric.update(preds, label)

avg_test_loss = test_loss / len(test_loader)
test_acc = test_acc_metric.compute().item()
test_prec = test_prec_metric.compute().item()
test_rec = test_rec_metric.compute().item()
test_f1 = test_f1_metric.compute().item()

print(f"Best Model Test Results: Loss={avg_test_loss:.4f}, Acc={test_acc:.4f}, Prec={test_prec:.4f}, Rec={test_rec:.4f}, F1={test_f1:.4f}")

# --- Confusion Matrix ---
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["Negative", "Positive"])
os.makedirs("plot", exist_ok=True)
plt.figure(figsize=(5,5))
disp.plot(cmap=plt.cm.Blues, values_format='d')
plt.title("Confusion Matrix")
plt.savefig("plot/confusion_matrix.png")
plt.show()

# --- Classification Report ---
report = classification_report(all_labels, all_preds, target_names=["Negative", "Positive"])
print(report)
with open("plot/classification_report.txt", "w") as f:
    f.write(report)

# --- Save best stats to a file ---
with open("plot/best_model_stats.txt", "w") as f:
    f.write(f"Best Model Test Results:\n")
    f.write(f"Loss: {avg_test_loss:.4f}\n")
    f.write(f"Accuracy: {test_acc:.4f}\n")
    f.write(f"Precision: {test_prec:.4f}\n")
    f.write(f"Recall: {test_rec:.4f}\n")
    f.write(f"F1 Score: {test_f1:.4f}\n")