In [9]:
import numpy as np
import time
from multiprocessing import Process, Queue
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

In [10]:
SEED = 42
AMOUNT = 5_000_000
np.random.seed(SEED)
vector = np.random.randint(0, AMOUNT, size=AMOUNT)

In [11]:
class ParallelMergeSort:
    def __init__(self, max_depth=4):
        self.max_depth = max_depth

    def parallel_merge_sort_process(self, arr, depth=0):
        if len(arr) <= 1:
            return arr
        
        if depth >= self.max_depth:
            return merge_sort(arr)
        
        mid = len(arr) // 2

        def worker(arr, depth, q):
            result = self.parallel_merge_sort_process(arr, depth)
            q.put(result)

        left_queue = Queue()
        right_queue = Queue()
        left_process = Process(target=worker, args=(arr[:mid], depth + 1, left_queue))
        right_process = Process(target=worker, args=(arr[mid:], depth + 1, right_queue))

        left_process.start()
        right_process.start()
        left = left_queue.get()
        right = right_queue.get()
        left_process.join()
        right_process.join()
        
        return merge(left, right)
    
    def parallel_merge_sort_process_pool(self, arr, depth=0):
        if len(arr) <= 1:
            return arr
        
        if depth >= self.max_depth:
            return merge_sort(arr)
        
        mid = len(arr) // 2

        with ProcessPoolExecutor(max_workers=2) as pool:
            left, right = pool.map(
                self.parallel_merge_sort_process_pool_task,
                [(arr[:mid], depth + 1), (arr[mid:], depth + 1)]
            )
        
        return merge(left, right)
    
    def parallel_merge_sort_thread_pool(self, arr, depth=0):
        if len(arr) <= 1:
            return arr
        
        if depth >= self.max_depth:
            return merge_sort(arr)
        
        mid = len(arr) // 2

        with ThreadPoolExecutor(max_workers=2) as pool:
            left, right = pool.map(
                self.parallel_merge_sort_thread_pool_task,
                [(arr[:mid], depth + 1), (arr[mid:], depth + 1)]
            )
        
        return merge(left, right)

    def parallel_merge_sort_process_pool_task(self, args):
        return self.parallel_merge_sort_process_pool(*args)
    
    def parallel_merge_sort_thread_pool_task(self, args):
        return self.parallel_merge_sort_thread_pool(*args)

def merge_sort(arr):
    if len(arr) <= 1:
        return arr
    mid = len(arr) // 2
    left = merge_sort(arr[:mid])
    right = merge_sort(arr[mid:])
    return merge(left, right)

def merge(left, right):
    merged = []
    i = j = 0
    while i < len(left) and j < len(right):
        if left[i] < right[j]:
            merged.append(left[i])
            i += 1
        else:
            merged.append(right[j])
            j += 1
    merged.extend(left[i:])
    merged.extend(right[j:])
    return merged

In [12]:
start_time = time.time()
usual_sort = merge_sort(vector)
end_time = time.time()
print("Serial:", end_time - start_time)

merge_parallel = ParallelMergeSort(max_depth=4)

start_time = time.time()
process_vector = merge_parallel.parallel_merge_sort_process(vector)
end_time = time.time()
print("Process:", end_time - start_time)

start_time = time.time()
process_vector_pool = merge_parallel.parallel_merge_sort_process_pool(vector)
end_time = time.time()
print("Process Pool:", end_time - start_time)

start_time = time.time()
thread_vector_pool = merge_parallel.parallel_merge_sort_thread_pool(vector)
end_time = time.time()
print("Thread Pool:", end_time - start_time) # Tempo maior devido a GIL

print(np.array_equal(process_vector, usual_sort) and np.array_equal(process_vector_pool, usual_sort) and np.array_equal(thread_vector_pool, usual_sort))

Serial: 17.112558603286743
Process: 12.988418817520142
Process Pool: 10.892355680465698
Thread Pool: 18.764776468276978
True
