How many maximum parallel processes can you run?

In [1]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  16


**Two types of execution**
What is Synchronous and Asynchronous execution?

**Synchronous**
* the processes are computed in the same order in which it was started

**Asynchronous**
* does not involve locking
* the order of the result may get mixe up but usually gets done quicker

2 main objects in multiprocessing to implement parallel execution of a function: The `Pool` Class and the `Process` Class.

1. Pool Class
    1. Synchronous execution
       * Pool.map() and Pool.starmap()
       * Pool.apply()
    3. Asynchronous execution
        * Pool.map_async() and Pool.starmap_async()
        * Pool.apply_async())
2. Process Class

In [2]:
# problem statement: Count how many numbers exist between a given range in each row
import numpy as np
from time import time

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()
data[:5]

[[3, 1, 0, 3, 9],
 [1, 4, 5, 7, 5],
 [8, 8, 5, 9, 8],
 [7, 1, 0, 3, 5],
 [6, 0, 1, 3, 1]]

In [3]:
# Solution Without Paralleization

def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

results = []
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))

print(results[:10])

[0, 4, 4, 2, 1, 3, 3, 3, 2, 3]


**How to parallelize any function?**
* take a particular function that should be run multiple times and make it run parallelly in different processors.
* initialize a `Pool` with n number of processors and pass the function you want to parallelize to one of `Pools` parallization methods
* `multiprocessing.Pool()` provides the `apply()`, `map()` and `starmap()` methods to make any function run in parallel.

**what’s the difference between `apply()` and `map()`?**
* Both apply and map take the function to be parallelized as the main argument.
* `apply()` takes an args argument that accepts the parameters passed to the ‘function-to-be-parallelized’ as an argument
* `map` can take only one iterable as an argument
* `map()` is really more suitable for simpler iterable operations but does the job faster

## Parallelizing using Pool.apply()


In [4]:
# Parallelizing using Pool.apply()

import multiprocessing as mp

# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())

# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]

# Step 3: Don't forget to close
pool.close()    

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

[0, 4, 4, 2, 1, 3, 3, 3, 2, 3]


## Parallelizing using Pool.map()
* `Pool.map()` accepts only one iterable as argumen
* 

In [5]:
# Parallelizing using Pool.map()
import multiprocessing as mp

# Redefine, with only 1 mandatory argument.
def howmany_within_range_rowonly(row, minimum=4, maximum=8):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

pool = mp.Pool(mp.cpu_count())

results = pool.map(howmany_within_range_rowonly, [row for row in data])

pool.close()

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

[0, 4, 4, 2, 1, 3, 3, 3, 2, 3]


## Parallelizing using Pool.starmap()
* Like `Pool.map()`, `Pool.starmap()` also accepts only one iterable as argument
* but in `starmap()`, each element in that iterable is also a iterable
* provide the arguments to the ‘function-to-be-parallelized’ in the same order in this inner iterable element, will in turn be unpacked during execution.
* Pool.starmap() is like a version of Pool.map() that accepts arguments.

In [6]:
# Parallelizing with Pool.starmap()
import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())

results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])

pool.close()

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

[0, 4, 4, 2, 1, 3, 3, 3, 2, 3]


# Asynchronous Parallel Processing
* `apply_async()`, `map_async()` and `starmap_async()`
* lets you do execute the processes in parallel asynchronously
* next process can start as soon as previous one gets over without regard for the starting order
* there is no guarantee that the result will be in the same order as the input

##  Parallelizing with Pool.apply_async()
* is very similar to apply()
* you need to provide a callback function that tells how the computed results should be stored.
* the order of numbers in the result gets jumbled up indicating the processes did not complete in the order it was started.
* we redefine a new howmany_within_range2() to accept and return the iteration number (i) as well and then sort the final results.

In [7]:
# Parallel processing with Pool.apply_async()

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)


# Step 3: Use loop to parallelize
for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# Step 4: Close Pool and let all the processes complete    
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]

Process ForkPoolWorker-62:
Process ForkPoolWorker-58:
Process ForkPoolWorker-49:
Process ForkPoolWorker-55:
Process ForkPoolWorker-52:
Process ForkPoolWorker-54:
Process ForkPoolWorker-60:
Process ForkPoolWorker-53:
Process ForkPoolWorker-50:
Process ForkPoolWorker-57:
Process ForkPoolWorker-56:
Process ForkPoolWorker-61:
Process ForkPoolWorker-64:
Process ForkPoolWorker-59:
Process ForkPoolWorker-51:
Process ForkPoolWorker-63:
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent c

  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/queues.py", line 354, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'howmany_within_range2' on <module '__main__'>
AttributeError: Can't get attribute 'howmany_within_range2' on <module '__main__'>
  File "/local/pkg/p

  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/python3.7/multiprocessing/pool.py", line 110, in worker
    task = get()
  File "/local/pkg/python/root-python-3.7/lib/py

KeyboardInterrupt: 

In [9]:
pool.close()

AttributeError: 'Pool' object has no attribute 'clear'

# Refrences
https://www.machinelearningplus.com/python/parallel-processing-python/
