In [None]:
!pip install git+https://github.com/serengil/deepface.git
!pip install ultralytics --no-deps

In [1]:
from deepface import DeepFace
import os
import numpy as np
from sklearn.metrics.pairwise import cosine_distances
from sklearn.metrics import accuracy_score
import csv
import time
from PIL import Image




In [None]:
INPUT_PATH = "/kaggle/input/dataset/merged_lfw_cplfw_50"
SIZES = [250,100, 50, 25]
OUTPUT_BASE = "/kaggle/working/"
SPLIT_SIZE = 10

In [None]:
def resize_and_save_all(input_path, output_base, sizes):
    for size in sizes:
        output_path = os.path.join(output_base, f"merged_lfw_cplfw_50_{size}")
        if not os.path.exists(output_path):
            os.makedirs(output_path)
        for subj in os.listdir(input_path):
            subj_in = os.path.join(input_path, subj)
            subj_out = os.path.join(output_path, subj)
            if os.path.isdir(subj_in):
                os.makedirs(subj_out, exist_ok=True)
                for img_name in os.listdir(subj_in):
                    in_img_path = os.path.join(subj_in, img_name)
                    out_img_path = os.path.join(subj_out, img_name)
                    try:
                        with Image.open(in_img_path) as img:
                            img = img.convert("RGB")
                            img = img.resize((size, size), Image.LANCZOS)
                            img.save(out_img_path)
                    except Exception as e:
                        print(f"Error con {in_img_path}: {e}")

In [None]:
resize_and_save_all(INPUT_PATH, OUTPUT_BASE, SIZES)
print("Redimensionado terminado.")

In [None]:
def get_all_images(dataset_path):
    images = []
    for subj in os.listdir(dataset_path):
        subj_path = os.path.join(dataset_path, subj)
        if os.path.isdir(subj_path):
            for img in os.listdir(subj_path):
                if img.lower().endswith(('.jpg', '.jpeg', '.png')):
                    images.append((subj, os.path.join(subj_path, img)))
    return images

def extract_embeddings(images, model, detector, emb_path, csv_writer):
    save_dir = os.path.join(emb_path, model, detector)
    os.makedirs(save_dir, exist_ok=True)
    total_time = 0
    count = 0
    for subj, img_path in images:
        subj_dir = os.path.join(save_dir, subj)
        os.makedirs(subj_dir, exist_ok=True)
        emb_file = os.path.join(subj_dir, os.path.basename(img_path) + ".npy")
        if not os.path.exists(emb_file):
            try:
                start_time = time.time()
                emb = DeepFace.represent(
                    img_path=img_path,
                    model_name=model,
                    detector_backend=detector,
                    enforce_detection=True,
                    align=True
                )[0]["embedding"]
                elapsed = time.time() - start_time
                total_time += elapsed
                count += 1
                np.save(emb_file, emb)
                print(f"{model} | {detector} | {img_path} | Tiempo: {elapsed:.3f} seg")
                # Guardar en el CSV
                csv_writer.writerow([model, detector, subj, img_path, elapsed])
            except Exception as e:
                print(f"Error: {img_path} | {e}")
    if count > 0:
        avg_time = total_time / count
        print(f"\nModelo: {model} | Detector: {detector} | Tiempo total: {total_time:.2f} seg | Imágenes procesadas: {count} | Tiempo promedio: {avg_time:.3f} seg")
    else:
        print(f"\nModelo: {model} | Detector: {detector} | No se procesaron imágenes.")

In [None]:
def load_embeddings(model, detector):
    emb_dir = os.path.join(EMB_PATH, model, detector)
    subjects = {}
    for subj in os.listdir(emb_dir):
        subj_path = os.path.join(emb_dir, subj)
        if os.path.isdir(subj_path):
            lfw_embs = []
            cplfw_embs = []
            for emb_name in os.listdir(subj_path):
                emb_path = os.path.join(subj_path, emb_name)
                emb = np.load(emb_path)
                if emb_name.startswith("lfw_"):
                    lfw_embs.append((emb_name, emb))
                elif emb_name.startswith("cplfw_"):
                    cplfw_embs.append((emb_name, emb))
            if len(lfw_embs) == 10 and len(cplfw_embs) > 0:
                subjects[subj] = {"lfw": lfw_embs, "cplfw": cplfw_embs}
    return subjects

def split_subjects(subjects, split_size):
    subject_list = sorted(list(subjects.keys()))
    return [subject_list[i:i+split_size] for i in range(0, len(subject_list), split_size)]

def benchmark_block(subjects, block_subjects):
    references = {subj: [emb for name, emb in subjects[subj]["lfw"]] for subj in block_subjects}
    probes = []
    true_labels = []
    pred_labels = []
    # Prepara pruebas (todas las cplfw)
    for subj in block_subjects:
        for probe_name, probe_emb in subjects[subj]["cplfw"]:
            probes.append((probe_emb, subj))
            true_labels.append(subj)
    # Comparación solo de embeddings (coseno)
    for probe_emb, true_subj in probes:
        min_dist = float("inf")
        best_match = None
        for ref_subj, ref_embs in references.items():
            dists = cosine_distances([probe_emb], ref_embs)[0]
            dist = np.min(dists)
            if dist < min_dist:
                min_dist = dist
                best_match = ref_subj
        pred_labels.append(best_match)
    acc = accuracy_score(true_labels, pred_labels)
    return acc, len(probes)

In [None]:
BASE_INPUT = "/kaggle/working"
BASE_EMB = "/kaggle/working"
MODELS = ["Facenet", "VGG-Face", "ArcFace", "GhostFaceNet", "OpenFace"]
DETECTORS = ["retinaface", "centerface", "yunet", "yolov8","yolov11s", "yolov11n"]
SPLIT_SIZE = 10

for size in SIZES:
    print(f"-----------------INICIANDO SIZE {size}-----------------")
    DATASET_PATH = f"{BASE_INPUT}/merged_lfw_cplfw_50_{size}"
    EMB_PATH = f"{BASE_EMB}/embeddings_{size}"
    os.makedirs(EMB_PATH, exist_ok=True)
    CSV_BENCHMARK = os.path.join(EMB_PATH, f"benchmark_results_{size}.csv")
    csv_file = os.path.join(EMB_PATH, f"embedding_times_{size}.csv")

    images = get_all_images(DATASET_PATH)
    header = ["model", "detector", "subject", "image_path", "embedding_time_sec"]
    with open(csv_file, mode="w", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(header)
        for model in MODELS:
            for detector in DETECTORS:
                print(f"Extrayendo embeddings para modelo: {model} | detector: {detector} | size: {size}")
                extract_embeddings(images, model, detector, EMB_PATH, writer)
    print(f"¡Embeddings extraídos y tiempos guardados en {csv_file}!")

    with open(CSV_BENCHMARK, "w", newline='') as f:
        writer = csv.writer(f)
        writer.writerow(["model", "detector", "split", "accuracy", "n_samples"])
        for model in MODELS:
            for detector in DETECTORS:
                print(f"Benchmarking modelo: {model} | detector: {detector} | size: {size}")
                subjects = load_embeddings(model, detector)
                splits = split_subjects(subjects, SPLIT_SIZE)
                for i, block_subjects in enumerate(splits):
                    acc, n = benchmark_block(subjects, block_subjects)
                    print(f"Split {i+1}: Acc: {acc:.3f} (n={n})")
                    writer.writerow([model, detector, i+1, acc, n])
    print(f"Resultados del benchmark guardados en: {CSV_BENCHMARK}")