In [1]:
import numpy as np
import subprocess

#Easy Parallel Multiprocessing in Python

With less than 10 extra lines of code, you can take your existing code and utilize all those extra cores on your computer.

Let's explore how this is done using a toy example.

### Toy Example: SVD

Define a simple function to do a Singular Value Decomposition on an input matrix.

In [2]:
def run_svd(in_array):
    return np.linalg.svd(in_array)

Generate 20 random 1000 by 1000 arrays. This is our "data".

In [3]:
array_list = []
for i in xrange(20):
    array_list.append(np.random.rand(1000,1000))

Loop through each of the 20 arrays.

In [4]:
%%time
svd_out_loop = []
for i in array_list:
    svd_out_loop.append(run_svd(i))

CPU times: user 1min 10s, sys: 963 ms, total: 1min 11s
Wall time: 1min 10s


A whole minute!? I don't know about you, but I get bored pretty quickly. Let's see if we can speed it up.

In [5]:
from multiprocessing import Pool

Initialize a multiprocessing pool with 10 cores.

In [6]:
pool = Pool(10)

Instead of looping through items, we will submit them to our multiprocesing pool, which will map our `run_svd` function across the arrays.

In [7]:
%%time
svd_out_map = pool.map(run_svd, array_list)
pool.close() # Close the pool when processing is finished.
pool.join() # Collect the results from the multiple parallel processing threads.

CPU times: user 1.21 s, sys: 869 ms, total: 2.08 s
Wall time: 9.29 s


That's much better! Now let's look at a real-world example.

### Practical Example: Smoothing fMRI data.

Let's say I have written a function to run spatial smoothing on fMRI data.

In [11]:
def run_smoothing(sub_id, file_path_template, fwhm, out_dir):
    """
    Run spatial smoothing using AFNI 3dBlurToFWHM.
    
    Parameters
    ----------
    sub_id : str
        Unique subject identifier.
    file_path_template : str
        Path to input files with with sub_id to be inserted via string formatting.
    fwhm : int
        FWHM of the 3D Gaussian kernel to use for smoothing.
    out_dir : str
        Path to an existing directory where smoothed files should be written.
    """
    try:
        func_path = file_path_template.format(sub_id)
        fname = '{}_4mm_fwhm_{}.nii.gz'.format(sub_id, str(fwhm))
        out_path = '/'.join([out_dir, fname])
        shell_out = subprocess.check_output(['3dBlurToFWHM', '-input', func_path, '-FWHM', fwhm, 
                                             '-prefix', out_path]) 
    except:
        return 'Error encountered during smoothing for subject %s.' % sub_id
        pass
    else:
        return 'Smoothing complete for %s.' % sub_id

Oh no! My `run_smoothing` function has **4 required arguments, but `Pool.map` can only pass a single argument** to the function we're mapping.

In this case, only one of the required arguments for our function changes as we iterate (`sub_id`); all other arguments stay constant. 

We could hard-code the other arguments inside the function, but that's clunky. There has to be another way.

### `functools.partial` to the rescue!

In [8]:
from functools import partial

Using `functools.partial`, we can create a "partial" function from `run_smoothing` for which all but one of the input arguments are pre-set.

In [9]:
fp_template = '/path/to/fmri_data/{}_preprocessed.nii.gz'
kernel_width = 6
o_dir = '/path/to/smoothed_data'

In [12]:
run_smoothing_partial = partial(run_smoothing, file_path_template=fp_template, fwhm=kernel_width, out_dir=o_dir)

We can then map our new function across the subject list, just as we did in the previous example.

In [None]:
subject_list = np.loadtxt('/path/to/subject_list.txt', dtype='S')

In [None]:
pool = Pool(10)
smoothing_out = pool.map(run_smoothing_partial, sub_list)
pool.close() 
pool.join() # In this case, we don't actually return anything from the function, but we join the output anyway.

The `multiprocessing.Pool` function is by far the easiest way to parallellize simple code, but it has some issues:
- It doesn't always play well with iPython or other interactive interpreters.
- It spawns a full new Python proess for each thread in the pool. This can add un-necessary overhead.

### An alternative: `IPython.parallel`

Briefly, here is how the equivalent functionality from the toy example would be achieved using `IPython.parallel`.

First, start a new IPython Notebook server. Click the "Clusters" tab, and start a new cluster with your desired number of engines (cores).

Then, in a new Notebook page, run the following code.

In [None]:
from IPython import parallel

Configure your local parallel cluster.

In [None]:
# Get the engines in our cluster.
clients = parallel.Client()
# Tell each engine to return all results at once.
clients.block = True
# Get a direct view of the cluster.
dview = clients.direct_view()
# Get a load-balanced view of the cluster.
lbview = clients.load_balanced_view()
# Set our views to run return all results at once.
dview.block = True
lbview.block = True

Define your function. Note that you have to do any imports **within the function** and explicitly push the new function to the nodes in your cluster.

In [None]:
def run_svd(in_array):
    import numpy as np
    return np.linalg.svd(in_array)
# Push to all cluster engines.
clients[:].push(dict(run_svd=run_svd))

In [None]:
# Map the array list across cluster engines.
svd_out_ipypool = lbview.map(run_svd, array_list)

### `joblib`: the easiest way to parallelize your code.

I only recently discovered [joblib](https://pythonhosted.org/joblib/index.html), but it has become my go-to method when I need to run things in parallel.

In [None]:
from joblib import Parallel, delayed

The beauty of `joblib` is that it wraps `multiprocessing.Pool` for you, so you don't have to manually create partial functions.

Instead, you pass your function and arguments to `joblib.Parallel`, and use list comprehension to define the `for` loop.

In [None]:
jl_report = Parallel(n_jobs=15, backend='threading')(
    delayed(run_smoothing)(sub_id, file_path_template, fwhm, out_dir)
    for sub_id in subject_list)

The code above uses threading, which in some cases can be more efficient than forking the job into multiple distinct processes, which requires sending data back-and-forth between processes. The [Embarrasingly parallel for loops](https://pythonhosted.org/joblib/parallel.html) page on the joblib website has more detail about how to use `joblib.Parallel` and when to use threading vs. forking. That page also discusses how to use pre-allocated memory-maps to work with data that is too large to fit in memory.

## Further Reading

- [Multiprocessing in Python: A Guided Tour](http://www.davekuhlman.org/python_multiprocessing_01.html)
- [Easily distributing a parallel IPython Notebook on a cluster](http://twiecki.github.io/blog/2014/02/24/ipython-nb-cluster/)
- [Using IPython for parallel computing](http://ipython.org/ipython-doc/2/parallel/index.html)
- [An introduction to parallel programming using Python's multiprocessing module](http://sebastianraschka.com/Articles/2014_multiprocessing_intro.html)
- [Parallelism in One Line](http://chriskiehl.com/article/parallelism-in-one-line/)