# USING THE TRACKING METHOD

In [1]:
import cv2
import numpy as np
import os 

In [36]:
# --- Configuration ---
DET_PROTOTXT = r"pretrained_models/deploy.prototxt.txt"
DET_MODEL = r"pretrained_models/res10_300x300_ssd_iter_140000.caffemodel"
REC_MODEL = r"pretrained_models/face_recognition_sface_2021dec.onnx"

#TARGET_IMAGE_FOLDER = "data/train"
TARGET_IMAGE_FOLDERS = ["data/train", "data/train_aug"]

CONFIDENCE_THRESHOLD = 0.5 # Minimum confidence for face detection
RECOGNITION_THRESHOLD = 0.9 # Cosine similarity threshold for matching (adjust based on testing)
TRACKING_ENABLED = True
TRACKER_TYPE = 'csrt' # Options: 'boosting', 'mil', 'kcf', 'tld', 'medianflow', 'mosse', 'csrt'
RE_DETECTION_INTERVAL = 30 # Re-run full detection every N frames

In [38]:
# --- Load Models ---
print("[INFO] Loading face detector model...")
detector_net = cv2.dnn.readNetFromCaffe(DET_PROTOTXT, DET_MODEL)

print("[INFO] Loading face recognition model...")
recognizer_net = cv2.dnn.readNetFromONNX(REC_MODEL)
print("LOADED!!!")

[INFO] Loading face detector model...
[INFO] Loading face recognition model...
LOADED!!!


In [40]:
target_embeddings = []
print(f"[INFO] Processing target images from folders: {TARGET_IMAGE_FOLDERS}...")

for folder in TARGET_IMAGE_FOLDERS:
    for image_name in os.listdir(folder):
        image_path = os.path.join(folder, image_name)
        image = cv2.imread(image_path)
        if image is None:
            print(f"Warning: Could not read image {image_path}")
            continue

        (h, w) = image.shape[:2]

        # 1. Detect face(s) in the target image
        blob = cv2.dnn.blobFromImage(cv2.resize(image, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
        detector_net.setInput(blob)
        detections = detector_net.forward()

        # Assume the largest face is the target if multiple are found
        best_face_confidence = -1
        best_face_box = None

        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]
            if confidence > CONFIDENCE_THRESHOLD:
                # Get bounding box, ensuring it's within image bounds
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                startX = max(0, startX)
                startY = max(0, startY)
                endX = min(w - 1, endX)
                endY = min(h - 1, endY)

                # Basic check for valid box size
                if endX > startX and endY > startY:
                    if confidence > best_face_confidence:
                        best_face_confidence = confidence
                        best_face_box = (startX, startY, endX, endY)

        # 2. Extract embedding if a face was found
        if best_face_box is not None:
            (startX, startY, endX, endY) = best_face_box
            face_roi = image[startY:endY, startX:endX]

            if face_roi.size == 0:
                print(f"Warning: Empty face ROI extracted from {image_path}")
                continue

            # Preprocess face for recognition model (specific to SFace)
            face_blob = cv2.dnn.blobFromImage(face_roi, 1.0 / 127.5, (112, 112), (127.5, 127.5, 127.5), swapRB=True)
            recognizer_net.setInput(face_blob)
            embedding = recognizer_net.forward()
            target_embeddings.append(embedding.flatten()) # Store the flattened embedding vector
            print(f"    -> Added embedding from {image_name}")
        else:
            print(f"Warning: No face detected in {image_path} above threshold {CONFIDENCE_THRESHOLD}")

if not target_embeddings:
    print("[ERROR] No target embeddings generated. Check target images and detection settings.")
    exit()

print(f"[INFO] Generated {len(target_embeddings)} target embeddings from {len(os.listdir('data/train')) + len(os.listdir('data/train_aug')) if os.path.exists('data/train_aug') else len(os.listdir('data/train'))} images.")



[INFO] Processing target images from folders: ['data/train', 'data/train_aug']...
    -> Added embedding from 1.jpg
    -> Added embedding from 10.jpg
    -> Added embedding from 100.jpeg
    -> Added embedding from 101.jpeg
    -> Added embedding from 102.jpeg
    -> Added embedding from 103.jpeg
    -> Added embedding from 104.jpeg
    -> Added embedding from 105.jpeg
    -> Added embedding from 106.jpeg
    -> Added embedding from 107.jpeg
    -> Added embedding from 108.jpeg
    -> Added embedding from 109.jpeg
    -> Added embedding from 11.jpg
    -> Added embedding from 110.jpg
    -> Added embedding from 111.jpg
    -> Added embedding from 112.jpeg
    -> Added embedding from 113.jpeg
    -> Added embedding from 114.jpg
    -> Added embedding from 115.jpeg
    -> Added embedding from 116.jpg
    -> Added embedding from 117.jpg
    -> Added embedding from 118.jpg
    -> Added embedding from 119.jpg
    -> Added embedding from 12.jpg
    -> Added embedding from 120.jpg
    -> Add

In [41]:
# --- Video Processing ---
VIDEO_SOURCE = "data/test/vid1.mp4" # Or 0 for webcam
OUTPUT_VIDEO_PATH = "data/test/vid1_processed_tracked_aug.mp4" # Optional: Save output
BLUR_KERNEL_SIZE = (99, 99) # Must be odd numbers; larger means more blur

print(f"[INFO] Starting video processing from {VIDEO_SOURCE}...")
cap = cv2.VideoCapture(VIDEO_SOURCE)

# Optional: Setup Video Writer
writer = None
if OUTPUT_VIDEO_PATH:
    try:
        # Get video properties for the writer
        frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        # Define the codec and create VideoWriter object
        fourcc = cv2.VideoWriter_fourcc(*'mp4v') # Or 'XVID', 'MJPG', etc.
        writer = cv2.VideoWriter(OUTPUT_VIDEO_PATH, fourcc, fps, (frame_width, frame_height))
        print(f"[INFO] Output video will be saved to {OUTPUT_VIDEO_PATH}")
    except Exception as e:
        print(f"[WARNING] Could not initialize video writer: {e}")
        writer = None

# Cosine Similarity function
def cosine_similarity(vec1, vec2):
    norm_vec1 = np.linalg.norm(vec1)
    norm_vec2 = np.linalg.norm(vec2)
    if norm_vec1 == 0 or norm_vec2 == 0:
        return 0.0 # Avoid division by zero
    return np.dot(vec1, vec2) / (norm_vec1 * norm_vec2)

# Initialize tracker
tracker = None
target_tracked = False
frame_count = 0
target_bbox = None

def create_tracker(tracker_type):
    if tracker_type == 'boosting':
        return cv2.legacy.TrackerBoosting_create()
    elif tracker_type == 'mil':
        return cv2.legacy.TrackerMIL_create()
    elif tracker_type == 'kcf':
        return cv2.legacy.TrackerKCF_create()
    elif tracker_type == 'tld':
        return cv2.legacy.TrackerTLD_create()
    elif tracker_type == 'medianflow':
        return cv2.legacy.TrackerMedianFlow_create()
    elif tracker_type == 'mosse':
        return cv2.legacy.TrackerMOSSE_create()
    elif tracker_type == "csrt":
        return cv2.legacy.TrackerCSRT_create()
    else:
        print("Invalid tracker type. Using CSRT by default.")
        return cv2.legacy.TrackerCSRT_create()

while True:
    ret, frame = cap.read()
    if not ret:
        print("[INFO] End of video stream.")
        break

    (h, w) = frame.shape[:2]

    if not TRACKING_ENABLED or not target_tracked or (frame_count % RE_DETECTION_INTERVAL == 0):
        # Perform face detection and recognition
        blob = cv2.dnn.blobFromImage(cv2.resize(frame, (300, 300)), 1.0, (300, 300), (104.0, 177.0, 123.0))
        detector_net.setInput(blob)
        detections = detector_net.forward()

        best_match_confidence = -1
        best_match_box = None
        matched_target = False

        for i in range(0, detections.shape[2]):
            confidence = detections[0, 0, i, 2]

            if confidence > CONFIDENCE_THRESHOLD:
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")
                startX = max(0, startX)
                startY = max(0, startY)
                endX = min(w - 1, endX)
                endY = min(h - 1, endY)

                if endX > startX and endY > startY:
                    face_roi = frame[startY:endY, startX:endX]
                    if face_roi.size == 0: continue

                    face_blob = cv2.dnn.blobFromImage(face_roi, 1.0 / 127.5, (112, 112), (127.5, 127.5, 127.5), swapRB=True)
                    recognizer_net.setInput(face_blob)
                    current_embedding = recognizer_net.forward().flatten()

                    for target_emb in target_embeddings:
                        similarity = cosine_similarity(current_embedding, target_emb)
                        if similarity > RECOGNITION_THRESHOLD:
                            matched_target = True
                            best_match_box = (startX, startY, endX - startX, endY - startY) # Tracker needs (x, y, w, h)
                            target_tracked = True
                            if TRACKING_ENABLED:
                                tracker = create_tracker(TRACKER_TYPE)
                                tracker.init(frame, best_match_box)
                            break # Target found, no need to check other embeddings
                    if matched_target:
                        break # Target found, no need to check other detections

        if not matched_target:
            target_tracked = False
            target_bbox = None

    elif TRACKING_ENABLED and target_tracked and tracker is not None:
        # Track the target
        success, bbox = tracker.update(frame)
        if success:
            (x, y, w_track, h_track) = [int(v) for v in bbox]
            target_bbox = (x, y, x + w_track, y + h_track)
        else:
            target_tracked = False
            target_bbox = None
            tracker = None # Reset tracker if tracking fails

    # Apply blurring if target is identified or tracked
    if target_tracked and target_bbox is not None:
        startX, startY, endX, endY = target_bbox
        face_roi = frame[startY:endY, startX:endX]
        if face_roi.size > 0:
            blurred_face = cv2.GaussianBlur(face_roi, (BLUR_KERNEL_SIZE[0] // 2 * 2 + 1, BLUR_KERNEL_SIZE[1] // 2 * 2 + 1), 0) # Ensure odd kernel size
            frame[startY:endY, startX:endX] = blurred_face
            cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 0, 255), 2) # Red box for target

    # Display the output frame
    cv2.imshow("Frame", frame)

    # Write frame to output video (if writer is initialized)
    if writer is not None:
        writer.write(frame)

    # Exit condition
    key = cv2.waitKey(1) & 0xFF
    if key == ord("q"):
        break

    frame_count += 1

# --- Cleanup ---
print("[INFO] Cleaning up...")
cap.release()
if writer is not None:
    writer.release()
cv2.destroyAllWindows()
print("[INFO] Finished.")

[INFO] Starting video processing from data/test/vid1.mp4...
[INFO] Output video will be saved to data/test/vid1_PROCESSED_TRACKED.mp4
[INFO] End of video stream.
[INFO] Cleaning up...
[INFO] Finished.
