# Parallel Python -- Multiprocess(ing)

The `multiprocessing` module allows us to run multiple executable processes of Python code instead of multithreading. This allows Python to easily bypass the Global Interpreter Lock that makes multithreading very difficult.

An advantage to this method is that each process has its own independent memory space, and so there will be no conflicts of multiple threads trying to access the same memory-location at the same time.

Since I am using a Jupyter Notebook, there is an incompatibility with the default `multiprocessing` module. Instead, I will use a (better) fork of it called `multiprocess` that works in Jupyter. Instructions on how to install a clone of our Anaconda environment with `multiprocess` that can be loaded into Jupyter Notebook can be found in the last slide.

### Example: A Serialized Program

The most basic use of the multiprocessing module is the `Process` class. Let's make a simple program that is serialized that generates a list of four random strings.

In [2]:
import random
import string

random.seed(123)

n = 16 # iterations

# define a string generator function
def rand_string(length):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    return rand_str

# Let's generate n number of 5-character strings
result = [ rand_string(5) for x in range(n) ]

# Print our result
print(result)

['drfXA', 'rg153', 'cyIJv', 'v2dki', 'vJvSp', 'ka5BX', 'f4Mye', 'auUCg', '5cfQj', 'iY6bs', '6BKEq', 'E1cXt', 'vHZEn', '0MOHK', 'Z9uaz', '5XPGB']


## Parallelizing our Code

This code segment is **embarrassingly parallel**. It is trivial to parallelize and multiple calls of this function can act independently for the most part. Each string can be generated by a single process running the same function, and the final list can be assembled at the end.

### Determining the number of compute processors on a system

We can find the number of processors on a system to maximize our resources and performance. For this demo, it is simpler to only count all physical cores, and not any 'logical' cores resulting from the simultaneous multithreading (SMT) capabilities of the CPU.

In [3]:
import psutil

# logical =  False (we do not count logical cores from SMT)
num_procs = psutil.cpu_count(logical=False)

print(num_procs)

8


### Introducing Multiprocess(ing): The `Process` class

The Process class is a method to explicitly spawn processes and are often called predefined functions.

### Example: Using `multiprocess.Process` to Parallelize our Program

In [4]:
import multiprocess as mp
import random
import string

random.seed(123)

n = 16 # iterations

# Define an output queue
# Queue allows communication between processes using Pickle-able data (.put() and .get() methods)
# Our processes only need to communicate when aggregating their output
output = mp.Queue()

# define a example function -- we'll input queue so that our output is collected
def rand_string(length, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put(rand_str)

# Setup a list of processes that we want to run
# mp.Process is called to spawn a process
# target = the function we would like to run
# args = tuple of arguments that are accepted by the function

processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(n)]

# Each process needs to be started
for p in processes:
    p.start()

# Wait until each process has finished before moving on!
for p in processes:
    p.join()

# Get process results from the output queue
results = [output.get() for p in processes]

print(results)

['zyY19', 'Jly8S', 'TI4s0', 'ej12j', 'sP8PR', 'BPmbD', 'OJtSb', '1s4hm', 'ihYCw', 'PzAqL', 'Aa6Em', 'LsUWx', 'IxO1z', 'AI7Py', 'dOura', 'ngprt']


#### Order of Results

The output queue receives data in an unordered fashion, it only follows the order at which processes put data in the queue. This can be problematic if the order of elements in the result matter.

Luckily, the rank of each process can be tracked to make sorting the order of results easier.

In [5]:
import multiprocess as mp
import random
import string

random.seed(123)

output = mp.Queue()

# Let's create a rank parameter so we can assign a rank to each result
def rand_string(length, rank, output):
    """ Generates a random string of numbers, lower- and uppercase chars. """
    rand_str = ''.join(random.choice(
                        string.ascii_lowercase
                        + string.ascii_uppercase
                        + string.digits)
                   for i in range(length))
    output.put((rank, rand_str))

processes = [mp.Process(target=rand_string, args=(5, x, output)) for x in range(n)]

for p in processes:
    p.start()

for p in processes:
    p.join()

results = [output.get() for p in processes]

# sort the results by the rank of each process
results.sort()
#results = [r[1] for r in results]

print(results)

[(0, 'oQIAH'), (1, 'ypRgy'), (2, '29s4u'), (3, 'm4kWP'), (4, 'YMwy9'), (5, 'M9Z9P'), (6, 'eCgtu'), (7, '1Snx4'), (8, 'MaXLc'), (9, 'aCEne'), (10, '1sD9f'), (11, 'GQSJw'), (12, 'BUFVI'), (13, 'g0UJ3'), (14, 'UwvM6'), (15, 'm5jAO')]


## `Pool` class: Assigning data to worker processes

The Pool class applies or maps functions to pre-existing data which can broken up into chunks.

 - Adding more worker processes allows us to process multiple chunks in parallel.
 - The more workers, the smaller each chunk can be made.
 - The smaller the chunk, the faster our job can be completed.

Let's a define a very simple function that returns a number raised to the power of 3.

In [8]:
def cube(x):
    return x**3

Next, we'll assemble our pool of workers! There should be one worker per processor core.

In [9]:
# Let's define our pool of workers!

pool = mp.Pool(processes = num_procs)

We will use the `pool.apply()` method to to apply a function looping through a range of inputs.

 - the `apply()` method requires a function and an argument tuple
 - the function is `cube`
 - the function is repeatedly called for each input
 - the argument tuple is `(x,)`

In [10]:
# Generate the cubes of 0-255

results = [pool.apply(cube, args=(x,)) for x in range(0,255)]
print(results)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

Now let's apply the `pool.map()` method to apply a function over range of data. This may sound very similar to `pool.apply()` but the order of operations is slightly different.

- the `map()` requires a function and data to apply the function to
- the function is `cube`
- the data is the tuple `range(0,256)`
- the function is then applied to each element in the tuple

In [11]:
# Generate the cubes of 0-255

# Notice how this code requires no list comprehension and is cleaner

x = range(0,255)
#print(x)

results = pool.map(cube, x)
print(results)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

## Synchronous and Asynchronous Computing

The `.apply()` and `.map()` methods are examples of synchronous computing.

There are asynchronous variants of these methods that do not wait and return results in the order the subprocesses finished.

In [12]:
# Asynchronous method

results = [pool.apply_async(cube, args=(x,)) for x in range(0,255)]
print(results)

[<multiprocess.pool.ApplyResult object at 0x7fffec5018d0>, <multiprocess.pool.ApplyResult object at 0x7fffec501780>, <multiprocess.pool.ApplyResult object at 0x7fffec5019e8>, <multiprocess.pool.ApplyResult object at 0x7fffec501a58>, <multiprocess.pool.ApplyResult object at 0x7fffec501ba8>, <multiprocess.pool.ApplyResult object at 0x7fffec501d68>, <multiprocess.pool.ApplyResult object at 0x7fffec501e80>, <multiprocess.pool.ApplyResult object at 0x7fffec501f28>, <multiprocess.pool.ApplyResult object at 0x7fffec513048>, <multiprocess.pool.ApplyResult object at 0x7fffec513128>, <multiprocess.pool.ApplyResult object at 0x7fffec513208>, <multiprocess.pool.ApplyResult object at 0x7fffec5132e8>, <multiprocess.pool.ApplyResult object at 0x7fffec5133c8>, <multiprocess.pool.ApplyResult object at 0x7fffec5134a8>, <multiprocess.pool.ApplyResult object at 0x7fffec513588>, <multiprocess.pool.ApplyResult object at 0x7fffec513668>, <multiprocess.pool.ApplyResult object at 0x7fffec513748>, <multiprocess

**Yikes!**

No need to worry. The result of this asynchronous method is a list of objects at their memory addresses. This occur because the `apply_async` method returns an `AsyncResult` object. The objects are actually 'pickled' and need to be extracted with pickle's `.get()` method.

There are also some cool methods associated with `AsyncResult`!

In [13]:
# We can check if these asynchronous processes have finished

ready = [p.ready() for p in results]
print(ready)

[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, Tru

In [14]:
# We can also see if each process was successful, i.e. no errors!

success = [p.successful() for p in results]
print(success)

[True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, Tru

In [15]:
# The .get() methods returns the results!
# Just beware that your asynchronous result may not be finished yet

output = [p.get() for p in results]
print(output)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

You may notice that the order seems correct. That is because each process is not spawned at the same time -- there is overhead involved, and for identical workers that take the same time to complete their tasks, the order at which they spawn often determines the order they finish. Just remember **order is not guaranteed**.

In [16]:
# make sure your pool of workers is closed once finished

pool.close()

One can avoid using .close() on a `Pool` object by using:

```
with mp.Pool(processes = num_procs) as pool:
```

Let's try that!

In [17]:
with mp.Pool(processes = num_procs) as pool:

    results = pool.map_async(cube, range(0,255)) # Using asynchronous map

    output = results.get()                      # get our results
    print(output)

[0, 1, 8, 27, 64, 125, 216, 343, 512, 729, 1000, 1331, 1728, 2197, 2744, 3375, 4096, 4913, 5832, 6859, 8000, 9261, 10648, 12167, 13824, 15625, 17576, 19683, 21952, 24389, 27000, 29791, 32768, 35937, 39304, 42875, 46656, 50653, 54872, 59319, 64000, 68921, 74088, 79507, 85184, 91125, 97336, 103823, 110592, 117649, 125000, 132651, 140608, 148877, 157464, 166375, 175616, 185193, 195112, 205379, 216000, 226981, 238328, 250047, 262144, 274625, 287496, 300763, 314432, 328509, 343000, 357911, 373248, 389017, 405224, 421875, 438976, 456533, 474552, 493039, 512000, 531441, 551368, 571787, 592704, 614125, 636056, 658503, 681472, 704969, 729000, 753571, 778688, 804357, 830584, 857375, 884736, 912673, 941192, 970299, 1000000, 1030301, 1061208, 1092727, 1124864, 1157625, 1191016, 1225043, 1259712, 1295029, 1331000, 1367631, 1404928, 1442897, 1481544, 1520875, 1560896, 1601613, 1643032, 1685159, 1728000, 1771561, 1815848, 1860867, 1906624, 1953125, 2000376, 2048383, 2097152, 2146689, 2197000, 2248091

## Project Gutenberg Reader

Let's use everything we've learned to tackle one last problem.

I have consolidated 18,792 books from [Project Gutenberg](https://www.gutenberg.org/) into a large 10GB binary file.

Using Python, let's count how many times some very important strings appear in all of these books:
 - `dog`
 - `cat`
 - `girl`
 - `boy`
 

## Strategy #1 Serialized Code

<img src = 'img/single_process_reader.png' >

In [18]:
%%timeit -n1 -r1

# location of this file
f = ("/project/shared/biohpc_training/large_txt.bin")

# Create a Python 'generator'
# Reads in large binary file in 256k chunks
# Processes each chunk and tallies the word count
def read_in_chunks(file_object, chunk_size=256*1024):
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 256k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data

dogs  = 0
cats  = 0
boys  = 0
girls = 0

# Open the file, read in chunks sequentially, and count the number of word instances
with open(f, "rb") as fin:
    for chunk in read_in_chunks(fin):
        line_list = chunk.decode('latin-1').strip().split()
        
        dogs  += line_list.count("dog")
        cats  += line_list.count("cat")
        boys  += line_list.count("boy")
        girls += line_list.count("girl")

print(dogs,cats,boys,girls)

48021 16392 130761 174912
2min 43s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


## Strategy #2 Parallelized Workers

<img src = 'img/multi_process_reader.png' >

In [19]:
%%timeit -n1 -r1

# a large binary file is broken into chunks which are fed to a pool of worker processes
# fastest method

import multiprocess as mp
import psutil

# logical =  False (we do not count logical cores from SMT)
num_procs = psutil.cpu_count(logical=False)

# location of our data
f = ("/project/shared/biohpc_training/large_txt.bin")

# Create a Python 'generator'
# Reads in large binary file in 256k chunks
# Processes each chunk and tallies the word count
def read_in_chunks(file_object, chunk_size=256*1024):
    """Lazy function (generator) to read a file piece by piece.
    Default chunk size: 256k."""
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data

# Our counter function        
def count_objects(chunk):
    dogs  = 0
    cats  = 0
    boys  = 0
    girls = 0

    line_list = chunk.strip().split()

    dogs  += line_list.count("dog")
    cats  += line_list.count("cat")
    boys  += line_list.count("boy")
    girls += line_list.count("girl")
                
    return dogs, cats, boys, girls

# Open our large file
with open(f, "rb") as fin:
   # Read in chunks!
   chunks = [chunk.decode('latin-1') for chunk in read_in_chunks(fin)]


# Here we map our function onto the chunks
# We our worker pool do all the hard work for us
# No manually spawning of Process objects

def multi_read(n):
    print('Creating pool with %d processes\n' % n)
    with mp.Pool(n) as p:
        results = p.map(count_objects, chunks)
    return results
        
results = multi_read(num_procs)

all_dogs  = 0
all_cats  = 0
all_boys  = 0
all_girls = 0

for dogs, cats, boys, girls in results:
    all_dogs  += dogs
    all_cats  += cats
    all_boys  += boys
    all_girls += girls

print(all_dogs, all_cats, all_boys, all_girls)

Creating pool with 8 processes

48021 16392 130761 174912
37.9 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


While there isn't a 1:1 scaleup, this is a nice speed up!