In [1]:
import cv2
import numpy as np
import os
import torch
from facenet_pytorch import MTCNN
from PIL import Image  
import time



In [2]:
import torch
a=[
torch.cuda.is_available(),
torch.cuda.device_count(),
torch.cuda.get_device_name(0),
]
a


[True, 1, 'NVIDIA GeForce RTX 3060']

In [3]:
detector = MTCNN()
# detector = MTCNN(keep_all=True, device=device)

In [4]:
def detect_faces_mtcnn(frame):
    # Convert the frame to PIL Image
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    frame_pil = Image.fromarray(frame_rgb)
    
    # Detect faces
    boxes, _ = detector.detect(frame_pil)
    
    if boxes is None:
        return []

    face_coordinates = []
    for box in boxes:
        x1, y1, x2, y2 = box.astype(int)
        face_coordinates.append((x1, y1, x2, y2))
    
    return face_coordinates


In [5]:
def non_max_suppression_fast(boxes, overlapThresh):
    if len(boxes) == 0:
        return []

    if boxes.dtype.kind == "i":
        boxes = boxes.astype("float")

    pick = []

    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    area = (x2 - x1 + 1) * (y2 - y1 + 1)
    idxs = np.argsort(y2)

    while len(idxs) > 0:
        last = len(idxs) - 1
        i = idxs[last]
        pick.append(i)

        xx1 = np.maximum(x1[i], x1[idxs[:last]])
        yy1 = np.maximum(y1[i], y1[idxs[:last]])
        xx2 = np.minimum(x2[i], x2[idxs[:last]])
        yy2 = np.minimum(y2[i], y2[idxs[:last]])

        w = np.maximum(0, xx2 - xx1 + 1)
        h = np.maximum(0, yy2 - yy1 + 1)

        overlap = (w * h) / area[idxs[:last]]

        idxs = np.delete(idxs, np.concatenate(([last], np.where(overlap > overlapThresh)[0])))

    return boxes[pick].astype("int")


In [6]:
def process_and_apply_bounding_boxes(video_path, mask_video_path, output_dir):
    cap = cv2.VideoCapture(video_path)
    mask_cap = cv2.VideoCapture(mask_video_path)

    if not cap.isOpened() or not mask_cap.isOpened():
        print(f"Could not open video files: {video_path} or {mask_video_path}")
        return

    # Get video properties
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Define the codec and create VideoWriter object
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    output_path = os.path.join(output_dir, os.path.basename(video_path))
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    while True:
        ret, frame = cap.read()
        mask_ret, mask_frame = mask_cap.read()

        if not ret or not mask_ret:
            break

        faces_current_frame = detect_faces_mtcnn(frame)
        nms_boxes = non_max_suppression_fast(np.array(faces_current_frame), 0.3)  # Apply Non-Maximum Suppression

        for (x1, y1, x2, y2) in nms_boxes:
            # For mask detection, considering more black pixels as indication of a mask
            roi = mask_frame[y1:y2, x1:x2]
            black_pixels = np.sum(np.all(roi <= [50, 50, 50], axis=2))
            total_pixels = roi.shape[0] * roi.shape[1]

            if total_pixels == 0:  # Avoid division by zero
                continue

            black_ratio = black_pixels / total_pixels
            color = (0, 255, 0) if black_ratio > 0.5 else (0, 0, 255)  # Green if mask (more black), else red

            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        
        cv2.imshow("Frame", frame)
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break
        out.write(frame)

    cap.release()
    mask_cap.release()
    out.release()
    cv2.destroyAllWindows()


In [10]:
def process_all_videos(base_dir, mask_dir, output_dir, checkpoint_file):
    # Read already processed videos from the checkpoint file
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'r') as f:
            processed_videos = set(f.read().splitlines())
    else:
        processed_videos = set()

    videos = [f for f in os.listdir(base_dir) if f.endswith('.mp4')]
    for video in videos:
        if video in processed_videos:
            print(f"Skipping already processed video: {video}")
            continue

        video_path = os.path.join(base_dir, video)
        mask_video_path = os.path.join(mask_dir, video.replace('.mp4', '_mask.mp4'))
        
        print(f"Processing {video_path} with {mask_video_path}")
        process_and_apply_bounding_boxes(video_path, mask_video_path, output_dir)
        print(f"Done. Saved processed video to {output_dir}")

        # Update the checkpoint file
        with open(checkpoint_file, 'a') as f:
            f.write(f"{video}\n")

# Example usage
base_dir = "D:\\aadithyaram\\Phosphene AI\\FFIW10K-v1-release\\target\\train"
mask_dir = "D:\\aadithyaram\\Phosphene AI\\FFIW10K-v1-release\\target_mask\\train"
output_dir = "D:\\aadithyaram\\Phosphene AI\\FFIW10K-v1-release\\annotations"
checkpoint_file = "processed_videos.txt"
process_all_videos(base_dir, mask_dir, output_dir, checkpoint_file)

Processing D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target\train\train_00000000.mp4 with D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target_mask\train\train_00000000_mask.mp4
Done. Saved processed video to D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\annotations
Processing D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target\train\train_00000001.mp4 with D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target_mask\train\train_00000001_mask.mp4
Done. Saved processed video to D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\annotations
Processing D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target\train\train_00000002.mp4 with D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target_mask\train\train_00000002_mask.mp4
Done. Saved processed video to D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\annotations
Processing D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target\train\train_00000003.mp4 with D:\aadithyaram\Phosphene AI\FFIW10K-v1-release\target_mask\train\train_00000003_mas

KeyboardInterrupt: 

: 