# Multiprocessing in Python

What happens if we need to do a lot of computation that is mostly the same type?

Basically: How can we assign different tasks to different cores or processors?

## Multiprocessing systems/interfaces

There are a few types of multiprocessing interfaces we can use:

| Interface | When to use |
|-----------|-------------|
| multiprocessing | The classic built-in multiprocessing library.  || multiprocessing | The classic built-in multiprocessing library.  |
| concurrent.futures.ProcessPoolExecutor | The modern way to launch parallel tasks. Usable for everything that takes a long time and needs to run on only one computer. Sometimes is slow, though. |
| multiprocess | An improved version of the built-in `multiprocessing` library. A bit more manual than the pools. || multiprocess | An improved version of the built-in `multiprocessing` library. A bit more manual than the pools. |
| mpi4py.futures.MPIPoolExecutor | The modern way to run scalable parallel tasks on computer clusters. Use for long tasks that need to run on more than one computer. |
| mpi4py classic | A bit beyond the scope of this class. |


### Using the `multiprocessing.Pool` interface

To illustrate the use of the `multiprocessing` interface, we'll show a demonstration of the different components of the interface.

Functions or software applications that can run completely independently based only on the inputs given to them are perfectly parallel (formerly known as [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallelhttps://en.wikipedia.org/wiki/Embarrassingly_parallel)).

Thus, we recommend for this project that you choose a software application that is perfectly parallel to illustrate the effects easier.  These perfectly parallel software applications can run completely within their own memory address space, which makes it simpler for you as a software developer.

We'll use a sample function below, which simply just wastes time.  Despite it being a bit useless, it's clear in how it operates, so it can illustrate the `multiprocessing` concepts a lot easier. 

In [None]:
import time

def long_running_function(some_value: int) -> int:
    """
    Performs a long-running function that wastes time for obvious throughput improvements.
    
    :param some_value: some integer value in seconds that we will sleep for
    :returns: the number of seconds this function took to complete (roughly)
    """
    print(f"running long_running_function with {some_value}")
    time.sleep(some_value)
    print(f"completed long_running_function with {some_value}")
    return some_value

Now that we have this function, let's say that our software application just requires calling this function for some data items, in order to process it.  How long would it take to process this data serially using our long-running function?

In [None]:
items_to_process = [3, 5, 10, 15, 17, 20]

In [None]:
%%time

return_items = []
for item in items_to_process:
    return_val = long_running_function(item)
    return_items.append(return_val)

This took way longer than it needed to!  You may be wondering about the `%%time` command.  That simply records both the Wall (clock) time and the CPU time and reports it to you.  We can see that our function was not computationally difficult since the CPU time was extremely low, so can definitely benefit from parallel computing even if we do not have more than two CPU cores.  The operating system will switch between the processes for us in the case that we do not have enough cores.

As you may remember, especially since data transfer (input/output or I/O) between disks or network devices trigger interrupts, the CPU and operating system will not hang on a function waiting for transfer, since the system can just do something else until the transfer interrupt comes back.

It is similar for the `sleep()` function, which triggers a software interrupt in the system.  Note that if you do something computationally expensive (difficult) requiring a lot of CPU time, then you will need many CPU cores (as you will see in Component 3 of this project).

Now, we'll present some code that runs the processing in parallel instead:

In [None]:
%%time

import multiprocessing as mp

with mp.Pool(processes=4) as pool:
    p_return_items = pool.map(long_running_function, items_to_process)

What are the Wall time and CPU times now?  Was the Wall time much shorter?  How do they compare to the times from before?

You might have noticed a couple of things:

- the `print()` statements are all jumbled -- this is because the operating system receives many at a time, it just goes along with "first characters first served"
- the `with` keyword in Python, which uses a context manager so that you don't have to call `close()` functions on any handles (think of it as an easier way to allocate/deallocate resources simply by scope)
- the `Pool` object, which creates a pool of *worker* processes, which are simply copies (usually via `fork()` but sometimes via `spawn()` if needed) of the Python interpreter with your code loaded
- the `processes=4` parameter passed to the `Pool()` constructor function, which indicates the number of workers to use
- the call to `pool.map()`, which allows us to use the higher-order `map()` function in parallel, which accepts a function as its first input parameter and an iterable (such as a list) as its second input parameter.  As you may have assumed, each item in the iterable becomes the input parameter passed to the function that runs on each worker.

What happens if we double the number of worker processes?

In [None]:
%%time

with mp.Pool(processes=8) as pool:
    p_return_items = pool.map(long_running_function, items_to_process)

What was the Wall time now?  Can we improve the Wall time any further?  Why or why not?

What about the CPU time?  Was it shorter or longer?

If we look, we see that the user CPU time has only gone up a little bit, but we see a lot more on the system CPU time.  This means that the operating system is doing more work, which is understandable as we might not have exclusive access to 8 CPU cores.

The Wall time, on the other hand, is now only slightly longer than the longest function run (which is supposed to run for 20 seconds).  We can see the significant improvement in throughput now!

#### Using `multiprocessing.starmap`

So [`Pool.map()`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.map) works pretty well, if we only take one input parameter into our function.  If our function is realistic, though, then we will have a multitude of input parameters.

In this case, we can use [`Pool.starmap()`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool.starmap) instead, by giving each function call a tuple (fixed list) of parameter inputs that `Pool.starmap()` will unpack into input parameters for us:

In [None]:
def long_running_function_enhanced(some_value: int, some_other_value: str) -> int:
    """
    Performs a long-running function that wastes time for obvious throughput improvements.
    
    :param some_value: some integer value in seconds that we will sleep for
    :param some_other_value: some string value that we will just use to print
    :returns: the number of seconds this function took to complete (roughly)
    """
    print(f"running long_running_function with {some_value}, {some_other_value}")
    time.sleep(some_value)
    print(f"completed long_running_function with {some_value}, {some_other_value}")
    return some_value


enh_items_to_process = [(3, 'a'), (5, 'b'), (10, 'c'),
                        (15, 'd'), (17, 'e'), (20, 'f')]

In [None]:
%%time

with mp.Pool(processes=8) as pool:
    pe_return_items = pool.starmap(
        long_running_function_enhanced,
        enh_items_to_process)

### Basic Linear Algebra Problems in Multiprocessing

This is a note if you are planning on augmenting code that uses matrix algebra libraries (very likely if you are planning on augmenting the suggested examples).

Turns out that the Math Kernel Library or any other basic linear algebra system (BLAS) sometimes will try to be sneaky and do multiprocessing/multithreading on its own.  However, this conflicts with your own code, so make sure to do this before importing numpy or any other packages:

In [None]:
import os
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['MKL_NUM_THREADS'] = '1'

# now you can import numpy and other packages
import numpy as np
import scipy.stats
import multiprocessing as mp

### Converting serial code to parallel code

As discussed earlier, parallelizing serial code is easiest when the problem is perfectly parallel.  This means that we can split our problem in such a way that the workers can work independently on some portion of our problem, then just assemble at the end.

In this case, we distribute our data such that each worker computes a chunk of it instead.  This technique is called [map-reduce](https://courses.cs.washington.edu/courses/cse490h/07wi/readings/IntroductionToParallelProgrammingAndMapReduce.pdf), because we map a function and chunk of data to a worker, collect all of the results, and then aggregate them using a reduction technique (such as the sum, mean, min, max).

![map_reduce_image](assets/map_reduce.webp)
_Figure: The diagram illustrating this map and reduction parallel code architecture.  From: https://www.mdpi.com/1999-4893/8/3/407_

Let's see how this works when it comes to performing something such as a sum of a 20000-item array.

In [None]:
def compute_sum(in_array: np.ndarray) -> float:
    output = 0.0
    for item in in_array:
        output += item
    return output


def p_compute_sum(input_array: np.ndarray, pool: mp.Pool,
                  num_workers: int = 4) -> float:
    """
    Distribute an 'equal' chunk of the input_array array to
      each worker, map the sum compute function to each worker 
      to compute their individual sums, then sum (reduce) the smaller
      number of partial sums.
    
    :param input_array: some large 1D array of numbers
    :param num_workers: number of workers to use in the pool
    :returns: the sum of the floats in input_array
    """
    offset = len(input_array) % num_workers
    amount_to_pad = num_workers - (offset) if offset > 0 else 0
    full_array = np.pad(
        array=input_array, pad_width=(0, amount_to_pad), mode='constant')
    orig_size = len(full_array)
    reshaped_array = full_array.reshape(
        (orig_size // num_workers, num_workers))
    results = pool.map(compute_sum, reshaped_array)  # map
    sum_result = np.sum(results)  # reduce
    return sum_result


def check_sum() -> None:
    num_workers = 16
    with mp.Pool(processes=num_workers) as pool:  # start the pool first
        random_array = np.random.default_rng().uniform(
            low=-24.5, high=72.4, size=2000000)
        serial_sum = compute_sum(random_array)
        parallel_sum = p_compute_sum(random_array, pool)
        print(f"serial result: {serial_sum}")
        print(f"parallel result: {parallel_sum}")
        # Note: use np.isclose() instead of ==
        #       since floats have some precision
        print(np.isclose(serial_sum, parallel_sum))

In [None]:
check_sum()

If you perform the timings using the tools in the next notebook, you might notice that the parallel sum computation is slower than the serial.  This is simply because a sum is faster to compute than to send data to the worker processes. Thus, often parallelization is a point of discussion and not always the best answer.

You're now ready to use the `multiprocessing.Pool` interface!

If you'd like more information, here are some additional resources:

- https://people.duke.edu/~ccc14/sta-663-2016/19B_Threads_Processses_Concurrency.html#Using-multiprocessing
- https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool
- https://docs.python.org/3/library/multiprocessing.html#examples  (scroll down to "Using Pool")
- https://courses.cs.washington.edu/courses/cse490h/07wi/readings/IntroductionToParallelProgrammingAndMapReduce.pdf
- https://cs.boisestate.edu/~amit/teaching/530/handouts/ep.pdf (general information on parallel processing)