# Multiprocessing in Python

## Multiprocessing with Multiprocessing library

In [14]:
import multiprocessing
import time

In [98]:
# def cpu_bound(number):
#     # print(multiprocessing.current_process().name)
#     return sum(i * i for i in range(number))

In [98]:
def find_sums(numbers):
    with multiprocessing.Pool() as pool:
        result = pool.map(cpu_bound, numbers)
    return result

In [101]:
numbers = [5_000_000 + x for x in range(20)]

In [10]:
start_time = time.perf_counter()
find_sums(numbers)
duration = time.perf_counter() - start_time
print(f"Duration {duration} seconds")

Duration 9.166558523022104 seconds


**The above works as long as there are no errors. There's no possibility of including a try-except block and process the numbers that can be processed. Below is the solution.**

In [103]:
numbers_w_str = numbers[:] + ['str']

In [None]:
def find_sums(numbers):
    result = []
    with multiprocessing.Pool() as pool:
        async_results = [pool.apply_async(cpu_bound, (number,)) for number in numbers]
        for async_result in async_results:
            try:
                result.append(async_result.get())
            except Exception:
            # except Exception as e:
                # print(f"An error occurred: {e}")
                pass
    return result

In [10]:
start_time = time.perf_counter()
find_sums(numbers_w_str)
duration = time.perf_counter() - start_time
print(f"Duration {duration} seconds")

Duration 9.166558523022104 seconds


## Multiprocessing with concurrent.futures library

In [2]:
from concurrent.futures import ProcessPoolExecutor, as_completed

**The below returns a generator object with the results. Once the generator has been iterated over, it becomes None.**

In [13]:
start_time = time.perf_counter()
with ProcessPoolExecutor() as executor:
    result = executor.map(cpu_bound, numbers)
print(f"Duration {time.perf_counter()-start_time} seconds")

Duration 9.324354574957397 seconds


**Despite the try/except block, the loop doesn't execute completely. This the problem with map is that if there's any error, the entire execution will stop.**

In [103]:
numbers_w_str = numbers[:] + ['str']

In [61]:
with ProcessPoolExecutor() as executor:
    r = executor.map(cpu_bound, numbers)
    for i,j in enumerate(r):
        try:
            print(f"Result: {j}")
        except Exception:
            print(f"Can't process result for: {numbers[i]}")

Result: 41666654166667500000
Result: 41666679166667500000
Result: 41666704166677500001
Result: 41666729166697500005
Result: 41666754166727500014
Result: 41666779166767500030
Result: 41666804166817500055
Result: 41666829166877500091
Result: 41666854166947500140
Result: 41666879167027500204
Result: 41666904167117500285
Result: 41666929167217500385
Result: 41666954167327500506
Result: 41666979167447500650
Result: 41667004167577500819
Result: 41667029167717501015
Result: 41667054167867501240
Result: 41667079168027501496
Result: 41667104168197501785
Result: 41667129168377502109


TypeError: 'str' object cannot be interpreted as an integer

**To execute the entire loop the same needs to be run with `executor.submit` instead.**

In [84]:
with ProcessPoolExecutor() as executor:
    r = [executor.submit(cpu_bound, num) for num in numbers]
for i in (as_completed(r)):
    try:
        print(f"Result: {i.result()}")
    except Exception:
        pass
[f"The number for which it can't be processed: {j}" for i,j in zip(r,numbers) if i.exception()]

Result: 41667104168197501785
Result: 41666754166727500014
Result: 41666854166947500140
Result: 41667079168027501496
Result: 41666654166667500000
Result: 41667129168377502109
Result: 41666779166767500030
Result: 41667029167717501015
Result: 41666954167327500506
Result: 41667054167867501240
Result: 41666929167217500385
Result: 41666679166667500000
Result: 41666804166817500055
Result: 41666879167027500204
Result: 41666704166677500001
Result: 41667004167577500819
Result: 41666979167447500650
Result: 41666904167117500285
Result: 41666729166697500005
Result: 41666829166877500091


['str']

## Multiprocessing with multiple function arguments

In [None]:
def resize_image(input_path, output_path, size=(512,512)):
    ...
    
images, output_paths, sizes = zip(*[(image, output_dir/f"{image.stem}.{image.suffix}", (512, 512))
                                    for image in image_dir.iterdir()])
# the resize_image function does take in multiple arguments as iterables in the executor object
# the same can be done with apply too
with ProcessPoolExecutor() as executor:
    futures = [executor.submit(resize_image, image, output_path, size)
               for image, output_path, size in zip(images, output_paths, sizes)]

**This can be done with the apply_async of multiprocessing too.**

In [None]:
def resize_image(input_path, output_path, size=(512,512)):
    ...
    
images, output_paths, sizes = zip(*[(image, output_dir/f"{image.stem}.{image.suffix}", (512, 512))
                                    for image in image_dir.iterdir()])

with multiprocessing.Pool() as pool:
        async_results = [pool.apply_async(resize_image, (image, output_path, size))
                         for image, output_path, size in zip(images, output_paths, sizes)]
    #     for async_result in async_results:
    #         try:
    #             result.append(async_result.get())
    #         except Exception:
    #         # except Exception as e:
    #             # print(f"An error occurred: {e}")
    #             pass
    # return result

## Multiprocessing for Pandas Apply

### With Multiprocessing Standard library

In [5]:
from typing import Union, Callable
import multiprocessing as mp
import numpy as np
import pandas as pd
from tqdm import tqdm

def parallel_apply(
        df_or_s: Union[pd.DataFrame, pd.Series],
        func: Callable,
        desc: str,
        n_jobs: int = mp.cpu_count()-1,
        progress_bar: bool = True
    ) -> Union[pd.DataFrame, pd.Series]:
    """A method to use multiprocessing for `df.apply()` or `s.apply()`.

    Args:
        df_or_s (Union[pd.DataFrame, pd.Series]): Dataframe or series to
        use `df.apply()` or `s.apply()` on.
        func (Callable): The apply function to be used in
        `df.apply()` or `s.apply()`.
        n_jobs (int, optional): Number of processors to use.
        Defaults to mp.cpu_count()-1.
        progress_bar (bool, optional): A tqdm progress bar.
        Defaults to True.
        desc (str): TQDM description kwarg.

    Returns:
        Union[pd.DataFrame, pd.Series]: Returns the original dataframe or
        series after applying the `df.apply()` or `s.apply()`.
    """
    with mp.Pool(n_jobs) as pool:
        split = np.array_split(df_or_s, n_jobs * 2)
        if progress_bar is True:
            split = tqdm(split, desc=desc)
        ret_list = pool.map(func, split)
        output_df_or_s = pd.concat(ret_list)
    
    return output_df_or_s


### With Joblib library

In [6]:
from typing import Union, Callable
import joblib
import numpy as np
import pandas as pd
from tqdm import tqdm

def parallel_apply(
        df_or_s: Union[pd.DataFrame, pd.Series],
        func: Callable,
        desc: str,
        n_jobs: int = joblib.cpu_count()-1,
        progress_bar: bool = True
    ) -> Union[pd.DataFrame, pd.Series]:
    """A method to use multiprocessing for `df.apply()` or `s.apply()`.

    Args:
        df_or_s (Union[pd.DataFrame, pd.Series]): Dataframe or series to
        use `df.apply()` or `s.apply()` on.
        func (Callable): The apply function to be used in
        `df.apply()` or `s.apply()`.
        n_jobs (int, optional): Number of processors to use.
        Defaults to mp.cpu_count()-1.
        progress_bar (bool, optional): A tqdm progress bar.
        Defaults to True.
        desc (str): TQDM description kwarg.

    Returns:
        Union[pd.DataFrame, pd.Series]: Returns the original dataframe or
        series after applying the `df.apply()` or `s.apply()`.
    """

    with Parallel(n_jobs=n_jobs, mmap_mode="r+") as parallel:
        split = np.array_split(df_or_s, 2 * n_jobs)
        if progress_bar is True:
            ret_list = parallel(
                delayed(func)(x) for x in tqdm(split, desc=desc))
        else:
            ret_list = parallel(delayed(func)(x) for x in split)
        output_df_or_s = pd.concat(ret_list)

    return output_df_or_s