In [11]:
# Parallel preprocessing for the data 
# we start by converting the mp4 files to mp3 files in parallel 
# using Python's concurrent.futures for multithreading or multiprocessing since it is a cpu bound tasks


In [12]:
import os
from pydub import AudioSegment
from tqdm import tqdm
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


In [13]:
input_directory = '/home/mahmoud/Desktop/Projects/parallelproject/rawdataset/drive-download-20241218T203725Z-001'

In [14]:

input_directory = '/home/mahmoud/Desktop/Projects/parallelproject/rawdataset/drive-download-20241218T203725Z-001'

# Define output directory for sequential conversion
output_directory_sequential = '/home/mahmoud/Desktop/Projects/parallelproject/pre processing /sequentialoutput'
os.makedirs(output_directory_sequential, exist_ok=True)

# Function for Sequential Conversion
def convert_to_mp3_sequential(video_path):
    """Converts a video file to MP3 (sequential approach)."""
    base_name = os.path.splitext(os.path.basename(video_path))[0]
    output_file = os.path.join(output_directory_sequential, f"{base_name}.mp3")
    try:
        audio = AudioSegment.from_file(video_path)
        audio.export(output_file, format="mp3")
        return f"Converted: {video_path} -> {output_file}"
    except Exception as e:
        return f"Failed to convert {video_path}: {str(e)}"

# Function to process sequential conversion
def sequential_conversion():
    # Gather all video files
    video_files = []
    for root, _, files in os.walk(input_directory):
        for file in files:
            if file.endswith((".mp4", ".mkv", ".avi", ".mov")):
                video_files.append(os.path.join(root, file))

    # Sequential conversion
    start_time = time.time()
    sequential_results = []
    for video_file in tqdm(video_files, total=len(video_files)):
        sequential_results.append(convert_to_mp3_sequential(video_file))
    sequential_time = time.time() - start_time

    return sequential_time, sequential_results

# Run Sequential Conversion
sequential_time, sequential_result = sequential_conversion()
print(f'Sequential Conversion Completed. Time taken: {sequential_time:.2f} seconds.')


100%|██████████| 23/23 [06:40<00:00, 17.40s/it]

Sequential Conversion Completed. Time taken: 400.26 seconds.





In [15]:
import os
import time
from concurrent.futures import ThreadPoolExecutor
from pydub import AudioSegment
from tqdm import tqdm

# Input directory
input_directory = '/home/mahmoud/Desktop/Projects/parallelproject/rawdataset/drive-download-20241218T203725Z-001'

# Set up output directory
output_directory = '/home/mahmoud/Desktop/Projects/parallelproject/pre processing /multithreadedoutput'  # Folder for multithreading output
os.makedirs(output_directory, exist_ok=True)

# Function for Parallel Conversion (Multithreading)
def convert_to_mp3_parallel(video_path):
    """Converts a video file to MP3 (parallel approach using multithreading)."""
    base_name = os.path.splitext(os.path.basename(video_path))[0]
    output_file = os.path.join(output_directory, f"{base_name}.mp3")
    try:
        audio = AudioSegment.from_file(video_path)
        audio.export(output_file, format="mp3")
        return f"Converted: {video_path} -> {output_file}"
    except Exception as e:
        return f"Failed to convert {video_path}: {str(e)}"

# Function to process parallel conversion using multithreading
def parallel_conversion_threading():
    # Gather all video files
    video_files = []
    for root, _, files in os.walk(input_directory):
        for file in files:
            if file.endswith((".mp4", ".mkv", ".avi", ".mov")):
                video_files.append(os.path.join(root, file))

    # Parallel conversion using multithreading
    start_time = time.time()
    results = []
    with ThreadPoolExecutor() as executor:
        for result in tqdm(executor.map(convert_to_mp3_parallel, video_files), total=len(video_files)):
            results.append(result)
    parallel_time_threading = time.time() - start_time

    return parallel_time_threading, results

# Run Parallel Conversion (Multithreading)
parallel_time_threading, parallel_result_threading = parallel_conversion_threading()
print(f'Parallel Conversion (Threading) Completed. Time taken: {parallel_time_threading:.2f} seconds.')


  0%|          | 0/23 [00:00<?, ?it/s]

100%|██████████| 23/23 [00:54<00:00,  2.36s/it]


Parallel Conversion (Threading) Completed. Time taken: 54.47 seconds.


In [16]:
from concurrent.futures import ProcessPoolExecutor
import os
from pydub import AudioSegment
from tqdm import tqdm
import time

input_directory = '/home/mahmoud/Desktop/Projects/parallelproject/rawdataset/drive-download-20241218T203725Z-001'

# Define output directory for multiprocessing (use absolute path)
output_directory_multiprocessing = '/home/mahmoud/Desktop/Projects/parallelproject/multiprocessing_output'
os.makedirs(output_directory_multiprocessing, exist_ok=True)

# Function for Parallel Conversion (Multiprocessing)
def convert_to_mp3_multiprocessing(video_path):
    """Converts a video file to MP3 (parallel approach using multiprocessing)."""
    base_name = os.path.splitext(os.path.basename(video_path))[0]
    output_file = os.path.join(output_directory_multiprocessing, f"{base_name}.mp3")
    try:
        audio = AudioSegment.from_file(video_path)
        audio.export(output_file, format="mp3")
        return f"Converted: {video_path} -> {output_file}"
    except Exception as e:
        return f"Failed to convert {video_path}: {str(e)}"

# Function to process parallel conversion using multiprocessing
def parallel_conversion_multiprocessing():
    # Gather all video files
    video_files = []
    for root, _, files in os.walk(input_directory):
        for file in files:
            if file.endswith((".mp4", ".mkv", ".avi", ".mov")):
                video_files.append(os.path.join(root, file))

    # Parallel conversion using multiprocessing
    start_time = time.time()
    results = []
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:  # Using max_workers based on CPU count
        for result in tqdm(executor.map(convert_to_mp3_multiprocessing, video_files), total=len(video_files)):
            results.append(result)
    parallel_time_multiprocessing = time.time() - start_time

    return parallel_time_multiprocessing, results

# Run Parallel Conversion (Multiprocessing)
parallel_time_multiprocessing, parallel_result_multiprocessing = parallel_conversion_multiprocessing()
print(f'Parallel Conversion (Multiprocessing) Completed. Time taken: {parallel_time_multiprocessing:.2f} seconds.')


100%|██████████| 23/23 [00:58<00:00,  2.53s/it]

Parallel Conversion (Multiprocessing) Completed. Time taken: 58.36 seconds.





In [19]:
import os

def calculate_metrics(sequential_time, parallel_time_threading, parallel_time_multiprocessing):
    # Get the number of processors
    num_processors = os.cpu_count()

    # Throughput and Latency
    num_tasks = len(sequential_result)  # number of tasks (audio files)

    if num_tasks == 0:
        print("No tasks to process.")
        return

    throughput_sequential = num_tasks / sequential_time
    throughput_threading = num_tasks / parallel_time_threading
    throughput_multiprocessing = num_tasks / parallel_time_multiprocessing

    latency_sequential = sequential_time / num_tasks
    latency_threading = parallel_time_threading / num_tasks
    latency_multiprocessing = parallel_time_multiprocessing / num_tasks

    # Efficiency and Speedup
    speedup_threading = sequential_time / parallel_time_threading
    efficiency_threading = speedup_threading / num_processors * 100  # Corrected to percentage

    speedup_multiprocessing = sequential_time / parallel_time_multiprocessing
    efficiency_multiprocessing = speedup_multiprocessing / num_processors * 100  # Corrected to percentage

    # Convert times to seconds and milliseconds
    sequential_time_seconds = sequential_time
    parallel_time_threading_seconds = parallel_time_threading
    parallel_time_multiprocessing_seconds = parallel_time_multiprocessing

    sequential_time_ms = sequential_time * 1000
    parallel_time_threading_ms = parallel_time_threading * 1000
    parallel_time_multiprocessing_ms = parallel_time_multiprocessing * 1000

    # Print Metrics
    print(f'Average Sequential Time: {sequential_time_seconds:.2f} seconds ({sequential_time_ms:.2f} ms)')
    print(f'Average Parallel Time (Threading): {parallel_time_threading_seconds:.2f} seconds ({parallel_time_threading_ms:.2f} ms)')
    print(f'Average Parallel Time (Multiprocessing): {parallel_time_multiprocessing_seconds:.2f} seconds ({parallel_time_multiprocessing_ms:.2f} ms)')

    print(f'Throughput (Sequential): {throughput_sequential:.2f} tasks/s')
    print(f'Throughput (Threading): {throughput_threading:.2f} tasks/s')
    print(f'Throughput (Multiprocessing): {throughput_multiprocessing:.2f} tasks/s')

    print(f'Latency (Sequential): {latency_sequential:.4f} seconds/task ({latency_sequential * 1000:.2f} ms/task)')
    print(f'Latency (Threading): {latency_threading:.4f} seconds/task ({latency_threading * 1000:.2f} ms/task)')
    print(f'Latency (Multiprocessing): {latency_multiprocessing:.4f} seconds/task ({latency_multiprocessing * 1000:.2f} ms/task)')

    print(f'Speedup (Threading): {speedup_threading:.2f}')
    print(f'Efficiency (Threading): {efficiency_threading:.2f}%')

    print(f'Speedup (Multiprocessing): {speedup_multiprocessing:.2f}')
    print(f'Efficiency (Multiprocessing): {efficiency_multiprocessing:.2f}%')

    # Verify Results
    if sorted([os.path.basename(x) for x in sequential_result]) != sorted([os.path.basename(x) for x in parallel_result_threading]) or sorted([os.path.basename(x) for x in sequential_result]) != sorted([os.path.basename(x) for x in parallel_result_multiprocessing]):
        raise Exception('Results from sequential and parallel approaches do not match.')

    print('All conversions completed successfully.')


In [20]:
calculate_metrics(sequential_time, parallel_time_threading, parallel_time_multiprocessing)


Average Sequential Time: 400.26 seconds (400255.76 ms)
Average Parallel Time (Threading): 54.47 seconds (54467.99 ms)
Average Parallel Time (Multiprocessing): 58.36 seconds (58361.94 ms)
Throughput (Sequential): 0.06 tasks/s
Throughput (Threading): 0.42 tasks/s
Throughput (Multiprocessing): 0.39 tasks/s
Latency (Sequential): 17.4024 seconds/task (17402.42 ms/task)
Latency (Threading): 2.3682 seconds/task (2368.17 ms/task)
Latency (Multiprocessing): 2.5375 seconds/task (2537.48 ms/task)
Speedup (Threading): 7.35
Efficiency (Threading): 36.74%
Speedup (Multiprocessing): 6.86
Efficiency (Multiprocessing): 34.29%
All conversions completed successfully.
