In [2]:
from PIL import Image
import os
import hashlib
import shutil
import sys
import numpy as np
from insightface.app import FaceAnalysis
from typing import Dict, List, Optional, Generator

import matplotlib.pyplot as plt

try:
    import pillow_heif
    pillow_heif.register_heif_opener()
    print("HEIC/HEIF support enabled.")
except ImportError:
    print("Warning: 'pillow-heif' not installed. HEIC/HEIF files will be skipped.")
    print("Install with: pip install pillow-heif")

class ImageDataloader:
    """
    Scans a directory for unique images and provides batches for processing.

    This class is implemented as a Python generator. It does not
    inherit from torch.utils.data.DataLoader, as our use case
    requires a simple, stateful iterator.
    """
    IMAGE_EXTENSIONS: tuple = ('.jpg', '.jpeg', '.png', '.heic', '.heif')

    def __init__(self, root_dir: str, batch_size: int = 32):
        if not os.path.isdir(root_dir):
            raise ValueError(f"Root directory not found: {root_dir}")
        if batch_size <= 0:
            raise ValueError("Batch size must be greater than 0")

        self.root_dir = root_dir
        self.batch_size = batch_size
        
        # self.labels will hold the "working state" of all images
        # {image_path: "unknown"}
        self.labels: Dict[str, str] = {}
        self._scan_and_deduplicate()

    def _calculate_hash(self, filepath: str, block_size: int = 65536) -> str:
        """
        Calculates the SHA256 hash of a file's content.
        """
        sha256 = hashlib.sha256()
        try:
            with open(filepath, 'rb') as f:
                while chunk := f.read(block_size):
                    sha256.update(chunk)
            return sha256.hexdigest()
        except (IOError, OSError) as e:
            print(f"Warning: Could not read file for hashing: {filepath}. Skipping. Error: {e}")
            return ""

    def _scan_and_deduplicate(self):
        """
        Walks the root directory, finds all unique images, and
        populates self.labels with the default 'unknown' label.
        """
        print(f"Scanning directory: {self.root_dir}...")
        image_hashes: set[str] = set()
        total_files = 0
        duplicates_skipped = 0

        for root, _, files in os.walk(self.root_dir):
            for file in files:
                if not file.lower().endswith(self.IMAGE_EXTENSIONS):
                    continue

                total_files += 1
                full_path = os.path.join(root, file)
                file_hash = self._calculate_hash(full_path)

                if not file_hash:
                    continue

                if file_hash not in image_hashes:
                    image_hashes.add(file_hash)
                    # All images start as 'unknown'
                    self.labels[full_path] = "unknown"
                else:
                    duplicates_skipped += 1

        print("--- Scan Complete ---")
        print(f"Total image files found: {total_files}")
        print(f"Duplicate images skipped: {duplicates_skipped}")
        print(f"Total unique images to process: {len(self.labels)}")
        if not self.labels:
            print("Warning: No valid, unique images were found.")

    def __len__(self) -> int:
        """
        Returns the total number of unique images to be processed.
        """
        return len(self.labels)

    def __iter__(self) -> Generator[Dict[str, str], None, None]:
        """
        Yields batches of images as dictionaries {image_path: label}.
        """
        # Get a static list of paths to iterate over
        all_paths = list(self.labels.keys())
        
        for i in range(0, len(all_paths), self.batch_size):
            batch_paths = all_paths[i : i + self.batch_size]
            
            # Create the batch dict
            batch_data = {path: self.labels[path] for path in batch_paths}
            
            if batch_data:
                yield batch_data


# -------------------------------------------------------
# Step 1. Load RetinaFace model
# -------------------------------------------------------
def load_retinaface_model():
    app = FaceAnalysis(name="buffalo_l")  # uses RetinaFace + ArcFace
    app.prepare(ctx_id=0, det_size=(640, 640))  # GPU: ctx_id=0
    return app

# -------------------------------------------------------
# Step 2. Detect faces using Pillow images
# -------------------------------------------------------
def detect_with_RetinaFace(batch_data, model):
    """
    batch_data: dict {image_path: label}
    model: RetinaFace model (insightface FaceAnalysis)

    Processes only images labeled 'unknown'.
    Returns updated batch_data with detections.
    """
    updated_data = batch_data.copy()

    # 1️⃣ Filter for unknown images
    unknown_items = [(path, label) for path, label in batch_data.items() if label == "unknown"]
    if not unknown_items:
        return updated_data

    image_paths = [p for p, _ in unknown_items]

    # 2️⃣ Load all images once
    loaded_images = {}
    for img_path in image_paths:
        try:
            img = Image.open(img_path).convert("RGB")
            img_bgr = np.array(img)[:, :, ::-1]  # RGB → BGR
            loaded_images[img_path] = img_bgr
        except Exception as e:
            print(f"⚠️ Error reading {img_path}: {e}")
            updated_data[img_path] = "invalid"

    # 3️⃣ Run inference per image (insightface doesn't support batch)
    for img_path, img_bgr in loaded_images.items():
        faces = model.get(img_bgr)  # must be called one by one

        if len(faces) > 0:
            updated_data[img_path] = "people"

    return updated_data

test_dir = r"D:\images\HockingHills"
output = r"D:\images\output"

# 1. Instantiate the Dataloader
# Using a small batch size for the example
dataloader = ImageDataloader(root_dir=test_dir, batch_size=32)

retinaface_model = load_retinaface_model()

for batch in dataloader:
    result = detect_with_RetinaFace(batch, retinaface_model)
    print(result)
    break

for key in batch:
    print(key, batch[key], result[key])

HEIC/HEIF support enabled.
Scanning directory: D:\images\HockingHills...
--- Scan Complete ---
Total image files found: 91
Duplicate images skipped: 0
Total unique images to process: 91
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\xiaom/.insightface\models\buffalo_l\1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\xiaom/.insightface\models\buffalo_l\2d106det.onnx landmark_2d_106 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\xiaom/.insightface\models\buffalo_l\det_10g.onnx detection [1, 3, '?', '?'] 127.5 128.0
Applied providers: ['CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}}
find model: C:\Users\xiaom/.insightface\models\buffalo_l\genderage.onnx genderage ['None', 3, 96, 96] 0.0 1.0
Applied prov