# 🧑‍💻 Face Recognition Pipeline with InsightFace, Scikit-Learn, and Optuna

This notebook implements a face recognition pipeline using InsightFace for feature extraction, and scikit-learn for classification. We also include robust data handling, feature engineering, and hyperparameter tuning using Optuna.

⚡ Key Features:

- ✔ Face Detection & Embedding Extraction (InsightFace)
- ✔ Enhanced Embeddings with Landmark & Pose Information
- ✔ Classification using Ridge Classifier & Linear SVC
- ✔ Cross-Validation with Macro F1 & Macro Accuracy
- ✔ Hyperparameter Tuning with Optuna
- ✔ Robust Test Set Prediction Pipeline

In [1]:
!pip install insightface optuna scikit-learn numpy pandas tqdm



# 1. Import Dependencies

In [42]:
import os
import cv2
import optuna
import insightface
import numpy as np
import pandas as pd
from tqdm import tqdm
from insightface.app import FaceAnalysis
from sklearn.linear_model import LogisticRegression, RidgeClassifier
from sklearn.svm import LinearSVC
from sklearn.ensemble import VotingClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import accuracy_score, f1_score

# 2. Configuration
Define the global configuration dictionary to control detection thresholds, model selection, and feature weighting; set up dataset paths:

In [3]:
CONFIG = {
    "APP_MODEL": "buffalo_l",  # Pretrained InsightFace model
    "APP_CONFIG": {
        "ctx_id": 0,  # GPU ID (-1 for CPU)
        "det_size": (320, 320),  # Detection resolution
        "det_thresh": 0.4,  # Detection threshold
    },
    "RANDOM_STATE": 42,
    "FEATURE_WEIGHTS": {  # Weights for feature combination
        "embeddings": 1.0,
        "norm_bbox": 0.5,
    },
    "ENHANCE_EMB": True  # Whether to use enhanced embeddings
}

In [4]:
main_dir = "dataset"

train_dir = os.path.join(main_dir, "train")
train_labels_dir = os.path.join(train_dir, "labels.csv")
test_dir = os.path.join(main_dir, "test")
unseen_test_dir = os.path.join(main_dir, "unseen_test")
ref_dir = os.path.join(main_dir, "reference_faces")

In [5]:
train_labels = pd.read_csv(train_labels_dir)
train_labels.head()

Unnamed: 0,filename,emp_id
0,face_0568.jpg,emp016
1,face_0433.jpg,emp014
2,face_1751.jpg,emp004
3,face_0675.jpg,emp028
4,face_0112.jpg,emp001


# 3. Reference & Train Data Creation

- Initialize Face Detector (`buffalo_l` from `insightface`)

- Setup Robust Face Detection
  - Includes multiple strategies for detecting difficult images

- Enhance Embeddings with Pose and Landmark features

- Create reference and train data from enhanced embeddings

In [6]:
app = FaceAnalysis(name=CONFIG["APP_MODEL"])
app.prepare(**CONFIG["APP_CONFIG"])

def robust_face_detection(img, app, attempts=3):
    if img is None:
        return None
        
    strategies = [
        lambda x: x,  # Original
        lambda x: cv2.convertScaleAbs(x, alpha=1.5, beta=40),  # Brighter and higher constrast
        lambda x: cv2.equalizeHist(cv2.cvtColor(x, cv2.COLOR_BGR2GRAY))[:,:,np.newaxis].repeat(3,2),  # Normalize pixel intensity
        lambda x: cv2.GaussianBlur(x, (5,5), 0),  # De-noise
        lambda x: cv2.medianBlur(x, 3),  # Alternative de-noise
    ]
    
    for i in range(attempts):
        try:
            modified = strategies[i](img) if i < len(strategies) else img
            faces = app.get(modified)
            if len(faces) > 0:
                return faces
        except Exception as e:
            print(f"Detection attempt {i+1} failed: {str(e)}")
            continue
    return None



Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /home/seang/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /home/seang/.insightface/models/buffalo_l/2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /home/seang/.insightface/models/buffalo_l/det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /home/seang/.insightface/models/buffalo_l/genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /home/seang/.insightface/models/buffalo_l/w600k_r50.onnx recognition ['None', 3, 112, 112] 127.5 127.5
set det

In [7]:
def enhance_embedding(
    embedding, 
    face, 
    landmark_weight=0.12,
    pose_weight=0.08,
    kps_weight=0.05,
    det_score_weight=0.04,
):
    required_attrs = ['bbox', 'kps', 'det_score', 'landmark_2d_106', 'pose']
    missing = [attr for attr in required_attrs if not hasattr(face, attr)]
    if missing:
        raise ValueError(f"Missing required face attributes: {missing}")
    
    base_embed = embedding / (np.linalg.norm(embedding))

    bbox_w = face.bbox[2] - face.bbox[0]
    bbox_h = face.bbox[3] - face.bbox[1]
    if bbox_w <= 0 or bbox_h <= 0:
        raise ValueError("Invalid bounding box dimensions.")
    
    # Predefined key points
    landmarks_rel = ((face.landmark_2d_106 - face.bbox[:2]) / [bbox_w, bbox_h]).flatten()
    kps_rel = ((face.kps - face.bbox[:2]) / [bbox_w, bbox_h]).flatten()

    # Face orientation relative to camera
    yaw, pitch, roll = face.pose
    yaw_rad = np.deg2rad(yaw)
    pitch_rad = np.deg2rad(pitch)
    roll_rad = np.deg2rad(roll)
    
    # Normalize to -1 and 1
    norm_pose = np.array([
        np.clip(yaw_rad / np.pi, -1, 1),
        np.clip(pitch_rad / (np.pi / 2), -1, 1),
        np.clip(roll_rad / np.pi, -1, 1)
    ])
    
    enhanced = np.concatenate([
        base_embed,
        landmark_weight * landmarks_rel,
        kps_weight * kps_rel,
        pose_weight * norm_pose,
        det_score_weight * np.array([face.det_score]),
    ])
    
    return enhanced / (np.linalg.norm(enhanced))


## Reference Data

In [8]:
def create_reference_data(ref_dir, ref_labels, app):
    ref_data = {
        "embedding": [],
        "enhanced_embedding": [],
        "emp_id": [],
        "norm_bbox": [],
        "is_ref": []
    }
    
    for emp_id in tqdm(ref_labels, desc="Processing Reference Identities"):
        img_path = os.path.join(ref_dir, emp_id)
        if os.path.isdir(img_path) is False:
            print(f"Error processing {emp_id} folder: Is not directory")
            continue
        for filename in tqdm(os.listdir(img_path), desc=f"Processing {emp_id} folder"):
            filepath = os.path.join(img_path, filename)
            if filepath.endswith(".mp4"):
                continue
            img = cv2.imread(filepath)
            faces = robust_face_detection(img, app, attempts=4)
            if faces:
                face = faces[0]
                img_h, img_w = img.shape[:2]
                bbox = face.bbox
                norm_bbox = [
                    bbox[0]/img_w,
                    bbox[1]/img_h,
                    bbox[2]/img_w,
                    bbox[3]/img_h
                ]
                ref_data['embedding'].append(
                    (face.embedding / np.linalg.norm(face.embedding))
                )
                ref_data['enhanced_embedding'].append(
                    enhance_embedding(face.embedding, face)
                )
                ref_data['emp_id'].append(emp_id)
                ref_data['norm_bbox'].append(norm_bbox)
            else:
                print(f"Error: Could not detect any faces in {filepath}")
            ref_data["is_ref"].append(1)
    
    for key in ref_data:
        if key == 'emp_id':
            continue
        ref_data[key] = np.array(ref_data[key])

    return ref_data

ref_data = create_reference_data(ref_dir, sorted(os.listdir(ref_dir)), app)
ref_data["embedding"].shape, ref_data["enhanced_embedding"].shape

Processing Reference Identities:   0%|          | 0/34 [00:00<?, ?it/s]


Processing emp001 folder: 100%|██████████| 13/13 [00:04<00:00,  3.16it/s]
Processing emp002 folder: 100%|██████████| 13/13 [00:04<00:00,  2.85it/s]1s/it]
Processing emp003 folder: 100%|██████████| 11/11 [00:03<00:00,  2.92it/s]8s/it]
Processing emp004 folder: 100%|██████████| 15/15 [00:05<00:00,  2.78it/s]0s/it]
Processing emp005 folder: 100%|██████████| 14/14 [00:04<00:00,  2.93it/s]1s/it]
Processing emp006 folder: 100%|██████████| 8/8 [00:02<00:00,  3.06it/s].67s/it]
Processing emp007 folder: 100%|██████████| 14/14 [00:04<00:00,  2.96it/s]7s/it]
Processing Reference Identities:  21%|██        | 7/34 [00:29<01:54,  4.22s/it]

Error: Could not detect any faces in dataset/reference_faces/emp008/emp008_013.jpg


Processing emp008 folder: 100%|██████████| 15/15 [00:05<00:00,  2.82it/s]
Processing emp009 folder: 100%|██████████| 17/17 [00:05<00:00,  2.87it/s]8s/it]
Processing emp010 folder: 100%|██████████| 15/15 [00:05<00:00,  2.90it/s]0s/it]
Processing emp011 folder: 100%|██████████| 11/11 [00:03<00:00,  2.97it/s]05s/it]
Processing emp012 folder: 100%|██████████| 14/14 [00:04<00:00,  2.91it/s]64s/it]
Processing emp013 folder: 100%|██████████| 16/16 [00:05<00:00,  2.82it/s]69s/it]
Processing Reference Identities:  38%|███▊      | 13/34 [01:00<01:44,  4.99s/it]

Error: Could not detect any faces in dataset/reference_faces/emp013/emp013_014.jpg


Processing emp014 folder: 100%|██████████| 15/15 [00:05<00:00,  2.82it/s]
Processing emp015 folder: 100%|██████████| 15/15 [00:05<00:00,  2.89it/s]09s/it]
Processing emp016 folder: 100%|██████████| 15/15 [00:05<00:00,  2.88it/s]12s/it]
Processing emp017 folder: 100%|██████████| 14/14 [00:04<00:00,  2.81it/s]15s/it]
Processing emp018 folder: 100%|██████████| 20/20 [00:07<00:00,  2.76it/s]10s/it]
Processing emp019 folder: 100%|██████████| 15/15 [00:05<00:00,  2.94it/s]74s/it]
Processing emp020 folder: 100%|██████████| 15/15 [00:05<00:00,  2.89it/s]55s/it]
Processing emp021 folder: 100%|██████████| 15/15 [00:05<00:00,  2.75it/s]44s/it]
Processing emp022 folder: 100%|██████████| 15/15 [00:05<00:00,  2.85it/s]45s/it]
Processing emp023 folder: 100%|██████████| 15/15 [00:05<00:00,  2.90it/s]39s/it]
Processing emp024 folder: 100%|██████████| 11/11 [00:03<00:00,  2.98it/s]33s/it]
Processing emp025 folder: 100%|██████████| 14/14 [00:04<00:00,  2.85it/s]84s/it]
Processing emp026 folder: 100%|████

Error: Could not detect any faces in dataset/reference_faces/emp030/emp030_011.jpg




Error: Could not detect any faces in dataset/reference_faces/emp030/emp030_003.jpg


Processing emp030 folder: 100%|██████████| 15/15 [00:04<00:00,  3.15it/s]
Processing emp031 folder: 100%|██████████| 15/15 [00:05<00:00,  2.87it/s]92s/it]
Processing emp032 folder: 100%|██████████| 15/15 [00:05<00:00,  2.91it/s]01s/it]
Processing emp034 folder: 100%|██████████| 11/11 [00:03<00:00,  2.94it/s]05s/it]
Processing emp035 folder: 100%|██████████| 15/15 [00:05<00:00,  2.84it/s]66s/it]
Processing Reference Identities: 100%|██████████| 34/34 [02:48<00:00,  4.95s/it]


((448, 512), (448, 738))

## Train Data

In [9]:
def create_train_data(train_dir, labels_df, app, normalize=True):
    train_data = {
        "embedding": [],
        "enhanced_embedding": [],
        "emp_id": [],
        "norm_bbox": [],
        "is_ref": [],
    }
    failed_samples = []

    img_path = os.path.join(train_dir, "images")
    for idx, row in tqdm(labels_df.iterrows(), desc="Processing Train Identities", total=len(labels_df)):
        filename, emp_id = row
        filepath = os.path.join(img_path, filename)
        
        img = cv2.imread(filepath)
        faces = robust_face_detection(img, app, attempts=4)
        
        if faces:
            face = faces[0]
            img_h, img_w = img.shape[:2]
            bbox = face.bbox
            norm_bbox = [
                bbox[0]/img_w,
                bbox[1]/img_h,
                bbox[2]/img_w,
                bbox[3]/img_h
            ]
            train_data['embedding'].append(
                (face.embedding / np.linalg.norm(face.embedding))
            )
            train_data['enhanced_embedding'].append(
                enhance_embedding(face.embedding, face)
            )
            train_data['emp_id'].append(emp_id)
            train_data['norm_bbox'].append(norm_bbox)
        else:
            # train_data['embedding'].append(np.random.normal(0, 0.01, 512))
            # train_data['enhanced_embedding'].append(np.random.normal(0, 0.01,738))
            # train_data['emp_id'].append("UNKNOWN")
            # train_data['norm_bbox'].append([0.0, 0.0, 0.0, 0.0])
            failed_samples.append((emp_id, filepath))
        
        train_data["is_ref"].append(0)
    
    for key in train_data:
        if key == 'emp_id':
            continue
        train_data[key] = np.array(train_data[key])
    
    return train_data, failed_samples

train_data, train_failed = create_train_data(train_dir, train_labels, app)
train_data["embedding"].shape, train_data["enhanced_embedding"].shape

Processing Train Identities:   0%|          | 0/1179 [00:00<?, ?it/s]

Processing Train Identities: 100%|██████████| 1179/1179 [06:49<00:00,  2.88it/s]


((1066, 512), (1066, 738))

In [10]:
print(f"Training images used (%): {((len(train_labels) - len(train_failed)) / len(train_labels)) * 100:.2f}")

Training images used (%): 90.42


# 4. Feature Engineering

- Generate feature vectors combining embeddings and bounding box coordinates:

In [11]:
scaler = StandardScaler()

def create_feature_vectors(ref_data, scaler, return_target=True, weights=None, enhanced_embedding=False):

    if enhanced_embedding:
        embeddings = np.array(ref_data["enhanced_embedding"])
    else:
        embeddings = np.array(ref_data["embedding"])
    norm_bbox = np.array(ref_data["norm_bbox"])

    features_to_normalize = np.hstack([
        norm_bbox
    ])

    normalized_features = scaler.fit_transform(features_to_normalize)
    
    norm_bbox = normalized_features[:, :4]

    features = np.hstack([
        embeddings * weights["embeddings"],        
        norm_bbox * weights["norm_bbox"],
    ])
    
    metadata = {
        "emp_ids": np.array(ref_data["emp_id"]) if return_target else None,
        "feature_names": [
            *[f"embedding_{i}" for i in range(512)],
            "bbox_x1", "bbox_y1", "bbox_x2", "bbox_y2",
        ],
        "scaler": scaler
    }
    if return_target:
        target = ref_data['emp_id']
    else:
        target = None

    return features, target, metadata

In [12]:
ref_feats, ref_target, ref_metadata = create_feature_vectors(
    ref_data, scaler, weights=CONFIG["FEATURE_WEIGHTS"], enhanced_embedding=CONFIG["ENHANCE_EMB"]
)

train_feats, train_target, train_metadata = create_feature_vectors(
    train_data, scaler, weights=CONFIG["FEATURE_WEIGHTS"], enhanced_embedding=CONFIG["ENHANCE_EMB"]
)

ref_feats.shape, train_feats.shape

((448, 742), (1066, 742))

# 5. Prepare Features and Labels

In [13]:
classes = np.unique(train_target + ref_target)
le = LabelEncoder()
le.fit(classes)

X = np.vstack([train_feats, ref_feats])
y = np.hstack([le.transform(train_target), le.transform(ref_target)])

# 6. Cross-validation & Evaluation

- We employed a 10-fold stratified cross-validation strategy on base models that worked particularly well with high-dimensional embedding. Specifically `RidgeClassifier` and `LinearSVC`. The custom class allows us to cross-validate multiple models at the same time with different metrics.

In [14]:
def view_cv_summary(results):
    for model_name, metrics in results.items():
        print(f"\n{model_name}:")
        for metric, stats in metrics['mean'].items():
            print(f"  {metric}: {stats:.4f} ± {metrics['std'][metric]:.4f}")

class CrossValidator:
    def __init__(self, models, metric_fns, cv_method, name=None, verbose=True):
        self.models = models
        self.metric_fns = metric_fns
        self.cv_method = cv_method
        self.name = name
        self.verbose = verbose
        self.results = {}

    def _calculate_metrics(self, y_true, y_pred):
        results = {}
        for name, fn in self.metric_fns:
            try:
                if name == "macro_f1":
                    results[name] = fn(y_true, y_pred, average="macro")
                else:
                    results[name] = fn(y_true, y_pred)
            except Exception as e:
                print(f"Metric {name} failed: {str(e)}")
                results[name] = np.nan
        return results

    def fit(self, X, y):
        self.results = {model[0]: [] for model in self.models}
        print(f"Name: {self.name}\n")
        for fold, (train_idx, test_idx) in enumerate(self.cv_method.split(X, y)):
            if self.verbose:
                print(f"\nFold {fold + 1}/{self.cv_method.n_splits}")
                print("-"*40)
            
            X_train, X_test = X[train_idx], X[test_idx]
            y_train, y_test = y[train_idx], y[test_idx]
            
            for name, model in self.models:
                model.fit(X_train, y_train)
                y_pred = model.predict(X_test)
                
                metrics = self._calculate_metrics(y_test, y_pred)
                self.results[name].append(metrics)
                
                if self.verbose:
                    print(f"- {name}:")
                    for m, v in metrics.items():
                        print(f"  {m}: {v:.4f}")
                    print("\n")

    def summarize(self):
        summary = {}
        for name in self.results.keys():
            fold_results = pd.DataFrame(self.results[name])
            summary[name] = {
                'mean': fold_results.mean(),
                'std': fold_results.std()
            }
        return summary

In [41]:
def macro_accuracy(y_true, y_pred):
    unique_classes = np.unique(y_true)
    acc_per_class = []
    
    for c in unique_classes:
        class_mask = y_true == c
        correct = np.sum((y_pred == y_true) & class_mask)
        acc = correct / np.sum(class_mask)
        acc_per_class.append(acc)
        
    return np.mean(acc_per_class) if acc_per_class else 0

models = [
    ("RidgeClassifier", RidgeClassifier(alpha=1.0, class_weight="balanced", random_state=CONFIG["RANDOM_STATE"])),
    ("LinearSVC", LinearSVC(class_weight="balanced", random_state=CONFIG["RANDOM_STATE"])),
]

metric_fns = [
    ('macro_f1', f1_score),
    ('macro_accuracy', macro_accuracy),
]

cv = CrossValidator(
    models=models, 
    metric_fns=metric_fns,
    cv_method=StratifiedKFold(
        n_splits=10, shuffle=True, random_state=CONFIG["RANDOM_STATE"]), 
    name="Optimized Crossvalidation"
)

cv.fit(X, y)
view_cv_summary(cv.summarize())

Name: Optimized Crossvalidation


Fold 1/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9011
  macro_accuracy: 0.9149


- LinearSVC:
  macro_f1: 0.9098
  macro_accuracy: 0.9221



Fold 2/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.8892
  macro_accuracy: 0.9086


- LinearSVC:
  macro_f1: 0.9022
  macro_accuracy: 0.9088



Fold 3/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9164
  macro_accuracy: 0.9344


- LinearSVC:
  macro_f1: 0.9241
  macro_accuracy: 0.9380



Fold 4/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9124
  macro_accuracy: 0.9188


- LinearSVC:
  macro_f1: 0.9054
  macro_accuracy: 0.9200



Fold 5/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9383
  macro_accuracy: 0.9417


- LinearSVC:
  macro_f1: 0.9302
  macro_accuracy: 0.9360



Fold 6/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.86

# 7. Hyperparameter Tuning

- We used `Optuna` to find the best parameters for Ridge and SVC (provided as commented code).

Example of setting best parameters manually:

In [16]:
# def ridge_objective(trial, X, y, cv_method):
#     params = {
#         'alpha': trial.suggest_float('alpha', 1e-3, 5, log=True),
#         'solver': trial.suggest_categorical('solver', ['svd', 'cholesky', 'sparse_cg']),
#         'class_weight': trial.suggest_categorical('class_weight', ['balanced']),
#         'fit_intercept': trial.suggest_categorical('fit_intercept', [True, False]),
#         'random_state': trial.suggest_categorical('random_state', [CONFIG['RANDOM_STATE']])
#     }
    
#     model = RidgeClassifier(**params)

#     fold_scores = []
#     for train_idx, valid_idx in cv_method.split(X, y):
#         X_train, y_train = X[train_idx], y[train_idx]
#         X_valid, y_valid = X[valid_idx], y[valid_idx]

#         model.fit(X_train, y_train)
#         y_pred = model.predict(X_valid)
#         fold_scores.append(f1_score(y_valid, y_pred, average="macro"))
    
#     return np.mean(fold_scores)

# def optimize_ridge(X, y, n_trials=50):
#     study = optuna.create_study(
#         direction='maximize'
#     )
    
#     cv_method = StratifiedKFold(
#         n_splits=10,
#         shuffle=True,
#         random_state=CONFIG["RANDOM_STATE"]
#     )
    
#     study.optimize(
#         lambda trial: ridge_objective(trial, X, y, cv_method),
#         n_trials=n_trials,
#         show_progress_bar=True
#     )
    
#     print("Best trial:")
#     trial = study.best_trial
#     print(f"  Macro F1: {trial.value:.4f}")
    
#     return study.best_params

# best_ridge_params = optimize_ridge(X, y, n_trials=100)

In [17]:
# def linear_svc_objective(trial, X, y, cv_method):
#     params = {
#         'C': trial.suggest_float('C', 1e-3, 10, log=True),
#         'class_weight': trial.suggest_categorical('class_weight', ['balanced']),
#         'fit_intercept': trial.suggest_categorical('fit_intercept', [True, False]),
#         'max_iter': trial.suggest_int('max_iter', 500, 5000),
#         'dual': trial.suggest_categorical('dual', [True, False]),
#         'random_state': trial.suggest_categorical('random_state', [CONFIG['RANDOM_STATE']])
#     }
    
#     model = LinearSVC(**params)

#     fold_scores = []
#     for train_idx, valid_idx in cv_method.split(X, y):
#         X_train, y_train = X[train_idx], y[train_idx]
#         X_valid, y_valid = X[valid_idx], y[valid_idx]

#         model.fit(X_train, y_train)
#         y_pred = model.predict(X_valid)
#         fold_scores.append(f1_score(y_valid, y_pred, average="macro"))
    
#     return np.mean(fold_scores)

# def optimize_linear_svc(X, y, n_trials=50):
#     study = optuna.create_study(direction='maximize')
    
#     cv_method = StratifiedKFold(
#         n_splits=10,
#         shuffle=True,
#         random_state=CONFIG["RANDOM_STATE"]
#     )
    
#     study.optimize(
#         lambda trial: linear_svc_objective(trial, X, y, cv_method),
#         n_trials=n_trials,
#         show_progress_bar=True
#     )
    
#     print("Best trial:")
#     trial = study.best_trial
#     print(f"  Macro F1: {trial.value:.4f}")
#     print("  Params:")
#     for k, v in trial.params.items():
#         print(f"    {k}: {v}")
    
#     return trial.params

# best_linear_svc_params = optimize_linear_svc(X, y, n_trials=100)

In [18]:
best_ridge_params = {
    'alpha': 0.2538247299320639, 
    'solver': 'sparse_cg', 
    'class_weight': 'balanced', 
    'fit_intercept': True, 
    'random_state': 42
}

best_linear_svc_params = {
    'C': 0.513724142614831, 
    'class_weight': 'balanced', 
    'fit_intercept': True, 
    'max_iter': 3530, 
    'dual': True
}

## Cross-validation with Optimized Models

In [20]:
op_models = [
    ("RidgeClassifier", RidgeClassifier(**best_ridge_params)),
    ("LinearSVC", LinearSVC(**best_linear_svc_params))
]

op_cv = CrossValidator(
    models=op_models,
    metric_fns=metric_fns,
    cv_method=StratifiedKFold(
        n_splits=10, shuffle=True, random_state=CONFIG["RANDOM_STATE"]), 
    name="FaceRecognition CV"
)

op_cv.fit(X, y)
view_cv_summary(op_cv.summarize())

Name: FaceRecognition CV


Fold 1/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9091
  macro_accuracy: 0.9221


- LinearSVC:
  macro_f1: 0.9039
  macro_accuracy: 0.9181



Fold 2/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.8890
  macro_accuracy: 0.9086


- LinearSVC:
  macro_f1: 0.9226
  macro_accuracy: 0.9231



Fold 3/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9184
  macro_accuracy: 0.9321


- LinearSVC:
  macro_f1: 0.9216
  macro_accuracy: 0.9380



Fold 4/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9203
  macro_accuracy: 0.9224


- LinearSVC:
  macro_f1: 0.9053
  macro_accuracy: 0.9200



Fold 5/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.9383
  macro_accuracy: 0.9417


- LinearSVC:
  macro_f1: 0.9389
  macro_accuracy: 0.9417



Fold 6/10
----------------------------------------
- RidgeClassifier:
  macro_f1: 0.8705
  ma

# 8. Predicting on Test set

- Create features for both seen and unseen competition test data

In [21]:
def create_test_data(test_dir, app, enhanced_embedding=False):
    test_data = {
        "embedding": [],
        "enhanced_embedding": [],
        "norm_bbox": [],
        "filenames": []
    }
    
    failed_samples = []
    img_path = os.path.join(test_dir, "images")
    for filename in tqdm(sorted(os.listdir(img_path)), desc="Processing Test Identites"):
        filepath = os.path.join(img_path, filename)
        
        img = cv2.imread(filepath)
        faces = robust_face_detection(img, app, attempts=4)

        if faces:
            face = faces[0]
            img_h, img_w = img.shape[:2]
            bbox = face.bbox
            norm_bbox = [
                bbox[0]/img_w,
                bbox[1]/img_h,
                bbox[2]/img_w,
                bbox[2]/img_h
            ]
            test_data['embedding'].append(
                (face.embedding / np.linalg.norm(face.embedding))
            )
            test_data['enhanced_embedding'].append(
                enhance_embedding(face.embedding, face)
            )
            test_data['norm_bbox'].append(norm_bbox)
        else:
            test_data['embedding'].append(np.zeros(512))
            test_data['enhanced_embedding'].append(np.zeros(738))
            test_data['norm_bbox'].append([-1.0, -1.0, -1.0, -1.0])
            failed_samples.append((filepath))
        test_data['filenames'].append(filename)

    for key in test_data:
        if key == 'filenames':
            continue
        test_data[key] = np.array(test_data[key])
    
    return test_data, failed_samples

test_data, test_failed = create_test_data(test_dir, app, enhanced_embedding=CONFIG["ENHANCE_EMB"])

Processing Test Identites: 100%|██████████| 636/636 [03:52<00:00,  2.73it/s]


In [22]:
unseen_test_data, unseen_test_failed = create_test_data(unseen_test_dir, app, enhanced_embedding=CONFIG["ENHANCE_EMB"])

Processing Test Identites: 100%|██████████| 1884/1884 [12:10<00:00,  2.58it/s]


In [23]:
test_data['embedding'].shape, test_data['enhanced_embedding'].shape

((636, 512), (636, 738))

In [24]:
unseen_test_data['embedding'].shape, unseen_test_data['enhanced_embedding'].shape

((1884, 512), (1884, 738))

In [25]:
test_feats, _, test_metadata = create_feature_vectors(
    test_data, scaler, weights=CONFIG["FEATURE_WEIGHTS"], enhanced_embedding=CONFIG["ENHANCE_EMB"], return_target=False
)
test_feats.shape

(636, 742)

In [26]:
unseen_test_feats, _, unseen_test_metadata = create_feature_vectors(
    unseen_test_data, scaler, weights=CONFIG["FEATURE_WEIGHTS"], enhanced_embedding=CONFIG["ENHANCE_EMB"], return_target=False
)
test_feats.shape

(636, 742)

In [27]:
ridge = RidgeClassifier(**best_ridge_params)
ridge.fit(X, y)

ridge_preds = ridge.predict(unseen_test_feats)
ridge_preds[:20]

array([14,  4,  4,  4,  4,  0,  4,  4,  4, 22, 22,  4, 11, 14,  4,  4, 14,
       12,  2, 21])

In [28]:
svc = LinearSVC(**best_linear_svc_params)
svc.fit(X, y)

svc_preds = svc.predict(unseen_test_feats)
svc_preds[:20]

array([14,  4,  4,  4,  4,  0,  4,  4,  4, 22, 22,  4, 11, 14,  4,  4, 14,
       12,  2, 21])

# 9. Submission

In [29]:
submission = pd.DataFrame(columns=["filename", "employee_id"])
submission["filename"] = unseen_test_data['filenames']
submission["employee_id"] = le.classes_[ridge_preds]

# Lowercase any "UNKNOWN" labels
submission["employee_id"] = submission["employee_id"].apply(
    lambda x: x.lower() if x == "UNKNOWN" else x
)

submission.head(10)

Unnamed: 0,filename,employee_id
0,face_10000.jpg,emp014
1,face_10001.jpg,emp004
2,face_10002.jpg,emp004
3,face_10003.jpg,emp004
4,face_10004.jpg,emp004
5,face_10005.jpg,unknown
6,face_10006.jpg,emp004
7,face_10007.jpg,emp004
8,face_10008.jpg,emp004
9,face_10009.jpg,emp022


In [30]:
submission.to_csv("submission.csv", index=False)