In [1]:
import pathos.multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  12


+ multiprocessing does not work in jupyter notebook in Windows10

In [1]:
import numpy as np
import concurrent.futures
import time

# if __name__ == '__main__':

def do_something(seconds):
	print(f'Sleeping {seconds} second(s)...')
	time.sleep(seconds)
	# print('Done sleeping...')
	return f'Done sleeping {seconds} second(s)'

n_repeat = 3
sleep_time = 1
sleep_times = [i for i in range(n_repeat,0,-1)]

# print("------------- Normal ---------------")
# start = time.perf_counter()

# for _ in range(n_repeat):
# 	do_something(sleep_time)
# finish = time.perf_counter()
# print(f'Finished in {round(finish - start,2)} second(s)')

# print("------------- Multiprocessing ---------------")
start = time.perf_counter()

# #### Example 1
# processes = []
# n_jobs = n_repeat

# for _ in range(n_jobs):
# 	p = multiprocessing.Process(target = do_something,args = [sleep_time])
# 	p.start()
# 	processes.append(p)

# for process in processes:
# 	process.join()


# #### Example 2: processes will return results once they are finished
# with concurrent.futures.ProcessPoolExecutor() as executor:
# 	# f1 = 
# 	# f2 = executor.submit(do_something, 1)
# 	# print(f1.result())
# 	# print(f2.result())
# 	results = [executor.submit(do_something, sleep_times[_]) for _ in range(n_repeat)]
# 	for f in concurrent.futures.as_completed(results):
# 		print(f.result())

#### Example 3: processes will return results in the order they were started
with concurrent.futures.ProcessPoolExecutor() as executor:
	results = executor.map(do_something, sleep_times)
	for result in results:
		print(result)

finish = time.perf_counter()
print(f'Finished in {round(finish - start,2)} second(s)')

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

# Notes from 10745

In [7]:
#Using a Process Pool
import multiprocessing

def function_square(data):
    result = data*data
    return result

In [8]:
inputs = list(range(0,100))

In [10]:
#The total number of parallel processes is four:
pool = multiprocessing.Pool(processes=4)

In [None]:
# The multiprocessing.Pool method applies function_square to the input element to perform a simple calculation. 
# The pool.map method submits to the process pool as separate tasks
# The result of the calculation is stored in pool_outputs
pool_outputs = pool.map(function_square, inputs)

In [None]:
pool.close()

In [None]:
pool.join()

In [None]:
print ('Pool:', pool_outputs)

# Notes from https://www.machinelearningplus.com/python/parallel-processing-python/

In [24]:
import pathos.multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  12


In [41]:
import numpy as np
import time

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()
data[:5]

[[8, 0, 6, 8, 7],
 [2, 8, 5, 5, 3],
 [8, 4, 9, 2, 0],
 [3, 1, 2, 1, 7],
 [0, 2, 3, 8, 8]]

In [42]:
# Solution Without Paralleization

def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count


start_time = time.time()
results = []
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
print("--- %s seconds ---" % (time.time() - start_time))

[4, 3, 2, 1, 2, 0, 4, 2, 1, 2]
--- 0.10632491111755371 seconds ---


In [43]:
# Parallelizing using Pool.apply()

import pathos.multiprocessing as mp

# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())

start_time = time.time()
# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]

# Step 3: Don't forget to close
pool.close()    

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
print("--- %s seconds ---" % (time.time() - start_time))

[4, 3, 2, 1, 2, 0, 4, 2, 1, 2]
--- 101.71500444412231 seconds ---


In [38]:
# Parallelizing using Pool.map()
import pathos.multiprocessing as mp

# Redefine, with only 1 mandatory argument.
def howmany_within_range_rowonly(row, minimum=4, maximum=8):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

start_time = time.time()
pool = mp.Pool(mp.cpu_count())

results = pool.map(howmany_within_range_rowonly, [row for row in data])

pool.close()

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
print("--- %s seconds ---" % (time.time() - start_time))

[2, 0, 3, 4, 4, 3, 2, 3, 3, 4]
--- 0.4538440704345703 seconds ---


In [39]:
# Parallelizing with Pool.starmap()
import pathos.multiprocessing as mp

start_time = time.time()
pool = mp.Pool(mp.cpu_count())

results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])

pool.close()

print(results[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
print("--- %s seconds ---" % (time.time() - start_time))

[2, 0, 3, 4, 4, 3, 2, 3, 3, 4]
--- 0.5691015720367432 seconds ---


In [40]:
# Parallel processing with Pool.apply_async()

import pathos.multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)

start_time = time.time()
# Step 3: Use loop to parallelize
for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# Step 4: Close Pool and let all the processes complete    
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])
#> [3, 1, 4, 4, 4, 2, 1, 1, 3, 3]
print("--- %s seconds ---" % (time.time() - start_time))

[2, 0, 3, 4, 4, 3, 2, 3, 3, 4]
--- 5.790550947189331 seconds ---


# Notes from https://stackoverflow.com/questions/20727375/multiprocessing-pool-slower-than-just-using-ordinary-functions

In [49]:
import difflib

In [51]:
difflib.get_close_matches("b","b")

['b']

In [4]:
import difflib, random, time
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) 
    for lengthofword in range(5)]) for nrofwords in range(100000)]
mainword = "hello"
mainwordlist = [mainword for each in range(50)]

In [6]:
processes = 5
def splitlist(inlist, chunksize):
    return [inlist[x:x+chunksize] for x in range(0, len(inlist), chunksize)]
# print (len(mainwordlist)/processes)
mainwordlistsplitted = splitlist(mainwordlist, int(len(mainwordlist)/processes))

In [2]:
def splitlist(inlist, chunksize):
    return [inlist[x:x+chunksize] for x in range(0, len(inlist), chunksize)]
splitlist([1,2,3,4,5],2)

[[1, 2], [3, 4], [5]]

In [None]:
from multiprocessing import Pool
import random, time
from difflib import get_close_matches

# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) 
                     for lengthofword in range(5)]) for nrofwords in range(10000)]
mainword = "hello"

# comparison function
def findclosematch(subwordlist):
    matches = get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
    if matches:
        return matches

# pool
print("pool method")
if __name__ == '__main__':
    pool = Pool(processes=3)
    t=time.time()
    result = pool.map_async(findclosematch, wordlist, chunksize=100)
    #do something with result
    for r in result.get():
        pass
    print (time.time()-t)

# normal
print ("normal method")
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
    pass
print (time.time()-t)

pool method


# Some attempt by myself

# Multiprocessing

In [1]:
import numpy as np
# instantiate and configure the worker pool
from pathos.multiprocessing import ProcessPool
import time

In [2]:
pool = ProcessPool(nodes=8)

In [3]:
B = 100000

In [None]:
base = 

In [10]:
start_time = time.time()
# do a blocking map on the chosen function
a = pool.map(pow, list(np.arange(B)), list(np.ones(B)*2))
print("--- %s seconds ---" % (time.time() - start_time))

--- 10.838655710220337 seconds ---


In [9]:
start_time = time.time()
# do a non-blocking map, then extract the results from the iterator
results = pool.imap(pow, np.arange(B), np.ones(B)*2)
a = list(results)
print("--- %s seconds ---" % (time.time() - start_time))

--- 242.35130786895752 seconds ---


In [15]:
start_time = time.time()
pool = ProcessPool(nodes=4)
# do a blocking map on the chosen function
a = [pow(i,2) for i in range(B)]
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.08654260635375977 seconds ---


# PP

In [2]:
import numpy as np
# instantiate and configure the worker pool
from pathos.pp import ParallelPool
import time

In [15]:
pool = ParallelPool(nodes=4)

### Example 1

In [16]:
B = 1000

In [17]:
start_time = time.time()
# do a blocking map on the chosen function
a = pool.map(pow, list(np.arange(B)), list(np.ones(B)*2))
print("--- %s seconds ---" % (time.time() - start_time))

--- 1.104346513748169 seconds ---


In [18]:
start_time = time.time()
# do a non-blocking map, then extract the results from the iterator
results = pool.imap(pow, np.arange(B), np.ones(B)*2)
a = list(results)
print("--- %s seconds ---" % (time.time() - start_time))

--- 1.1080365180969238 seconds ---


In [11]:
start_time = time.time()
# do a blocking map on the chosen function
a = [pow(i,2) for i in range(B)]
print("--- %s seconds ---" % (time.time() - start_time))

--- 0.044896841049194336 seconds ---


### Example 2

In [22]:
def add(x,y):
    return x+y

B = 10000
inX = list(np.arange(B))
inY = list(np.ones(B) * 2)

start_time = time.time()
# do a blocking map on the chosen function
a = pool.map(pow, inX, inY)
print("--- %s seconds ---" % (time.time() - start_time))

--- 25.86776375770569 seconds ---
