## Parallel Version Solved Using Reduction

In [1]:
# Import the necessary libraries
import os
import cv2
import numpy as np
import time
from joblib import Parallel, delayed
from multiprocessing import Manager, Lock, Value

In [2]:
# Folder containing the images
input_folder = 'cars'  # Replace with the path to your folder
output_folder = 'cars_filtered'  # Replace with the path to save filtered images

# Creating the output folder if it doesn't exist
os.makedirs(output_folder, exist_ok=True)

In [3]:
# Function to process rows (done in parallel)
def process_row(row_index, image, filter_kernel, filter_size, height, width, shared_dict, lock,cores):

    output_row = np.zeros(width, dtype=np.float32)
    for j in range(width):
        weighted_sum = 0.0
        for k in range(filter_size):
            for l in range(filter_size):
                row = row_index + k - filter_size // 2
                col = j + l - filter_size // 2
                if 0 <= row < height and 0 <= col < width:
                    weighted_sum += image[row, col] * filter_kernel[k, l]
        output_row[j] = weighted_sum


    # Critical Section: Applying lock to ensure no 2 or more threads/processes access/modify the variable at the same time
    with lock:
        shared_dict["row_processed"] += 1

    return output_row

In [4]:
# Main function with parallel processing of rows using joblib
def parallel_process(image, shared_dict, lock,cores):

    # Preparing for parallel processing
    height, width = image.shape

    filter_kernel = np.array([[0.1, 0.2, 0.1],
                              [0.2, 0.1, 0.2],
                              [0.1, 0.2, 0.1]], dtype=np.float32)

    filter_size = filter_kernel.shape[0]

    # Parallelizing the row processing using joblib
    results = Parallel(n_jobs=cores)(delayed(process_row)(i, image, filter_kernel, filter_size, height, width, shared_dict, lock,cores) for i in range(height))

    # Reduction section: Combine rows into the final output image
    output_image = np.array(results, dtype=np.float32)

    return output_image

In [5]:
# Function to process and save each image (Done in parallel)
def process_image(image_path, shared_dict, lock,cores):

    # Reading the image in grayscale
    image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)

    if image is None:
        print(f"Could not open or find the image: {image_path}")
        return

    # Applying Gaussian Blur
    blurred_image = cv2.GaussianBlur(image, (3,3), 0)

    # Computing the elevation map using Sobel operator for edge detection
    sobel_x = cv2.Sobel(blurred_image, cv2.CV_64F, 1, 0, ksize=3)
    sobel_y = cv2.Sobel(blurred_image, cv2.CV_64F, 0, 1, ksize=3)

    # Combining the gradients to obtain the magnitude of the gradient
    magnitude_gradient = cv2.magnitude(sobel_x, sobel_y)

    # Normalizing the image to a range between 0 and 1 (floating-point operations)
    image = magnitude_gradient.astype(np.float32) / 255.0

    # Applying parallel custom filter operation
    output_image = parallel_process(image,shared_dict,lock,cores)

    # Clipping the output to stay in the range [0, 1]
    output_image = np.clip(output_image, 0.0, 1.0)

    # Converting the processed image back to 8-bit (0-255)
    processed_image = (output_image * 255).astype(np.uint8)

    # Saving the processed image
    output_image_path = os.path.join(output_folder, os.path.basename(image_path))
    cv2.imwrite(output_image_path, processed_image)

    # Critical Section: Applying lock to ensure no 2 or more threads/processes access/modify the variable at the same time
    with lock:
        shared_dict["processed_image_count"] +=1

In [6]:
# List of image paths to process
image_paths = [os.path.join(input_folder, filename) for filename in os.listdir(input_folder)
        if filename.endswith(('.jpg', '.jpeg', '.png', '.bmp'))]

core_counts=[1,2,3,4,6,8,10,12,14,16]
for cores in core_counts:
        # Manager provides a way to share data safely between processes by allowing access to a shared object
        with Manager() as manager:
                
                shared_dict = manager.dict({"processed_image_count": 0, "row_processed": 0}) 
                lock = manager.Lock()

                print(f"\nTesting with {cores} cores:")
                Start_time = time.time()
                
                # Parallelizing= the image processing using joblib
                Parallel(n_jobs=cores)(delayed(process_image)(image_path,shared_dict,lock,cores) for image_path in image_paths)

                #Printing
                print(f"Total time taken by the execution of this code for all images using Joblib: {time.time() - Start_time:.2f}s")
                print(f"Processed Images: {shared_dict['processed_image_count']}")
                print(f"Processed Rows: {shared_dict['row_processed']}")


Testing with 1 cores:
Total time taken by the execution of this code for all images using Joblib: 367.88s
Processed Images: 200
Processed Rows: 96000

Testing with 2 cores:
Total time taken by the execution of this code for all images using Joblib: 194.66s
Processed Images: 200
Processed Rows: 96000

Testing with 3 cores:
Total time taken by the execution of this code for all images using Joblib: 142.93s
Processed Images: 200
Processed Rows: 96000

Testing with 4 cores:
Total time taken by the execution of this code for all images using Joblib: 136.57s
Processed Images: 200
Processed Rows: 96000

Testing with 6 cores:
Total time taken by the execution of this code for all images using Joblib: 171.38s
Processed Images: 200
Processed Rows: 96000

Testing with 8 cores:
Total time taken by the execution of this code for all images using Joblib: 182.34s
Processed Images: 200
Processed Rows: 96000

Testing with 10 cores:
Total time taken by the execution of this code for all images using Jo