In [1]:
## Facefusion Installation

In [34]:
# Install required packages for improved clustering
import subprocess
import sys

# Install scikit-learn for DBSCAN clustering
subprocess.check_call([sys.executable, "-m", "pip", "install", "scikit-learn"])
subprocess.run(["python", "uninstall.py", "--onnxruntime", "default", "--skip-conda"])
subprocess.run(["python", "install.py", "--onnxruntime-gpu", "default", "--skip-conda"])

print("✅ scikit-learn installed successfully")

✅ scikit-learn installed successfully


In [36]:
# Check GPU availability
import onnxruntime as ort

print("ONNXRuntime version:", ort.__version__)
print("Available providers:", ort.get_available_providers())

# Check different GPU options
cuda_available = 'CUDAExecutionProvider' in ort.get_available_providers()
dml_available = 'DmlExecutionProvider' in ort.get_available_providers()

if cuda_available:
    print("✅ CUDA is available!")
    recommended_providers = ["cuda", "cpu"]
elif dml_available:
    print("✅ DirectML is available! (Windows GPU acceleration)")
    recommended_providers = ["dml", "cpu"]
else:
    print("❌ No GPU acceleration available, using CPU only")
    recommended_providers = ["cpu"]

print(f"\n🔧 Recommended execution providers: {recommended_providers}")

# Test GPU functionality if available
if cuda_available or dml_available:
    try:
        # Test creating a session
        import numpy as np
        providers = ['CUDAExecutionProvider', 'CPUExecutionProvider'] if cuda_available else ['DmlExecutionProvider', 'CPUExecutionProvider']
        print("✅ GPU execution provider can be initialized")
    except Exception as e:
        print(f"❌ GPU test failed: {e}")
        recommended_providers = ["cpu"]

print(f"\n💡 Use this in your facefusion config: execution_providers = {recommended_providers}")

ONNXRuntime version: 1.19.2
Available providers: ['TensorrtExecutionProvider', 'CUDAExecutionProvider', 'CPUExecutionProvider']
✅ CUDA is available!

🔧 Recommended execution providers: ['cuda', 'cpu']
✅ GPU execution provider can be initialized

💡 Use this in your facefusion config: execution_providers = ['cuda', 'cpu']


In [37]:
import os

# Set the root directory to your local Windows path
root_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest"
os.chdir(root_dir)
print(f"Current working directory: {os.getcwd()}")

Current working directory: C:\Users\nethe\OneDrive\Documents\GitHub\facetest


In [5]:
import os
import subprocess

# Set the version
FACEFUSION_VERSION = "3.3.2"

# Clone the repository with the correct branch
result = subprocess.run(
    ["git", "clone", "https://github.com/facefusion/facefusion.git", 
     "--branch", FACEFUSION_VERSION, "--single-branch"],
    capture_output=True,
    text=True
)

if result.returncode != 0:
    print(f"Error cloning repository: {result.stderr}")
else:
    print(f"Successfully cloned facefusion version {FACEFUSION_VERSION}")

# Change to the facefusion directory and install dependencies
os.chdir("facefusion")

# Install with pip (Windows compatible)
subprocess.run(["python", "install.py", "--onnxruntime-gpu", "default", "--skip-conda"])

# Run the initial downloads for models
subprocess.run(["python", "facefusion.py", "headless-run",
    "--processors", "face_swapper", "face_enhancer",
    "--face-enhancer-model", "gfpgan_1.4",
    "--face-detector-model", "retinaface",
    "--face-occluder-model", "xseg_3",
    "--face-swapper-model", "hyperswap_1c_256"])

subprocess.run(["python", "facefusion.py", "headless-run",
    "--processors", "face_swapper", "face_enhancer",
    "--face-enhancer-model", "gfpgan_1.4",
    "--face-detector-model", "retinaface",
    "--face-occluder-model", "xseg_3",
    "--face-swapper-model", "inswapper_128"])

# Return to parent directory
os.chdir("..")

Error cloning repository: fatal: destination path 'facefusion' already exists and is not an empty directory.



## Face detection

In [38]:
import mimetypes


def is_image_file(file_path: str) -> bool:
    """Check if the file is an image based on its MIME type."""

    mime_type, _ = mimetypes.guess_type(file_path)
    return mime_type is not None and mime_type.startswith("image/")

In [39]:
from dataclasses import dataclass
import time
from typing import Callable, Literal
import cv2
import math
import logging
import os
import sys
from typing import List, Union
from PIL import Image

sys.path.append(f"{root_dir}/facefusion")

from facefusion.processors.types import ProcessorStateKey  # noqa: E402
from facefusion.types import Face, StateKey, VisionFrame  # noqa: E402
from facefusion import state_manager  # noqa: E402
from facefusion.face_analyser import get_many_faces  # noqa: E402
from facefusion.face_selector import compare_faces, sort_and_filter_faces  # noqa: E402
from facefusion.vision import read_video_frame, normalize_frame_color, read_image  # noqa: E402
from facefusion.uis.components.face_selector import extract_gallery_frames  # noqa: E402

checks_per_second = float(os.getenv(key="CHECKS_PER_SECOND", default="2"))
face_distance = float(os.getenv(key="reference_face_distance", default="0.4"))


def init_state_with_getenv(key: Union[StateKey, ProcessorStateKey], default: str):
    state_manager.init_item(key=key, value=float(os.getenv(key, default)))


state_manager.init_item("download_providers", ["github"])
state_manager.init_item("execution_device_id", "0")
state_manager.init_item("execution_providers", [os.getenv("execution_provider", "cpu")])
state_manager.init_item("face_selector_mode", "reference")
state_manager.init_item("face_parser_model", "bisenet_resnet_34")  # cSpell:disable-line
state_manager.init_item("face_landmarker_model", "2dfan4")  # cSpell:disable-line
state_manager.init_item("face_detector_angles", [0])
state_manager.init_item("face_detector_model", "retinaface")
state_manager.init_item("face_detector_size", "640x640")
state_manager.init_item("face_selector_order", "left-right")
init_state_with_getenv("face_landmarker_score", "0.5")


def check_if_face_exist(detected_faces: List[Face], face: Face):
    for detected_face in detected_faces:
        if compare_faces(
            face=face, reference_face=detected_face, face_distance=face_distance
        ):
            return True

    return False


@dataclass
class SwapSuccess:
    outputs: list[str]
    success: Literal[True] = True


@dataclass
class SwapFailure:
    error: str
    success: Literal[False] = False


def extract_faces_from_image(
    vision_frame: VisionFrame,
    face: Face,
    output_dir: str,
    frame_number: int,
    face_idx: int,
) -> str:
    """Extract and save a face region from an image frame.

    Args:
        vision_frame (VisionFrame): The source image frame to extract from
        face (Face): Face object containing bounding box coordinates
        output_dir (str): Directory to save the extracted face image
        idx (int): Index used to generate unique filename

    Returns:
        str: Path to the saved face image file

    The function:
    1. Gets bounding box coordinates from the face object
    2. Adds 25% padding around the face region
    3. Crops and normalizes the face region
    4. Saves it as a PNG file named '{frame_number}-{face_idx}.png'
    """
    start_x, start_y, end_x, end_y = map(int, face.bounding_box)  # type: ignore
    padding_x = int((end_x - start_x) * 0.25)
    padding_y = int((end_y - start_y) * 0.25)
    start_x = max(0, start_x - padding_x)
    start_y = max(0, start_y - padding_y)
    end_x = max(0, end_x + padding_x)
    end_y = max(0, end_y + padding_y)
    crop_vision_frame = vision_frame[start_y:end_y, start_x:end_x]
    crop_vision_frame = normalize_frame_color(crop_vision_frame)

    face_filename = os.path.join(output_dir, f"{frame_number}-{face_idx}.png")
    img = Image.fromarray(crop_vision_frame)
    img.save(face_filename)

    return face_filename


def process_image(
    task_id: str,
    file_path: str,
    output_dir: str,
    handle_complete: Callable[[str], None],
    face_detector_score: float,
) -> SwapSuccess | SwapFailure:
    """Process a single image file to detect and extract faces."""

    def log(message: object):
        logging.info(f"[{task_id}] {message}")

    state_manager.init_item("target_path", file_path)
    state_manager.init_item("face_detector_score", face_detector_score)
    log(f"processing image: {file_path}")

    # Check for NSFW content
    # is_nsfw = detect_nsfw(
    #     file_path=file_path,
    #     start_frame=0,
    #     end_frame=1,
    # )
    is_nsfw = False

    if is_nsfw:
        log("image detected as nsfw, returning zero faces")
        return SwapFailure(error="nsfw")

    os.makedirs(output_dir, exist_ok=True)

    # Read the image
    temp_vision_frame = read_image(file_path)
    if temp_vision_frame is None:
        log("failed to read image")
        return SwapFailure(error="invalid_image_file")

    # Detect faces in the image
    faces = sort_and_filter_faces(get_many_faces([temp_vision_frame]))

    face_file_paths: list[str] = []

    for idx, face in enumerate(faces):
        face_filename = extract_faces_from_image(
            vision_frame=temp_vision_frame,
            face=face,
            output_dir=output_dir,
            frame_number=0,
            face_idx=idx,
        )
        face_file_paths.append(face_filename)
        handle_complete(face_filename)

    log(f"extracted {len(faces)} faces from image")
    return SwapSuccess(outputs=face_file_paths)

# def process_image(
#     task_id: str,
#     file_path: str,
#     output_dir: str,
#     handle_complete: Callable[[str], None],
#     face_detector_score: float,
# ) -> SwapSuccess | SwapFailure:
#     import logging, os
#     def log(msg): logging.info(f"[{task_id}] {msg}")

#     state_manager.init_item("target_path", file_path)
#     state_manager.init_item("face_detector_score", face_detector_score)
#     os.makedirs(output_dir, exist_ok=True)
#     all_dir   = os.path.join(output_dir, "all")
#     grouped   = os.path.join(output_dir, "grouped")
#     final_dir = os.path.join(output_dir, "final")
#     os.makedirs(all_dir, exist_ok=True)
#     os.makedirs(grouped, exist_ok=True)
#     os.makedirs(final_dir, exist_ok=True)

#     frame = read_image(file_path)
#     if frame is None:
#         return SwapFailure(error="invalid_image_file")

#     faces = sort_and_filter_faces(get_many_faces([frame]))

#     identities: List[Face] = []
#     best_area: Dict[int, int] = {}
#     final_paths: Dict[int, str] = {}

#     for idx, face in enumerate(faces):
#         # 1) Save ALL detections
#         sx, sy, ex, ey = map(int, face.bounding_box)  # type: ignore
#         px, py = int((ex - sx)*0.25), int((ey - sy)*0.25)
#         sx, sy = max(0, sx - px), max(0, sy - py)
#         ex, ey = max(0, ex + px), max(0, ey + py)
#         crop = normalize_frame_color(frame[sy:ey, sx:ex])
#         all_path = os.path.join(all_dir, f"0-{idx}.png")
#         Image.fromarray(crop).save(all_path)

#         # 2) Assign identity and save under grouped/ID_xx/
#         ident = assign_identity(face, identities, face_distance)
#         ident_dir = os.path.join(grouped, f"ID_{ident:02d}")
#         os.makedirs(ident_dir, exist_ok=True)
#         grp_path = os.path.join(ident_dir, f"0-{idx}.png")
#         Image.fromarray(crop).save(grp_path)

#         # 3) Maintain best (final) representative per identity
#         area = bbox_area(face)
#         if ident not in best_area or area > best_area[ident]:
#             best_area[ident] = area
#             final_path = extract_faces_from_image(
#                 vision_frame=frame,
#                 face=face,
#                 output_dir=final_dir,
#                 frame_number=0,
#                 face_idx=ident,   # one file per identity
#             )
#             final_paths[ident] = final_path
#             handle_complete(final_path)

#     outputs = [final_paths[i] for i in sorted(final_paths.keys())]
#     log(f"image: {len(faces)} detections, {len(outputs)} identities")
#     return SwapSuccess(outputs=outputs)


import math, json, os
import numpy as np, cv2
from typing import List, Dict, Tuple

def extract_padded_crop(frame, face, pad_ratio: float = 0.25):
    sx, sy, ex, ey = map(int, face.bounding_box)  # type: ignore
    px, py = int((ex - sx) * pad_ratio), int((ey - sy) * pad_ratio)
    sx, sy = max(0, sx - px), max(0, sy - py)
    ex, ey = max(0, ex + px), max(0, ey + py)
    return normalize_frame_color(frame[sy:ey, sx:ex])


def _variance_of_laplacian(gray: np.ndarray) -> float:
    return float(cv2.Laplacian(gray, cv2.CV_64F).var())

def _frontalness_from_landmarks(landmarks: np.ndarray) -> float:
    # Heuristics using eye line roll and nose-centered yaw; robust to different landmark counts.
    try:
        n = landmarks.shape[0]
        if n >= 48:
            L = landmarks[[36,37,38,39,40,41]].mean(axis=0)
            R = landmarks[[42,43,44,45,46,47]].mean(axis=0)
            nose = landmarks[30] if n > 30 else landmarks[n//2]
        else:
            L = landmarks[:n//2].mean(axis=0); R = landmarks[n//2:].mean(axis=0); nose = landmarks[n//2]
        dx, dy = (R - L)
        roll_deg = abs(math.degrees(math.atan2(dy, dx)))
        roll_score = max(0.0, 1.0 - (roll_deg/30.0))  # 1 at 0°, 0 at 30°
        mid_x = 0.5*(L[0]+R[0]); half_eye = 0.5*abs(R[0]-L[0]) + 1e-6
        yaw_norm = abs(nose[0] - mid_x) / half_eye
        yaw_score = max(0.0, 1.0 - min(1.0, yaw_norm))
        return 0.5*roll_score + 0.5*yaw_score
    except Exception:
        return 0.5

# helper: always return just the crop, no matter what padded_crop returns
def _get_crop_only(frame, face, pad_ratio: float = 0.25):
    res = padded_crop(frame, face, pad_ratio=pad_ratio)
    return res[0] if isinstance(res, tuple) else res

# replace your existing face_quality_score with this
import numpy as np, cv2, math

def face_quality_score(frame: np.ndarray, face) -> tuple[float, dict]:
    """
    Composite score of size, sharpness, frontalness.
    Immune to padded_crop() returning 1, 2, or many values.
    """
    # compute a padded bbox for the area metric directly from the face bbox
    sx, sy, ex, ey = map(int, face.bounding_box)  # type: ignore
    pad_ratio = 0.25
    px, py = int((ex - sx) * pad_ratio), int((ey - sy) * pad_ratio)
    sx, sy = max(0, sx - px), max(0, sy - py)
    ex, ey = max(0, ex + px), max(0, ey + py)

    H, W = frame.shape[:2]
    area_norm = ((ex - sx) * (ey - sy)) / max(1.0, float(W * H))  # larger face => higher

    # robustly get the cropped patch
    crop = _get_crop_only(frame, face, pad_ratio=pad_ratio)

    # sharpness via Laplacian variance
    gray = cv2.cvtColor(crop, cv2.COLOR_BGR2GRAY) if crop.ndim == 3 else crop
    lap = float(cv2.Laplacian(gray, cv2.CV_64F).var())
    sharp_norm = min(1.0, lap / 300.0)  # tune 300 for your footage

    # frontalness from landmarks if present
    landmarks = getattr(face, "landmarks", None) or getattr(face, "landmark", None)
    if landmarks is not None:
        lm = np.array(landmarks, dtype=np.float32).reshape(-1, 2)
        try:
            if lm.shape[0] >= 48:
                L = lm[[36,37,38,39,40,41]].mean(axis=0)
                R = lm[[42,43,44,45,46,47]].mean(axis=0)
                nose = lm[30] if lm.shape[0] > 30 else lm[lm.shape[0]//2]
            else:
                L = lm[:lm.shape[0]//2].mean(axis=0)
                R = lm[lm.shape[0]//2:].mean(axis=0)
                nose = lm[lm.shape[0]//2]
            dx, dy = (R - L)
            roll_deg = abs(math.degrees(math.atan2(dy, dx)))
            roll_score = max(0.0, 1.0 - (roll_deg / 30.0))
            mid_x = 0.5 * (L[0] + R[0]); half_eye = 0.5 * abs(R[0] - L[0]) + 1e-6
            yaw_norm = abs(nose[0] - mid_x) / half_eye
            yaw_score = max(0.0, 1.0 - min(1.0, yaw_norm))
            frontal = 0.5 * roll_score + 0.5 * yaw_score
        except Exception:
            frontal = 0.5
    else:
        frontal = 0.5

    score = 0.4 * area_norm + 0.4 * sharp_norm + 0.2 * frontal
    return score, {"area": area_norm, "sharp": sharp_norm, "frontal": frontal}



# def process_video(
#     task_id: str,
#     file_path: str,
#     output_dir: str,
#     handle_complete: Callable[[str], None],
#     face_detector_score: float,
# ) -> SwapSuccess | SwapFailure:
#     """Process a video file to detect and extract faces."""

#     def log(message: object):
#         logging.info(f"[{task_id}] {message}")

#     def debug(message: object):
#         if os.getenv("DEBUG", "false").lower() == "true":
#             logging.info(f"[{task_id}] {message}")

#     state_manager.init_item("target_path", file_path)
#     state_manager.init_item("face_detector_score", face_detector_score)
#     video = cv2.VideoCapture(file_path)
#     if not video.isOpened():
#         raise ValueError("Error: Could not open video.")

#     # Get total frame count
#     total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
#     if total_frames <= 0:  # Some codecs like HEVC may not report frame count correctly
#         video.set(cv2.CAP_PROP_POS_AVI_RATIO, 1)  # Seek to end
#         total_frames = int(video.get(cv2.CAP_PROP_POS_FRAMES))
#         video.set(cv2.CAP_PROP_POS_AVI_RATIO, 0)  # Seek back to start

#     log(f"video has {total_frames} frames")

#     # is_nsfw = detect_nsfw(
#     #     file_path=file_path,
#     #     start_frame=0,
#     #     end_frame=total_frames,
#     # )
#     is_nsfw = False
#     if is_nsfw:
#         log("video detected as nsfw, returning zero faces")
#         return SwapFailure(error="nsfw")
#     else:
#         log("video is not nsfw, continuing")

#     fps = float(video.get(cv2.CAP_PROP_FPS))
#     frames_to_skip = math.ceil(fps / checks_per_second)

#     log(
#         f"video has {fps} fps and {total_frames} frames, detecting face every {frames_to_skip} frames"
#     )
#     os.makedirs(output_dir, exist_ok=True)

#     detected_faces: List[Face] = []

#     face_count = 0

#     face_file_paths: list[str] = []

#     checked_duration: float = 0
#     checked_count: int = 0

#     current_frame = 0
#     while current_frame < total_frames:
#         start = time.time()

#         debug(f"reading frame {current_frame}")
#         temp_vision_frame = read_video_frame(
#             video_path=file_path, frame_number=current_frame
#         )
#         if temp_vision_frame is None:
#             current_frame += frames_to_skip
#             continue

#         faces = extract_gallery_frames(temp_vision_frame=temp_vision_frame)
#         debug(f"extracted {len(faces)} gallery frames")

#         faces = get_many_faces([temp_vision_frame])
#         debug(f"extracted {len(faces)} faces")

#         for idx, face in enumerate(faces):
#             if check_if_face_exist(detected_faces=detected_faces, face=face):
#                 debug(f"face {idx} already exists")
#                 continue

#             detected_faces.append(face)
#             face_count += 1

#             debug(f"extracting face {idx}")
#             face_filename = extract_faces_from_image(
#                 vision_frame=temp_vision_frame,
#                 face=face,
#                 output_dir=output_dir,
#                 frame_number=current_frame,
#                 face_idx=idx,
#             )
#             debug(f"extracted face {idx} to {face_filename}")
#             face_file_paths.append(face_filename)
#             handle_complete(face_filename)

#         current_frame += frames_to_skip
#         checked_duration += time.time() - start
#         checked_count += 1
#     video.release()
#     cv2.destroyAllWindows()

#     log(
#         f"checked {checked_count} frames and found {face_count} faces. Took {round(checked_duration * 1000)}ms at {round(checked_duration / checked_count * 1000)}ms/frame"
#     )

#     return SwapSuccess(outputs=face_file_paths)

def process_video(
    task_id: str,
    file_path: str,
    output_dir: str,
    handle_complete: Callable[[str], None],
    face_detector_score: float,
) -> SwapSuccess | SwapFailure:
    """Original behavior preserved; additionally saves ALL detections and FINAL deduped."""
    def log(message: object):
        logging.info(f"[{task_id}] {message}")

    def debug(message: object):
        if os.getenv("DEBUG", "false").lower() == "true":
            logging.info(f"[{task_id}] {message}")

    # NEW — ensure output subfolders
    all_dir   = os.path.join(output_dir, "all")
    final_dir = os.path.join(output_dir, "final")
    os.makedirs(all_dir, exist_ok=True)
    os.makedirs(final_dir, exist_ok=True)

    state_manager.init_item("target_path", file_path)
    state_manager.init_item("face_detector_score", face_detector_score)
    video = cv2.VideoCapture(file_path)
    if not video.isOpened():
        raise ValueError("Error: Could not open video.")

    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    if total_frames <= 0:
        video.set(cv2.CAP_PROP_POS_AVI_RATIO, 1)
        total_frames = int(video.get(cv2.CAP_PROP_POS_FRAMES))
        video.set(cv2.CAP_PROP_POS_AVI_RATIO, 0)

    log(f"video has {total_frames} frames")

    is_nsfw = False
    if is_nsfw:
        log("video detected as nsfw, returning zero faces")
        return SwapFailure(error="nsfw")
    else:
        log("video is not nsfw, continuing")

    fps = float(video.get(cv2.CAP_PROP_FPS))
    frames_to_skip = math.ceil(fps / checks_per_second)
    log(f"video has {fps} fps and {total_frames} frames, detecting face every {frames_to_skip} frames")

    detected_faces: List[Face] = []  # original dedup set
    face_count = 0
    final_paths: list[str] = []      # NEW — paths of deduped outputs we return

    checked_duration: float = 0
    checked_count: int = 0

    current_frame = 0
    while current_frame < total_frames:
        start = time.time()

        debug(f"reading frame {current_frame}")
        temp_vision_frame = read_video_frame(video_path=file_path, frame_number=current_frame)
        if temp_vision_frame is None:
            current_frame += frames_to_skip
            continue

        # (You had an extra extract_gallery_frames call; keep if you want, but get_many_faces is what matters)
        faces = get_many_faces([temp_vision_frame])
        debug(f"extracted {len(faces)} faces")

        for idx, face in enumerate(faces):
            # --- NEW: save ALL detections (padded crop), regardless of dedup ---
            sx, sy, ex, ey = map(int, face.bounding_box)  # type: ignore
            px, py = int((ex - sx) * 0.25), int((ey - sy) * 0.25)
            sx, sy = max(0, sx - px), max(0, sy - py)
            ex, ey = max(0, ex + px), max(0, ey + py)
            crop = normalize_frame_color(temp_vision_frame[sy:ey, sx:ex])
            all_path = os.path.join(all_dir, f"{current_frame}-{idx}.png")
            Image.fromarray(crop).save(all_path)

            # --- ORIGINAL: dedup check using your existing threshold/compare_faces ---
            if check_if_face_exist(detected_faces=detected_faces, face=face):
                debug(f"frame {current_frame} face {idx} duplicate -> skip final")
                continue

            # it's a NEW identity in the original logic — save into final/ and record
            detected_faces.append(face)
            face_count += 1

            final_path = extract_faces_from_image(
                vision_frame=temp_vision_frame,
                face=face,
                output_dir=final_dir,               # NEW — write deduped to final/
                frame_number=current_frame,
                face_idx=idx,
            )
            final_paths.append(final_path)
            handle_complete(final_path)

        current_frame += frames_to_skip
        checked_duration += time.time() - start
        checked_count += 1

    video.release()
    cv2.destroyAllWindows()

    log(
        f"checked {checked_count} frames and found {face_count} unique faces. "
        f"Took {round(checked_duration * 1000)}ms at {round(checked_duration / max(1,checked_count) * 1000)}ms/frame"
    )

    # Return the deduped set paths (as before), now under final/
    return SwapSuccess(outputs=final_paths)

# def process_video(
#     task_id: str,
#     file_path: str,
#     output_dir: str,
#     handle_complete: Callable[[str], None],
#     face_detector_score: float,
# ) -> SwapSuccess | SwapFailure:
#     """Detect faces, save ALL crops, group by identity, and select best exemplar per identity."""
#     import logging, time
#     def log(msg): logging.info(f"[{task_id}] {msg}")

#     # Set FaceFusion state
#     state_manager.init_item("target_path", file_path)
#     state_manager.init_item("face_detector_score", face_detector_score)

#     # Prepare output dirs
#     all_dir   = os.path.join(output_dir, "all")
#     byid_dir  = os.path.join(output_dir, "by_identity")
#     final_dir = os.path.join(output_dir, "final")
#     os.makedirs(all_dir, exist_ok=True)
#     os.makedirs(byid_dir, exist_ok=True)
#     os.makedirs(final_dir, exist_ok=True)

#     # Open video
#     cap = cv2.VideoCapture(file_path)
#     if not cap.isOpened():
#         raise ValueError("Error: Could not open video.")

#     total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
#     if total <= 0:
#         cap.set(cv2.CAP_PROP_POS_AVI_RATIO, 1)
#         total = int(cap.get(cv2.CAP_PROP_POS_FRAMES))
#         cap.set(cv2.CAP_PROP_POS_AVI_RATIO, 0)
#     fps = float(cap.get(cv2.CAP_PROP_FPS)) or 24.0
#     frames_to_skip = math.ceil(fps / checks_per_second)
#     log(f"frames={total}, fps={fps}, sampling every {frames_to_skip}")

#     # Identity tracking
#     representatives: List[Face] = []                  # canonical face per identity
#     best_score: Dict[int, float] = {}                 # ID -> score
#     best_pick:  Dict[int, Tuple[int, Face]] = {}      # ID -> (frame_idx, face)
#     byid_index: Dict[int, List[str]] = {}             # ID -> list of paths

#     current = 0
#     t0 = time.time()
#     while current < total:
#         frame = read_video_frame(video_path=file_path, frame_number=current)
#         if frame is None:
#             current += frames_to_skip
#             continue

#         faces = get_many_faces([frame])  # FaceFusion detection + embeddings
#         for idx, f in enumerate(faces):
#             # 1) save ALL
#             crop = extract_padded_crop(frame, f, pad_ratio=0.25)
#             all_path = os.path.join(all_dir, f"{current}-{idx}.png")
#             Image.fromarray(crop).save(all_path)

#             # 2) assign identity via embedding distance (no cap)
#             ident = None
#             for i, ref in enumerate(representatives):
#                 if compare_faces(face=f, reference_face=ref, face_distance=face_distance):
#                     ident = i; break
#             if ident is None:
#                 representatives.append(f)
#                 ident = len(representatives) - 1

#             # 3) grouped by identity
#             ident_dir = os.path.join(byid_dir, f"ID_{ident:02d}")
#             os.makedirs(ident_dir, exist_ok=True)
#             byid_path = os.path.join(ident_dir, f"{current}-{idx}.png")
#             if byid_path != all_path:
#                 Image.fromarray(crop).save(byid_path)
#             byid_index.setdefault(ident, []).append(byid_path)

#             # 4) keep best exemplar by quality score
#             q, _ = face_quality_score(frame, f)
#             if ident not in best_score or q > best_score[ident]:
#                 best_score[ident] = q
#                 best_pick[ident]  = (current, f)

#         current += frames_to_skip

#     cap.release()
#     cv2.destroyAllWindows()

#     # Emit finals (best per identity)
#     final_paths: List[str] = []
#     for ident in sorted(best_pick.keys()):
#         frame_idx, face_obj = best_pick[ident]
#         frame2 = read_video_frame(video_path=file_path, frame_number=frame_idx)
#         if frame2 is None:
#             continue
#         out_path = extract_faces_from_image(
#             vision_frame=frame2,
#             face=face_obj,
#             output_dir=final_dir,
#             frame_number=frame_idx,
#             face_idx=ident,      # one file per identity
#         )
#         final_paths.append(out_path)
#         handle_complete(out_path)

#     # Optional manifest
#     with open(os.path.join(byid_dir, "index.json"), "w") as f:
#         json.dump({f"ID_{k:02d}": v for k, v in sorted(byid_index.items())}, f, indent=2)

#     log(f"identities={len(final_paths)}; time={time.time()-t0:.2f}s")
#     return SwapSuccess(outputs=final_paths)


def process(
    task_id: str,
    file_path: str,
    output_dir: str,
    handle_complete: Callable[[str], None],
    face_detector_score: float,
) -> SwapSuccess | SwapFailure:
    """Process either an image or video file to detect and extract faces."""
    if is_image_file(file_path):
        return process_image(
            task_id=task_id,
            file_path=file_path,
            output_dir=output_dir,
            handle_complete=handle_complete,
            face_detector_score=face_detector_score,
        )
    else:
        return process_video(
            task_id=task_id,
            file_path=file_path,
            output_dir=output_dir,
            handle_complete=handle_complete,
            face_detector_score=face_detector_score,
        )

# --- identity helpers ---
from typing import List, Dict

def bbox_area(face) -> int:
    x1, y1, x2, y2 = map(int, face.bounding_box)  # type: ignore
    return max(0, x2 - x1) * max(0, y2 - y1)

def assign_identity(face, identities: List[Face], distance: float) -> int:
    # returns existing identity index or creates a new one
    for i, ref in enumerate(identities):
        if compare_faces(face=face, reference_face=ref, face_distance=distance):
            return i
    identities.append(face)
    return len(identities) - 1

def padded_crop(frame, face, pad_ratio: float = 0.25):
    sx, sy, ex, ey = map(int, face.bounding_box)  # type: ignore
    px, py = int((ex - sx) * pad_ratio), int((ey - sy) * pad_ratio)
    sx, sy = max(0, sx - px), max(0, sy - py)
    ex, ey = max(0, ex + px), max(0, ey + py)
    return normalize_frame_color(frame[sy:ey, sx:ex])



In [40]:
# Import the improved clustering module
import sys
import os
import importlib

root_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest"
sys.path.insert(0, root_dir)  # Add to beginning of path

# Force reload if already imported
if 'improved_identity_clustering' in sys.modules:
    import improved_identity_clustering
    importlib.reload(improved_identity_clustering)
    print("✅ Reloaded improved clustering module")

# Import the improved clustering functions
from improved_identity_clustering import (
    ImprovedIdentityTracker,
    process_video_with_improved_clustering,
    enhanced_face_quality_score,
    SwapSuccess
)

print("✅ Improved clustering functions loaded")

✅ Reloaded improved clustering module
✅ Improved clustering functions loaded


# Images by Identity

In [41]:
import os, shutil
import glob, json
from typing import List, Dict
from PIL import Image

# Set local Windows paths
root_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest"

# FaceFusion imports
from facefusion import state_manager
from facefusion.face_analyser import get_many_faces
from facefusion.face_selector import compare_faces
from facefusion.vision import read_static_image

# Set source and destination directories for Windows
SRC_DIR = os.path.join(root_dir, "detected_faces", "all")
DST_DIR = os.path.join(root_dir, "detected_faces", "by_identity")

reference_faces_dir = os.path.join(root_dir, "reference_faces")
os.makedirs(reference_faces_dir, exist_ok=True)

print(f"Looking for images in: {SRC_DIR}")
print(f"Directory exists: {os.path.exists(SRC_DIR)}")

if os.path.exists(SRC_DIR):
    files = os.listdir(SRC_DIR)
    print(f"Files in directory: {len(files)}")
    if len(files) < 10:  # Show first few files if not too many
        print(f"Sample files: {files[:10]}")
else:
    print("❌ Source directory doesn't exist!")
    print("Make sure you've run the face detection cell first to create the 'all' folder")
    exit()

os.makedirs(DST_DIR, exist_ok=True)

# Use the same threshold you've been using elsewhere
DIST = float(os.getenv("reference_face_distance", "0.4"))
state_manager.init_item("face_detector_score", float(os.getenv("face_detector_score", "0.6")))
state_manager.init_item("face_selector_mode", "reference")

# Representatives and index
representatives: List = []
index: Dict[str, List[str]] = {}

# Get all image files
png_files = glob.glob(os.path.join(SRC_DIR, "*.png"))
jpg_files = glob.glob(os.path.join(SRC_DIR, "*.jpg"))
all_files = sorted(png_files + jpg_files)

print(f"Found {len(all_files)} image files to process")

# Process each image file
processed = 0
errors = 0

for path in all_files:
    try:
        print(f"Processing: {os.path.basename(path)}")

        # Check if file exists and has content
        if not os.path.exists(path):
            print(f"  ❌ File doesn't exist: {path}")
            errors += 1
            continue

        file_size = os.path.getsize(path)
        if file_size == 0:
            print(f"  ❌ File is empty: {path}")
            errors += 1
            continue

        # Try to read with OpenCV
        frame = read_static_image(path)
        if frame is None:
            print(f"  ❌ Could not read image: {path}")
            errors += 1
            continue

        # Try face detection
        faces = get_many_faces([frame])
        if not faces:
            print(f"  ⚠️ No faces found in: {path}")
            continue

        print(f"  ✅ Found {len(faces)} faces")

        for f in faces:
            # Find existing identity by embedding distance
            assigned = None
            for i, ref in enumerate(representatives):
                if compare_faces(face=f, reference_face=ref, face_distance=DIST):
                    assigned = i
                    break

            # Create a new identity if none matched
            if assigned is None:
                representatives.append(f)
                assigned = len(representatives) - 1

            ident_name = f"ID_{assigned:02d}"
            ident_dir = os.path.join(DST_DIR, ident_name)
            os.makedirs(ident_dir, exist_ok=True)

            # Copy original crop into its identity bucket
            out_path = os.path.join(ident_dir, os.path.basename(path))
            if os.path.abspath(path) != os.path.abspath(out_path):
                shutil.copy2(path, out_path)

            index.setdefault(ident_name, []).append(out_path)

        processed += 1

    except Exception as e:
        print(f"  ❌ Error processing {path}: {str(e)}")
        errors += 1

print(f"\n📊 Summary:")
print(f"Files processed: {processed}")
print(f"Errors: {errors}")
print(f"Identities found: {len(representatives)}")

if representatives:
    # Save a simple manifest
    with open(os.path.join(DST_DIR, "index.json"), "w") as f:
        json.dump(index, f, indent=2)

    for k in sorted(index.keys()):
        print(f"{k} -> {len(index[k])} images")
else:
    print("No faces found to group by identity")

Looking for images in: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces\all
Directory exists: True
Files in directory: 528
Found 528 image files to process
Processing: 1014-0.png
  ✅ Found 1 faces
Processing: 1014-1.png
  ✅ Found 1 faces
Processing: 1014-2.png
  ✅ Found 1 faces
Processing: 1027-0.png
  ✅ Found 1 faces
Processing: 1027-1.png
  ✅ Found 1 faces
Processing: 1027-2.png
  ✅ Found 1 faces
Processing: 1027-3.png
  ✅ Found 1 faces
Processing: 104-0.png
  ⚠️ No faces found in: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces\all\104-0.png
Processing: 1040-0.png
  ✅ Found 1 faces
Processing: 1040-1.png
  ✅ Found 1 faces
Processing: 1040-2.png
  ✅ Found 1 faces
Processing: 1040-3.png
  ✅ Found 1 faces
Processing: 1053-0.png
  ✅ Found 1 faces
Processing: 1053-1.png
  ✅ Found 1 faces
Processing: 1053-2.png
  ✅ Found 1 faces
Processing: 1053-3.png
  ✅ Found 1 faces
Processing: 1066-0.png
  ✅ Found 1 faces
Processing: 1066-1.png
  ✅ Found 1 faces
Proces

KeyboardInterrupt: 

In [42]:
import sys
sys.path.insert(0,
r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest")

from select_best_faces import (
    select_best_face_per_identity,
    copy_best_faces_to_output,
    get_best_reference_paths
)
from facefusion.vision import read_static_image
from facefusion.face_analyser import get_many_faces

# Path to your by_identity folder
by_identity_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces\by_identity"
output_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces"

# Select best faces from each identity cluster
best_faces = select_best_face_per_identity(
    by_identity_dir=by_identity_dir,
    read_image_fn=read_static_image,
    get_faces_fn=get_many_faces,
    verbose=True
)

# Copy best faces to output folder
best_refs_dir = copy_best_faces_to_output(
    best_faces=best_faces,
    output_dir=output_dir,
    folder_name="best_references",
    verbose=True
)

# Get list of best reference paths (for feeding to FaceFusion)
reference_paths = get_best_reference_paths(best_refs_dir,
num_references=3)
print(f"\nTop 3 reference faces:")
for path in reference_paths:
    print(f"  - {path}")

Found 12 identity clusters

Processing ID_00...
  Evaluating 290 faces...
  ✓ Best face: 3796-2.png
    Score: 0.954 (area=1.135, sharp=1.000, frontal=0.500)

Processing ID_01...
  Evaluating 19 faces...
  ✓ Best face: 78-2.png
    Score: 0.713 (area=1.037, sharp=0.495, frontal=0.500)

Processing ID_02...
  Evaluating 39 faces...
  ✓ Best face: 4524-2.png
    Score: 0.882 (area=0.955, sharp=1.000, frontal=0.500)

Processing ID_03...
  Evaluating 59 faces...
  ✓ Best face: 4121-4.png
    Score: 0.880 (area=0.980, sharp=0.969, frontal=0.500)

Processing ID_04...
  Evaluating 5 faces...
  ✓ Best face: 4381-0.png
    Score: 0.714 (area=1.075, sharp=0.458, frontal=0.500)

Processing ID_05...
  Evaluating 6 faces...
  ✓ Best face: 4342-15.png
    Score: 0.696 (area=0.984, sharp=0.505, frontal=0.500)

Processing ID_06...
  Evaluating 8 faces...
  ✓ Best face: 78-1.png
    Score: 0.738 (area=0.939, sharp=0.655, frontal=0.500)

Processing ID_07...
  Evaluating 4 faces...
  ✓ Best face: 4342-11.

# Detect Final Images

In [43]:
import shutil, os

# Set local Windows path for detected faces
root_dir = r"C:/Users/nethe/OneDrive/Documents/GitHub/facetest"
faces_root = os.path.join(root_dir, "detected_faces")

# Clean and recreate the directory
shutil.rmtree(faces_root, ignore_errors=True)
os.makedirs(faces_root, exist_ok=True)

# Example video paths - update these to your actual video files
# video_path = os.path.join(root_dir, "input_videos", "your_video.mp4")
# Or use a specific test video if you have one:
video_path = os.path.join(root_dir, "test_video3.mp4")

# Check if video exists before processing
if not os.path.exists(video_path):
    print(f"Video file not found: {video_path}")
    print("Please update the video_path variable to point to your video file")
else:
    r = process(
        task_id="test",
        file_path=video_path,
        output_dir=faces_root,          # will create {faces_root}/all and {faces_root}/final
        handle_complete=lambda p: print("final:", p),
        face_detector_score=0.6,
    )
    
    print("returned (deduped):", r.outputs if isinstance(r, SwapSuccess) else r)

final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\26-0.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\65-0.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\65-1.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\130-0.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\650-0.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\1690-1.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\2249-4.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\2379-1.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\3744-17.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\3757-15.png
final: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\final\3809-1.png
final: C:/Users/nethe/OneDr

In [44]:
import shutil, os

# Set local Windows path for detected faces
root_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest"
faces_root = os.path.join(root_dir, "detected_faces_improved")

# Clean and recreate the directory
shutil.rmtree(faces_root, ignore_errors=True)
os.makedirs(faces_root, exist_ok=True)

# Example video paths - update these to your actual video files
video_path = os.path.join(root_dir, "test_video3.mp4")

# Check if video exists before processing
if not os.path.exists(video_path):
    print(f"Video file not found: {video_path}")
    print("Please update the video_path variable to point to your video file")
else:
    print("🔍 Using improved clustering with DBSCAN...")
    r = process_video_with_improved_clustering(
        task_id="improved_test",
        file_path=video_path,
        output_dir=faces_root,
        handle_complete=lambda p: print("✅ Saved face:", p),
        face_detector_score=0.6,
        use_dbscan=True  # Enable DBSCAN clustering
    )
    
    if isinstance(r, SwapSuccess):
        print(f"\n📊 Results:")
        print(f"Total unique identities found: {len(r.outputs)}")
        
        # Read and display clustering metadata
        import json
        metadata_path = os.path.join(faces_root, "clustering_metadata.json")
        if os.path.exists(metadata_path):
            with open(metadata_path, 'r') as f:
                metadata = json.load(f)
                print(f"Total detections: {metadata['total_detections']}")
                print(f"Clustering method: {metadata['clustering_method']}")
                print("\nFaces per identity:")
                for id_name, count in metadata['identity_sizes'].items():
                    print(f"  {id_name}: {count} faces")
    else:
        print(f"Error: {r}")

🔍 Using improved clustering with DBSCAN...
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\24-0.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\276-1.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\60-2.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\60-3.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\60-4.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\60-5.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\60-6.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\4080-7.png
✅ Saved face: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces_improved\final\144-8.png
✅ Saved face: C:\Users\net

# Download Detected Faces

In [33]:
import shutil
import os

# Set local Windows paths
root_dir = r"C:/Users/nethe/OneDrive/Documents/GitHub/facetest"

# Create zip archives locally (no Google Colab download)
final_dir = os.path.join(root_dir, "detected_faces", "final")
byid_dir = os.path.join(root_dir, "detected_faces", "by_identity")

# Create zip of final faces
if os.path.exists(final_dir):
    zip_path_final = os.path.join(root_dir, "detected_faces_final")
    shutil.make_archive(zip_path_final, "zip", final_dir)
    print(f"Created zip archive: {zip_path_final}.zip")
else:
    print(f"Directory not found: {final_dir}")

# Create zip of faces grouped by identity
if os.path.exists(byid_dir):
    zip_path_byid = os.path.join(root_dir, "detected_faces_by_identity")
    shutil.make_archive(zip_path_byid, "zip", byid_dir)
    print(f"Created zip archive: {zip_path_byid}.zip")
else:
    print(f"Directory not found: {byid_dir}")

print("\nZip files have been created in your project directory.")
print("You can find them at:")
print(f"  - {root_dir}\\detected_faces_final.zip")
print(f"  - {root_dir}\\detected_faces_by_identity.zip")

Created zip archive: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces_final.zip
Directory not found: C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces\by_identity

Zip files have been created in your project directory.
You can find them at:
  - C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces_final.zip
  - C:/Users/nethe/OneDrive/Documents/GitHub/facetest\detected_faces_by_identity.zip


## helpers

In [46]:
from enum import Enum
from dataclasses import dataclass

class JobError(Enum):
    fraud = "fraud"
    invalid_audio_path = "invalid_audio_path"
    invalid_image_path = "invalid_image_path"
    invalid_video_path = "invalid_video_path"

    invalid_audio_file = "invalid_audio_file"
    invalid_image_file = "invalid_image_file"
    invalid_video_file = "invalid_video_file"
    invalid_youtube_url = "invalid_youtube_url"
    no_source_face = "no_source_face"
    nsfw = "nsfw"
    sensitive_image = "sensitive_image"
    sensitive_prompt = "sensitive_prompt"
    too_many_faces = "too_many_faces"

    # used for virtual try on
    no_model_detected = "no_model_detected"
    no_outfit_detected = "no_outfit_detected"

@dataclass
class SwapSuccess:
    output_path: str
    is_nsfw: bool
    success: Literal[True] = True


@dataclass
class SwapFailure:
    error: JobError
    is_nsfw: bool
    success: Literal[False] = False

import subprocess


def convert_image_to_format(input_path: str, output_path: str):
    try:
        res = subprocess.run(
            args=["convert", input_path, output_path],
            capture_output=True,
            text=True,
            errors="replace",
        )
        if res.returncode != 0:
            raise Exception(res.stderr)
    except Exception as e:
        print(f"Error converting image to format: {str(e)}")
        raise Exception(e)


def convert_image_to_format(input_path: str, output_path: str):
    try:
        img = Image.open(input_path)
        img.convert("RGB").save(output_path)
        print(f"✅ Converted {input_path} -> {output_path}")
    except Exception as e:
        print(f"Error converting {input_path}: {e}")
        raise


from PIL import Image, ImageOps


def add_border_to_image(
    image_path: str, output_path: str, zoom_factor: float = 0.8, border_size: int = 30
):
    """
    Adds a border around an image after resizing it by a specified zoom factor.

    Parameters:
    - image_path (str): Path to the input image file.
    - output_path (str): Path to save the output image with the border.
    - zoom_factor (float, optional): Scaling factor for resizing the image. Defaults to 0.8 (80% of original size).
    - border_size (int, optional): Thickness of the border in pixels. Defaults to 30.
    """
    image = Image.open(image_path)

    new_width = int(image.width * zoom_factor)
    new_height = int(image.height * zoom_factor)

    zoomed_image = image.resize((new_width, new_height))

    bordered_image = ImageOps.expand(
        image=zoomed_image, border=border_size, fill="black"
    )

    bordered_image.save(output_path)


# Intermediate Output

In [47]:
# # --- intermediate MP4 helper ---
# import os, shutil
# from facefusion.temp_helper import resolve_temp_frame_paths
# from facefusion import state_manager
# from facefusion.ffmpeg import merge_video
# from facefusion.vision import restrict_video_fps

# ROOT = os.getcwd()


# def save_intermediate(pass_idx: int, temp_video_fps: float):
#     # Locate current temp frames
#     frame_paths = resolve_temp_frame_paths(state_manager.get_item("target_path"))
#     if not frame_paths:
#         print(f"[intermediate] no temp frames for pass {pass_idx:02d}")
#         return
#     temp_dir = os.path.dirname(frame_paths[0])

#     # Ensure out dir and file path
#     snap_dir = os.path.join(ROOT, "facefusion", "intermediate", f"pass_{pass_idx:02d}")
#     os.makedirs(snap_dir, exist_ok=True)
#     out_mp4 = os.path.join(snap_dir, f"output_pass_{pass_idx:02d}.mp4")

#     # Temporarily set output_path to the intermediate MP4
#     prev_out = state_manager.get_item("output_path")
#     state_manager.set_item("output_path", out_mp4)

#     # Ensure res/fps exist
#     if not state_manager.get_item("output_video_resolution"):
#         state_manager.set_item("output_video_resolution", "1280x720")
#     if not state_manager.get_item("output_video_fps"):
#         state_manager.set_item("output_video_fps", 30)

#     ok = merge_video(
#         target_path=state_manager.get_item("target_path"),
#         temp_video_fps=restrict_video_fps(
#             state_manager.get_item("target_path"),
#             state_manager.get_item("output_video_fps"),
#         ),
#         output_video_resolution=state_manager.get_item("output_video_resolution"),
#         output_video_fps=state_manager.get_item("output_video_fps"),
#         trim_frame_start=state_manager.get_item("trim_frame_start"),
#         trim_frame_end=state_manager.get_item("trim_frame_end"),
#     )
#     if ok:
#         print(f"[intermediate] video -> {out_mp4}")
#     else:
#         print(f"[intermediate] merge_video() FAILED on pass {pass_idx:02d}")

#     # Restore original output_path
#     state_manager.set_item("output_path", prev_out)

# --- intermediate MP4 helper (absolute paths) ---
# --- intermediate MP4 helper (absolute paths) ---
import os
from facefusion.temp_helper import resolve_temp_frame_paths
from facefusion import state_manager
from facefusion.ffmpeg import merge_video
from facefusion.vision import restrict_video_fps

BASE_INTER_DIR = os.path.join(root_dir, "facefusion", "intermediate")

def save_intermediate(pass_idx: int):
    # imports kept local so you only edit this function
    import glob, shutil
    from facefusion.filesystem import filter_audio_paths
    from facefusion.common_helper import get_first
    from facefusion.temp_helper import move_temp_file
    from facefusion.ffmpeg import replace_audio, restore_audio

    # 1) Locate current temp frames
    frame_paths = resolve_temp_frame_paths(state_manager.get_item("target_path"))
    if not frame_paths:
        print(f"[intermediate] no temp frames for pass {pass_idx:02d}")
        return
    temp_dir = os.path.dirname(frame_paths[0])

    # 2) Prepare per-pass dir and snapshot frames
    inter_dir = os.path.join(BASE_INTER_DIR, f"pass_{pass_idx:02d}")
    frames_dir = os.path.join(inter_dir, "frames")
    os.makedirs(inter_dir, exist_ok=True)
    shutil.rmtree(frames_dir, ignore_errors=True)
    shutil.copytree(temp_dir, frames_dir)
    fmt = state_manager.get_item("temp_frame_format") or "png"
    count = len(glob.glob(os.path.join(frames_dir, f"*.{fmt}")))
    print(f"[intermediate] copied {count} frames -> {frames_dir}")

    # 3) Write MP4 like the final export (with robust fallback)
    out_mp4 = os.path.join(inter_dir, f"output_pass_{pass_idx:02d}.mp4")
    prev_out = state_manager.get_item("output_path")
    state_manager.set_item("output_path", out_mp4)

    # ensure res/fps
    if not state_manager.get_item("output_video_resolution"):
        state_manager.set_item("output_video_resolution", "1280x720")
    if not state_manager.get_item("output_video_fps"):
        state_manager.set_item("output_video_fps", 30)

    ok = merge_video(
        target_path=state_manager.get_item("target_path"),
        temp_video_fps=restrict_video_fps(
            state_manager.get_item("target_path"),
            state_manager.get_item("output_video_fps"),
        ),
        output_video_resolution=state_manager.get_item("output_video_resolution"),
        output_video_fps=state_manager.get_item("output_video_fps"),
        trim_frame_start=state_manager.get_item("trim_frame_start"),
        trim_frame_end=state_manager.get_item("trim_frame_end"),
    )

    if ok:
        # Try to add audio; if not, mimic final fallback so file exists
        src_audio = get_first(filter_audio_paths(state_manager.get_item("source_paths") or []))
        added_audio = replace_audio(state_manager.get_item("target_path"), src_audio, out_mp4) if src_audio else False
        if not added_audio:
            restored = restore_audio(
                state_manager.get_item("target_path"), out_mp4,
                state_manager.get_item("trim_frame_start"),
                state_manager.get_item("trim_frame_end"),
            )
            if not restored:
                move_temp_file(state_manager.get_item("target_path"), out_mp4)
    else:
        print(f"[intermediate] merge_video() returned False for pass {pass_idx:02d} -> {out_mp4}")

    # 4) Verify
    if os.path.exists(out_mp4) and os.path.getsize(out_mp4) > 0:
        print(f"[intermediate] wrote {out_mp4} ({os.path.getsize(out_mp4)} bytes)")
    else:
        print(f"[intermediate] missing {out_mp4}")

    # Restore original output_path
    state_manager.set_item("output_path", prev_out)



## Multi-Face Swap


In [48]:
from dataclasses import dataclass
import json
from pathlib import Path
import os
import sys
from time import time
from typing import Any

# from ai_helpers.asset_helpers import fetch_file_from_source
# from ai_helpers.face_swap import SwapFailure, SwapSuccess
# from ai_helpers.images import add_border_to_image, convert_image_to_format
# from setup import bucket_name, get_base_args, root_dir

sys.path.append(f"{root_dir}/facefusion")

from facefusion import (
    content_analyser,
    process_manager,
    state_manager,
    face_analyser,
)
from facefusion.args import apply_args
from facefusion.common_helper import get_first
from facefusion.core import (
    conditional_append_reference_faces,
)
from facefusion.face_store import clear_reference_faces
from facefusion.ffmpeg import (
    extract_frames,
    merge_video,
    replace_audio,
    restore_audio,
)
from facefusion.filesystem import filter_audio_paths, is_video
from facefusion.processors.core import get_processors_modules
from facefusion.temp_helper import (
    clear_temp_directory,
    create_temp_directory,
    move_temp_file,
    resolve_temp_frame_paths,
)
from facefusion.vision import (
    pack_resolution,
    read_static_image,
    restrict_trim_frame,
    restrict_video_fps,
    restrict_video_resolution,
    unpack_resolution,
)

def fetch_file_from_source(bucket_name: str, download_dir: str, file_path: str) -> str:
    return file_path


@dataclass
class FaceInput:
    new_file_path: str
    frame: int
    index: int


output_path = f"{root_dir}/output.mp4"


def swap_individual_faces(
    source_local_path: str,
    face_image_paths: str,
    base_args: dict[str, Any],
) -> SwapSuccess | SwapFailure:
    start_time = time()
    faces_dir = f"{root_dir}/face_images"
    os.makedirs(faces_dir, exist_ok=True)

    # initial setup steps for facefusion
    os.chdir("facefusion")
    process_manager.start()

    args = {  # type: ignore
        **base_args,
        "face_selector_mode": "reference",
        "face_selector_order": "left-right",
        "reference_face_distance": 0.3,
        "target_path": source_local_path,
        "output_path": output_path,
    }
    apply_args(
        args=args,  # type: ignore
        apply_state_item=state_manager.init_item,
    )

    state = state_manager.get_state()
    print(json.dumps(state), flush=True)

    try:
        user_image_paths = json.loads(face_image_paths)
        faces: list[FaceInput] = []
        for img in user_image_paths:
            print(f"downloading {img} to local directory")
            temp_new_face_path = fetch_file_from_source(
                bucket_name='ignore',
                download_dir=faces_dir,
                file_path=img["new"],
            )
            new_face_path = f"{faces_dir}/{Path(img['new']).stem}.png"
            print(f"converting {temp_new_face_path} to {new_face_path}")
            convert_image_to_format(temp_new_face_path, new_face_path)

            max_zoom_attempts = int(os.getenv("FACE_IMAGE_MAX_ZOOM_ATTEMPTS", 5))

            for zoom_attempt in range(max_zoom_attempts + 1):
                try:
                    frame = read_static_image(new_face_path)
                    if frame is None:
                        print(
                            f"attempt {zoom_attempt}: error reading image {new_face_path}"
                        )
                        return SwapFailure(
                            error=JobError.invalid_image_file, is_nsfw=False
                        )

                    frame_faces = face_analyser.get_many_faces([frame])
                    frame_face_count = len(frame_faces)

                    if frame_face_count >= 1:
                        print(
                            f"attempt {zoom_attempt}: detected one face in {new_face_path}"
                        )
                        break
                    else:
                        print(
                            f"attempt {zoom_attempt}: found no faces in {new_face_path}"
                        )
                except Exception as e:
                    print(f"attempt {zoom_attempt}: error detecting faces {str(e)}")

                if zoom_attempt >= max_zoom_attempts:
                    print(
                        f"max zoom attempts reached without detecting faces in {new_face_path}. Error with no source face"
                    )
                    return SwapFailure(error=JobError.no_source_face, is_nsfw=False)

                new_face_path_expanded = (
                    f"{faces_dir}/{Path(img['new']).stem}-{zoom_attempt}.png"
                )
                add_border_to_image(new_face_path, new_face_path_expanded)
                new_face_path = new_face_path_expanded

            original_face_path = Path(img["original"])

            frame_idx = original_face_path.stem
            frame, index = map(int, frame_idx.split("-"))

            faces.append(
                FaceInput(new_file_path=new_face_path, frame=frame, index=index)
            )
    except Exception as e:
        print(e)
        return SwapFailure(error=JobError.invalid_image_path, is_nsfw=False)

    trim_frame_start, trim_frame_end = restrict_trim_frame(
        video_path=state_manager.get_item("target_path"),
        trim_frame_start=state_manager.get_item("trim_frame_start"),
        trim_frame_end=state_manager.get_item("trim_frame_end"),
    )
    if content_analyser.analyse_video(
        state_manager.get_item("target_path"), trim_frame_start, trim_frame_end
    ):
        print("video detected as nsfw, returning error")
        return SwapFailure(error=JobError.nsfw, is_nsfw=True)

    clear_temp_directory(state_manager.get_item("target_path"))
    create_temp_directory(state_manager.get_item("target_path"))

    # # ADDED by Runbo
    # import cv2

    # # If not provided, infer from source
    # ovr = state_manager.get_item("output_video_resolution")
    # ovf = state_manager.get_item("output_video_fps")
    # if not ovr or not ovf:
    #     cap = cv2.VideoCapture(state_manager.get_item("target_path"))
    #     if not cap.isOpened():
    #         return SwapFailure(error=JobError.invalid_video_file, is_nsfw=False)
    #     w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) or 1280
    #     h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) or 720
    #     fps = cap.get(cv2.CAP_PROP_FPS) or 30
    #     cap.release()
    #     if not ovr:
    #         state_manager.set_item("output_video_resolution", f"{w}x{h}")
    #     if not ovf:
    #         state_manager.set_item("output_video_fps", int(round(fps)))
    #   #-----------------------------------------------


    temp_video_resolution = pack_resolution(
        restrict_video_resolution(
            state_manager.get_item("target_path"),
            unpack_resolution(state_manager.get_item("output_video_resolution")),
        )
    )
    temp_video_fps = restrict_video_fps(
        state_manager.get_item("target_path"),
        state_manager.get_item("output_video_fps"),
    )
    print(
        f"extracting frames for video with {temp_video_fps} fps, with {temp_video_resolution} resolution"
    )
    if extract_frames(
        state_manager.get_item("target_path"),
        temp_video_resolution,
        temp_video_fps,
        trim_frame_start,
        trim_frame_end,
    ):
        print("extracting_frames_succeed")
    else:
        print("extracting_frames_failed")
        return SwapFailure(error=JobError.invalid_video_file, is_nsfw=False)

    temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item("target_path"))

    if not temp_frame_paths:
        print("temp_frames_not_found")
        raise Exception("temp_frame_paths is not set")

    # for face in faces:

    #     clear_reference_faces()
    #     state_manager.set_item("source_paths", [face.new_file_path])
    #     state_manager.set_item("reference_frame_number", face.frame)
    #     state_manager.set_item("reference_face_position", face.index)
    #     conditional_append_reference_faces()
    #     for processor_module in get_processors_modules(["face_swapper"]):
    #         print(f"processing with {processor_module}")
    #         processor_module.process_video(
    #             state_manager.get_item("source_paths"), temp_frame_paths
    #         )
    #     # save intermediates for this pass # ADDED
    #     save_intermediate(pass_idx=faces.index(face))

    # if "face_enhancer" in base_args["processors"]:
    #     print("enhancing images")
    #     for processor_module in get_processors_modules(["face_enhancer"]):
    #         processor_module.process_video(
    #             state_manager.get_item("source_paths"), temp_frame_paths
    #         )
    #         processor_module.post_process()

    # save_intermediate(pass_idx=i, temp_video_fps=temp_video_fps)
    # temp_video_fps already computed above
    for i, face in enumerate(faces):
        clear_reference_faces()
        state_manager.set_item("source_paths", [face.new_file_path])
        state_manager.set_item("reference_frame_number", face.frame)
        state_manager.set_item("reference_face_position", face.index)
        conditional_append_reference_faces()

        # swap for this identity
        for processor_module in get_processors_modules(["face_swapper"]):
            print(f"processing with {processor_module}")
            processor_module.process_video(
                state_manager.get_item("source_paths"),
                temp_frame_paths
            )
            processor_module.post_process()   # finalize writes

        # write intermediate MP4 for this pass
        # save_intermediate(pass_idx=i, temp_video_fps=30)
        save_intermediate(pass_idx=i)

        # #----------------------------
        # # SAVE Intermediate Output
        # #----------------------------
        # # import os, shutil, glob

        # # locate FaceFusion’s temp frame directory from any frame path
        # temp_frame_paths = resolve_temp_frame_paths(state_manager.get_item("target_path"))
        # temp_dir = os.path.dirname(temp_frame_paths[0])

        # # make a per-pass snapshot dir
        # snap_dir = os.path.join(root_dir, "intermediate", f"pass_{len(os.listdir(os.path.join(root_dir, 'intermediate'))):02d}") \
        #           if os.path.exists(os.path.join(root_dir, "intermediate")) else os.path.join(root_dir, "intermediate", "pass_00")
        # os.makedirs(os.path.dirname(snap_dir), exist_ok=True)

        # # copy all current frames
        # shutil.copytree(temp_dir, snap_dir, dirs_exist_ok=True)
        # print(f"[snapshot] saved frames to {snap_dir}")


    print(
        f"merging video with {state_manager.get_item('output_video_fps')} fps and {state_manager.get_item('output_video_resolution')} resolution"
    )

    if merge_video(
        target_path=state_manager.get_item("target_path"),
        temp_video_fps=temp_video_fps,
        output_video_resolution=state_manager.get_item("output_video_resolution"),
        output_video_fps=state_manager.get_item("output_video_fps"),
        trim_frame_start=trim_frame_start,
        trim_frame_end=trim_frame_end,
    ):
        print("successfully merged video")
    else:
        print("failed merged video")
        return SwapFailure(error=JobError.invalid_video_file, is_nsfw=False)

    source_audio_path = get_first(
        filter_audio_paths(state_manager.get_item("source_paths"))
    )
    if source_audio_path:
        if replace_audio(
            state_manager.get_item("target_path"),
            source_audio_path,
            state_manager.get_item("output_path"),
        ):
            print("replacing_audio_succeed")
        else:
            print("replacing_audio_skipped")
            move_temp_file(
                state_manager.get_item("target_path"),
                state_manager.get_item("output_path"),
            )
    else:
        if restore_audio(
            state_manager.get_item("target_path"),
            state_manager.get_item("output_path"),
            trim_frame_start,
            trim_frame_end,
        ):
            print("restoring_audio_succeed")
        else:
            print("restoring_audio_skipped")
            move_temp_file(
                state_manager.get_item("target_path"),
                state_manager.get_item("output_path"),
            )
    # clear temp
    print("clearing_temp")
    clear_temp_directory(state_manager.get_item("target_path"))
    # validate video
    if is_video(state_manager.get_item("output_path")):
        seconds = "{:.2f}".format((time() - start_time))
        print(f"processing_video_succeed in {seconds} seconds")
    else:
        print("processing_video_failed")
        return SwapFailure(error=JobError.invalid_video_file, is_nsfw=False)

    process_manager.end()
    os.chdir(root_dir)

    return SwapSuccess(output_path=output_path, is_nsfw=False)


# Generate Video

In [52]:
import os
import glob
import cv2
import numpy as np
from facefusion.face_analyser import get_many_faces
from facefusion.vision import read_static_image

def calculate_face_quality(face, frame):
    """
    Calculate face quality score based on multiple factors.
    Higher score = better quality.
    """
    # Face size (larger is better)
    bbox = face.bounding_box
    face_area = (bbox[2] - bbox[0]) * (bbox[3] - bbox[1])
    
    # Face crop for sharpness calculation
    y1, y2 = int(bbox[1]), int(bbox[3])
    x1, x2 = int(bbox[0]), int(bbox[2])
    face_crop = frame[y1:y2, x1:x2]
    
    # Calculate sharpness (Laplacian variance)
    if face_crop.size > 0:
        gray = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
        sharpness = cv2.Laplacian(gray, cv2.CV_64F).var()
    else:
        sharpness = 0
    
    # Frontalness score (based on landmarks)
    # Embedding confidence (if available)
    embedding_quality = np.linalg.norm(face.embedding) if hasattr(face, 'embedding') else 1.0
    
    # Combined score (weighted)
    quality_score = (face_area * 0.4) + (sharpness * 0.5) + (embedding_quality * 0.1)
    
    return quality_score

def keep_best_face_per_identity(by_identity_dir, verbose=True):
    """
    For each identity folder, keep only the best quality face and delete the rest.
    
    Args:
        by_identity_dir: Directory containing ID_xx folders
        verbose: Whether to print progress
    
    Returns:
        Dictionary with statistics about deleted faces
    """
    stats = {
        "total_identities": 0,
        "total_faces_before": 0,
        "total_faces_after": 0,
        "total_deleted": 0
    }
    
    # Get all identity folders
    identity_folders = sorted(glob.glob(os.path.join(by_identity_dir, "ID_*")))
    stats["total_identities"] = len(identity_folders)
    
    if verbose:
        print(f"\n{'='*60}")
        print(f"Processing {len(identity_folders)} identity folders")
        print(f"{'='*60}\n")
    
    for identity_folder in identity_folders:
        identity_name = os.path.basename(identity_folder)
        
        # Get all face images in this identity
        face_paths = sorted(
            glob.glob(os.path.join(identity_folder, "*.png")) +
            glob.glob(os.path.join(identity_folder, "*.jpg"))
        )
        
        if not face_paths:
            if verbose:
                print(f"{identity_name}: No faces found, skipping")
            continue
        
        stats["total_faces_before"] += len(face_paths)
        
        if len(face_paths) == 1:
            if verbose:
                print(f"{identity_name}: Only 1 face, keeping it")
            stats["total_faces_after"] += 1
            continue
        
        # Calculate quality score for each face
        face_qualities = []
        
        for face_path in face_paths:
            frame = read_static_image(face_path)
            if frame is None:
                continue
            
            faces = get_many_faces([frame])
            if not faces:
                continue
            
            face = faces[0]
            quality = calculate_face_quality(face, frame)
            face_qualities.append((face_path, quality))
        
        if not face_qualities:
            if verbose:
                print(f"{identity_name}: No valid faces found")
            continue
        
        # Sort by quality (highest first)
        face_qualities.sort(key=lambda x: x[1], reverse=True)
        
        # Keep the best face
        best_face_path = face_qualities[0][0]
        best_quality = face_qualities[0][1]
        
        # Delete all other faces
        deleted_count = 0
        for face_path, quality in face_qualities[1:]:
            try:
                os.remove(face_path)
                deleted_count += 1
            except Exception as e:
                if verbose:
                    print(f"  ⚠️ Error deleting {face_path}: {e}")
        
        stats["total_faces_after"] += 1
        stats["total_deleted"] += deleted_count
        
        if verbose:
            print(f"{identity_name}: Kept best face (quality={best_quality:.2f}), deleted {deleted_count} faces")
            print(f"  Best: {os.path.basename(best_face_path)}")
    
    if verbose:
        print(f"\n{'='*60}")
        print(f"Summary:")
        print(f"  Total identities: {stats['total_identities']}")
        print(f"  Faces before: {stats['total_faces_before']}")
        print(f"  Faces after: {stats['total_faces_after']}")
        print(f"  Faces deleted: {stats['total_deleted']}")
        print(f"{'='*60}\n")
    
    return stats

# Run the cleanup
detected_faces_improved_dir = os.path.join(root_dir, "detected_faces_improved")
by_identity_dir = os.path.join(detected_faces_improved_dir, "by_identity")

# WARNING: This will permanently delete files!
# Make sure you have a backup if needed
print("⚠️  WARNING: This will permanently delete face images!")
print("Press Ctrl+C within 5 seconds to cancel...")
import time
time.sleep(5)

stats = keep_best_face_per_identity(by_identity_dir, verbose=True)

Press Ctrl+C within 5 seconds to cancel...

Processing 61 identity folders

ID_00: Kept best face (quality=19687.69), deleted 6 faces
  Best: 36-0.png
ID_01: Kept best face (quality=39257.55), deleted 41 faces
  Best: 4116-3.png
ID_02: No valid faces found
ID_03: Kept best face (quality=687.82), deleted 2 faces
  Best: 84-3.png
ID_04: Kept best face (quality=647.90), deleted 2 faces
  Best: 60-2.png
ID_05: Kept best face (quality=537.63), deleted 0 faces
  Best: 60-3.png
ID_06: Kept best face (quality=638.83), deleted 1 faces
  Best: 84-0.png
ID_07: Kept best face (quality=23192.65), deleted 7 faces
  Best: 4128-1.png
ID_08: No valid faces found
ID_09: Kept best face (quality=4302.03), deleted 0 faces
  Best: 228-0.png
ID_10: Kept best face (quality=8488.58), deleted 24 faces
  Best: 4092-7.png
ID_11: Kept best face (quality=6132.53), deleted 3 faces
  Best: 4032-2.png
ID_12: Kept best face (quality=1496.51), deleted 2 faces
  Best: 276-1.png
ID_13: Kept best face (quality=4726.10), de

In [49]:
from facefusion.vision import read_static_image
from facefusion.face_analyser import get_many_faces
import os

print("Testing image reading...")
for char, path in source_reference_faces.items():
    print(f"\n{char}: {path}")
    print(f"  File exists: {os.path.exists(path)}")
    print(f"  File size: {os.path.getsize(path) if os.path.exists(path) else 'N/A'} bytes")
    
    # Try to read the image
    frame = read_static_image(path)
    print(f"  Frame loaded: {frame is not None}")
    
    if frame is not None:
        print(f"  Frame shape: {frame.shape}")
        
        # Try to detect faces
        faces = get_many_faces([frame])
        print(f"  Faces detected: {len(faces)}")
        if faces:
            print(f"  ✅ Successfully loaded and detected face!")
        else:
            print(f"  ❌ No faces detected in image")
    else:
        print(f"  ❌ Failed to load image")

Testing image reading...

character1: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\reference_faces\Mira.png
  File exists: True
  File size: 201168 bytes
  Frame loaded: True
  Frame shape: (602, 396, 3)
  Faces detected: 1
  ✅ Successfully loaded and detected face!

character2: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\reference_faces\Rumi.png
  File exists: True
  File size: 131862 bytes
  Frame loaded: True
  Frame shape: (439, 326, 3)
  Faces detected: 1
  ✅ Successfully loaded and detected face!

character3: C:\Users\nethe\OneDrive\Documents\GitHub\facetest\reference_faces\Zoey.jpg
  File exists: True
  File size: 8312 bytes
  Frame loaded: True
  Frame shape: (118, 159, 3)
  Faces detected: 1
  ✅ Successfully loaded and detected face!


In [50]:
import sys
sys.path.append(root_dir)
from auto_face_matcher import auto_map_faces_to_identities, get_identity_face_paths
from facefusion.vision import read_static_image
from facefusion.face_analyser import get_many_faces
from facefusion.face_selector import compare_faces

# Set your local video and face image paths
source_video_path = os.path.join(root_dir, "inputs", "test_video3.mp4")  # Update this path
face_images_dir = os.path.join(root_dir, "face_images")
detected_faces_dir = os.path.join(root_dir, "detected_faces", "final")
detected_faces_improved_dir = os.path.join(root_dir, "detected_faces_improved")

# Step 1: Define SOURCE reference faces (characters FROM the video to identify)
source_reference_faces = {
    "character1": os.path.join(root_dir, "reference_faces", "Mira.png"),
    "character2": os.path.join(root_dir, "reference_faces", "Rumi.png"),
    "character3": os.path.join(root_dir, "reference_faces", "Zoey.jpg")
}

# Step 2: Define TARGET faces (new faces you want to swap TO)
target_faces = {
    "character1": os.path.join(face_images_dir, "boy1.jpg"),
    "character2": os.path.join(face_images_dir, "girl1.jpg"),
    "character3": os.path.join(face_images_dir, "boy2.jpg")
}

# Auto-match source references to identity clusters
print("Matching source reference faces to identity clusters...")
identity_map = auto_map_faces_to_identities(
    face_mappings=source_reference_faces,
    by_identity_dir=os.path.join(detected_faces_improved_dir, "by_identity"),
    read_image_fn=read_static_image,
    get_faces_fn=get_many_faces,
    compare_faces_fn=compare_faces,
    face_distance_threshold=0.4,
    verbose=True
)

# Get all face paths for matched identities
identity_face_paths = get_identity_face_paths(
    identity_map=identity_map,
    by_identity_dir=os.path.join(detected_faces_improved_dir, "by_identity")
)

# Create face swap configuration using TARGET faces
face_image_paths = []
for label, identity_name in identity_map.items():
    target_face = target_faces[label]  # Use target face, not source reference
    original_faces = identity_face_paths[label]
    
    for original_face in original_faces:
        face_image_paths.append({
            "new": target_face,
            "original": original_face
        })
    
    print(f"{label}: {identity_name} → swap with {target_face} ({len(original_faces)} faces)")

print(f"\nTotal swaps configured: {len(face_image_paths)}")

Matching source reference faces to identity clusters...

Auto-mapping 3 reference faces to identities

Finding identity match for: Mira.png
  ✓ ID_00: Match found (distance=27.500)
  ✓ ID_01: Match found (distance=31.971)
  ✗ ID_02: No match
  ✓ ID_03: Match found (distance=34.785)
  ✗ ID_04: No match
  ✗ ID_05: No match
  ✗ ID_06: No match
  ✓ ID_07: Match found (distance=32.125)
  ✗ ID_08: No match
  ✗ ID_09: No match
  ✗ ID_10: No match
  ✗ ID_11: No match
  ✗ ID_12: No match
  ✓ ID_13: Match found (distance=32.992)
  ✓ ID_14: Match found (distance=30.895)
  ✓ ID_15: Match found (distance=31.848)
  ✓ ID_16: Match found (distance=32.046)
  ✗ ID_17: No match
  ✓ ID_18: Match found (distance=28.537)
  ✗ ID_19: No match
  ✓ ID_20: Match found (distance=20.689)
  ✓ ID_21: Match found (distance=26.469)
  ✓ ID_22: Match found (distance=35.587)
  ✓ ID_23: Match found (distance=26.660)
  ✗ ID_24: No match
  ✓ ID_25: Match found (distance=31.790)
  ✓ ID_26: Match found (distance=36.128)
  ✓ I

In [51]:
import os
import json
from typing import Any, Literal
import glob
import sys

# Set root directory for Windows
root_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest"
os.chdir(root_dir)

# Import auto face matcher
sys.path.append(root_dir)
from auto_face_matcher import auto_map_faces_to_identities, get_identity_face_paths
from facefusion.vision import read_static_image
from facefusion.face_analyser import get_many_faces
from facefusion.face_selector import compare_faces

facefusion_default_args: dict[str, Any] = {
    "log_level": os.getenv("log_level", "info"),
    "download_providers": ["github", "huggingface"],
    "execution_providers": ["cuda", "cpu"],  # Try CUDA first, fallback to CPU
    "execution_device_id": 0,
    "processors": os.getenv("processors", "face_swapper,face_enhancer").split(","),
    "face_detector_angles": [0],
    "face_detector_model": os.getenv("face_detector_model", "retinaface"),
    "face_detector_size": "640x640",
    "face_detector_score": float(os.getenv("face_detector_score", "0.5")),
    "face_landmarker_score": float(os.getenv("face_landmarker_score", "0.5")),
    "face_mask_blur": 0.3,
    "face_mask_padding": [0, 0, 0, 0],
    "face_mask_types": os.getenv("face_mask_types", "box,occlusion").split(","),
    "face_selector_mode": os.getenv("face_selector_mode", "one"),
    "face_selector_order": os.getenv("face_selector_order", "left-right"),
    "face_occluder_model": "xseg_1",
    "face_swapper_pixel_boost": "128x128",
    "face_enhancer_model": os.getenv("face_enhancer_model", "gfpgan_1.4"),
    "face_enhancer_blend": 80,
    "face_enhancer_weight": 1.0,
    "reference_face_distance": float(os.getenv("reference_face_distance", "0.3")),
    "temp_path": os.path.join(root_dir, "temp"),  # Use local temp directory
    "temp_frame_format": "png",
    "execution_thread_count": int(os.getenv("execution_thread_count", "8")),
    "execution_queue_count": int(os.getenv("execution_queue_count", "2")),
    "output_video_encoder": "libx264",
    "output_video_quality": int(os.getenv("output_video_quality", "80")),
    "output_video_preset": "veryfast",
    "output_audio_encoder": "aac",
    "output_image_quality": int(os.getenv("output_image_quality", "100")),
}

default_args = {
    **facefusion_default_args,
    "execution_device_id": "0",
    "execution_thread_count": 16,  # Back to higher for GPU
    "execution_queue_count": 2,    # Back to higher for GPU
    "face_landmarker_model": "2dfan4",
    "face_parser_model": "bisenet_resnet_34",
    "face_occluder_model": "xseg_3",
    "face_enhancer_blend": 70,
    "output_audio_volume": 100,
    "keep_temp": True,
    "temp_path": os.path.join(root_dir, "temp"),
    "output_video_resolution": "720x1280",
}

def get_base_args(
    version: Literal["v1", "v2"],
    face_mask_mode: str,
    high_bitrate: bool,
    should_enhance: bool,
) -> dict[str, Any]:
    if version == "v1":
        face_swapper_model = "inswapper_128"
        face_swapper_pixel_boost = "128x128"
        face_mask_blur = 0.3
    else:
        face_swapper_model = "hyperswap_1c_256"
        face_swapper_pixel_boost = "256x256"
        face_mask_blur = 0.7

    if face_mask_mode == "stable":
        face_mask_types = ["box"]
        if version == "v1":
            face_mask_padding = [0, 16, 0, 16]
        else:
            face_mask_padding = [20, 16, 0, 16]
    else:
        face_mask_types = ["box", "occlusion"]
        face_mask_padding = [0, 0, 0, 0]

    if should_enhance:
        processors = ["face_swapper", "face_enhancer"]
    else:
        processors = ["face_swapper"]

    return {
        **default_args,
        "face_swapper_model": face_swapper_model,
        "face_swapper_pixel_boost": face_swapper_pixel_boost,
        "face_mask_blur": face_mask_blur,
        "output_video_preset": "ultrafast" if high_bitrate else "veryfast",
        "output_video_quality": 99 if high_bitrate else 80,
        "processors": processors,
        "face_mask_types": face_mask_types,
        "face_mask_padding": face_mask_padding,
    }

# Configuration
version = "v2"  # @param ["v1", "v2"]
should_enhance = False
face_mask_mode = "stable"
high_bitrate = False

base_args = get_base_args(version, face_mask_mode, high_bitrate, should_enhance)

# Set paths
source_video_path = os.path.join(root_dir, "inputs", "test_video3.mp4")
face_images_dir = os.path.join(root_dir, "face_images")
detected_faces_improved_dir = os.path.join(root_dir, "detected_faces_improved")

# Step 1: Define SOURCE reference faces (characters FROM the video to identify)
source_reference_faces = {
    "character1": os.path.join(root_dir, "reference_faces", "Mira.png"),
    "character2": os.path.join(root_dir, "reference_faces", "Rumi.png"),
    "character3": os.path.join(root_dir, "reference_faces", "Zoey.jpg")
}

# Step 2: Define TARGET faces (new faces you want to swap TO)
target_faces = {
    "character1": os.path.join(face_images_dir, "boy1.jpg"),
    "character2": os.path.join(face_images_dir, "girl1.jpg"),
    "character3": os.path.join(face_images_dir, "boy2.jpg")
}

# Auto-match source references to identity clusters
print("Matching source reference faces to identity clusters...")
identity_map = auto_map_faces_to_identities(
    face_mappings=source_reference_faces,
    by_identity_dir=os.path.join(detected_faces_improved_dir, "by_identity"),
    read_image_fn=read_static_image,
    get_faces_fn=get_many_faces,
    compare_faces_fn=compare_faces,
    face_distance_threshold=0.4,
    verbose=True
)

# Get all face paths for matched identities
identity_face_paths = get_identity_face_paths(
    identity_map=identity_map,
    by_identity_dir=os.path.join(detected_faces_improved_dir, "by_identity")
)

# Create face swap configuration using TARGET faces
face_image_paths = []
for label, identity_name in identity_map.items():
    target_face = target_faces[label]  # Use target face, not source reference
    original_faces = identity_face_paths[label]
    
    for original_face in original_faces:
        face_image_paths.append({
            "new": target_face,
            "original": original_face
        })
    
    print(f"{label}: {identity_name} → swap with {target_face} ({len(original_faces)} faces)")

print(f"\nTotal swaps configured: {len(face_image_paths)}")

# Check if paths exist before running
if not os.path.exists(source_video_path):
    print(f"Source video not found: {source_video_path}")
    print("Please update source_video_path to point to your video file")
elif not face_image_paths:
    print("No face swaps configured. Auto matcher found no matches.")
    print("Check that reference images exist and contain detectable faces.")
elif not all(os.path.exists(f["new"]) for f in face_image_paths):
    print("Some target face images not found. Please check face_images directory")
else:
    result = swap_individual_faces(
        source_local_path=source_video_path,
        face_image_paths=json.dumps(face_image_paths),
        base_args=base_args,
    )
    print(f"Result: {result}")

Matching source reference faces to identity clusters...

Auto-mapping 3 reference faces to identities

Finding identity match for: Mira.png
  ✓ ID_00: Match found (distance=27.500)
  ✓ ID_01: Match found (distance=31.971)
  ✗ ID_02: No match
  ✓ ID_03: Match found (distance=34.785)
  ✗ ID_04: No match
  ✗ ID_05: No match
  ✗ ID_06: No match
  ✓ ID_07: Match found (distance=32.125)
  ✗ ID_08: No match
  ✗ ID_09: No match
  ✗ ID_10: No match
  ✗ ID_11: No match
  ✗ ID_12: No match
  ✓ ID_13: Match found (distance=32.992)
  ✓ ID_14: Match found (distance=30.895)
  ✓ ID_15: Match found (distance=31.848)
  ✓ ID_16: Match found (distance=32.046)
  ✗ ID_17: No match
  ✓ ID_18: Match found (distance=28.537)
  ✗ ID_19: No match
  ✓ ID_20: Match found (distance=20.689)
  ✓ ID_21: Match found (distance=26.469)
  ✓ ID_22: Match found (distance=35.587)
  ✓ ID_23: Match found (distance=26.660)
  ✗ ID_24: No match
  ✓ ID_25: Match found (distance=31.790)
  ✓ ID_26: Match found (distance=36.128)
  ✓ I



extracting frames for video with 25.0 fps, with 1908x1080 resolution




extracting_frames_succeed
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_00\frames




[intermediate] wrote C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_00\output_pass_00.mp4 (373267055 bytes)
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_01\frames




[intermediate] wrote C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_01\output_pass_01.mp4 (373378023 bytes)
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_02\frames




[intermediate] wrote C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_02\output_pass_02.mp4 (373571115 bytes)
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_03\frames




[intermediate] wrote C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_03\output_pass_03.mp4 (373766155 bytes)
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_04\frames




[intermediate] wrote C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_04\output_pass_04.mp4 (373807941 bytes)
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_05\frames




[intermediate] wrote C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_05\output_pass_05.mp4 (373962038 bytes)
processing with <module 'facefusion.processors.modules.face_swapper' from 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest/facefusion\\facefusion\\processors\\modules\\face_swapper.py'>




[intermediate] copied 4806 frames -> C:\Users\nethe\OneDrive\Documents\GitHub\facetest\facefusion\intermediate\pass_06\frames




KeyboardInterrupt: 

In [11]:
import os
import glob

root_dir = r"C:\Users\nethe\OneDrive\Documents\GitHub\facetest"
all_dir = os.path.join(root_dir, "detected_faces", "all")

# Find and delete all empty files
empty_files = []
for file_path in glob.glob(os.path.join(all_dir, "*")):
    try:
        if os.path.getsize(file_path) == 0:
            print(f"Deleting empty file: {os.path.basename(file_path)}")
            os.remove(file_path)
            empty_files.append(file_path)
    except Exception as e:
        print(f"Could not delete {file_path}: {e}")

print(f"Deleted {len(empty_files)} empty files")

Deleting empty file: 165-0.png
Could not delete C:\Users\nethe\OneDrive\Documents\GitHub\facetest\detected_faces\all\165-0.png: [WinError 32] The process cannot access the file because it is being used by another process: 'C:\\Users\\nethe\\OneDrive\\Documents\\GitHub\\facetest\\detected_faces\\all\\165-0.png'
Deleted 0 empty files
