# Imports

In [None]:
!pip install ultralytics torch torchvision torchaudio facenet-pytorch
import numpy as np
import pandas as pd
import pickle
import cv2
import os
import ast
from tqdm import tqdm
from ultralytics import YOLO
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.model_selection import train_test_split
from facenet_pytorch import InceptionResnetV1

# Configurations

In [None]:
DATASET_PATH = "/kaggle/input/surveillance-for-retail-stores/face_identification/face_identification/"
TRAINSET_CSV = f"{DATASET_PATH}trainset.csv"
TEST_PATH = f"{DATASET_PATH}test/"
EVAL_SET_CSV = f"{DATASET_PATH}eval_set.csv"
YOLO_MODEL_PATH = "/kaggle/input/tracking/tensorflow2/default/1/best.pt"

# Model parameters
EMBEDDING_SIZE = 512
FACE_SIZE = 160  # Facenet expects 160x160

# Training parameters
# EXTRACTOR_EPOCHS = 50
# BATCH_SIZE = 16
# LEARNING_RATE = 0.001

# Training parameters
EXTRACTOR_EPOCHS = 10
BATCH_SIZE = 8
LEARNING_RATE = 0.01

## Load pre-trained YOLO model

In [None]:
print("Loading pre-trained YOLO model...")
detector = YOLO(YOLO_MODEL_PATH)

## Prepare face dataset
- Reads images
- Uses the YOLO model to detect & crop the face inside each image
- Applies image transformations (resizing to 160x160 pixels, normalizing colors) to prepare for training.

In [None]:
class FaceDataset(Dataset):
    def __init__(self, dataframe):
        self.dataframe = dataframe
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((FACE_SIZE, FACE_SIZE)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
        
    def __len__(self):
        return len(self.dataframe)
    
    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        img_path = f"{DATASET_PATH}{row['image_path']}"
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Use YOLO to detect and crop face
        results = detector(image)
        if len(results[0].boxes) > 0:
            x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].tolist())
            image = image[y1:y2, x1:x2]
        
        if self.transform:
            image = self.transform(image)
            
        return image, row['gt']

## Create a Feature Extractor (Fine-Tuned Face Recog Model)
- Starts from a pre-trained FaceNet model (trained on VGGFace2 dataset)
- Adds a new classification layer (name tags)
- Fine-tunes it to specifically recognize faces in training set

### Training process
- Adjusting the model with:
    - **Loss function:** to measure mistakes (CrossEntropyLoss). Cross-entropy loss is standard for classification tasks, as it measures the difference between predicted probabilities and true labels.
    - **Optimizer:** Learning algorithm Adam with a certain learning rate.
- Validates with 20% unseen images to check progress

In [None]:
class FineTunedFaceNet(nn.Module):
    def __init__(self, num_classes=None):
        super().__init__()
        self.base = InceptionResnetV1(pretrained='vggface2') # base facenet model trained on VGGFace2
        if num_classes:
            self.classifier = nn.Linear(512, num_classes)
        else:
            self.classifier = None
    
    def forward(self, x):
        features = self.base(x)
        if self.classifier:
            return self.classifier(features)
        return features

def train_feature_extractor():
    print("=== Training Feature Extractor ===")
    
    # Prepare dataset
    trainset = pd.read_csv(TRAINSET_CSV)
    label_to_idx = {label: idx for idx, label in enumerate(trainset['gt'].unique())} # create a dict with person_number : running index (0, 1, ...)
    trainset['label_idx'] = trainset['gt'].map(label_to_idx) # replace each value in gt column with its numeric index using label_to_idx dictionary

    # split into training & validatio
    train_df, val_df = train_test_split(trainset, test_size=0.2, stratify=trainset['label_idx'])
    
    train_dataset = FaceDataset(train_df)
    val_dataset = FaceDataset(val_df)
    # dataloaders allow efficient iteration and batching
    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
    
    # Initialize Model: Sets up the neural network, loss function, optimizer, and hardware device. 
    model = FineTunedFaceNet(len(label_to_idx)) # classifier head for N persons
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # compute with gpu if found
    model = model.to(device)
    criterion = nn.CrossEntropyLoss() # loss function
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE) # optimizer
    
    # Training Loop: Iteratively updates the model's weights using training data.
    for epoch in range(EXTRACTOR_EPOCHS):
        model.train() # Set the model to training mode, enabling features like dropout and batch normalization
        
        # Iterates over batches of images and labels from the training DataLoader
        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}"): 
            # Converts string labels (labels) to numeric indices using label_to_idx and moves them to the GPU/CPU.
            labels = torch.tensor([label_to_idx[l] for l in labels]).to(device) 
            images = images.to(device)

            # Clear old gradients before the next batch to prevent accumulation.
            optimizer.zero_grad()
            # forward pass
            outputs = model(images)
            # compute loss by comparing output with ground truth (labels)
            loss = criterion(outputs, labels)
            # Computes gradients of the loss with respect to the model's parameters (backpropagate)
            loss.backward()
            # Update the model's weights using the computed gradients
            optimizer.step()
        
        # Validation: Evaluates model performance on unseen data to monitor generalization.
        # set model to eval mode
        model.eval()
        correct = 0
        total = 0
        # disable gradient to save memory & speed
        with torch.no_grad():
            # iterate over validation batches, same as training loop but without backpropagation
            for images, labels in val_loader:
                labels = torch.tensor([label_to_idx[l] for l in labels]).to(device)
                images = images.to(device)
                outputs = model(images)
                # Extracts the predicted class (the one with the highest probability)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        
        print(f"Validation Accuracy: {100 * correct / total:.2f}%")
    
    return model, label_to_idx

In [None]:
extractor, label_to_idx = train_feature_extractor()
idx_to_label = {v: k for k, v in label_to_idx.items()} # convert from ids to labels

## Create embeddings Database
1. Detect their face
2. Crop & resize it
3. Run it through the trained model to get a 512-dimensional vector (an embedding) that uniquely represents the face
4. Save these embeddings in a dict

In [None]:
trainset = pd.read_csv(TRAINSET_CSV)
embeddings_dict = {}

extractor.eval()
transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((FACE_SIZE, FACE_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

with torch.no_grad():
    for idx, row in tqdm(trainset.iterrows(), total=len(trainset), desc="Creating embeddings"):
        img_path = f"{DATASET_PATH}{row['image_path']}"
        person = row["gt"]
        
        # Detect face
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = detector(image)
        
        if len(results[0].boxes) == 0:
            continue
            
        # Crop face
        x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].tolist())
        face_img = image[y1:y2, x1:x2]
        
        # Get embedding
        # Applies the image transformations defined earlier (transforms.Compose) to the cropped face image (face_img).
        # the transform(face_img) outputs a 3D array [Channels, Height, Width] & torch expects input as 4D array [Batch_Size, Channels, Height, Width]
        # so unsqueeze(0) makes Batch_Size = 1 so that the model doesn't crash while processing, otherwise an error might be returned
        face_tensor = transform(face_img).unsqueeze(0).to(next(extractor.parameters()).device)
        embedding = extractor.base(face_tensor).cpu().numpy()[0]
        
        embeddings_dict.setdefault(person, []).append(embedding)

# Save embeddings
with open("embeddings.pkl", "wb") as f:
    pickle.dump({
        "embeddings_dict": embeddings_dict,
        "label_mapping": label_to_idx,
        "threshold": 0.5
    }, f)

## Evaluate eval_set.csv
1. For each test image detect & crop the face
2. Extract the embedding for this face
3. Compare it with the embeddings of known people:
    - Use **cosine similarity** to measure how similar two faces (vectors) are
    - If the similarity is high enough (above threshold 0.5), assign the face to that person.
    - Otherwise, "doesn't_exist"

In [None]:
eval_set = pd.read_csv(EVAL_SET_CSV)

for idx, row in tqdm(eval_set.iterrows(), total=len(eval_set), desc="Evaluating"):
    img_path = f"{TEST_PATH}{row['image_path']}"
    
    # Detect face
    image = cv2.imread(img_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = detector(image)
    
    if len(results[0].boxes) == 0:
        eval_set.at[idx, "gt"] = "doesn't_exist"
        continue
        
    # Crop face
    x1, y1, x2, y2 = map(int, results[0].boxes.xyxy[0].tolist())
    face_img = image[y1:y2, x1:x2]
    
    face_tensor = transform(face_img).unsqueeze(0).to(next(extractor.parameters()).device)
    with torch.no_grad():
        query_embedding = extractor.base(face_tensor).cpu().numpy()[0]
    
    # Find closest match
    best_match = None
    best_sim = -1

    # Compare from dataset using cosine similarity
    for person, embeddings in embeddings_dict.items():
        mean_emb = np.mean(embeddings, axis=0)
        sim = np.dot(query_embedding, mean_emb) / (np.linalg.norm(query_embedding) * np.linalg.norm(mean_emb))
        if sim > best_sim:
            best_sim = sim
            best_match = person
            
    eval_set.at[idx, "gt"] = best_match if best_sim >= 0.5 else "doesn't_exist"

eval_set.to_csv("eval_set.csv", index=False)

## Prepare submission.csv file

In [None]:
submission = pd.read_csv(f"/kaggle/input/surveillance-for-retail-stores/submission_file.csv")
filled_eval_set = pd.read_csv("eval_set.csv")

for index, row in submission.iterrows():
    if row["ID"] < 429:
        continue
    image_dict_string = row["objects"]
    image_dict = ast.literal_eval(image_dict_string)
    image_string_name = image_dict["image"]
    filename = os.path.basename(image_string_name)
    
    for index2, row2 in filled_eval_set.iterrows():
        if row2["image_path"] == filename:
            ground_truth = row2["gt"]
            image_dict["gt"] = ground_truth
            submission.at[index, "objects"] = image_dict
            break

submission.to_csv("submission.csv", index=False)