# Parallel Processing Trials

In [None]:
import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    """
    It creates a ProcessPoolExecutor context manager, which will manage a pool of worker processes.
    executor.map(is_prime, PRIMES) maps the is_prime function over the PRIMES list, distributing the work across multiple processes. 
    This allows for the prime checks to be executed in parallel, potentially speeding up the computation on multi-core processors.
    It uses zip to iterate over the PRIMES list and the results of executor.map simultaneously, 
    printing whether each number is prime.
    """
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

In [None]:
import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    
    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Using submit() to start is_prime function for each number in PRIMES
        futures = [executor.submit(is_prime, num) for num in PRIMES]
        
        for num, future in zip(PRIMES, futures):
            print('%d is prime: %s' % (num, future.result()))

if __name__ == '__main__':
    main()

In [None]:
import concurrent.futures
import math

names = ["suraj", "ankush", "david","bastian"]

def is_suraj(name):
    if name == "suraj":
        return True
    return False

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # Using submit() to start is_prime function for each number in PRIMES
        futures = [executor.submit(is_suraj, name) for name in names]
        
        for name, future in zip(names, futures):
            print(f"{name} is: {future.result()}")

if __name__ == '__main__':
    main()

# Simple Implementation

In [None]:
import os
import cv2
import csv
import time
import concurrent.futures

def format_time(seconds):
    """Converts time in seconds to hours, minutes, and seconds format."""
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return f"{int(hours)} hours, {int(minutes)} minutes, {int(seconds)} seconds"

def extract_frames(data_dir, output_dir, row, max_frames_per_chunk):
    try:
        participant_id, file_id, annotation_id, frame_start, frame_end, activity, chunk_id = row
        video_filepath = os.path.join(data_dir, file_id + '.mp4')
        new_file_id = file_id.replace("/", "_")
        updated_output_dir = os.path.join(output_dir, f'{activity}', f'{participant_id}_{new_file_id}_frames_{frame_start}_{frame_end}_ann_{annotation_id}_chunk_{chunk_id}')
        
        os.makedirs(updated_output_dir, exist_ok=True)
        cap = cv2.VideoCapture(video_filepath)
        frame_count = 0
         
        # Set the starting frame position
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_start))
        
        for frame_num in range(int(frame_start), int(frame_end) + 1):
            ret, frame = cap.read()
            if not ret:
                print(f"Frame number {frame_num} is missing.")
                break
            output_filename = f'img_{frame_num:06d}.png'
            output_path = os.path.join(updated_output_dir, output_filename)
            cv2.imwrite(output_path, frame)
            frame_count += 1
            
            if frame_count > max_frames_per_chunk:
                break

        cap.release()
    except Exception as e:
        print(f"Error processing file {file_id}: {e}")

def process_annotations_parallel(annotation_file, data_dir, root_dataset_dir, dataset_sub_dir, max_frames_per_chunk=48):
    try:
        with open(annotation_file, 'r') as csvfile:
            reader = csv.reader(csvfile)
            next(reader)  # Skip header row
            output_dir = os.path.join(root_dataset_dir, dataset_sub_dir)
            
            with concurrent.futures.ProcessPoolExecutor() as executor:
                futures = [executor.submit(extract_frames, data_dir, output_dir, row, max_frames_per_chunk) for row in reader]
                concurrent.futures.wait(futures)
    except Exception as e:
        print(f"Error reading annotation file: {e}")

def main():
    data_dir = "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_ir"
    root_dataset_dir = "/net/polaris/storage/deeplearning/sur_data/kinect_ir_daa/split_0"
    dataset_sub_dirs = ['train', 'test', 'val']
    annotation_files = [
        "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.test.csv",
        "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.train.csv",
        "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.val.csv"
    ]

    start_time = time.time()

    for annotation_file, dataset_sub_dir in zip(annotation_files, dataset_sub_dirs):
        process_annotations_parallel(annotation_file, data_dir, root_dataset_dir, dataset_sub_dir)
    
    end_time = time.time()
    time_taken = end_time - start_time
    print(f"Time taken for extracting dataset frames of split_0 of kinect_ir_right_top view: {format_time(time_taken)}")

main()


In [None]:
"""
These implementations are not used for the extraction of the frames but they could be modified for further developments or use cases.
"""
import os
import cv2
import csv
import time
import concurrent.futures

class FrameExtractor:
    """Extract frames from video files based on annotations.

    Attributes:
        data_dir (str): Directory where video files are stored.
        root_dataset_dir (str): Root directory for storing extracted frames.
        max_frames_per_chunk (int): Maximum number of frames to extract per chunk.
    """
    def __init__(self, data_dir, root_dataset_dir, max_frames_per_chunk=48):
        self.data_dir = data_dir
        self.root_dataset_dir = root_dataset_dir
        self.max_frames_per_chunk = max_frames_per_chunk

    def _extract_frames(self, output_dir, row):
        """Extract frames for a single video file based on annotation row."""
        try:
            participant_id, file_id, annotation_id, frame_start, frame_end, activity, chunk_id = row
            video_filepath = os.path.join(self.data_dir, file_id + '.mp4')
            new_file_id = file_id.replace("/", "_")
            updated_output_dir = os.path.join(output_dir, f'{activity}', f'{participant_id}_{new_file_id}_frames_{frame_start}_{frame_end}_ann_{annotation_id}_chunk_{chunk_id}')
            
            os.makedirs(updated_output_dir, exist_ok=True)
            cap = cv2.VideoCapture(video_filepath)
            frame_count = 0
             
            cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_start))
            
            for frame_num in range(int(frame_start), int(frame_end) + 1):
                ret, frame = cap.read()
                if not ret:
                    print(f"Frame number {frame_num} is missing.")
                    break
                output_filename = f'img_{frame_num:06d}.png'
                output_path = os.path.join(updated_output_dir, output_filename)
                cv2.imwrite(output_path, frame)
                frame_count += 1
                
                if frame_count > self.max_frames_per_chunk:
                    break

            cap.release()
        except Exception as e:
            print(f"Error processing file {file_id}: {e}")

    def process_annotations_parallel(self, annotation_file, dataset_sub_dir):
        """Process annotations in parallel, extracting frames for each video."""
        try:
            with open(annotation_file, 'r') as csvfile:
                reader = csv.reader(csvfile)
                next(reader)  # Skip header row
                output_dir = os.path.join(self.root_dataset_dir, dataset_sub_dir)
                
                with concurrent.futures.ProcessPoolExecutor() as executor:
                    futures = [executor.submit(self._extract_frames, output_dir, row) for row in reader]
                    concurrent.futures.wait(futures)
        except Exception as e:
            print(f"Error reading annotation file: {e}")

def format_time(seconds):
    """Converts time in seconds to hours, minutes, and seconds format."""
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return f"{int(hours)} hours, {int(minutes)} minutes, {int(seconds)} seconds"

def main():
    data_dir = "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_ir"
    root_dataset_dir = "/net/polaris/storage/deeplearning/sur_data/kinect_ir_daa/split_0"
    dataset_sub_dirs = ['train', 'test', 'val']
    annotation_files = [
        "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.test.csv",
        "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.train.csv",
        "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.val.csv"
    ]

    extractor = FrameExtractor(data_dir, root_dataset_dir)

    start_time = time.time()

    for annotation_file, dataset_sub_dir in zip(annotation_files, dataset_sub_dirs):
        extractor.process_annotations_parallel(annotation_file, dataset_sub_dir)
    
    end_time = time.time()
    time_taken = end_time - start_time
    print(f"Time taken for extracting dataset frames of split_0 of kinect_ir_right_top view: {format_time(time_taken)}")

if __name__ == "__main__":
    main()

# Two class OOP structure separting Annotation processing and Frame Extraction Processing

In [None]:
import os
import cv2
import csv
import time
import concurrent.futures

class FrameExtractor:
    """
    A class to extract frames from video files based on annotations.
    
    Attributes:
        data_dir (str): The directory where the video files are stored.
        output_dir (str): The root directory where the extracted frames will be saved.
        max_frames_per_chunk (int): The maximum number of frames to extract per chunk.
    """
    
    def __init__(self, data_dir, output_dir, max_frames_per_chunk=48):
        """
        Initializes the FrameExtractor with directories and extraction parameters.
        """
        self.data_dir = data_dir
        self.output_dir = output_dir
        self.max_frames_per_chunk = max_frames_per_chunk
    
    def extract_frames(self, row):
        """
        Extracts frames from a video file based on a row from the annotation file.
        
        Args:
            row (list): A list containing annotation information for a video segment.
        """
        try:
            participant_id, file_id, annotation_id, frame_start, frame_end, activity, chunk_id = row
            video_filepath = os.path.join(self.data_dir, file_id + '.mp4')
            new_file_id = file_id.replace("/", "_")
            updated_output_dir = os.path.join(self.output_dir, f'{activity}', f'{participant_id}_{new_file_id}_frames_{frame_start}_{frame_end}_ann_{annotation_id}_chunk_{chunk_id}')
            
            os.makedirs(updated_output_dir, exist_ok=True)
            cap = cv2.VideoCapture(video_filepath)
            frame_count = 0
             
            cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_start))
            
            for frame_num in range(int(frame_start), int(frame_end) + 1):
                ret, frame = cap.read()
                if not ret:
                    print(f"Frame number {frame_num} is missing.")
                    break
                output_filename = f'img_{frame_num:06d}.png'
                output_path = os.path.join(updated_output_dir, output_filename)
                cv2.imwrite(output_path, frame)
                frame_count += 1
                
                if frame_count > self.max_frames_per_chunk:
                    break

            cap.release()
        except Exception as e:
            print(f"Error processing file {file_id}: {e}")

class AnnotationProcessor:
    """
    A class to process annotation files and extract frames from videos in parallel.
    
    Attributes:
        annotation_files (list): A list of paths to the annotation files.
        data_dir (str): The directory where the video files are stored.
        root_dataset_dir (str): The root directory where the extracted frames will be saved.
    """
    
    def __init__(self, annotation_files, data_dir, root_dataset_dir):
        """
        Initializes the AnnotationProcessor with the dataset and annotation information.
        """
        self.annotation_files = annotation_files
        self.data_dir = data_dir
        self.root_dataset_dir = root_dataset_dir
    
    def process_annotations(self, dataset_sub_dir, max_frames_per_chunk):
        """
        Processes a single annotation file in parallel using multiple processes.
        
        Args:
            dataset_sub_dir (str): The sub-directory (e.g., 'train', 'test') for saving extracted frames.
            max_frames_per_chunk (int): The maximum number of frames to extract per chunk.
        """
        annotation_file = self.annotation_files[dataset_sub_dir]
        output_dir = os.path.join(self.root_dataset_dir, dataset_sub_dir)
        frame_extractor = FrameExtractor(self.data_dir, output_dir, max_frames_per_chunk)
        
        try:
            with open(annotation_file, 'r') as csvfile:
                reader = csv.reader(csvfile)
                next(reader)  # Skip header row
                
                with concurrent.futures.ProcessPoolExecutor() as executor:
                    futures = [executor.submit(frame_extractor.extract_frames, row) for row in reader]
                    concurrent.futures.wait(futures)
        except Exception as e:
            print(f"Error processing annotation file {annotation_file}: {e}")

def format_time(seconds):
    """Converts time in seconds to hours, minutes, and seconds format."""
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return f"{int(hours)} hours, {int(minutes)} minutes, {int(seconds)} seconds"

def main():
    data_dir = "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_ir"
    root_dataset_dir = "/net/polaris/storage/deeplearning/sur_data/kinect_ir_daa/split_0"
    dataset_sub_dirs = ['train', 'test', 'val']
    annotation_files = {
        'test': "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.test.csv",
        'train': "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.train.csv",
        'val': "/home/sur06423/hiwi/vit_exp/vision_tranformer_baseline/data/kinect_color_annotation/activities_3s/kinect_ir/midlevel.chunks_90.split_0.val.csv"
    }

    annotation_processor = AnnotationProcessor(annotation_files, data_dir, root_dataset_dir)

    start_time = time.time()

    for dataset_sub_dir in dataset_sub_dirs:
        max_frames_per_chunk = 48  # Customize this value as needed
        print(f"Processing {dataset_sub_dir}...")
        annotation_processor.process_annotations(dataset_sub_dir, max_frames_per_chunk)

    end_time = time.time()
    time_taken = format_time(end_time - start_time)
    print(f"Time taken for extracting dataset frames of split_0 of kinect_ir_right_top view: {time_taken}")

if __name__ == "__main__":
    main()

In [None]:
import os
import cv2
import csv
import time
import concurrent.futures
import logging

# Set up basic configuration for logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(processName)s - %(levelname)s - %(message)s')

class VideoFrameExtractor:
    """
    Extracts frames from video files based on annotations and processes these annotations in parallel.
    """
    def __init__(self, data_dir, root_dataset_dir, annotation_files, num_workers=4):
        self.data_dir = data_dir
        self.root_dataset_dir = root_dataset_dir
        self.annotation_files = annotation_files
        self.num_workers = num_workers

    def extract_frames(self, row, output_dir, max_frames_per_chunk):
        participant_id, file_id, annotation_id, frame_start, frame_end, activity, chunk_id = row
        video_filepath = os.path.join(self.data_dir, file_id + '.mp4')
        new_file_id = file_id.replace("/", "_")
        updated_output_dir = os.path.join(output_dir, f'{activity}', f'{participant_id}_{new_file_id}_frames_{frame_start}_{frame_end}_ann_{annotation_id}_chunk_{chunk_id}')
        
        os.makedirs(updated_output_dir, exist_ok=True)
        cap = cv2.VideoCapture(video_filepath)
        frame_count = 0
         
        cap.set(cv2.CAP_PROP_POS_FRAMES, int(frame_start))
        
        logging.info(f"Starting frame extraction for {file_id} by process ID: {os.getpid()}")
        for frame_num in range(int(frame_start), int(frame_end) + 1):
            ret, frame = cap.read()
            if not ret:
                logging.warning(f"Frame number {frame_num} is missing in {file_id}.")
                break
            output_filename = f'img_{frame_num:06d}.png'
            output_path = os.path.join(updated_output_dir, output_filename)
            cv2.imwrite(output_path, frame)
            frame_count += 1
            
            if frame_count > max_frames_per_chunk:
                break

        cap.release()
        logging.info(f"Completed frame extraction for {file_id}")

    def process_annotations(self, dataset_sub_dir, max_frames_per_chunk):
        annotation_file = self.annotation_files[dataset_sub_dir]
        output_dir = os.path.join(self.root_dataset_dir, dataset_sub_dir)
        
        with open(annotation_file, 'r') as csvfile:
            reader = csv.reader(csvfile)
            next(reader)  # Skip header row
            
            with concurrent.futures.ProcessPoolExecutor(max_workers=self.num_workers) as executor:
                futures = [executor.submit(self.extract_frames, row, output_dir, max_frames_per_chunk) for row in reader]
                concurrent.futures.wait(futures)

def format_time(seconds):
    hours = seconds // 3600
    minutes = (seconds % 3600) // 60
    seconds = seconds % 60
    return f"{int(hours)}h:{int(minutes)}m:{int(seconds)}s"

def main():
    data_dir = "/path/to/your/video/files"
    root_dataset_dir = "/path/to/your/output/directory"
    annotation_files = {
        'train': "/path/to/train_annotation_file.csv",
        'test': "/path/to/test_annotation_file.csv",
        'val': "/path/to/val_annotation_file.csv"
    }
    num_workers = 4  # Adjust based on your system's capabilities

    video_frame_extractor = VideoFrameExtractor(data_dir, root_dataset_dir, annotation_files, num_workers)

    start_time = time.time()

    for dataset_sub_dir in annotation_files.keys():
        max_frames_per_chunk = 48
        logging.info(f"Processing {dataset_sub_dir} dataset...")
        video_frame_extractor.process_annotations(dataset_sub_dir, max_frames_per_chunk)

    end_time = time.time()
    time_taken = format_time(end_time - start_time)
    logging.info(f"Time taken for extracting dataset frames: {time_taken}")

if __name__ == "__main__":
    main()
