https://www.machinelearningplus.com/python/parallel-processing-python/

https://www.machinelearningplus.com/

In [1]:
import numpy as np
from time import time
import multiprocessing as mp
%time

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 500, size=[200000, 5])
data = arr.tolist()
data[:5]

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.53 µs


[[62, 240, 109, 311, 8],
 [20, 179, 439, 182, 240],
 [445, 3, 139, 246, 436],
 [125, 362, 470, 329, 0],
 [179, 366, 368, 269, 471]]

In [2]:
# Solution Without Paralleization

def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

In [3]:
%%time
results = []
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))

print(results[:10])

[1, 0, 0, 0, 0, 1, 0, 0, 0, 0]
CPU times: user 89.3 ms, sys: 3.97 ms, total: 93.3 ms
Wall time: 92.5 ms


In [4]:
# %%time
# # Parallelizing using Pool.apply()

# # Step 1: Init multiprocessing.Pool()
# pool = mp.Pool(mp.cpu_count())

# # Step 2: `pool.apply` the `howmany_within_range()`
# results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]

# # Step 3: Don't forget to close
# pool.close()    

# print(results[:10])

In [5]:
%%time
# Parallelizing using Pool.map()

# Redefine, with only 1 mandatory argument.
def howmany_within_range_rowonly(row, minimum=4, maximum=8):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

pool = mp.Pool(mp.cpu_count())

results = pool.map(howmany_within_range_rowonly, [row for row in data])

pool.close()

print(results[:10])

[1, 0, 0, 0, 0, 1, 0, 0, 0, 0]
CPU times: user 76.7 ms, sys: 20.4 ms, total: 97.1 ms
Wall time: 191 ms


In [6]:
%%time
pool = mp.Pool(mp.cpu_count())

results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])

pool.close()

print(results[:10])

[1, 0, 0, 0, 0, 1, 0, 0, 0, 0]
CPU times: user 209 ms, sys: 44.7 ms, total: 254 ms
Wall time: 335 ms


In [7]:
# Parallel processing with Pool.apply_async()
pool = mp.Pool(mp.cpu_count())

results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)


# # Step 3: Use loop to parallelize
# for i, row in enumerate(data):
#     pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# # Step 4: Close Pool and let all the processes complete    
# pool.close()
# pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# # Step 5: Sort results [OPTIONAL]
# results.sort(key=lambda x: x[0])
# results_final = [r for i, r in results]

# print(results_final[:10])

In [8]:
# Parallel processing with Pool.apply_async() without callback function
pool = mp.Pool(mp.cpu_count())

results = []

# call apply_async() without callback
result_objects = [pool.apply_async(howmany_within_range2, args=(i, row, 4, 8)) for i, row in enumerate(data)]

# result_objects is a list of pool.ApplyResult objects
results = [r.get()[1] for r in result_objects]

pool.close()
pool.join()
print(results[:10])

[1, 0, 0, 0, 0, 1, 0, 0, 0, 0]


pandas

In [9]:
import numpy as np
import pandas as pd
import multiprocessing as mp

df = pd.DataFrame(np.random.randint(3, 10, size=[5, 2]))
print(df.head())

   0  1
0  8  8
1  8  8
2  4  3
3  6  5
4  3  5


In [16]:
# Row wise Operation
def hypotenuse(row):
    return round(row[1]**2 + row[2]**2, 2)**0.5

with mp.Pool(4) as pool:
    result = pool.imap(hypotenuse, df.itertuples(name=False), chunksize=10)
    output = [round(x, 2) for x in result]

print(output)

ValueError: Type names and field names cannot be a keyword: 'False'

In [11]:
# Column wise Operation
def sum_of_squares(column):
    return sum([i**2 for i in column[1]])

with mp.Pool(2) as pool:
    result = pool.imap(sum_of_squares, df.iteritems(), chunksize=10)
    output = [x for x in result]

print(output) 

[189, 187]
