In [1]:
import os
import sys
import cv2
import torch
from torchvision.transforms import functional as F
from torchvision import transforms
from PIL import Image
from torchvision.models.detection import fasterrcnn_resnet50_fpn

# YOLO dependencies
sys.path.append(r"C:\Users\ybr5070\yolov7")  # Update this to point to your YOLOv7 path
from models.experimental import attempt_load
from utils.general import non_max_suppression

# Define device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Transformation pipeline for resizing images to 224x224
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

# Save and resize face detection to 224x224 (without the bounding box edges)
def save_and_resize_frame(frame, bbox, output_dir, frame_time):
    x, y, w, h = [int(v) for v in bbox]
    face = frame[y:y+h, x:x+w]  # Crop the face region
    if face.size > 0:
        face_pil = Image.fromarray(cv2.cvtColor(face, cv2.COLOR_BGR2RGB))
        face_resized = transform(face_pil)

        # Filename with timestamp in seconds (e.g., tracked_0012.34.png)
        timestamp_str = f"{frame_time:.2f}"  # Zero padding for uniform filenames
        save_path = os.path.join(output_dir, f"tracked_{timestamp_str}.png")
        
        # Save the resized face image
        face_resized_pil = transforms.ToPILImage()(face_resized)
        face_resized_pil.save(save_path)
        
        return save_path, frame_time  # Return filename and timestamp for mapping

# Load models
def initialize_models(yolo_model_path, faster_rcnn_model_path):
    yolo_model = attempt_load(yolo_model_path, map_location=device)
    faster_rcnn_model = fasterrcnn_resnet50_fpn(pretrained=False, num_classes=2).to(device)
    faster_rcnn_model.load_state_dict(torch.load(faster_rcnn_model_path))
    
    return yolo_model, faster_rcnn_model

# Weighted bounding box calculation
def dynamic_weighted_bounding_boxes(yolo_bbox, yolo_conf, frcnn_bbox, frame_width, frame_height, size_threshold=0.5):
    yolo_area = (yolo_bbox[2] - yolo_bbox[0]) * (yolo_bbox[3] - yolo_bbox[1])
    frcnn_area = (frcnn_bbox[2] - frcnn_bbox[0]) * (frcnn_bbox[3] - frcnn_bbox[1])
    area_ratio = yolo_area / frcnn_area if frcnn_area > 0 else 0

    yolo_weight = 1.0
    frcnn_weight = 0.0

    if yolo_conf >= 0.2:  # Strong confidence for YOLO
        yolo_weight = 1.0
        frcnn_weight = 0.0
    elif area_ratio < size_threshold:  # Smaller YOLO box, reduce YOLO weight
        yolo_weight = 0.4
        frcnn_weight = 0.6

    x1_avg = int((yolo_bbox[0] * yolo_weight + frcnn_bbox[0] * frcnn_weight))
    y1_avg = int((yolo_bbox[1] * yolo_weight + frcnn_bbox[1] * frcnn_weight))
    x2_avg = int((yolo_bbox[2] * yolo_weight + frcnn_bbox[2] * frcnn_weight))
    y2_avg = int((yolo_bbox[3] * yolo_weight + frcnn_bbox[3] * frcnn_weight))

    return (x1_avg, y1_avg, x2_avg - x1_avg, y2_avg - y1_avg)

# Process frame for YOLO detection
def process_frame_yolo(frame, model, target_size=(512, 320)):  # Updated target size
    resized_frame = cv2.resize(frame, target_size)
    tensor_frame = F.to_tensor(resized_frame).unsqueeze(0).to(device)
    model.eval()
    with torch.no_grad():
        prediction = model(tensor_frame)
        prediction = non_max_suppression(prediction[0] if isinstance(prediction, tuple) else prediction, 0.2, 0.8)
    return prediction, resized_frame

# Process frame for Faster R-CNN detection
def process_frame_frcnn(frame, model):
    tensor_frame = F.to_tensor(frame).unsqueeze(0).to(device)
    model.eval()
    with torch.no_grad():
        prediction = model(tensor_frame)
    return prediction

# Detect and track faces using YOLO and Faster R-CNN
# Detect and track faces using YOLO as the primary and Faster R-CNN as fallback
def detect_and_track(video_path, yolo_model, faster_rcnn_model, subject_output_dir, redetect_interval=20, fps=30):
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print(f"Error opening video file: {video_path}")
        return

    tracker = cv2.legacy.TrackerKCF_create()
    init_tracking = False
    frame_count = 0
    last_bbox = None
    last_confidence = 0.0

    # Dictionary to store image file paths and their corresponding timestamps
    image_to_timestamp_map = {}

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        original_height, original_width = frame.shape[:2]
        frame_time = frame_count / fps  # Calculate time in seconds for current frame

        if frame_count % redetect_interval == 0 or not init_tracking:
            # YOLO Detection (Primary Model)
            yolo_bbox, yolo_conf = None, 0.0
            resized_frame = cv2.resize(frame, (512, 320))
            yolo_prediction, _ = process_frame_yolo(resized_frame, yolo_model)

            if yolo_prediction and len(yolo_prediction[0]) > 0:
                element = yolo_prediction[0][0]
                yolo_bbox = element[:4].cpu().numpy()
                yolo_conf = element[4].item()

                scale_x = original_width / 512
                scale_y = original_height / 320
                yolo_bbox = [int(c * scale_x) if i % 2 == 0 else int(c * scale_y) for i, c in enumerate(yolo_bbox)]
                last_bbox = (yolo_bbox[0], yolo_bbox[1], yolo_bbox[2] - yolo_bbox[0], yolo_bbox[3] - yolo_bbox[1])

                tracker = cv2.legacy.TrackerKCF_create()
                tracker.init(frame, last_bbox)
                init_tracking = True
                last_confidence = yolo_conf

            # Fallback to Faster R-CNN if YOLO fails or confidence is low
            frcnn_bbox, frcnn_conf = None, 0.0
            if yolo_bbox is None or yolo_conf < 0.2:  # Fallback condition
                frcnn_prediction = process_frame_frcnn(frame, faster_rcnn_model)
                if len(frcnn_prediction[0]['boxes']) > 0:
                    bbox = frcnn_prediction[0]['boxes'][0].cpu().numpy()
                    frcnn_bbox = [int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])]
                    frcnn_conf = frcnn_prediction[0]['scores'][0].item()
                    last_bbox = (frcnn_bbox[0], frcnn_bbox[1], frcnn_bbox[2] - frcnn_bbox[0], frcnn_bbox[3] - frcnn_bbox[1])

                    tracker = cv2.legacy.TrackerKCF_create()
                    tracker.init(frame, last_bbox)
                    init_tracking = True
                    last_confidence = frcnn_conf

        # Tracking using KCF for YOLO or fallback Faster R-CNN bounding boxes
        if init_tracking:
            success, tracked_bbox = tracker.update(frame)
            if success:
                x, y, w, h = [int(v) for v in tracked_bbox]

                # Save the cropped face, regardless of confidence
                save_path, timestamp = save_and_resize_frame(frame, tracked_bbox, subject_output_dir, frame_time)
                image_to_timestamp_map[save_path] = timestamp  # Store image file path and timestamp

        frame_count += 1

    # Return the image-to-timestamp mapping
    return image_to_timestamp_map

# Process videos in a directory
def process_videos(directory_path, yolo_model, faster_rcnn_model, output_dir, fps=30):
    for filename in os.listdir(directory_path):
        if filename.lower().endswith(('.mp4', '.avi', '.mov')):
            video_path = os.path.join(directory_path, filename)
            subject_name = os.path.splitext(filename)[0]  # Use filename without extension as the subject name
            subject_output_dir = os.path.join(output_dir, subject_name)

            # Ensure the subject's output directory exists
            if not os.path.exists(subject_output_dir):
                os.makedirs(subject_output_dir)

            print(f"Processing video: {filename}")
            image_to_timestamp_map = detect_and_track(video_path, yolo_model, faster_rcnn_model, subject_output_dir, fps=fps)        
            
# Initialize models first
yolo_model_path = r"C:\Users\ybr5070\yolov7\runs\train\exp4\weights\best.pt"
faster_rcnn_model_path = r"C:\Users\ybr5070\Desktop\HomeBytes\frcnn_final.pth"
output_dir = r"C:\Users\ybr5070\Documents\PS1_face"

# Ensure the output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

# Initialize models
yolo_model, faster_rcnn_model = initialize_models(yolo_model_path, faster_rcnn_model_path)

# Now, process videos after models have been initialized
process_videos(r"C:\Users\ybr5070\Documents\video", yolo_model, faster_rcnn_model, output_dir)

Fusing layers... 
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
RepConv.fuse_repvgg_block
IDetect.fuse


  return _VF.meshgrid(tensors, **kwargs)  # type: ignore[attr-defined]


Processing video: R01_006_V4_PS1_fixed.mp4
