In [2]:
import cv2
import torch
import numpy as np
from PIL import Image
from facenet_pytorch import MTCNN, InceptionResnetV1
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Lock
import pyttsx3
from collections import defaultdict

# Thread and task management
executor = ThreadPoolExecutor(max_workers=2)
task_lock = Lock()

# Device setup
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Initialize pyttsx3 engine
engine = pyttsx3.init()

# Load models
mtcnn = MTCNN(keep_all=True, device=device, min_face_size=25)
resnet = InceptionResnetV1(pretrained='vggface2').eval().to(device)

# Load embeddings and names
dataset_path = "/Users/rajataggarwal/Desktop/dataset"
data = np.load(f"{dataset_path}/embeddings.npz")
stored_embeddings = data['embeddings']
name_list = data['names']

# Webcam setup
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 480)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 360)

if not cap.isOpened():
    print("Error: Could not open camera.")
    exit()

# Parameters
DISTANCE_THRESHOLD = 0.75
face_appearance_times = defaultdict(list)
unknown_face_timestamps = []  # To track timestamps of unknown faces
processing = False
processing_time = 0.0
frame_counter = 0

# Frame processing function
def process_frame(input_frame, resized_frame):
    global processing, processing_time
    start_time = time.time()

    img_rgb = cv2.cvtColor(resized_frame, cv2.COLOR_BGR2RGB)
    boxes, _ = mtcnn.detect(img_rgb)
    faces = mtcnn(Image.fromarray(img_rgb))
  
    if faces is not None and boxes is not None:
        try:
            faces = torch.stack([f for f in faces]).to(device)
        except:
            processing = False
            return

        with torch.no_grad():
            embeddings = resnet(faces).cpu().numpy()

        for box, embedding in zip(boxes, embeddings):
            distances = np.sum((stored_embeddings - embedding) ** 2, axis=1)
            min_idx = np.argmin(distances)
            min_distance = distances[min_idx]
            name = name_list[min_idx] if min_distance < DISTANCE_THRESHOLD else "Unknown"

            current_time = time.time()
            face_appearance_times[name].append(current_time)

            if name == "Unknown":
                # Track the timestamp for unknown faces
                unknown_face_timestamps.append(current_time)

    processing_time = time.time() - start_time
    processing = False

# Text-to-speech function
def speak_name(names):
    if names:
        text = ", ".join(names)
        engine.say(text)
        engine.runAndWait()

# Main recognition loop
def start_recognition():
    global processing, frame_counter  # Ensure frame_counter is global
    print("Starting face recognition... Press 'q' to stop.")

    face_appearance_times.clear()
    spoken_names = set()
    spoken_unknown_count = False  # Flag to prevent speaking multiple times in the same run
    start_time = time.time()

    frame_counter = 0  # Initialize frame_counter here

    while time.time() - start_time < 10:
        ret, frame = cap.read()
        if not ret:
            break
        frame_counter += 1  # Now this works because frame_counter is initialized
        if frame_counter % 3 == 0 and not processing:
            with task_lock:
                if executor._work_queue.qsize() < 2:
                    processing = True
                    resized_frame = cv2.resize(frame, (320, 240))
                    executor.submit(process_frame, frame.copy(), resized_frame)

        # Real-time duration check and speak
        current_time = time.time()
        for name, timestamps in face_appearance_times.items():
            if name not in spoken_names and len(timestamps) > 0:
                duration = current_time - min(timestamps)
                if duration >= 3:
                    print(f"Recognized: {name}")
                    speak_name([name])
                    spoken_names.add(name)

        # Check for unknown faces (duration >= 5 seconds)
        if not spoken_unknown_count:
            # Count unknown faces that have been seen for at least 5 seconds
            unknown_faces = [timestamp for timestamp in unknown_face_timestamps if current_time - timestamp >= 4]

            if unknown_faces:
                print(f"Number of unknown faces: {len(unknown_faces)}")
                speak_name([f"{len(unknown_faces)} unknown faces"])
                spoken_unknown_count = True  # Prevent speaking multiple times in the same run

    print("10 seconds completed, closing...")
    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    start_recognition()


Using device: cpu
Starting face recognition... Press 'q' to stop.
Recognized: rajat
Recognized: Unknown
Recognized: Shobit
Number of unknown faces: 2
10 seconds completed, closing...
