In [None]:
%pip install -q insightface opencv-python
%pip install -q opencv-python insightface ultralytics scikit-learn

In [1]:
%pip install -q onnxruntime # For CPU-only
%pip install -q faiss-cpu


Note: you may need to restart the kernel to use updated packages.


ERROR: Invalid requirement: '#': Expected package name at the start of dependency specifier
    #
    ^


Note: you may need to restart the kernel to use updated packages.


In [None]:
#Click the link to download the yolo model
# https://github.com/akanametov/yolo-face/releases/download/v0.0.0/yolov12l-face.pt

'wget' is not recognized as an internal or external command,
operable program or batch file.


In [3]:
# !pip uninstall numpy -y
# !pip install numpy --upgrade --force-reinstall


# Embedding saing and picle

In [None]:
import os
import cv2
import numpy as np
import pickle
from tqdm import tqdm
import faiss
from insightface.app import FaceAnalysis

# ========== CONFIG ==========
FACES_DIR = "/content/drive/MyDrive/Yolo/data/augumented"
CACHE_PATH = "face_embeddings_faiss_aug.pkl"
FAISS_INDEX_PATH = "faiss_aug.index"
EMBEDDING_DIM = 512
# ============================

# ========== Initialize InsightFace ==========
face_analyzer = FaceAnalysis(name='buffalo_l', root='~/.insightface')
face_analyzer.prepare(ctx_id=0, det_size=(640, 640))

# ========== Load Cached Embeddings if Available ==========
if os.path.exists(CACHE_PATH) and os.path.exists(FAISS_INDEX_PATH):
    print("🔁 Loading cached embeddings and FAISS index...")
    with open(CACHE_PATH, "rb") as f:
        database = pickle.load(f)
    faiss_index = faiss.read_index(FAISS_INDEX_PATH)
else:
    print("🚀 Building embedding database from scratch...")
    database = {
        "embeddings": [],
        "labels": [],
        "paths": []
    }

    for person_name in os.listdir(FACES_DIR):
        person_path = os.path.join(FACES_DIR, person_name)
        if not os.path.isdir(person_path):
            continue

        for img_name in tqdm(os.listdir(person_path), desc=f"Processing {person_name}"):
            img_path = os.path.join(person_path, img_name)
            img = cv2.imread(img_path)
            if img is None:
                continue

            faces = face_analyzer.get(img)
            if len(faces) == 0 or faces[0].embedding is None:
                continue

            embedding = faces[0].embedding.astype(np.float32)
            embedding /= np.linalg.norm(embedding)  # Normalize!
            database["embeddings"].append(embedding)
            database["labels"].append(person_name)
            database["paths"].append(img_path)

    # Convert to NumPy array
    database["embeddings"] = np.array(database["embeddings"], dtype=np.float32)

    # Save cache
    with open(CACHE_PATH, "wb") as f:
        pickle.dump(database, f)
    print("✅ Embeddings saved to:", CACHE_PATH)

    # Create FAISS index using cosine similarity (IndexFlatIP)
    faiss_index = faiss.IndexFlatIP(EMBEDDING_DIM)
    faiss_index.add(database["embeddings"])
    faiss.write_index(faiss_index, FAISS_INDEX_PATH)
    print("✅ FAISS index saved to:", FAISS_INDEX_PATH)


# Model Testing

In [None]:
import cv2
import os
import json
import numpy as np
from tqdm import tqdm
from datetime import datetime
from ultralytics import YOLO
from insightface.app import FaceAnalysis
import faiss
import pickle

# =================== CONFIG ====================
VIDEO_PATH = "/content/drive/MyDrive/Yolo/1752207756000_saraswathi.MP4"  #/content/drive/MyDrive/Yolo/1752207756000_saraswathi.MP4
YOLO_MODEL_PATH = "/content/yolov12l-face.pt"
FAISS_INDEX_PATH = "/content/drive/MyDrive/Yolo/Face_models/faiss_aug.index"
CACHE_PATH = "/content/drive/MyDrive/Yolo/Face_models/face_embeddings_faiss_aug.pkl"
OUTPUT_JSON_PATH = "output_detections.json"
OUTPUT_VIDEO_PATH = "output_tracked_video.mp4"
CAMERA_ID = "entrance camera"
SIMILARITY_THRESHOLD = 0.3
IOU_THRESHOLD = 0.3
K = 1

# ========== AOI CONFIG ==========
USE_POLYGON = True
AOI_RECT = (300, 300, 900, 900)
# AOI_POLYGON = np.array([
#     [1040, 1294],
#     [1448, 958],
#     [1528, 118],
#     [6, 138],
#     [14, 1280],
# ], dtype=np.int32)

AOI_POLYGON = np.array([
    [1596, 316],
    [1784, 326],
    [1790, 722],
    [1586, 834],
    [1598, 320]
], dtype=np.int32)  #saraswathi




In [None]:
# ========== AOI Helpers ==========
def is_inside_rectangle(box, rect):
    x1, y1, x2, y2 = box
    cx = (x1 + x2) // 2
    cy = (y1 + y2) // 2
    rx1, ry1, rx2, ry2 = rect
    return rx1 <= cx <= rx2 and ry1 <= cy <= ry2

def is_inside_polygon(box, polygon):
    x1, y1, x2, y2 = box
    cx = int((x1 + x2) / 2)
    cy = int((y1 + y2) / 2)
    return cv2.pointPolygonTest(polygon.astype(np.int32), (float(cx), float(cy)), False) >= 0


In [None]:

# ========== Load Components ==========
print("🔍 Loading models...")
model = YOLO(YOLO_MODEL_PATH)
face_analyzer = FaceAnalysis(name='buffalo_l', root='~/.insightface')
face_analyzer.prepare(ctx_id=0, det_size=(640, 640))

with open(CACHE_PATH, "rb") as f:
    database = pickle.load(f)
faiss_index = faiss.read_index(FAISS_INDEX_PATH)

# ========== Video Reader ==========
cap = cv2.VideoCapture(VIDEO_PATH)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

# ========== Output Writer ==========
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out_writer = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, fps, (width, height))

# ========== Track and Process ==========
all_results = []

results = model.track(
    source=VIDEO_PATH,
    stream=True,
    persist=True,
    save=False,
    tracker="bytetrack.yaml",
    conf=0.5
)


In [None]:

for result in tqdm(results, desc="🚀 Processing video"):
    frame = result.orig_img
    timestamp = datetime.now().isoformat()
    detections = []

    # Draw AOI
    if USE_POLYGON:
        cv2.polylines(frame, [AOI_POLYGON], isClosed=True, color=(255, 255, 0), thickness=2)
    else:
        cv2.rectangle(frame, (AOI_RECT[0], AOI_RECT[1]), (AOI_RECT[2], AOI_RECT[3]), (255, 255, 0), 2)

    online_targets = result.boxes
    if online_targets is None:
        continue

    # Detect faces
    faces = face_analyzer.get(frame)

    for i in range(len(online_targets)):
        x1, y1, x2, y2 = map(int, online_targets.xyxy[i].tolist())
        track_id = int(online_targets.id[i]) if online_targets.id is not None else -1

        # Check AOI
        if USE_POLYGON:
            if not is_inside_polygon((x1, y1, x2, y2), AOI_POLYGON):
                continue
        else:
            if not is_inside_rectangle((x1, y1, x2, y2), AOI_RECT):
                continue

        # Match with best face by IoU
        best_face = None
        max_iou = 0
        for face in faces:
            fx1, fy1, fx2, fy2 = face.bbox.astype(int)
            inter_x1 = max(x1, fx1)
            inter_y1 = max(y1, fy1)
            inter_x2 = min(x2, fx2)
            inter_y2 = min(y2, fy2)
            inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
            union_area = (x2 - x1)*(y2 - y1) + (fx2 - fx1)*(fy2 - fy1) - inter_area
            iou = inter_area / union_area if union_area > 0 else 0

            if iou > max_iou and iou > IOU_THRESHOLD:
                best_face = face
                max_iou = iou

        if best_face is None or best_face.embedding is None:
            continue

        emb = best_face.embedding.astype(np.float32).reshape(1, -1)
        emb /= np.linalg.norm(emb, axis=1, keepdims=True)

        D, I = faiss_index.search(emb, K)
        similarity = D[0][0]

        if similarity > SIMILARITY_THRESHOLD:
            label = database["labels"][I[0][0]]
        else:
            label = "Unknown"

        detections.append({
            "track_id": track_id,
            "label": label,
            "confidence": round(float(similarity), 2),
            "bbox": [x1, y1, x2, y2]
        })

        # Draw on frame
        color = (0, 255, 0) if label != "Unknown" else (0, 0, 255)
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        cv2.putText(frame, f"ID:{track_id} {label} ({similarity:.2f})", (x1, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

    all_results.append({
        "camera_id": CAMERA_ID,
        "timestamp": timestamp,
        "detections": detections
    })

    out_writer.write(frame)

cap.release()
out_writer.release()

# Save JSON output
with open(OUTPUT_JSON_PATH, "w") as f:
    json.dump(all_results, f, indent=2)

print(f"✅ JSON output saved to {OUTPUT_JSON_PATH}")
print(f"🎥 Output video saved to {OUTPUT_VIDEO_PATH}")