In [1]:
# Instalación de librerías necesarias para el sistema de detección y censura facial

!pip install -qq retina-face facenet-pytorch opencv-python scipy gradio insightface onnxruntime

In [None]:
# 1. Importación de librerías y configuración del entorno

# Librerías del sistema y manipulación de archivos
import os
import shutil
import subprocess

# Librerías para computación y procesamiento de imágenes
import torch        # Computación en GPU, usado para modelos de deep learning
import cv2          # OpenCV: procesamiento de imágenes y videos
import numpy as np  # Operaciones numéricas con arreglos
from PIL import Image  # Manejo de imágenes (carga, conversión, etc.)

# Librerías para reconocimiento facial
from facenet_pytorch import InceptionResnetV1  # Modelo de embeddings faciales
from numpy.linalg import norm                  # Cálculo de norma para comparar vectores

# Librerías para interfaz de usuario
import gradio as gr  # Crear interfaz web para cargar imágenes y mostrar resultados

# InsightFace para detección y análisis facial avanzado
import insightface
from insightface.app import FaceAnalysis  # Carga del sistema de análisis facial completo

# Selección del dispositivo: usa GPU si está disponible, si no CPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Usando dispositivo: {device}")

Usando dispositivo: cuda


In [None]:
# 2. Inicialización de modelos

providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if device == 'cuda' else ['CPUExecutionProvider']

# Inicializa el detector facial de InsightFace (usa RetinaFace internamente)
app = FaceAnalysis(allowed_modules=['detection'], providers=providers)
app.prepare(ctx_id=0 if device == 'cuda' else -1, det_size=(640, 640))
print("RetinaFace (via InsightFace) inicializado para detección.")

# Carga del modelo InceptionResnetV1 con pesos preentrenados (vggface2) para generar embeddings faciales
resnet = InceptionResnetV1(pretrained='vggface2').eval().to(device)
print("InceptionResnetV1 (para embeddings de rostros) inicializado.")


download_path: /root/.insightface/models/buffalo_l
Downloading /root/.insightface/models/buffalo_l.zip from https://github.com/deepinsight/insightface/releases/download/v0.7/buffalo_l.zip...


100%|██████████| 281857/281857 [00:04<00:00, 65856.23KB/s]


Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/2d106det.onnx landmark_2d_106
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: /root/.insightface/models/buffalo_l/det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/genderage.onnx genderage
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
model ignore: /root/.insightface/models/buffalo_l/w600k_r50.onnx recognition
set det-size: (640, 640)
RetinaFace (via InsightFace) inicializado para detección.


  0%|          | 0.00/107M [00:00<?, ?B/s]

InceptionResnetV1 (para embeddings de rostros) inicializado.


In [None]:
# 3. Parámetros generales y estructuras para rostros conocidos

# Umbral de similitud para considerar que dos rostros son iguales
THRESHOLD = 0.8

# Crea carpeta para almacenar rostros conocidos (si no existe)
os.makedirs("known_faces", exist_ok=True)

# Lista para guardar los vectores (embeddings) de rostros conocidos y sus nombres asociados a cada uno
known_embeddings = []
known_names = []

USE_GRADIO = True

In [None]:
# 4. Funciones auxiliares para embeddings y comparación de rostros

# Convierte una imagen de rostro en un vector (embedding) usando InceptionResnetV1
def get_embedding(face_img_pil):
    if face_img_pil is None:
        return None

    # Redimensiona la imagen a 160x160 (tamaño requerido por el modelo)
    face_img_pil = face_img_pil.resize((160, 160), Image.BILINEAR)

    # Normaliza los valores de píxeles
    face_np = np.array(face_img_pil).astype(np.float32)
    face_np = (face_np - 127.5) / 128.0

    # Convierte la imagen a tensor para PyTorch
    face_tensor = torch.from_numpy(face_np).permute(2, 0, 1).unsqueeze(0)

    # Genera el embedding
    with torch.no_grad():
        emb = resnet(face_tensor.to(device))[0].cpu().numpy()
    return emb

# Compara un embedding con los embeddings conocidos y devuelve True si coincide
def is_known_face(embedding, known_embeddings, threshold=THRESHOLD):
    if not known_embeddings:
        return False
    for known_emb in known_embeddings:
        distance = norm(embedding - known_emb)
        if distance < threshold:
            return True
    return False

In [None]:
# 5. Función de censura con máscara (blur o pixelate)

def censor_face_with_mask(image, box, landmarks, method='blur', ksize=25):
    # Asegura que el tamaño del kernel sea impar (requisito de OpenCV)
    ksize_blur = ksize if ksize % 2 == 1 else ksize + 1

    # Define los límites del rostro y los ajusta al tamaño de la imagen
    x1, y1, x2, y2 = map(int, box)
    y1, y2 = max(0, y1), min(image.shape[0], y2)
    x1, x2 = max(0, x1), min(image.shape[1], x2)

    # Crea una máscara negra del mismo tamaño que la imagen
    mask = np.zeros(image.shape[:2], dtype=np.uint8)

    # Calcula centros de ojos y boca a partir de los landmarks
    eye_center_x = (landmarks[0][0] + landmarks[1][0]) / 2
    eye_center_y = (landmarks[0][1] + landmarks[1][1]) / 2
    mouth_center_x = (landmarks[3][0] + landmarks[4][0]) / 2
    mouth_center_y = (landmarks[3][1] + landmarks[4][1]) / 2

    # Calcula el centro de la elipse basada en ojos, nariz y boca
    ellipse_center_x = int((eye_center_x + landmarks[2][0] + mouth_center_x) / 3)
    ellipse_center_y = int((eye_center_y + landmarks[2][1] + mouth_center_y) / 3)

    # Calcula ancho y alto de la elipse basada en distancias faciales
    width = int(np.sqrt((landmarks[1][0] - landmarks[0][0])**2 + (landmarks[1][1] - landmarks[0][1])**2) * 2.5)
    width = max(width, x2 - x1)

    height = int(np.sqrt((mouth_center_x - eye_center_x)**2 + (mouth_center_y - eye_center_y)**2) * 2.8)
    height = max(height, y2 - y1)

    # Calcula el ángulo de inclinación de los ojos para rotar la elipse
    delta_x_eyes = landmarks[1][0] - landmarks[0][0]
    delta_y_eyes = landmarks[1][1] - landmarks[0][1]
    angle = np.degrees(np.arctan2(delta_y_eyes, delta_x_eyes))

    # Dibuja una elipse blanca en la máscara, sobre la zona del rostro
    cv2.ellipse(mask, (ellipse_center_x, ellipse_center_y), (width // 2, height // 2),
                angle, 0, 360, 255, -1)

    # Suaviza y expande ligeramente la máscara para mejorar el resultado
    mask = cv2.dilate(mask, None, iterations=2)
    mask = cv2.GaussianBlur(mask, (ksize_blur, ksize_blur), 0)

    # Normaliza la máscara a rango [0,1] y la expande a 3 canales
    mask_normalized = mask.astype(np.float32) / 255.0
    mask_normalized = np.stack([mask_normalized, mask_normalized, mask_normalized], axis=-1)

    # Crea una copia de la imagen para aplicar censura
    temp_image = image.copy()

    # Aplica el método de censura seleccionado
    if method == 'blur':
        censored_full_image = cv2.GaussianBlur(temp_image, (ksize_blur, ksize_blur), 0)
    elif method == 'pixelate':
        h_full, w_full = temp_image.shape[:2]
        ksize_pixelate = max(1, ksize_blur // 5)
        temp = cv2.resize(temp_image, (max(1, w_full // ksize_pixelate), max(1, h_full // ksize_pixelate)), interpolation=cv2.INTER_LINEAR)
        censored_full_image = cv2.resize(temp, (w_full, h_full), interpolation=cv2.INTER_NEAREST)
    else:
        censored_full_image = temp_image

    # Combina imagen original y censurada usando la máscara como alpha
    result_image = (censored_full_image * mask_normalized) + (image * (1 - mask_normalized))
    result_image = np.uint8(result_image)

    return result_image

In [None]:
# 6. Reconocimiento y censura de rostros en un frame

def recognize_and_censor(frame, known_embeddings, censor_method='blur', blur_ksize=35):
    # Convierte la imagen de BGR (OpenCV) a RGB (requerido por InsightFace)
    img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

    # Detecta rostros en la imagen usando InsightFace
    faces = app.get(img_rgb)

    # Si no se detectan rostros, retorna la imagen original
    if not faces:
        return frame

    # Procesa cada rostro detectado
    for face in faces:
        box = face.bbox          # Caja delimitadora del rostro
        landmarks = face.kps     # Puntos clave (ojos, nariz, boca)

        # Ajusta los límites de la caja al tamaño de la imagen
        x1, y1, x2, y2 = map(int, box)
        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(frame.shape[1], x2), min(frame.shape[0], y2)

        if x2 <= x1 or y2 <= y1:
            continue

        # Extrae el rostro como un recorte del frame
        face_img_cropped_np = frame[y1:y2, x1:x2]
        if face_img_cropped_np.size == 0:
            continue

        # Convierte el recorte a formato PIL y RGB para el modelo de embeddings
        face_img_pil = Image.fromarray(cv2.cvtColor(face_img_cropped_np, cv2.COLOR_BGR2RGB))

        # Obtiene el embedding del rostro
        emb = get_embedding(face_img_pil)

        # Si no es un rostro conocido, lo censura
        if emb is not None and not is_known_face(emb, known_embeddings):
            frame = censor_face_with_mask(frame, box, landmarks, method=censor_method, ksize=blur_ksize)

    # Devuelve el frame con rostros censurados (si corresponde)
    return frame

In [None]:
# 7. Detección si un archivo es video y conversión de .webm a .mp4

def is_video(filename):
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.webm']
    return any(filename.lower().endswith(ext) for ext in video_extensions)

def convert_webm_to_mp4(input_path, output_path='temp_converted.mp4'):
    os.makedirs(os.path.dirname(output_path) or '.', exist_ok=True)
    cmd = [
        'ffmpeg', '-y',
        '-i', input_path,
        '-vf', 'scale=trunc(iw/2)*2:trunc(ih/2)*2',
        '-c:v', 'libx264',
        '-pix_fmt', 'yuv420p',
        output_path
    ]
    result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode != 0:
        raise RuntimeError(f"La conversión con ffmpeg falló: {result.stderr}")
    return output_path

In [None]:
# 8. Procesamiento general de un archivo (imagen o video)

def process_single_file(file_path, censor_method='blur', blur_ksize=35):
    global known_embeddings

    output_dir = "processed_media"
    os.makedirs(output_dir, exist_ok=True)

    base_name = os.path.basename(file_path)
    name, ext = os.path.splitext(base_name)

    file_path_to_process = file_path
    temp_mp4_path = None

    if ext.lower() == '.webm':
        temp_mp4_path = os.path.join(output_dir, f"temp_{name}.mp4")
        file_path_to_process = convert_webm_to_mp4(file_path, temp_mp4_path)

    # Procesamiento si es video
    if is_video(file_path_to_process):
        cap = cv2.VideoCapture(file_path_to_process)
        if not cap.isOpened():
            if temp_mp4_path and os.path.exists(temp_mp4_path):
                os.remove(temp_mp4_path)
            raise ValueError(f"No se pudo abrir el video: {file_path_to_process}")

        # Configuración del archivo de salida
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        fps = cap.get(cv2.CAP_PROP_FPS)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        output_path = os.path.join(output_dir, f"censored_video_{name}.mp4")
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

        if not out.isOpened():
            cap.release()
            if temp_mp4_path and os.path.exists(temp_mp4_path):
                os.remove(temp_mp4_path)
            raise ValueError(f"No se pudo crear el archivo de salida de video: {output_path}. Asegúrate de que los códecs estén disponibles.")

        while True:
            ret, frame = cap.read()
            if not ret:
                break
            frame = recognize_and_censor(frame, known_embeddings, censor_method, blur_ksize)
            out.write(frame)

        cap.release()
        out.release()

        # Elimina archivo temporal si fue creado
        if temp_mp4_path and os.path.exists(temp_mp4_path):
            os.remove(temp_mp4_path)

        return output_path

    # Procesamiento si es imagen
    else:
        image = cv2.imread(file_path_to_process)
        if image is None:
            raise ValueError(f"No se pudo abrir la imagen: {file_path_to_process}")

        # Aplica censura a la imagen
        image = recognize_and_censor(image, known_embeddings, censor_method, blur_ksize)
        output_path = os.path.join(output_dir, f"censored_image_{name}.jpg")
        cv2.imwrite(output_path, image)
        return output_path

In [None]:
# 9. Gradio

if USE_GRADIO:

    # Carga de rostros conocidos
    def load_known_faces_gradio(face_files):
        global known_embeddings, known_names
        known_embeddings = []
        known_names = []

        if face_files:
            for face_file in face_files:
                img_path = face_file.name
                img_pil = Image.open(img_path).convert("RGB")
                name = os.path.splitext(os.path.basename(img_path))[0]

                img_np_rgb = np.array(img_pil)
                faces_in_known_img = app.get(img_np_rgb)

                if len(faces_in_known_img) == 0:
                    print(f"Advertencia: No se detectó ningún rostro en '{name}'. Saltando este archivo.")
                    continue
                elif len(faces_in_known_img) > 1:
                    print(f"Advertencia: Se detectaron múltiples rostros en '{name}'. Se usará el primer rostro detectado para el embedding.")

                face = faces_in_known_img[0]
                box = face.bbox
                x1, y1, x2, y2 = map(int, box)
                x1, y1 = max(0, x1), max(0, y1)
                x2, y2 = min(img_np_rgb.shape[1], x2), min(img_np_rgb.shape[0], y2)

                cropped_face_np = img_np_rgb[y1:y2, x1:x2]
                if cropped_face_np.size == 0:
                    print(f"Advertencia: El recorte del rostro en '{name}' está vacío. Saltando.")
                    continue

                cropped_face_pil = Image.fromarray(cropped_face_np)

                emb = get_embedding(cropped_face_pil)

                if emb is not None:
                    known_embeddings.append(emb)
                    known_names.append(name)
                else:
                    print(f"Advertencia: No se pudo obtener la incrustación para el rostro en '{name}'. Saltando.")
            return f"{len(known_embeddings)} rostros conocidos cargados."
        return "No se cargaron rostros conocidos."

    # Función que procesa múltiples archivos, los guarda y genera un ZIP
    def process_and_zip_and_display(files_to_process, censor_method, blur_ksize):
        output_dir = "processed_media"
        if os.path.exists(output_dir):
            shutil.rmtree(output_dir)
        os.makedirs(output_dir, exist_ok=True)

        output_dir_for_zip = "processed_media_for_zip_temp"
        if os.path.exists(output_dir_for_zip):
            shutil.rmtree(output_dir_for_zip)
        os.makedirs(output_dir_for_zip, exist_ok=True)

        if not files_to_process:
            return (
                "Por favor, sube al menos una imagen o video para procesar.",
                None,
                gr.update(visible=False)
            )

        processed_images_paths = []
        status_messages = []

        for uploaded_file in files_to_process:
            try:
                output_path = process_single_file(uploaded_file.name, censor_method, blur_ksize)
                if not is_video(output_path):
                    processed_images_paths.append(output_path)
                status_messages.append(f"{os.path.basename(uploaded_file.name)} procesado correctamente. Salida: {output_path}")
            except Exception as e:
                status_messages.append(f"Error procesando {os.path.basename(uploaded_file.name)}: {str(e)}")

        # Genera ZIP con todos los archivos procesados si hay alguno
        zip_path = None
        if os.listdir(output_dir):
            zip_base_name = os.path.join(output_dir_for_zip, "processed_media_output")
            shutil.make_archive(zip_base_name, 'zip', output_dir)
            zip_path = zip_base_name + ".zip"

        return (
            "\n".join(status_messages),
            processed_images_paths if processed_images_paths else None,
            gr.File(label="Descargar todos los medios procesados (ZIP)", value=zip_path, visible=True if zip_path else False)
        )

    # Interfaz Gradio
    with gr.Blocks() as demo:
        gr.Markdown("# Censurador de Rostros con Detección Avanzada")

        gr.Markdown("---")
        gr.Markdown("### 1. Cargar Rostros Conocidos")
        gr.Markdown("Sube imágenes de los rostros que no quieres que sean censurados.")
        known_faces_input = gr.File(
            label="Archivos de Rostros Conocidos (JPEG/PNG)",
            file_count="multiple",
            type="filepath",
            file_types=["image"]
        )
        load_button = gr.Button("Cargar Rostros Conocidos")
        load_output = gr.Textbox(label="Estado de Carga de Rostros")
        load_button.click(load_known_faces_gradio, inputs=known_faces_input, outputs=load_output)

        gr.Markdown("---")
        gr.Markdown("### 2. Procesar Medios")
        gr.Markdown("Sube las imágenes o videos que deseas procesar para censurar rostros.")
        file_input = gr.File(
            label="Imágenes o Videos (JPEG/PNG/MP4/MOV/WEBM)",
            file_count="multiple",
            type="filepath"
        )
        censor_method_radio = gr.Radio(
            ["blur"],
            label="Método de Censura",
            value="blur"
        )
        blur_ksize_slider = gr.Slider(
            minimum=5,
            maximum=101,
            step=2,
            value=35,
            label="Tamaño de Censura (Cuanto mayor, más intenso el efecto)"
        )
        process_button = gr.Button("Procesar Medios")

        processing_status = gr.Textbox(label="Estado de Procesamiento", lines=5)
        output_image_gallery = gr.Gallery(
            label="Imágenes Procesadas (Solo imágenes mostradas aquí, videos en el ZIP)",
            columns=4,
            height="auto",
            object_fit="contain",
            visible=True
        )
        output_zip_file = gr.File(
            label="Descargar todos los medios procesados (ZIP)",
            visible=False
        )

        process_button.click(
            process_and_zip_and_display,
            inputs=[file_input, censor_method_radio, blur_ksize_slider],
            outputs=[processing_status, output_image_gallery, output_zip_file]
        )

    demo.launch(debug=True)

It looks like you are running Gradio on a hosted a Jupyter notebook. For the Gradio app to work, sharing must be enabled. Automatically setting `share=True` (you can turn this off by setting `share=False` in `launch()` explicitly).

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://1db54dc7b3a8aa9249.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


Advertencia: No se detectó ningún rostro en 'authorized'. Saltando este archivo.
Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://1db54dc7b3a8aa9249.gradio.live
