# Pool Class Example

## Count #s in given range in a matrix

In [1]:
import numpy as np
from time import time 

In [22]:
#Prepare data
np.random.RandomState(100)
arr = np.random.randint(0,10, size=[8, 200000])
data = arr.tolist()
print(arr.shape)
print(len(data))
print(len(data[0]))

(8, 200000)
8
200000


### Without parallelization

In [23]:
def within_range(row, min_, max_):
    count=0
    for n in row:
        if min_<=n<=max_:
            count=count+1
    return count
result=[]
for row in data:
    result.append(within_range(row, 4, 8))
print(result)

[100120, 100217, 99623, 100092, 99541, 100275, 100033, 100009]


### Parallel approach Pool.apply()

In [24]:
import multiprocessing as mp

In [25]:
pool = mp.Pool(mp.cpu_count())

In [26]:
results = [pool.apply(within_range, args=(row, 4,8)) for row in data]

In [28]:
pool.close()

In [29]:
print(results)

[100120, 100217, 99623, 100092, 99541, 100275, 100033, 100009]


## Parallel approach Pool.map()

The main difference between `.map()` and `.apply()` is that map accepts only 1 argument that is the itterable object on which we have to run the function whereas apply accepts multiple arguments which are passed to the function which we are parellelizing

### Async Parallel Processing using apply_async()

apply_async() is very similar to apply() except that you need to provide a callback function that tells how the computed results should be stored.

However, a caveat with apply_async() is, the order of numbers in the result gets jumbled up indicating the processes did not complete in the order it was started.

To solve this we can have an itteration identifier to identify the row no and then put the answer in the specific position

In [88]:
def within_range2(row, row_i, min_, max_):
    count=0
    for n in row:
        if min_<=n<=max_:
            count=count+1
    print(row_id)
    return (row_id, count)

In [89]:
async_result=[]

In [90]:
def collect_result(result):
    global async_result
    async_result.append(result)
    print(async_result)
    print(result)

In [91]:
pool=mp.Pool(mp.cpu_count())

In [92]:
for i, row in enumerate(data):
    pool.apply_async(within_range2, args=(row, i, 4, 8), callback=collect_result)

In [93]:
pool.close()

In [94]:
pool.join()

In [95]:
print(async_result)

[]
