<a href="https://colab.research.google.com/github/DeepFrame/VisionAI-Video/blob/main/VisionAI_Video.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Installing Dependencies

In [None]:
!pip install retina-face

Collecting retina-face
  Downloading retina_face-0.0.17-py3-none-any.whl.metadata (10 kB)
Downloading retina_face-0.0.17-py3-none-any.whl (25 kB)
Installing collected packages: retina-face
Successfully installed retina-face-0.0.17


# Importing Libraries

In [None]:
import cv2
import numpy as np
import os
from glob import glob
import json

In [None]:
from retinaface import RetinaFace

# Keyframe Extraction

In [None]:
def extract_keyframes(video_path, threshold=10.0, metadata_file="keyframes_metadata.json"):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open {video_path}")
        return []

    keyframes = []
    video_name = os.path.basename(video_path)
    metadata = {"video_name": video_name, "keyframes": []}

    ret, prev_frame_bgr = cap.read()
    if not ret:
        print(f"Error: Could not read first frame of {video_path}")
        return []

    prev_frame_gray = cv2.cvtColor(prev_frame_bgr, cv2.COLOR_BGR2GRAY)
    keyframes.append((0, prev_frame_bgr))
    metadata["keyframes"].append({"frame_index": 0, "motion_score": None})

    frame_idx = 1
    while True:
        ret, curr_frame_bgr = cap.read()
        if not ret:
            break
        curr_frame_gray = cv2.cvtColor(curr_frame_bgr, cv2.COLOR_BGR2GRAY)
        motion_score = get_motion_score(prev_frame_gray, curr_frame_gray)

        if motion_score > threshold:
            keyframes.append((frame_idx, curr_frame_bgr))
            metadata["keyframes"].append({
                "frame_index": frame_idx,
                "motion_score": float(motion_score)
            })
            prev_frame_gray = curr_frame_gray

        frame_idx += 1

    cap.release()

    if os.path.exists(metadata_file):
        with open(metadata_file, "r") as f:
            existing = json.load(f)
    else:
        existing = {}

    existing[video_name] = metadata["keyframes"]

    with open(metadata_file, "w") as f:
        json.dump(existing, f, indent=4)

    print(f"✅ Extracted {len(keyframes)} keyframes, metadata updated in {metadata_file}")
    return keyframes

In [None]:
def get_motion_score(frame1, frame2, kernel=np.ones((9,9), dtype=np.uint8)):
    frame_diff = cv2.subtract(frame2, frame1)
    frame_diff = cv2.medianBlur(frame_diff, 3)
    mask = cv2.adaptiveThreshold(frame_diff, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                 cv2.THRESH_BINARY_INV, 11, 3)
    mask = cv2.medianBlur(mask, 3)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel, iterations=1)
    motion_score = np.sum(mask) / (mask.shape[0] * mask.shape[1])
    return motion_score

def alternative_algorithm(video_path, skip_frames=10):
    """
    Extract frames from a video by skipping a fixed number of frames.
    """
    print(f"Applying frame-skipping algorithm to {video_path}")
    cap = cv2.VideoCapture(video_path)
    keyframes = []
    frame_idx = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % skip_frames == 0:
            keyframes.append((frame_idx, frame))

        frame_idx += 1

    cap.release()
    print(f"✅ Extracted {len(keyframes)} frames by skipping {skip_frames} frames each time.")
    return keyframes

In [None]:
def process_saved_frames(frames_dir="All_Frames", faces_dir="Faces_Extracted", metadata_file="faces_metadata.json"):
    os.makedirs(faces_dir, exist_ok=True)
    frame_folders = glob(os.path.join(frames_dir, "*"))

    if os.path.exists(metadata_file):
        with open(metadata_file, "r") as f:
            all_metadata = json.load(f)
    else:
        all_metadata = {}

    for folder in frame_folders:
        print(f"Processing frames in {folder} ...")
        video_name = os.path.basename(folder)
        video_faces_dir = os.path.join(faces_dir, video_name)
        os.makedirs(video_faces_dir, exist_ok=True)

        video_metadata = []

        frame_files = sorted(
            glob(os.path.join(folder, "*.png")) +
            glob(os.path.join(folder, "*.jpg")) +
            glob(os.path.join(folder, "*.jpeg"))
        )

        print(f"Found {len(frame_files)} frames in {video_name}")
        for f_idx, frame_file in enumerate(frame_files):
            frame = cv2.imread(frame_file)
            if frame is None:
                continue

            frame_data = {
                "frame_index": f_idx,
                "frame_path": frame_file,
                "faces": []
            }

            faces = detect_and_crop_faces(frame)
            for idx, (face_img, bbox) in enumerate(faces):
                ext = os.path.splitext(frame_file)[1]
                face_file = os.path.join(video_faces_dir, f"{video_name}_frame{f_idx:05d}_face{idx}{ext}")
                cv2.imwrite(face_file, face_img)

                face_metadata = {
                    "face_index": idx,
                    "bbox": bbox,
                    "cropped_face_path": face_file
                }
                frame_data["faces"].append(face_metadata)

            if frame_data["faces"]:
                video_metadata.append(frame_data)

        all_metadata[video_name] = video_metadata
        print(f"Processed {len(frame_files)} frames for {video_name}")

    with open(metadata_file, "w") as f:
        json.dump(all_metadata, f, indent=4)

    print(f"✅ Face detection complete! Metadata updated in {metadata_file}")


# Redirect stdout

In [None]:
import sys
import os
import contextlib

@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        sys.stdout = devnull
        try:
            yield
        finally:
            sys.stdout = old_stdout

# Face Detection

In [None]:
def process_face_square(img, face, margin_ratio=0.2, target_size=(112, 112)):
    h, w = img.shape[:2]
    x1, y1, x2, y2 = face["facial_area"]

    bw = x2 - x1
    bh = y2 - y1
    margin_x = int(bw * margin_ratio)
    margin_y = int(bh * margin_ratio)

    x1 = max(0, x1 - margin_x)
    y1 = max(0, y1 - margin_y)
    x2 = min(w, x2 + margin_x)
    y2 = min(h, y2 + margin_y)

    crop_w = x2 - x1
    crop_h = y2 - y1
    if crop_w > crop_h:
        diff = crop_w - crop_h
        expand_top = diff // 2
        expand_bottom = diff - expand_top
        if y1 - expand_top >= 0 and y2 + expand_bottom <= h:
            y1 -= expand_top
            y2 += expand_bottom
        else:
            x1 += diff // 2
            x2 -= (diff - diff // 2)
    elif crop_h > crop_w:
        diff = crop_h - crop_w
        expand_left = diff // 2
        expand_right = diff - expand_left
        if x1 - expand_left >= 0 and x2 + expand_right <= w:
            x1 -= expand_left
            x2 += expand_right
        else:
            y1 += diff // 2
            y2 -= (diff - diff // 2)

    x1, x2 = max(0, x1), min(w, x2)
    y1, y2 = max(0, y1), min(h, y2)

    cropped_face = img[y1:y2, x1:x2]

    if cropped_face.size == 0:
        return None, None

    resized_face = cv2.resize(cropped_face, target_size, interpolation=cv2.INTER_AREA)
    updated_bbox = [int(x1), int(y1), int(x2), int(y2)]
    return resized_face, updated_bbox

In [None]:
def detect_and_crop_faces(frame, margin_ratio=0.2, target_size=(112,112)):
    with suppress_stdout():
        faces_detected = RetinaFace.detect_faces(frame)

    cropped_faces = []

    if not isinstance(faces_detected, dict) or len(faces_detected) == 0:
        return []

    for key, face_data in faces_detected.items():
        cropped_face, updated_bbox = process_face_square(frame, face_data, margin_ratio, target_size)
        if cropped_face is None or updated_bbox is None:
            continue

        cropped_faces.append((cropped_face, updated_bbox))

    return cropped_faces

# Directory Setup

In [None]:
video_directory = "Videos"

output_dir = "Processed_Video_Frames"
cropped_faces_directory = "Faces_Extracted_RetinaFace"

os.makedirs(output_dir, exist_ok=True)
os.makedirs(cropped_faces_directory, exist_ok=True)

# Step 1: Process Video Keyframes

In [None]:
video_files = glob(os.path.join(video_directory, "*.mp4"))
print(f"Found {len(video_files)} videos")

duration_threshold_sec = 60

Found 9 videos


In [None]:
for video_path in video_files:
    video_name = os.path.splitext(os.path.basename(video_path))[0]
    save_path = os.path.join(output_dir, video_name)
    os.makedirs(save_path, exist_ok=True)

    # Check video duration
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
    duration_sec = frame_count / fps if fps > 0 else 0
    cap.release()

    print(f"\n\nProcessing {video_name} ... Duration: {duration_sec:.2f}s")

    if duration_sec < duration_threshold_sec:
        keyframes = alternative_algorithm(video_path)
    else:
        keyframes = extract_keyframes(video_path, threshold=28.0)

    for i, (idx, frame) in enumerate(keyframes):
        frame_path = os.path.join(save_path, f"{video_name}_keyframe_{i}_frame{idx}.png")
        cv2.imwrite(frame_path, frame)

print("\n\n✅ Keyframe extraction complete!")



Processing tourists ... Duration: 2.00s
Applying frame-skipping algorithm to Videos/tourists.mp4
✅ Extracted 5 frames by skipping 10 frames each time.


Processing F3 ... Duration: 336.40s
✅ Extracted 503 keyframes, metadata updated in keyframes_metadata.json


Processing S2 ... Duration: 78.17s
✅ Extracted 1302 keyframes, metadata updated in keyframes_metadata.json


Processing S1_N1 ... Duration: 60.08s
✅ Extracted 2 keyframes, metadata updated in keyframes_metadata.json


Processing F2 ... Duration: 12.71s
Applying frame-skipping algorithm to Videos/F2.mp4
✅ Extracted 31 frames by skipping 10 frames each time.


Processing F1 ... Duration: 15.38s
Applying frame-skipping algorithm to Videos/F1.mp4
✅ Extracted 37 frames by skipping 10 frames each time.


Processing F4 ... Duration: 581.73s
✅ Extracted 1270 keyframes, metadata updated in keyframes_metadata.json


Processing S3 ... Duration: 81.90s
✅ Extracted 16 keyframes, metadata updated in keyframes_metadata.json


Processing S4 .

# Step 2: Process saved frames for face detetction

In [None]:
process_saved_frames(output_dir, cropped_faces_directory)

print("✅ Detection and Cropping complete!")

Processing frames in Processed_Video_Frames/S1_N1 ...
Found 2 frames in S1_N1


Downloading...
From: https://github.com/serengil/deepface_models/releases/download/v1.0/retinaface.h5
To: /root/.deepface/weights/retinaface.h5
100%|██████████| 119M/119M [00:01<00:00, 74.1MB/s]


Processed 2 frames for S1_N1
Processing frames in Processed_Video_Frames/F3 ...
Found 503 frames in F3
Processed 503 frames for F3
Processing frames in Processed_Video_Frames/S3 ...
Found 16 frames in S3
Processed 16 frames for S3
Processing frames in Processed_Video_Frames/tourists ...
Found 5 frames in tourists
Processed 5 frames for tourists
Processing frames in Processed_Video_Frames/F2 ...
Found 31 frames in F2
Processed 31 frames for F2
Processing frames in Processed_Video_Frames/F1 ...
Found 37 frames in F1
Processed 37 frames for F1
Processing frames in Processed_Video_Frames/S2 ...
Found 1302 frames in S2
Processed 1302 frames for S2
Processing frames in Processed_Video_Frames/S4 ...
Found 65 frames in S4
Processed 65 frames for S4
Processing frames in Processed_Video_Frames/F4 ...
Found 1270 frames in F4
Processed 1270 frames for F4
✅ Face detection complete! Metadata updated in faces_metadata.json
✅ Detection and Cropping complete!


# Step 3: Checking frames extracted

In [None]:
import os
from collections import defaultdict
from glob import glob

all_frames = glob("Processed_Video_Frames/*/*.png") + \
             glob("Processed_Video_Frames/*/*.jpg") + \
             glob("Processed_Video_Frames/*/*.jpeg")

print(f"Total frames stored: {len(all_frames)}")

frame_count_per_video = defaultdict(int)
for f in all_frames:
    video_name = os.path.basename(os.path.dirname(f))
    frame_count_per_video[video_name] += 1

print("Frames per video:")
for video, count in frame_count_per_video.items():
    print(f"{video}: {count} frames")

Total frames stored: 3231
Frames per video:
S1_N1: 2 frames
F3: 503 frames
S3: 16 frames
tourists: 5 frames
F2: 31 frames
F1: 37 frames
S2: 1302 frames
S4: 65 frames
F4: 1270 frames


# Step 4: Visualization of Frames

In [None]:
import matplotlib.pyplot as plt
import cv2
import math
import os
import re

def extract_frame_index(path):
    """Extract numeric frame index from filename"""
    match = re.search(r"_frame(\d+)", os.path.basename(path))
    if match:
        return int(match.group(1))
    return 0

def show_video_frames(all_frames, video_name, frames_per_row=10):
    video_frames = [f for f in all_frames if os.path.basename(os.path.dirname(f)) == video_name]
    total_frames = len(video_frames)
    print(f"Showing {total_frames} frames for {video_name}")

    video_frames = sorted(video_frames, key=extract_frame_index)

    rows = math.ceil(total_frames / frames_per_row)
    plt.figure(figsize=(20, 2*rows))

    for i, frame_file in enumerate(video_frames):
        img = cv2.imread(frame_file)
        if img is None:
            continue
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        plt.subplot(rows, frames_per_row, i+1)
        plt.imshow(img_rgb)
        plt.title(os.path.basename(frame_file), fontsize=8)
        plt.axis("off")

    plt.suptitle(f"Frames from video: {video_name}", fontsize=14)
    plt.tight_layout()
    plt.show()


for video, count in frame_count_per_video.items():
    show_video_frames(all_frames, video)

# Step 5: Visualization of Extracted Faces

In [None]:
import matplotlib.pyplot as plt
import cv2
import math
import os
import re
from glob import glob

def extract_indices(filename):
    """
    Extract frame index and face index from filenames like:
    video_frame00012_face1.png
    """
    match = re.search(r"_frame(\d+)_face(\d+)", filename)
    if match:
        return int(match.group(1)), int(match.group(2))
    return float("inf"), float("inf")

def show_images_grid(image_files, title, images_per_row=10):
    total = len(image_files)
    rows = math.ceil(total / images_per_row)
    plt.figure(figsize=(20, 2*rows))

    image_files = sorted(image_files, key=lambda f: extract_indices(os.path.basename(f)))

    for i, img_file in enumerate(image_files):
        img = cv2.imread(img_file)
        if img is None:
            continue
        img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        plt.subplot(rows, images_per_row, i+1)
        plt.imshow(img_rgb)
        plt.title(os.path.basename(img_file), fontsize=8)
        plt.axis("off")

    plt.suptitle(title, fontsize=14)
    plt.tight_layout()
    plt.show()

faces_dir = "Faces_Extracted_RetinaFace"
face_folders = glob(os.path.join(faces_dir, "*"))

for folder in face_folders:
    video_name = os.path.basename(folder)
    face_files = (
        glob(os.path.join(folder, "*.png")) +
        glob(os.path.join(folder, "*.jpg")) +
        glob(os.path.join(folder, "*.jpeg"))
    )
    if not face_files:
        continue
    show_images_grid(face_files, f"Detected faces from video: {video_name}", images_per_row=10)