In [3]:
import cv2
import numpy as np
import os
import onnxruntime
from scipy.spatial.distance import cosine


# Initialize models and Haar Cascades
def initialize_models(onnx_path, cascade_path):
    face_cascade = cv2.CascadeClassifier(cascade_path)
    ort_session = onnxruntime.InferenceSession(onnx_path)
    return face_cascade, ort_session


# Manual preprocessing function
def preprocess_image(img, size=(160, 160)):
    """
    Preprocess an image for the ONNX model:
    - Resize to the required input size.
    - Normalize to the expected mean and standard deviation.
    - Convert to NCHW format.
    """
    # Resize the image
    img_resized = cv2.resize(img, size)

    # Normalize pixel values to [0, 1]
    img_normalized = img_resized / 255.0

    # Normalize using ImageNet mean and std
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    img_normalized = (img_normalized - mean) / std

    # Convert HWC to CHW format
    img_transposed = np.transpose(img_normalized, (2, 0, 1))

    # Add batch dimension
    img_batch = np.expand_dims(img_transposed, axis=0).astype(np.float32)

    return img_batch


# Load and process face embeddings from the database
def load_database(database_path, face_cascade, ort_session):
    face_database = {}
    for person_name in os.listdir(database_path):
        person_folder = os.path.join(database_path, person_name)
        if os.path.isdir(person_folder):
            embeddings = []
            for img_name in os.listdir(person_folder):
                img_path = os.path.join(person_folder, img_name)
                img = cv2.imread(img_path)
                if img is not None:
                    face_embedding = process_image(img, face_cascade, ort_session)
                    if face_embedding is not None:
                        embeddings.append(face_embedding)
            if embeddings:
                face_database[person_name] = np.mean(embeddings, axis=0)
    return face_database


# Process a single image to get the face embedding
def process_image(img, face_cascade, ort_session):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60))
    if len(faces) > 0:
        x, y, w, h = faces[0]  # Use the first detected face
        face = img[y:y+h, x:x+w]
        face_rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
        preprocessed_face = preprocess_image(face_rgb)  # Manual preprocessing
        embedding = ort_session.run(None, {"input": preprocessed_face})[0]
        return embedding
    return None


# Face Recognition: Compare detected face embedding with database
def compare_faces(known_embeddings, face_embedding, threshold=0.6):
    min_distance = float('inf')
    name = None
    for person_name, known_embedding in known_embeddings.items():
        dist = cosine(face_embedding.flatten(), known_embedding.flatten())
        if dist < min_distance and dist < threshold:
            min_distance = dist
            name = person_name
    return name


def detect_faces(frame, face_cascade, ort_session, min_face_size=160):
    """
    Detect faces in a frame using Haar Cascade and get embeddings.
    """
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    boxes = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(60, 60))
    embeddings = []

    for (x, y, w, h) in boxes:
        face = frame[y:y+h, x:x+w]

        if face.size != 0:  # Ensure the face is not empty
            # Resize the face if it's smaller than the minimum size
            if face.shape[0] < min_face_size or face.shape[1] < min_face_size:
                face = cv2.resize(face, (min_face_size, min_face_size))

            face_rgb = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)
            preprocessed_face = preprocess_image(face_rgb)  # Manual preprocessing
            embedding = ort_session.run(None, {"input": preprocessed_face})[0]
            embeddings.append(((x, y, w, h), embedding))
    return embeddings


def annotate_frame(frame, detections, face_database, threshold=0.6):
    """
    Annotates the frame with bounding boxes and labels for detected faces.
    Draws 'Unknown' for faces not in the database.
    """
    for (x, y, w, h), embedding in detections:
        # Compare the detected face with the database
        name = compare_faces(face_database, embedding, threshold=threshold)

        # Set the label for the bounding box
        if name is None:
            name = "Unknown"

        # Draw bounding box
        cv2.rectangle(frame, (x, y), (x+w, y+h), (0, 255, 0), 2)
        # Add label
        cv2.putText(frame, name, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)

    return frame


# Main Loop: Capture video frames and apply pipeline
def main_loop(database_path, onnx_path, cascade_path):
    # Initialize models
    face_cascade, ort_session = initialize_models(onnx_path, cascade_path)

    # Load face database
    print("Loading database...")
    face_database = load_database(database_path, face_cascade, ort_session)
    print("Database loaded.")

    # Initialize webcam
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Camera not accessible!")
        cap.release()
        return

    print("Starting webcam feed...")
    while True:
        ret, frame = cap.read()
        if not ret:
            print("Error: Failed to capture frame.")
            break

        # Detect faces and get embeddings
        detections = detect_faces(frame, face_cascade, ort_session)

        # Annotate frame with bounding boxes and names
        frame = annotate_frame(frame, detections, face_database)

        # Display the frame
        cv2.imshow('Camera Feed', frame)

        # Break on 'Esc' key
        if cv2.waitKey(1) & 0xFF == 27:
            break

    # Release resources
    cap.release()
    cv2.destroyAllWindows()


# Run the pipeline
if __name__ == "__main__":
    database_path = '/home/theosiam/Repos/Autotrust/Autotrust/Face_Verification/Database'  # Replace with your database path
    cascade_path = '/home/theosiam/Repos/Autotrust/Autotrust/Face_Verification/haarcascade_frontalface_default.xml'
    onnx_path = "/home/theosiam/Repos/Autotrust/Autotrust/Face_Verification/resnet.onnx"  # Path to your exported ONNX model
    main_loop(database_path, onnx_path, cascade_path)


Loading database...
Database loaded.
Starting webcam feed...
