### Data ETL

Load video annotations

In [1]:
import os
import pandas as pd
import numpy as np

# Specify the directory path
project_dir = os.path.dirname(os.getcwd())
annotations_file = os.path.join(project_dir,'data','annotations','video_annotations_combined_43407.csv')  # Change to your desired directory

# Read the annotations file
annotations = pd.read_csv(annotations_file)

Locate annotated video files.

In [2]:
import sys
sys.path.append('./utils')
from utils_functions import find_videos


# Video directory
video_dir = r'D:\MYPROJECTS_GITHUB2\socialvision_GPT_annotations\videos'

# Find the videos that are in the annotations file
matched_videos = find_videos(video_dir, target_names=list(annotations['video_name']))
print(f'Found {len(matched_videos)} videos')   # Found 43408 videos

Found 43407 videos


Process video files to extract frames, apply transformation per frame, and save in a tensor of shape num_frames, channels, height, width.

In [13]:
import torch
from tqdm import tqdm
from models import load_transform
from utils_functions import extract_frames

# Specify how many frames to extract from each video
num_frames = 16

# Load transformation pipeline
transform = load_transform()

# Create an output directory for storing result files
output_dir = os.path.join(project_dir,'data','processed_videos')
os.makedirs(output_dir, exist_ok=True)

# Iterate through each video in `matched_videos`
for video in tqdm(matched_videos, desc="Processing videos"):
    # Construct an output filename based on the video name
    video_name = os.path.basename(video)
    base_name, _ = os.path.splitext(video_name)
    output_path = os.path.join(output_dir, f"{base_name}.pt")

    # Skip processing if the output file already exists
    if os.path.exists(output_path):
        # print(f"Skipping {video_name} (already processed).")
        continue

    # Extract frames from video
    try:
        frames = extract_frames(video, num_frames=num_frames)
    except Exception as e:
        print(f"Error processing {video_name}: {e}")
        continue
    
    # Apply transformations to each frame
    frames = [transform(frame) for frame in frames]

    # Stack frames to create a tensor of shape [num_frames, channels, height, width]
    video_tensor = torch.stack(frames, dim=1)

    # Save the tensor to a .pt file
    torch.save(video_tensor, output_path)

Processing videos:   3%|▎         | 1125/43407 [00:00<00:02, 15207.87it/s]


ValueError: Number of frames requested exceeds total frames in the video.

In [43]:
import torch
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor

from models import load_transform
from utils_functions import extract_frames

def process_single_video(video_path, num_frames, output_dir):
    """
    Extract frames, apply transforms, and save the resulting tensor to .pt file.
    Skips processing if the .pt file already exists.
    """
    # Create the output file path based on the video filename
    video_name = os.path.basename(video_path)
    base_name, _ = os.path.splitext(video_name)
    output_path = os.path.join(output_dir, f"{base_name}.pt")

    # Skip if this file has already been processed
    if os.path.exists(output_path):
        return f"Skipping (file exists): {video_path}"

    # Load transforms (must be picklable if using ProcessPoolExecutor)
    transform = load_transform()

    # Extract frames from the video
    frames = extract_frames(video_path, num_frames=num_frames)

    # Apply transformations to each frame
    frames = [transform(frame) for frame in frames]

    # Stack frames to create a tensor of shape [num_frames, channels, height, width]
    video_tensor = torch.stack(frames, dim=1)

    # Ensure the output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Save the tensor to a .pt file
    torch.save(video_tensor, output_path)
    return f"Processed: {video_path}"

def worker(args):
    """
    A top-level worker function to unpack the arguments and call the real processing function.
    This must be defined at the module level (not nested) so it can be pickled.
    """
    video_path, num_frames, output_dir = args
    return process_single_video(video_path, num_frames, output_dir)

def process_videos_in_parallel(matched_videos, num_frames=16, output_dir="processed_videos"):
    """
    Use ProcessPoolExecutor to process each video in matched_videos in parallel.
    Skips any videos that already have a .pt file in the output_dir.
    """
    # Create output directory if needed
    os.makedirs(output_dir, exist_ok=True)

    # Prepare a list of (args) for each video
    jobs = [(video_path, num_frames, output_dir) for video_path in matched_videos]

    # Use a ProcessPoolExecutor to run tasks in parallel
    with ProcessPoolExecutor() as executor:
        # Pass the 'worker' function as the first argument, and our jobs list as the second
        results_iter = executor.map(worker, jobs)

        # Wrap results in a tqdm progress bar
        for result in tqdm(results_iter, total=len(jobs), desc="Processing videos"):
            # Optionally, print each result to see "Processed" or "Skipping"
            # print(result)
            pass

# Create an output directory for storing result files
output_dir = os.path.join(project_dir,'data','processed_videos')
os.makedirs(output_dir, exist_ok=True)

# Process all videos in parallel, skipping those that are already processed
process_videos_in_parallel(
    matched_videos,
    num_frames=16,
    output_dir=output_dir
)

BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

In [9]:
import torch
from tqdm import tqdm
from concurrent.futures import ProcessPoolExecutor

from models import load_transform
from utils_functions import extract_frames

# Specify how many frames to extract from each video
num_frames = 16

# Load transformation pipeline
transform = load_transform()

# Create an output directory for storing result files
output_dir = os.path.join(project_dir, 'data', 'processed_videos')
os.makedirs(output_dir, exist_ok=True)

def process_video(video):
    """
    Extract frames from a single video, transform them,
    create a tensor, and save it to disk (unless it already exists).
    """
    # Construct an output filename based on the video name
    video_name = os.path.basename(video)
    base_name, _ = os.path.splitext(video_name)
    output_path = os.path.join(output_dir, f"{base_name}.pt")

    # Skip if the file is already there
    if os.path.isfile(output_path):
        print(f"Skipping {video_name}. Output already exists: {output_path}")
        return output_path

    # Extract frames from video
    frames = extract_frames(video, num_frames=num_frames)

    # Stack frames to create a tensor of shape [num_frames, channels, height, width]
    video_tensor = torch.stack(frames, dim=1)

    # Save the tensor to a .pt file
    torch.save(video_tensor, output_path)

    return output_path  # Just to have a return value if needed

In [10]:
# Cell 2: The main guard
if __name__ == "__main__":


    # Video directory
    video_dir = r'D:\MYPROJECTS_GITHUB2\socialvision_GPT_annotations\videos'

    # Find the videos that are in the annotations file
    matched_videos = find_videos(video_dir, target_names=list(annotations['video_name']))
    print(f'Found {len(matched_videos)} videos')   # Found 43408 videos


    with ProcessPoolExecutor(max_workers=4) as executor:
        list(tqdm(
            executor.map(process_video, matched_videos),
            total=len(matched_videos),
            desc="Processing videos in parallel"
        ))

Found 43407 videos


Processing videos in parallel:   0%|          | 0/43407 [00:07<?, ?it/s]


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.