In [1]:
import insightface
import cv2
import numpy as np
import faiss
import os
import json
import threading
import queue

# Load face analysis model
model = insightface.app.FaceAnalysis(name='buffalo_l', providers=['CUDAExecutionProvider'])
model.prepare(ctx_id=0, det_size=(640, 640))
print("\n✅ Model loaded successfully!")

# Database & FAISS index
face_db = {}
id_to_name = []
embedding_dim = 512  
index = faiss.IndexFlatIP(embedding_dim)  # FAISS Index for fast search
db_file = "face_db.json"
faiss_index_file = "faiss_index.bin"

# RTSP Camera Setup
RTSP_URL = "rtsp://admin:bitsathY@192.168.1.11"

def normalize(emb):
    """Normalize embeddings for cosine similarity."""
    return emb / np.linalg.norm(emb, axis=1, keepdims=True)

def detect_and_embed(img):
    """Detects faces in an image and extracts embeddings."""
    faces = model.get(img)
    if not faces:
        return [], None  # No faces detected

    faces = sorted(faces, key=lambda face: (face.bbox[2] - face.bbox[0]) * (face.bbox[3] - face.bbox[1]), reverse=True)
    embeddings = np.array([face.embedding for face in faces], dtype=np.float32)
    return faces, normalize(embeddings)

def add_face(name, img_path=None):
    """Adds all face images of a person to the database."""
    global face_db, index, id_to_name

    person_dir = f"Data/Images/{name}"
    
    if img_path:
        img_paths = [img_path]
    else:
        if not os.path.exists(person_dir):
            print(f"❌ No images found for {name}!")
            return
        img_paths = [os.path.join(person_dir, img) for img in os.listdir(person_dir) if img.endswith(('.jpg', '.png', '.jpeg'))]

    all_embeddings = []
    
    for img_path in img_paths:
        img = cv2.imread(img_path)
        if img is None:
            print(f"❌ Error reading {img_path}")
            continue
        
        faces, embeddings = detect_and_embed(img)
        if embeddings is None or len(embeddings) == 0:
            print(f"⚠️ No face detected in {img_path}")
            continue

        all_embeddings.append(embeddings[0])  # Take the largest face

    if not all_embeddings:
        print(f"❌ No valid faces added for {name}")
        return

    all_embeddings = np.vstack(all_embeddings)
    
    # Add embeddings to FAISS and store in database
    face_db[name] = all_embeddings.tolist()
    index.add(all_embeddings)
    id_to_name.extend([name] * all_embeddings.shape[0])
    
    save_db()
    print(f"✅ {name} added with {len(all_embeddings)} images!")

def capture_and_save(name):
    """Captures all profile images using the webcam and saves them."""
    angles = ["front", "left", "right", "up"]
    os.makedirs(f"Data/Images/{name}", exist_ok=True)

    for angle in angles:
        input(f"📸 Look {angle} and press ENTER to capture...")
        
        cam = cv2.VideoCapture(RTSP_URL)
        if not cam.isOpened():
            print("❌ Error: Could not open camera.")
            return

        ret, frame = cam.read()
        cam.release()
        if not ret:
            print(f"❌ Error: Could not capture {angle} image.")
            return
        
        img_path = f"Data/Images/{name}/{angle}.jpg"
        cv2.imwrite(img_path, frame)
        print(f"✅ {angle} image saved.")
        
    add_face(name)

def search_face(img):
    """Search for faces in an image using FAISS."""
    faces, test_embeddings = detect_and_embed(img)
    if test_embeddings is None:
        return []

    D, I = index.search(test_embeddings, 1)

    results = []
    for idx, (best_match_idx, best_score) in enumerate(zip(I[:, 0], D[:, 0])):
        if best_match_idx >= 0 and best_match_idx < len(id_to_name) and best_score > 0.5:
            matched_name = id_to_name[best_match_idx]
        else:
            matched_name = "Unknown"

        results.append((faces[idx].bbox, matched_name, best_score))

    return results

def rtsp_camera_thread(frame_queue):
    """Threaded RTSP camera capture to prevent lag."""
    cam = cv2.VideoCapture(RTSP_URL)
    if not cam.isOpened():
        print("❌ Error: Could not open RTSP stream.")
        return
    
    while True:
        ret, frame = cam.read()
        if not ret:
            continue

        if not frame_queue.full():
            frame_queue.put(frame)

def real_time_recognition():
    """Real-time face recognition using RTSP camera with threading."""
    frame_queue = queue.Queue(maxsize=1)
    threading.Thread(target=rtsp_camera_thread, args=(frame_queue,), daemon=True).start()

    print("📷 Starting real-time face recognition. Press 'q' to quit.")

    while True:
        if not frame_queue.empty():
            frame = frame_queue.get()

            results = search_face(frame)

            for bbox, name, score in results:
                x1, y1, x2, y2 = map(int, bbox)
                color = (0, 255, 0) if name != "Unknown" else (0, 0, 255)
                
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.putText(frame, f"{name} ({score:.2f})", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

            cv2.imshow("Real-time Face Recognition", frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cv2.destroyAllWindows()

def save_db():
    """Save face embeddings to JSON and FAISS index."""
    with open(db_file, 'w') as f:
        json.dump(face_db, f)
    faiss.write_index(index, faiss_index_file)
    print("💾 Database saved!")

def load_db():
    """Load face embeddings from JSON and restore FAISS index."""
    global face_db, index, id_to_name

    if os.path.exists(db_file):
        with open(db_file, 'r') as f:
            face_db = json.load(f)

    if os.path.exists(faiss_index_file):
        index = faiss.read_index(faiss_index_file)

    all_embeddings = []
    id_to_name.clear()
    
    for name, embeddings in face_db.items():
        embeddings = np.array(embeddings, dtype=np.float32)
        all_embeddings.append(embeddings)
        id_to_name.extend([name] * embeddings.shape[0])

    if all_embeddings:
        index.add(np.vstack(all_embeddings))
    
    print(f"✅ Loaded {len(face_db)} faces from database!")

def main():
    load_db()

    while True:
        print("\nOptions:")
        print("1️⃣ Add a face from an image")
        print("2️⃣ Capture and add a face")
        print("3️⃣ Search for a face in an image")
        print("4️⃣ Start real-time recognition")
        print("5️⃣ Exit")
        choice = input("Enter your choice: ").strip()
        
        if choice == '1':
            name = input("Enter name: ").strip()
            img_path = input("Enter image path: ").strip()
            add_face(name, img_path)
        elif choice == '2':
            capture_and_save(input("Enter name: ").strip())
        elif choice == '3':
            img = cv2.imread(input("Enter image path: ").strip())
            print(search_face(img))
        elif choice == '4':
            real_time_recognition()
        elif choice == '5':
            break

if __name__ == "__main__":
    main()


Applied providers: ['CUDAExecutionProvider', 'CPUExecutionProvider'], with options: {'CPUExecutionProvider': {}, 'CUDAExecutionProvider': {'sdpa_kernel': '0', 'use_tf32': '1', 'fuse_conv_bias': '0', 'prefer_nhwc': '0', 'tunable_op_max_tuning_duration_ms': '0', 'enable_skip_layer_norm_strict_mode': '0', 'tunable_op_tuning_enable': '0', 'tunable_op_enable': '0', 'use_ep_level_unified_stream': '0', 'device_id': '0', 'has_user_compute_stream': '0', 'gpu_external_empty_cache': '0', 'cudnn_conv_algo_search': 'EXHAUSTIVE', 'cudnn_conv1d_pad_to_nc1d': '0', 'gpu_mem_limit': '18446744073709551615', 'gpu_external_alloc': '0', 'gpu_external_free': '0', 'arena_extend_strategy': 'kNextPowerOfTwo', 'do_copy_in_default_stream': '1', 'enable_cuda_graph': '0', 'user_compute_stream': '0', 'cudnn_conv_use_max_workspace': '1'}}
find model: /home/admin1/.insightface/models/buffalo_l/1k3d68.onnx landmark_3d_68 ['None', 3, 192, 192] 0.0 1.0
Applied providers: ['CUDAExecutionProvider', 'CPUExecutionProvider'],

  P = np.linalg.lstsq(X_homo, Y)[0].T # Affine matrix. 3 x 4



Options:
1️⃣ Add a face from an image
2️⃣ Capture and add a face
3️⃣ Search for a face in an image
4️⃣ Start real-time recognition
5️⃣ Exit
