In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import torch
import random
import numpy as np

from PIL import Image
from collections import Counter
import matplotlib.pyplot as plt

import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm
from torch.cuda.amp import autocast, GradScaler
from torchvision import transforms
from torchvision.models import resnet18
import torchvision.transforms.functional as TF
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split

In [3]:
os.cpu_count()

2

In [4]:
def seed_everything(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    os.environ['PYTHONHASHSEED'] = str(seed)

def seed_worker(worker_id):
    worker_seed = torch.initial_seed() % 2**32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

seed_everything(42)
g = torch.Generator()
g.manual_seed(42)

<torch._C.Generator at 0x78aa66a39410>

In [5]:
base_path = '/content/drive/MyDrive/DATASET/Comys_Hackathon5/Comys_Hackathon5/Task_B'

In [6]:
def show_items(path, depth=0,files=3):
    if not os.path.isdir(path):
        return

    indent = "--" * depth
    print(f"{indent} {os.path.basename(path)}/")

    try:
        items = sorted(os.listdir(path))[:files]
    except Exception as e:
        print(f"{indent}  [Error accessing {path}]: {e}")
        return

    for item in items:
        item_path = os.path.join(path, item)
        if os.path.isdir(item_path):
            show_items(item_path, depth + 1,files)
        else:
            print(f"{indent}   {item}")

In [7]:
show_items(base_path, files=15)

 Task_B/
-- train/
---- 001_frontal/
----   001_frontal.jpg
------ distortion/
------   001_frontal_blurred.jpg
------   001_frontal_foggy.jpg
------   001_frontal_lowlight.jpg
------   001_frontal_noisy.jpg
------   001_frontal_rainy.jpg
------   001_frontal_resized.jpg
------   001_frontal_sunny.jpg
---- 002_frontal/
----   002_frontal.jpg
------ distortion/
------   002_frontal_blurred.jpg
------   002_frontal_foggy.jpg
------   002_frontal_lowlight.jpg
------   002_frontal_noisy.jpg
------   002_frontal_rainy.jpg
------   002_frontal_resized.jpg
------   002_frontal_sunny.jpg
---- 003_frontal/
----   003_frontal.jpg
------ distortion/
------   003_frontal_blurred.jpg
------   003_frontal_foggy.jpg
------   003_frontal_lowlight.jpg
------   003_frontal_noisy.jpg
------   003_frontal_rainy.jpg
------   003_frontal_resized.jpg
------   003_frontal_sunny.jpg
---- 004_frontal/
----   004_frontal.jpg
------ distortion/
------   004_frontal_blurred.jpg
------   004_frontal_foggy.jpg
-----

In [8]:
class TripletFaceDataset(Dataset):
    def __init__(self, root_dir, transform=None, include_distortions=True):
        self.transform = transform
        self.data = {}
        self.people = []
        self.samples = []

        for person_id in sorted(os.listdir(root_dir)):
            person_path = os.path.join(root_dir, person_id)
            if not os.path.isdir(person_path):
                continue

            all_images = []
            for file in os.listdir(person_path):
                if file.endswith(".jpg") and file != "distortion":
                    all_images.append(os.path.join(person_path, file))
            if include_distortions:
                distortion_dir = os.path.join(person_path, "distortion")
                if os.path.exists(distortion_dir):
                    for dfile in os.listdir(distortion_dir):
                        if dfile.endswith(".jpg"):
                            all_images.append(os.path.join(distortion_dir, dfile))

            if len(all_images) >= 2:
                self.data[person_id] = all_images
                self.people.append(person_id)

        for person_id in self.people:
            anchors, positive_ids=[], []
            for img_path in self.data[person_id]:
                if "distortion" in img_path:
                    positive_ids.append(img_path)
                else:
                    anchors.append(img_path)

            for anchor in anchors:
                for positive_id in positive_ids:
                    if anchor.split("/")[-1].strip(".jpg") in positive_id:
                        neg_person = random.choice([p for p in self.people if p != person_id])
                        self.samples.append((anchor, positive_id, random.choice(self.data[neg_person])))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        a_path, p_path, n_path = self.samples[idx]

        anchor = Image.open(a_path).convert("RGB")
        positive = Image.open(p_path).convert("RGB")
        negative = Image.open(n_path).convert("RGB")

        anchor_label = os.path.basename(os.path.dirname(os.path.dirname(a_path))) \
        if "distortion" in a_path else os.path.basename(os.path.dirname(a_path))

        if self.transform:
            anchor = self.transform(anchor)
            positive = self.transform(positive)
            negative = self.transform(negative)

        return anchor, positive, negative, anchor_label

In [9]:
class EmbeddingNet(nn.Module):
    def __init__(self, embedding_size=128,pretrained=True):
        super().__init__()
        self.backbone = resnet18(pretrained=pretrained)
        self.backbone.fc = nn.Linear(self.backbone.fc.in_features, embedding_size)

    def forward(self, x):
        return self.backbone(x)

In [10]:
def triplet_loss(anchor, positive, negative, margin=1.0):
    pos_dist = F.pairwise_distance(anchor, positive)
    neg_dist = F.pairwise_distance(anchor, negative)
    loss = F.relu(pos_dist - neg_dist + margin)
    return loss.mean()

In [11]:
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor(),
])

In [12]:
train_dataset = TripletFaceDataset(
    root_dir=os.path.join(base_path, "train"),
    transform=transform
)

val_dataset = TripletFaceDataset(
    root_dir=os.path.join(base_path, "val"),
    transform=transform
)

In [13]:
total_files_trn,total_files_val=0,0
for root, dirs, files in os.walk(os.path.join(base_path, "train")):
    for d in dirs:
        if d == "distortion":
            distortion_path = os.path.join(root, d)
            for _, _, distortion_files in os.walk(distortion_path):
                total_files_trn += len(distortion_files)

for root, dirs, files in os.walk(os.path.join(base_path, "val")):
    for d in dirs:
        if d == "distortion":
            distortion_path = os.path.join(root, d)
            for _, _, distortion_files in os.walk(distortion_path):
                total_files_val += len(distortion_files)
total_files_trn,total_files_val

(13482, 2954)

In [14]:
len(train_dataset), len(val_dataset)

(13482, 2954)

In [15]:
for i in range(10):
    anchor, positive, negative,label = train_dataset[i]
    plt.figure(figsize=(10, 3))

    plt.subplot(1, 3, 1)
    plt.imshow(TF.to_pil_image(anchor))
    plt.title(f"Anchor label: {label}")
    plt.axis("off")

    plt.subplot(1, 3, 2)
    plt.imshow(TF.to_pil_image(positive))
    plt.title("Positive (same ID)")
    plt.axis("off")

    plt.subplot(1, 3, 3)
    plt.imshow(TF.to_pil_image(negative))
    plt.title("Negative (diff ID)")
    plt.axis("off")

    plt.tight_layout()
    plt.show()

Output hidden; open in https://colab.research.google.com to view.

In [16]:
train_loader = DataLoader(
    train_dataset,
    batch_size=128,
    shuffle=True,
    num_workers=2,
    worker_init_fn=seed_worker,
    generator=g,
    pin_memory=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=128,
    shuffle=False,
    num_workers=2,
    worker_init_fn=seed_worker,
    generator=g,
    pin_memory=True
)

In [17]:
model = EmbeddingNet(embedding_size=128).cuda()
optimizer = optim.Adam(model.parameters(), lr=2e-5)

Downloading: "https://download.pytorch.org/models/resnet18-f37072fd.pth" to /root/.cache/torch/hub/checkpoints/resnet18-f37072fd.pth
100%|██████████| 44.7M/44.7M [00:00<00:00, 189MB/s]


In [18]:
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict

In [None]:
def evaluate_model(custom_model, loader, device, threshold=0.5):
    y_true = []
    y_pred = []
    count=0
    with torch.no_grad():
        for anchor, positive, negative, _ in loader:
            count+=1
            print(f"\rProcessing Evaluation... {count*100/len(loader):.6f}%", end="")
            anchor = anchor.to(device)
            positive = positive.to(device)
            negative = negative.to(device)

            anchor_emb = custom_model(anchor)
            positive_emb = custom_model(positive)
            negative_emb = custom_model(negative)

            sim_ap = F.cosine_similarity(anchor_emb, positive_emb, dim=1)
            sim_an = F.cosine_similarity(anchor_emb, negative_emb, dim=1)

            pred_ap = (sim_ap >= threshold).int().tolist()
            pred_an = (sim_an >= threshold).int().tolist()

            y_true += [1] * len(pred_ap) + [0] * len(pred_an)
            y_pred += pred_ap + pred_an

    acc = accuracy_score(y_true, y_pred)
    prec, rec, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')

    print(f"\nEvaluation: Accuracy : {acc:.6f}\tPrecision: {prec:.6f}\tRecall: {rec:.6f}\tF1 Score : {f1:.6f}")

In [20]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [21]:
root_path = '/content/drive/MyDrive/DATASET/Comys_Hackathon5'

In [None]:
threshold=0.6

for epoch in range(20):
    model.train()
    total_loss = 0
    complete=0
    correct = 0
    total = 0
    tp = 0
    fp = 0
    fn = 0

    for anchor, positive, negative, _ in train_loader:
        complete+=1
        print(f"\rProcessing... {complete*100/len(train_loader):.6f}%",end="")

        anchor = anchor.cuda()
        positive = positive.cuda()
        negative = negative.cuda()

        anchor_out = model(anchor)
        positive_out = model(positive)
        negative_out = model(negative)

        loss = triplet_loss(anchor_out, positive_out, negative_out)
        total_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        sim_ap = F.cosine_similarity(anchor_out, positive_out)
        sim_an = F.cosine_similarity(anchor_out, negative_out)

        pred_ap = (sim_ap >= threshold).int()
        pred_an = (sim_an >= threshold).int()

        tp += (pred_ap == 1).sum()
        fn += (pred_ap == 0).sum()
        fp += (pred_an == 1).sum()
        correct += (pred_ap == 1).sum() + (pred_an == 0).sum()
        total += pred_ap.numel() + pred_an.numel()

    accuracy = correct.float() / total
    precision = tp.float() / (tp + fp + 1e-8)
    recall = tp.float() / (tp + fn + 1e-8)
    f1 = 2 * precision * recall / (precision + recall + 1e-8)

    print(f"\nTraining.....\nEpoch: {epoch+1}\tLoss: {total_loss/len(train_loader):.6f}, Acc: {accuracy:.6f}, Prec: {precision:.6f}, Rec: {recall:.6f}, F1: {f1:.6f}")

    torch.save(model.state_dict(), os.path.join(root_path,f"resnet18_embed_face_recognition_epoch{epoch+1}-v05.pth"))

    model.eval()
    evaluate_model(model, val_loader, device, threshold)

Processing... 100.000000%
Training.....
Epoch: 1	Loss: 0.046505, Acc: 0.678757, Prec: 0.608902, Rec: 0.999481, F1: 0.756767
Processing Evaluation... 100.000000%
Evaluation: Accuracy : 0.761002	Precision: 0.677079	Recall: 0.997969	F1 Score : 0.806787
Processing... 100.000000%
Training.....
Epoch: 2	Loss: 0.001564, Acc: 0.744140, Prec: 0.661498, Rec: 1.000000, F1: 0.796267
Processing Evaluation... 100.000000%
Evaluation: Accuracy : 0.769634	Precision: 0.685103	Recall: 0.997969	F1 Score : 0.812457
Processing... 100.000000%
Training.....
Epoch: 3	Loss: 0.000305, Acc: 0.754525, Prec: 0.670713, Rec: 1.000000, F1: 0.802906
Processing Evaluation... 100.000000%
Evaluation: Accuracy : 0.777928	Precision: 0.692904	Recall: 0.998307	F1 Score : 0.818031
Processing... 100.000000%
Training.....
Epoch: 4	Loss: 0.000174, Acc: 0.759828, Prec: 0.675519, Rec: 1.000000, F1: 0.806340
Processing Evaluation... 100.000000%
Evaluation: Accuracy : 0.774204	Precision: 0.689252	Recall: 0.998646	F1 Score : 0.815593


In [None]:
root_path = '/content/drive/MyDrive/DATASET/Comys_Hackathon5/Comys_Hackathon5'
torch.save(model.state_dict(), os.path.join(root_path,"resnet18_embed_face_recognition-v05.pth"))

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = EmbeddingNet(embedding_dim=256,pretrained=False).to(device)
model.load_state_dict(torch.load("model_final.pth", map_location=device))

In [None]:
model.eval()

EmbeddingNet(
  (backbone): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, tra