# Part1: Data handling

### Data splitting

In [2]:
import json
import os
import shutil

def organize_data(json_file, videos_folder, output_folder):
    """Organizes video data into training, validation, and testing folders based on JSON file."""

    try:
        with open(json_file, 'r') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: JSON file '{json_file}' not found.")
        return
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON format in '{json_file}'.")
        return

    os.makedirs(os.path.join(output_folder, "training_data"), exist_ok=True)
    os.makedirs(os.path.join(output_folder, "validation_data"), exist_ok=True)
    os.makedirs(os.path.join(output_folder, "testing_data"), exist_ok=True)

    for entry in data:
        for instance in entry['instances']:
            video_id = instance['video_id']
            video_path = os.path.join(videos_folder, f"{video_id}.mp4")
            split = instance['split']

            if not os.path.exists(video_path):
                print(f"Warning: Video file '{video_path}' not found. Skipping.")
                continue

            if split == "train":
                destination = os.path.join(output_folder, "training_data", f"{video_id}.mp4")
            elif split == "val":
                destination = os.path.join(output_folder, "validation_data", f"{video_id}.mp4")
            elif split == "test":
                destination = os.path.join(output_folder, "testing_data", f"{video_id}.mp4")
            else:
                print(f"Warning: Unknown split '{split}' for video '{video_id}'. Skipping.")
                continue

            try:
                shutil.copy2(video_path, destination)  # copy2 preserves metadata
                print(f"Copied '{video_id}.mp4' to '{split}' folder.")
            except shutil.Error as e:
                print(f"Error copying '{video_id}.mp4': {e}")

def main():
    json_file_path = 'WLASL_100.json'  # Path to your JSON file
    videos_folder_path = 'videos'  # Path to your videos folder
    output_directory = 'organized_data'  # Path to the output directory

    organize_data(json_file_path, videos_folder_path, output_directory)

if __name__ == "__main__":
    main()

Copied '69241.mp4' to 'train' folder.
Copied '07069.mp4' to 'train' folder.
Copied '07068.mp4' to 'train' folder.
Copied '07070.mp4' to 'train' folder.
Copied '07074.mp4' to 'train' folder.
Copied '69302.mp4' to 'val' folder.
Copied '17710.mp4' to 'train' folder.
Copied '65540.mp4' to 'train' folder.
Copied '17711.mp4' to 'train' folder.
Copied '17712.mp4' to 'train' folder.
Copied '17713.mp4' to 'test' folder.
Copied '17709.mp4' to 'train' folder.
Copied '17720.mp4' to 'train' folder.
Copied '17721.mp4' to 'train' folder.
Copied '17722.mp4' to 'train' folder.
Copied '17723.mp4' to 'train' folder.
Copied '17724.mp4' to 'val' folder.
Copied '12328.mp4' to 'train' folder.
Copied '12312.mp4' to 'val' folder.
Copied '12311.mp4' to 'train' folder.
Copied '12313.mp4' to 'train' folder.
Copied '12314.mp4' to 'train' folder.
Copied '12315.mp4' to 'val' folder.
Copied '12316.mp4' to 'train' folder.
Copied '12317.mp4' to 'train' folder.
Copied '12318.mp4' to 'train' folder.
Copied '12319.mp4' to

### Data processing

#### Videos renaming

In [6]:
def load_labels(json_file):
    """Loads video labels, creating a dictionary with lists of video IDs per gloss."""
    try:
        with open(json_file, 'r') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: JSON file '{json_file}' not found.")
        return None
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON format in '{json_file}'.")
        return None

    label_data = {}
    for item in data:
        gloss = item["gloss"]
        for instance in item["instances"]:
            video_id = instance["video_id"]
            if gloss not in label_data:
                label_data[gloss] = []
            label_data[gloss].append(video_id)
    return label_data

In [7]:
def rename_videos(data_dir, label_data):
    """Renames videos, adding a unique counter for each instance of a gloss."""
    for subfolder in ["training_data", "validation_data", "testing_data"]:
        subfolder_path = os.path.join(data_dir, subfolder)
        for gloss, video_ids in label_data.items():
            for i, video_id in enumerate(video_ids):
                filename = f"{video_id}.mp4"
                old_path = os.path.join(subfolder_path, filename)
                if os.path.exists(old_path):  # Check if the file exists before renaming
                    new_filename = f"{gloss}_{video_id}_{i+1}.mp4"  # Add instance counter
                    new_path = os.path.join(subfolder_path, new_filename)
                    os.rename(old_path, new_path)
                    print(f"Renamed: {old_path} -> {new_path}")
                else:
                    print(f"Warning: Video file '{filename}' not found in '{subfolder_path}'.")

In [11]:
# constant directory names
json_file_path = 'WLASL_100.json'
data_dir = 'organized_data'

In [12]:
label_data = load_labels(json_file_path)
if label_data is not None:
    rename_videos(data_dir, label_data)



#### Video information

In [27]:
def load_video_info(json_file):
    """Loads video information such as FPS and frame range."""
    try:
        with open(json_file, 'r') as f:
            data = json.load(f)
    except FileNotFoundError:
        print(f"Error: JSON file '{json_file}' not found.")
        return None
    except json.JSONDecodeError:
        print(f"Error: Invalid JSON format in '{json_file}'.")
        return None

    video_info = {}
    for item in data:
        gloss = item["gloss"]
        for instance in item["instances"]:
            video_id = instance["video_id"]
            fps = instance["fps"]
            frame_start = instance["frame_start"]
            frame_end = instance["frame_end"]
            split = instance["split"]
            instance_id = instance["instance_id"]

            if video_id not in video_info:
                video_info[video_id] = {
                    "gloss": gloss,
                    "fps": fps,
                    "frame_start": frame_start,
                    "frame_end": frame_end,
                    "split": split,
                    "instance_id": instance_id
                }
    return video_info

In [28]:
# this variable contain the {gloss, fps, frame_start, frame_end, split} information for each instance
video_info = load_video_info(json_file_path)

In [29]:
print(video_info)

{'69241': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': -1, 'split': 'train', 'instance_id': 0}, '65225': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': -1, 'split': 'train', 'instance_id': 1}, '68011': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': -1, 'split': 'train', 'instance_id': 2}, '68208': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': 60, 'split': 'train', 'instance_id': 3}, '68012': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': -1, 'split': 'train', 'instance_id': 4}, '70212': {'gloss': 'book', 'fps': 25, 'frame_start': 2150, 'frame_end': 2249, 'split': 'val', 'instance_id': 5}, '70266': {'gloss': 'book', 'fps': 25, 'frame_start': 3732, 'frame_end': 3852, 'split': 'train', 'instance_id': 6}, '07085': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': -1, 'split': 'train', 'instance_id': 7}, '07086': {'gloss': 'book', 'fps': 25, 'frame_start': 1, 'frame_end': -1, 'split': 'train', 'instance_id': 8}, '

#### Frame extraction

In [15]:
import cv2

def extract_frames(video_path, fps, frame_start, frame_end):
    """Extracts frames from a video, handling variable frame_end values."""
    cap = cv2.VideoCapture(video_path)
    frames = []
    current_frame = 0

    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if current_frame >= frame_start:
            frames.append(frame)
            if frame_end != -1 and current_frame >= frame_end:
                break  # Stop if frame_end is specified and reached

        current_frame += 1

    cap.release()
    return frames

def downsample_or_upsample_frames(frames, current_fps, target_fps):
    """Downsamples or upsamples frames based on the current and target FPS."""
    if current_fps == target_fps:
        return frames

    if current_fps < target_fps:
        # Upsample: Repeat frames to increase frame rate
        factor = target_fps // current_fps
        new_frames = []
        for frame in frames:
            new_frames.extend([frame] * factor)
        return new_frames
    else:
        # Downsample: Skip frames to decrease frame rate
        factor = current_fps // target_fps
        return frames[::factor]

def create_sequences(frames, sequence_length):
    """Creates sequences of frames from a list of frames."""
    sequences = []
    for i in range(len(frames) - sequence_length + 1):
        sequence = frames[i:i + sequence_length]
        sequences.append(sequence)
    return sequences

In [31]:
import torch
import torchvision.models as models
import torchvision.transforms as transforms
import numpy as np

def extract_resnet_features(frames, resnet_model, transform):
    """Extracts ResNet features from a sequence of frames."""
    features = []
    for frame in frames:
        img_tensor = transform(frame)
        img_tensor = img_tensor.unsqueeze(0)  # Add batch dimension

        with torch.no_grad():
            output = resnet_model(img_tensor)

        features.append(output.numpy())  # Adjust this based on your ResNet architecture
    return features

def preprocess_and_extract_features(data_dir, video_info, target_fps=25, sequence_length=32):
    """Preprocesses videos, extracts frames, and features."""
    resnet_model = models.resnet50(pretrained=True)
    resnet_model.eval()
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    all_features = []
    all_labels = []

    for video_id, info in video_info.items():
        video_path = os.path.join(data_dir, f"{info['split']}_data", f"{info['gloss']}_{video_id}_{info['instance_id']}.mp4")
        if os.path.exists(video_path):
            frames = extract_frames(video_path, info['fps'], info['frame_start'], info['frame_end'])
            frames = downsample_or_upsample_frames(frames, info['fps'], target_fps)
            sequences = create_sequences(frames, sequence_length)

            for sequence in sequences:
                features = extract_resnet_features(sequence, resnet_model, transform)
                all_features.append(features)
                all_labels.append(info['gloss'])
        else:
            print(f"Warning: Video file '{video_path}' not found.")

    return np.array(all_features), np.array(all_labels)

In [32]:
all_features, all_labels = preprocess_and_extract_features(data_dir, video_info)





: 

In [22]:
print(all_features)


[]


In [23]:
print(all_labels)

[]
