# Face recognition training pipeline

### Dataset downloading

In [None]:
from roboflow import Roboflow
rf = Roboflow(api_key="************") # your api_key
project = rf.workspace("melnikum").project("my-photo-search-2")
version = project.version(4)
dataset = version.download("folder")

loading Roboflow workspace...
loading Roboflow project...
Exporting format folder in progress : 85.0%
Version export complete for folder format


Downloading Dataset Version Zip in My-photo-search-2-4 to folder:: 100%|██████████| 38966/38966 [00:02<00:00, 17452.02it/s]





Extracting Dataset Version Zip to My-photo-search-2-4 in folder:: 100%|██████████| 2459/2459 [00:00<00:00, 10425.97it/s]


In [None]:
!rm -r My-photo-search-2-4/train/Cringe
!rm -r My-photo-search-2-2/valid/Cringe

In [1]:
from facenet_pytorch import MTCNN, InceptionResnetV1, fixed_image_standardization, training
import torch
from torch.utils.data import DataLoader, SubsetRandomSampler
from torch import optim
from torch.optim.lr_scheduler import MultiStepLR
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms
import numpy as np
import os
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import PIL

  from .autonotebook import tqdm as notebook_tqdm


### Determine if an nvidia GPU is available

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('Running on device: {}'.format(device))

Running on device: cuda:0


### Triplet Loss Dataset

In [3]:
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import os
import random
from PIL import  Image
from collections import defaultdict

class TripletFaceDataset(Dataset):
    def __init__(self, root_dir, transform=None, max_triplets_per_identity=None, random_state=None):
        self.root_dir = root_dir
        self.transform = transform or transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        if random_state:

            random.seed(random_state)
        
        else:
            random.seed(42)

        self.triplets = self.create_triplets(max_triplets_per_identity)  
    
    def __len__(self):
        return len(self.triplets)
    
    def __getitem__(self, idx):
        anchor_path, positive_path, negative_path = self.triplets[idx]
        
        anchor = self.load_image(anchor_path)
        positive = self.load_image(positive_path)
        negative = self.load_image(negative_path)
        
        return anchor, positive, negative
    
    def load_image(self, path):
        img = Image.open(path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img

    def create_triplets(self, max_triplets_per_identity):
        # Create a dictionary to map identities to their images
        identity_dict = defaultdict(list)

        # Walk through the directory structure (assumes root_dir contains subdirectories for each identity)
        for root, _, files in os.walk(self.root_dir):
            if files:  # Only process directories that contain images
                identity = os.path.basename(root)
                for file in files:
                    if file.lower().endswith(('.png', '.jpg', '.jpeg')):
                        identity_dict[identity].append(os.path.join(root, file))

        # Ensure we have at least 2 images per identity for positive pairs
        identity_dict = {k: v for k, v in identity_dict.items() if len(v) >= 2}

        triplets = []
        identities = list(identity_dict.keys())

        for identity in identities:
            # Get all images for this identity
            identity_images = identity_dict[identity]

            # Create positive pairs (anchor and positive from same identity)
            for i in range(len(identity_images)):
                for j in range(i+1, len(identity_images)):  # Limit pairs per anchor
                    anchor = identity_images[i]
                    positive = identity_images[j]

                    # Select a negative from a different identity
                    negative_identity = random.choice([x for x in identities if x != identity])
                    negative = random.choice(identity_dict[negative_identity])

                    triplets.append((anchor, positive, negative))

        # Optional: Balance the dataset by limiting triplets per identity
        if max_triplets_per_identity:
            balanced_triplets = []
            counts = defaultdict(int)
            random.shuffle(triplets)
            for triplet in triplets:
                identity = os.path.basename(os.path.dirname(triplet[0]))
                if counts[identity] < max_triplets_per_identity:
                    balanced_triplets.append(triplet)
                    counts[identity] += 1
            triplets = balanced_triplets

        print(f"Created {len(triplets)} triplets")
        return triplets

In [10]:
train_dataset = TripletFaceDataset('/home/melnikum/Projects/Recognition/My-photo-search-2-4/train', max_triplets_per_identity=2000)
val_dataset = TripletFaceDataset('/home/melnikum/Projects/Recognition/My-photo-search-2-4/valid', max_triplets_per_identity=500)

Created 2780 triplets
Created 536 triplets


In [11]:
from torch.utils.data import DataLoader

batch_size = 64

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=8)

### Model Training

In [4]:
def evaluate(model, dataloader, threshold=1.0):
    model.eval()
    correct = 0
    total = 0
    
    with torch.no_grad():
        for anchors, positives, negatives in dataloader:
            anchors = anchors.to(device)
            positives = positives.to(device)
            negatives = negatives.to(device)
            
            # Get embeddings
            anchor_emb = model(anchors)
            positive_emb = model(positives)
            negative_emb = model(negatives)
            
            # Calculate distances
            pos_dist = (anchor_emb - positive_emb).pow(2).sum(1)
            neg_dist = (anchor_emb - negative_emb).pow(2).sum(1)
            
            # Count correct predictions
            correct += ((pos_dist < threshold) & (neg_dist >= threshold)).sum().item()
            total += anchors.size(0)
    
    accuracy = correct / total
    return accuracy

In [9]:
import torch
import numpy as np
import random

def set_seed(seed=42):
    """Set random seed for reproducibility across multiple libraries"""
    # Python random module
    random.seed(seed)
    
    # NumPy
    np.random.seed(seed)
    
    # PyTorch
    torch.manual_seed(seed)
    
    # If using CUDA (GPU)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  # For multi-GPU setups
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

# Set seed (42 is commonly used, but any integer works)
set_seed(42)

In [29]:
from torch.optim import AdamW
import time

# Parameters
learning_rate = 0.0001
weight_decay = 0.001
margin = 0.2
num_epochs = 50

# Model

model = InceptionResnetV1(
    classify=False,
    pretrained='vggface2'
).to(device)

#freeze all layers except last linear
for param in model.parameters():
    param.requires_grad = False

for param in model.last_linear.parameters():
    param.requires_grad = True

# Loss and optimizer
criterion = torch.nn.TripletMarginLoss(margin=margin)
optimizer = AdamW(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = MultiStepLR(optimizer, [40, 50])

accuracy_list = []
max_val_accuracy = 0.

# Training
for epoch in range(num_epochs):
    model.train()
    epoch_start_time = time.time()
    running_loss = 0.0
    batch_losses = []
    
    for i, (anchors, positives, negatives) in enumerate(train_dataloader):
        anchors = anchors.to(device)
        positives = positives.to(device)
        negatives = negatives.to(device)
        
        # Forward pass
        anchor_embeddings = model(anchors)
        positive_embeddings = model(positives)
        negative_embeddings = model(negatives)
        
        # Compute loss
        loss = criterion(anchor_embeddings, positive_embeddings, negative_embeddings)
        
        # Backward pass and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        batch_losses.append(loss.item())
        
        if i % 100 == 99:  # Print every 100 batches
            avg_loss = running_loss / 100
            print(f'Epoch [{epoch+1}/{num_epochs}], Batch [{i+1}/{len(dataloader)}], '
                  f'Batch Loss: {avg_loss:.4f}')
            running_loss = 0.0
    
    # Calculate epoch statistics
    epoch_time = time.time() - epoch_start_time
    epoch_loss = sum(batch_losses) / len(batch_losses)
    min_loss = min(batch_losses)
    max_loss = max(batch_losses)
    
    # Print epoch summary
    print('\n' + '='*60)
    print(f'EPOCH {epoch+1} SUMMARY:')
    print(f'Time: {epoch_time:.2f}s | '
          f'Avg Loss: {epoch_loss:.4f} | '
          f'Min Loss: {min_loss:.4f} | '
          f'Max Loss: {max_loss:.4f}')
    print(f'Learning Rate: {optimizer.param_groups[0]["lr"]:.6f}')
    
    train_accuracy = evaluate(model, train_dataloader)
    print(f'Train Accuracy: {train_accuracy}')
    
    val_accuracy = evaluate(model, val_dataloader)
    if  val_accuracy > max_val_accuracy:
        max_val_accuracy = val_accuracy 
        torch.save(model.state_dict(), f'/home/melnikum/Projects/Recognition/checkpoints/facenet_checkpoint_last_layer_best.pth')

    print(f'Validation Accuracy: {val_accuracy}')
    print('='*60 + '\n')
    
    scheduler.step()


EPOCH 1 SUMMARY:
Time: 14.25s | Avg Loss: 0.1018 | Min Loss: 0.0458 | Max Loss: 0.1680
Learning Rate: 0.000100
Train Accuracy: 0.09568345323741007
Validation Accuracy: 0.05783582089552239


EPOCH 2 SUMMARY:
Time: 14.31s | Avg Loss: 0.0753 | Min Loss: 0.0266 | Max Loss: 0.1387
Learning Rate: 0.000100
Train Accuracy: 0.21151079136690648
Validation Accuracy: 0.11380597014925373


EPOCH 3 SUMMARY:
Time: 14.26s | Avg Loss: 0.0621 | Min Loss: 0.0104 | Max Loss: 0.1243
Learning Rate: 0.000100
Train Accuracy: 0.30287769784172663
Validation Accuracy: 0.13992537313432835


EPOCH 4 SUMMARY:
Time: 14.02s | Avg Loss: 0.0531 | Min Loss: 0.0075 | Max Loss: 0.1088
Learning Rate: 0.000100
Train Accuracy: 0.38848920863309355
Validation Accuracy: 0.2294776119402985


EPOCH 5 SUMMARY:
Time: 13.79s | Avg Loss: 0.0490 | Min Loss: 0.0150 | Max Loss: 0.1239
Learning Rate: 0.000100
Train Accuracy: 0.4341726618705036
Validation Accuracy: 0.2667910447761194


EPOCH 6 SUMMARY:
Time: 13.95s | Avg Loss: 0.0467 | M

### Model Evaluating on Test Dataset

In [None]:
def evaluate_image(model, face_image_path, dataloader, threshold=1.0):
      
    model.eval()
    TP, FP, FN, TN = 0, 0, 0, 0
    transf = transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
    face_image = PIL.Image.open('/home/melnikum/Projects/Recognition/yura_face_image.jpg')
    face = transf(face_image).to(device)
    feature_face = model(face[None])
    
    with torch.no_grad():
        for images, target in dataloader:
            
            images, target = images.to(device), target.to(device)
            encoded_images = model(images)

            distances = torch.cdist(feature_face[None], encoded_images)

            TP += ((distances < threshold) & (target == 1)).sum()
            FP += ((distances < threshold) & (target == 0)).sum()
            FN += ((distances >= threshold) & (target == 1)).sum()
            TN += ((distances >= threshold) & (target == 0)).sum()

        
    print(f'Precision : {TP / (TP + FP):.3f}')
    print(f'Recal : {TP / (TP + FN):.3f}')
    print(f'Accuracy : {(TP + TN) / (TP + FN + FP + TN):.3f}')

In [None]:
loaded_model = InceptionResnetV1(classify=False, pretrained='vggface2').to(device)
loaded_model.load_state_dict(torch.load('/home/melnikum/Projects/Recognition/checkpoints/facenet_checkpoint_last_layer_best.pth'))
loaded_model.eval()

In [33]:
data_dir = '/home/melnikum/Projects/Recognition/testfolder'
test_dataset = datasets.ImageFolder(data_dir, transform=transforms.Compose([
            transforms.Resize((256, 256)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ]))
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=8)

In [37]:
evaluate_image(loaded_model, '/home/melnikum/Projects/Recognition/yura_face_image.jpg', test_dataloader)

Precision : 0.920
Recal : 0.601
Accuracy : 0.987
