In [None]:
!pip install -U insightface onnxruntime opencv-python matplotlib
!apt-get install -y unrar
!pip install patool
!apt-get install -y rar

In [None]:
# Clear previous working and output directories if they exist
for folder in [working_dir, output_dir]:
    if os.path.exists(folder):
        shutil.rmtree(folder)
    os.makedirs(folder, exist_ok=True)

# Also remove previous .rar output if it exists
if os.path.exists(rar_output):
    os.remove(rar_output)

In [None]:
import os
import cv2
import math
import shutil
import concurrent.futures
import numpy as np
import patoolib
import subprocess

from pathlib import Path
from insightface.app import FaceAnalysis
from insightface.utils import face_align
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive', force_remount=False)

# EDIT THESE FOUR PATHS
ARCHIVE_INPUT_PATH    = "" # Input Path
RAW_EXTRACTION_FOLDER = "/content/crop_test_raw"
FACE_CROP_OUTPUT_FOLDER = "/content/crop_test_face_cropped"
ARCHIVE_OUTPUT_PATH   = "" # Output Path

# Fresh start – clear & extract
for folder in (RAW_EXTRACTION_FOLDER, FACE_CROP_OUTPUT_FOLDER):
    shutil.rmtree(folder, ignore_errors=True)
    os.makedirs(folder, exist_ok=True)

print("Extracting archive …")
patoolib.extract_archive(ARCHIVE_INPUT_PATH, outdir=RAW_EXTRACTION_FOLDER, verbosity=-1)

# If archive unpacks into a single top-level folder, dive in
entries = os.listdir(RAW_EXTRACTION_FOLDER)
if len(entries) == 1:
    single = os.path.join(RAW_EXTRACTION_FOLDER, entries[0])
    if os.path.isdir(single):
        RAW_EXTRACTION_FOLDER = single

# Initialise InsightFace (GPU ctx_id=0)
face_app = FaceAnalysis(name='buffalo_l', allowed_modules=['detection'])
face_app.prepare(ctx_id=0, det_size=(640, 640))

# Parameters
NUM_IO_THREADS        = 4
VALID_IMAGE_EXTENSIONS= ('.jpg', '.jpeg', '.png')
MIN_FRONTALITY_RATIO  = 0.40   # require nose roughly centered between eyes
MAX_TILT_ANGLE_DEG    = 5      # degrees before using norm_crop
OUTPUT_CROP_SIZE      = 112    # final saved face size

# Face‐crop + save helper
def save_face_crop(face, image, output_folder, filename):
    # Frontalness test using keypoints: left eye, right eye, nose
    left_eye, right_eye, nose = face.kps[:3]
    dist_left  = abs(nose[0] - left_eye[0])
    dist_right = abs(right_eye[0] - nose[0])
    max_dist = max(dist_left, dist_right)
    min_dist = min(dist_left, dist_right)
    if max_dist == 0:
      return 0

    ratio = min_dist / max_dist
    if ratio < MIN_FRONTALITY_RATIO:
        return 0

    # Check tilt angle of eyes
    dx = right_eye[0] - left_eye[0]
    dy = right_eye[1] - left_eye[1]
    tilt_angle = math.degrees(math.atan2(dy, dx))

    # Crop: if tilted, use norm_crop; else use bounding‐box square crop
    if abs(tilt_angle) > MAX_TILT_ANGLE_DEG:
        face_crop = face_align.norm_crop(image, landmark=face.kps, image_size=OUTPUT_CROP_SIZE)
    else:
        x1, y1, x2, y2 = face.bbox.astype(int)
        center_x = (x1 + x2) // 2
        center_y = (y1 + y2) // 2
        half = max((x2 - x1), (y2 - y1)) // 2
        x_start = max(center_x - half, 0)
        y_start = max(center_y - half, 0)
        square = image[y_start:y_start+2*half, x_start:x_start+2*half]
        if square.size == 0:
            return 0
        face_crop = cv2.resize(square, (OUTPUT_CROP_SIZE, OUTPUT_CROP_SIZE))

    out_path = os.path.join(output_folder, filename)
    cv2.imwrite(out_path, face_crop)
    return 1

# Threaded disk reader
io_pool = concurrent.futures.ThreadPoolExecutor(max_workers=NUM_IO_THREADS)

def read_image(path):
    return path, cv2.imread(path)

def iterate_images(folder):
    for fname in os.listdir(folder):
        if not fname.lower().endswith(VALID_IMAGE_EXTENSIONS):
            continue
        full_path = os.path.join(folder, fname)
        path, img = io_pool.submit(read_image, full_path).result()
        if img is not None:
            yield fname, img

# Main loop – walk only leaf dirs, detect & crop
total_saved = 0

for dirpath, subdirs, _ in os.walk(RAW_EXTRACTION_FOLDER):

    if subdirs:
        continue

    rel = os.path.relpath(dirpath, RAW_EXTRACTION_FOLDER)
    out_sub = os.path.join(FACE_CROP_OUTPUT_FOLDER, rel + "_crop")
    os.makedirs(out_sub, exist_ok=True)

    saved_here = 0
    for filename, image in iterate_images(dirpath):
        faces = face_app.get(image)
        if not faces:
            continue
        saved_here += save_face_crop(faces[0], image, out_sub, filename)

    print(f"{rel}: {saved_here} / {len(os.listdir(dirpath))} kept")
    total_saved += saved_here

print(f"\nTotal images saved: {total_saved}")

# Final compression to RAR
print("\nCompressing …")
subprocess.run(
    ["rar", "a", "-r", str(ARCHIVE_OUTPUT_PATH), str(FACE_CROP_OUTPUT_FOLDER)],
    check=True
)
print("Done! Archive saved at:", ARCHIVE_OUTPUT_PATH)
