In [2]:
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# or clone this repo, removing the '-' to allow python imports:
!git clone https://github.com/timesler/facenet-pytorch.git facenet_pytorch
!pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
!pip install mxnet
!pip install torchmetrics

fatal: destination path 'facenet_pytorch' already exists and is not an empty directory.
Looking in indexes: https://download.pytorch.org/whl/cu118


In [4]:
import numpy as np
import os
np.bool = bool
import mxnet as mx
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, Subset, random_split
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
import math
import torch_xla
import torch_xla.core.xla_model as xm
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_multiprocessing as xmp
from facenet_pytorch import MTCNN
import torch.optim as optim
from torch.optim.lr_scheduler import MultiStepLR
from torchmetrics import Accuracy
import torch_xla.utils.serialization as xser
from Utils import preprocess_image, CANONICAL_LANDMARKS
from CasiaWebFace import CASIAWebFaceDataset
from Intermediate_Strategy import MobileFaceNetIntermediate
from ArcFace import ArcFaceLoss
from MobileFaceNet import MobileFaceNet
from train_functions import train, train_tpu
from DS import DS
import random

  state_dict = torch.load(state_dict_path)
  state_dict = torch.load(state_dict_path)
  state_dict = torch.load(state_dict_path)


In [5]:
# Define the mean and std for normalization
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

# Define a named function to replace the lambda (already provided earlier)
def preprocess_and_transform(img):
    img = preprocess_image(img)  # Ensure preprocess_image is defined elsewhere
    img = transforms.ToTensor()(img)
    img = transforms.Normalize(mean=mean, std=std)(img)
    return img

# Define the transformation pipeline for training with augmentation
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),  # Rotate by +/- 10 degrees
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    transforms.RandomResizedCrop(112, scale=(0.8, 1.0)),  # Randomly crop and resize to 112x112
    transforms.Lambda(preprocess_and_transform)
])

In [6]:
rec_file = '/content/drive/MyDrive/casia-webface/train.rec'
idx_file = '/content/drive/MyDrive/casia-webface/train.idx'
# Initialize the dataset inside the function
full_dataset = CASIAWebFaceDataset(rec_file, idx_file, transform=None)  # No transform applied initially

# Apply the transformation to the full dataset
full_dataset.transform = train_transform

BATCH_SIZE = 128
# Create a DataLoader for the full dataset
train_loader = DataLoader(full_dataset, batch_size=BATCH_SIZE, num_workers=8, shuffle=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MobileFaceNetIntermediate(embedding_size=128).to(device)

In [7]:
def random_search(num_results, steps):
    # Initialize a data structure 'DS' to store the results.
    # 'num_results' determines the max size of the collection
    results = DS(num_results)

    # Perform 'steps' number of iterations to sample points in the space.
    for _ in range(steps):
        # Randomly generate points for margin and scale within bounds.
        margin, scale = random.uniform(0, 0.5), random.uniform(0, 100)
        # Initialize ArcFace loss
        arc_face = ArcFaceLoss(num_classes=full_dataset.get_num_unique_labels(),
                               embedding_size=128, margin=margin, scale=scale, device=device).to(device)


        optimizer = torch.optim.SGD(
    [
        {'params': model.conv1.parameters()},
        {'params': model.dwconv2.parameters()},
        {'params': model.blocks.parameters()},
        {'params': model.gdconv.parameters()},
        {'params': model.fc.parameters(), 'weight_decay': 0.01},
        {'params': arc_face.parameters(), 'weight_decay': 0.01}
    ],
    lr=0.0001,
    momentum=0.9,
    weight_decay=0.00001,
    nesterov=True
        )
        # Define the learning rate scheduler
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=25, gamma=0.1)

        # Define accuracy metrics (optional, you can modify or remove this if not needed)
        train_accuracy = Accuracy(task='multiclass', num_classes=full_dataset.get_num_unique_labels()).to(device)

        # Train the model
        _, accuracy = train(model, train_loader, optimizer, arc_face, train_accuracy, device)

        # Add the point and its function value to the results data structure.
        # The second argument "max" indicates that we're interested in points with maximum 'f' values.
        results.add({'margin': margin, 'scale': scale, 'f': accuracy}, "max")

    # Return the data structure containing the top results after all iterations.
    return results


In [8]:
def clip(point, bounds):
    margin = max(bounds[0][0], min(point[0], bounds[0][1]))
    scale = max(bounds[1][0], min(point[1], bounds[1][1]))
    return margin, scale


In [9]:
def hill_climb(num_results, start_margin, start_scale, bounds, model, train_loader, device, step_size=0.01, iterations=1000):
    """
    Hill Climbing to optimize margin and scale for the model's ArcFace loss.
    """

    # Initialize the starting point (margin, scale).
    current_margin = start_margin
    current_scale = start_scale

    # Initialize ArcFace loss with starting values.
    arc_face = ArcFaceLoss(num_classes=full_dataset.get_num_unique_labels(),
                           embedding_size=128, margin=current_margin, scale=current_scale, device=device).to(device)

    optimizer = torch.optim.SGD(
        [
            {'params': model.conv1.parameters()},
            {'params': model.dwconv2.parameters()},
            {'params': model.blocks.parameters()},
            {'params': model.gdconv.parameters()},
            {'params': model.fc.parameters(), 'weight_decay': 0.01},
            {'params': arc_face.parameters(), 'weight_decay': 0.01}
        ],
        lr=0.0001,
        momentum=0.9,
        weight_decay=0.00001,
        nesterov=True
    )

    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=25, gamma=0.1)

    # Define accuracy metric.
    train_accuracy = Accuracy(task='multiclass', num_classes=full_dataset.get_num_unique_labels()).to(device)

    # Train the model with the initial margin and scale.
    _, current_accuracy = train(model, train_loader, optimizer, arc_face, train_accuracy, device)

    # Start the hill climbing loop.
    for _ in range(iterations):
        # Generate neighboring points (margins and scales) around the current point.
        neighbors = [
            (current_margin + d_margin, current_scale + d_scale)
            for d_margin in [-step_size, 0, step_size]  # Change in margin.
            for d_scale in [-step_size, 0, step_size]  # Change in scale.
            if not (d_margin == 0 and d_scale == 0)  # Exclude the current point (no change).
        ]

        # Initialize variables for the best next point.
        best_margin = current_margin
        best_scale = current_scale
        best_accuracy = current_accuracy

        # Evaluate all neighboring points.
        for margin, scale in neighbors:
            # Ensure the margin and scale are within bounds.
            margin, scale = clip((margin, scale), bounds)

            # Update ArcFaceLoss with new margin and scale.
            arc_face = ArcFaceLoss(num_classes=full_dataset.get_num_unique_labels(),
                                   embedding_size=128, margin=margin, scale=scale, device=device).to(device)

            # Train the model with the new margin and scale.
            _, accuracy = train(model, train_loader, optimizer, arc_face, train_accuracy, device)

            # If the new accuracy is better, update the best point.
            if accuracy > best_accuracy:
                best_margin = margin
                best_scale = scale
                best_accuracy = accuracy

        # If no improvement is found, break the loop (plateau).
        if best_accuracy == current_accuracy:
            break

        # Otherwise, move to the best neighboring point.
        current_margin = best_margin
        current_scale = best_scale
        current_accuracy = best_accuracy


    # Return the data structure containing the top results after all iterations.
    return current_accuracy, (current_margin, current_scale)


In [10]:
# Initialize the best value found so far and its corresponding point.
best_value_hill_climb, best_point = 0, ()


bounds = [(0, 2), (0, 100)]  # (min_margin, max_margin), (min_scale, max_scale)
results = random_search(1000, 10000000)
random_search_res = results.get_data()
# Call the hill climb function to optimize margin and scale.
for result in random_search_res:
    start_margin, start_scale = result['margin'], result['scale']
    best_results = hill_climb(
      num_results=10,
      start_margin=start_margin,
      start_scale=start_scale,
      bounds=bounds,
      model=model,
      train_loader=train_loader,
      device=device
)

print(best_results)


KeyboardInterrupt: 