<a href="https://colab.research.google.com/github/MartijnRoozendaal1/vvr-prediction/blob/main/video_data_preprocessing_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Step 1: Initialize Parameters and Face Alignment Network (FAN)

In [None]:
# Import necessary libraries
!pip install face-alignment
import cv2
import face_alignment
import numpy as np
import os
from tqdm import tqdm
import torch

# Path to the folder containing infrared video files
video_folder = '/content/drive/MyDrive/InfraredVideos'

# Initialize the Face Alignment Network (FAN) for facial landmark detection
device = 'cuda' if torch.cuda.is_available() else 'cpu'
fa = face_alignment.FaceAlignment(face_alignment.LandmarksType.TWO_D, device=device)


Step 2: Helper Functions

In [None]:
def extract_roi(frame, landmarks):
    """
    Extracts regions of interest (ROIs) from a frame using facial landmarks.
    ROIs include the eyes, nose, and cheeks.
    """
    landmarks = landmarks.astype(int)  # Convert landmarks to integers
    rois = {
        'eyes': frame[min(landmarks[36:42, 1]):max(landmarks[36:42, 1]),
                      min(landmarks[36:42, 0]):max(landmarks[36:42, 0])],
        'nose': frame[min(landmarks[27:36, 1]):max(landmarks[27:36, 1]),
                      min(landmarks[27:36, 0]):max(landmarks[27:36, 0])],
        'cheeks': frame[min(landmarks[48:54, 1]):max(landmarks[48:54, 1]),
                        min(landmarks[48:54, 0]):max(landmarks[48:54, 0])]
    }
    return rois

def resample_frames(video_frames, target_count=25):
    """
    Resamples frames to ensure a uniform count across videos.
    If the number of frames is insufficient, repeats existing frames.
    """
    if len(video_frames) >= target_count:
        return video_frames[:target_count]  # Truncate to target_count
    repeat_count = target_count - len(video_frames)
    return video_frames + video_frames[:repeat_count]


Step 3: Process Videos and Extract ROIs

In [None]:
# Dictionary to store processed frames
processed_frames = {}

# List to track excluded videos
excluded_videos = []

# Statistics dictionary to track processing details
video_stats = {"total_videos": 0, "total_frames": 0, "frames_per_video": []}

# Iterate through each video in the folder
for video_name in tqdm(os.listdir(video_folder)):
    if not video_name.endswith(('.mp4', '.wmv')):
        continue  # Skip non-video files

    video_stats["total_videos"] += 1
    video_path = os.path.join(video_folder, video_name)

    # Open video file
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Select 25 evenly spaced frames
    selected_frames_indices = np.linspace(0, frame_count - 1, 25).astype(int)
    video_frames = []

    for idx in range(frame_count):
        ret, frame = cap.read()
        if not ret or idx not in selected_frames_indices:
            continue

        # Convert frame to RGB for FAN
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        detected_landmarks = fa.get_landmarks(rgb_frame)

        if detected_landmarks is None:
            continue  # Skip frames without detected faces

        landmarks = detected_landmarks[0]  # Extract first face's landmarks

        # Crop ROIs and normalize the frame
        rois = extract_roi(frame, landmarks)
        normalized_frame = cv2.normalize(frame, None, alpha=0, beta=255, norm_type=cv2.NORM_MINMAX)

        # Append processed frame to video frames list
        video_frames.append({'aligned_frame': normalized_frame, 'rois': rois})

    cap.release()

    # Handle videos with insufficient frames
    if len(video_frames) < 3:
        excluded_videos.append(video_name)
        continue
    else:
        video_frames = resample_frames(video_frames)

    # Save processed frames and update stats
    processed_frames[video_name] = video_frames
    video_stats["frames_per_video"].append(len(video_frames))
    video_stats["total_frames"] += len(video_frames)


Step 4: Save Processed Data

In [None]:
# Save processed frames to a file
np.save('/content/drive/MyDrive/Processed_Infrared_Frames_FAN.npy', processed_frames)

# Save excluded videos to a text file
with open('/content/drive/MyDrive/Excluded_Videos.txt', 'w') as f:
    for video in excluded_videos:
        f.write(f"{video}\n")

# Print processing summary
print("Processing complete. Saved processed frames.")
print(f"Total Videos: {video_stats['total_videos']}")
print(f"Total Frames Processed: {video_stats['total_frames']}")
print(f"Average Frames Per Video: {np.mean(video_stats['frames_per_video']):.2f}")
print(f"Excluded Videos: {len(excluded_videos)}")


Step 5: Post-Processing (Filtering and Statistics)

In [None]:
# Load processed frames
processed_frames_path = '/content/drive/MyDrive/Processed_Infrared_Frames_FAN.npy'
processed_frames = np.load(processed_frames_path, allow_pickle=True).item()

# Filter videos with fewer than 25 frames
filtered_frames = {video_id: frames for video_id, frames in processed_frames.items() if len(frames) >= 25}

# Save filtered frames
filtered_frames_path = '/content/drive/MyDrive/Filtered_Infrared_Frames_FAN.npy'
np.save(filtered_frames_path, filtered_frames)

# Display statistics
removed_videos = {video_id: len(frames) for video_id, frames in processed_frames.items() if len(frames) < 25}
print(f"Removed {len(removed_videos)} videos with fewer than 25 frames:")
for video_id, frame_count in removed_videos.items():
    print(f"Video ID: {video_id}, Frame Count: {frame_count}")

print(f"Total videos after filtering: {len(filtered_frames)}")