In [6]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import time
import numpy as np
import os
import multiprocessing
import random
import sys
import functools


In [None]:
def mc_pi(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(-1, 1)
        y = np.random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [None]:
%%time

res = [mc_pi(int(1e6)) for i in range(2)]

In [None]:
@njit()
def mc_pi_numba(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(-1, 1)
        y = np.random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [None]:
%%time

res = [mc_pi_numba(int(1e6)) for i in range(2)]


In [53]:
np.array(res)

array([3.139124, 3.141576])

In [58]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_numba, [int(1e7) for i in range(1000)])

KeyboardInterrupt: 

In [55]:
np.array(list(res))

array([3.140996, 3.142452, 3.14368 , 3.14292 , 3.141316, 3.139808,
       3.139864, 3.140724, 3.142452, 3.139852])

In [63]:
%%time

with ProcessPoolExecutor(max_workers=20) as pool:
    res = pool.map(mc_pi_numba, [int(1e4) for i in range(int(1e3))])

CPU times: user 1.26 s, sys: 140 ms, total: 1.4 s
Wall time: 2.59 s


In [57]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi_numba, [int(1e4) for i in range(int(1e3))], chunksize=100)

CPU times: user 20.9 ms, sys: 24 ms, total: 45 ms
Wall time: 137 ms


In [66]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi_numba, [int(1e7) for i in range(10)])

CPU times: user 26 ms, sys: 16 ms, total: 42 ms
Wall time: 754 ms


In [68]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi_numba, [int(1e4) for i in range(int(1e4))])

CPU times: user 27.7 ms, sys: 40 ms, total: 67.8 ms
Wall time: 864 ms


In [69]:
def f(i):
    time.sleep(np.random.random())
    print(os.getpid(), i)

In [73]:
for i in range(10):
    p = mp.Process(target=f, args=(i,))
    p.start()
    p.join()

31177 0
31186 1
31197 2
31206 3
31215 4
31224 5
31233 6
31242 7
31252 8
31262 9


In [11]:
#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return result

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

def test():
    
    for j in range(6):
        PROCESSES = int(j+1) 
        print(PROCESSES)
        print('Creating pool with %d processes\n' % PROCESSES)

        test = np.random.rand(int(10),1)

        with multiprocessing.Pool(PROCESSES) as pool:

            #
            # Tests
            #

            TASKS = [(mul, (test[i], 7)) for i in range(len(test))] + \
                    [(plus, (test[i], 8)) for i in range(len(test))]

            results = [pool.apply_async(calculate, t) for t in TASKS]
            imap_it = pool.imap(calculatestar, TASKS)
            imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
            
            print(imap_it)

            print('Ordered results using pool.apply_async():')
            for r in results:
                print('\t', r.get())
            print()

            print()
            for x in imap_it:
                print(type(imap_it))
                print(x)
            print()

            print('Unordered results using pool.imap_unordered():')
            for x in imap_unordered_it:
                print('\t', x)
            print()

            print('Ordered results using pool.map() --- will block till complete:')
            for x in pool.map(calculatestar, TASKS):
                print('\t', x)
            print()

            #
            # Test error handling
            #

            print('Testing error handling:')

            try:
                print(pool.apply(f, (5,)))
            except ZeroDivisionError:
                print('\tGot ZeroDivisionError as expected from pool.apply()')
            else:
                raise AssertionError('expected ZeroDivisionError')

            try:
                print(pool.map(f, list(range(10))))
            except ZeroDivisionError:
                print('\tGot ZeroDivisionError as expected from pool.map()')
            else:
                raise AssertionError('expected ZeroDivisionError')

            try:
                print(list(pool.imap(f, list(range(10)))))
            except ZeroDivisionError:
                print('\tGot ZeroDivisionError as expected from list(pool.imap())')
            else:
                raise AssertionError('expected ZeroDivisionError')

            it = pool.imap(f, list(range(10)))
            for i in range(10):
                try:
                    x = next(it)
                except ZeroDivisionError:
                    if i == 5:
                        pass
                except StopIteration:
                    break
                else:
                    if i == 5:
                        raise AssertionError('expected ZeroDivisionError')

            assert i == 9
            print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
            print()

            #
            # Testing timeouts
            #

            print('Testing ApplyResult.get() with timeout:', end=' ')
            res = pool.apply_async(calculate, TASKS[0])
            while 1:
                sys.stdout.flush()
                try:
                    sys.stdout.write('\n\t%s' % res.get(0.02))
                    break
                except multiprocessing.TimeoutError:
                    sys.stdout.write('.')
            print()
            print()

            print('Testing IMapIterator.next() with timeout:', end=' ')
            it = pool.imap(calculatestar, TASKS)
            while 1:
                sys.stdout.flush()
                try:
                    sys.stdout.write('\n\t%s' % it.next(0.02))
                except StopIteration:
                    break
                except multiprocessing.TimeoutError:
                    sys.stdout.write('.')
            print()
            print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

1
Creating pool with 1 processes

<multiprocessing.pool.IMapIterator object at 0x7fae0baeca50>
Ordered results using pool.apply_async():
	 [6.1471196]
	 [5.35938368]
	 [4.48816956]
	 [2.04935347]
	 [1.41218664]
	 [3.8168228]
	 [5.38837387]
	 [2.16817178]
	 [6.81657977]
	 [3.92111038]
	 [8.87815994]
	 [8.76562624]
	 [8.64116708]
	 [8.29276478]
	 [8.20174095]
	 [8.5452604]
	 [8.7697677]
	 [8.30973883]
	 [8.97379711]
	 [8.56015863]


<class 'multiprocessing.pool.IMapIterator'>
[6.1471196]
<class 'multiprocessing.pool.IMapIterator'>
[5.35938368]
<class 'multiprocessing.pool.IMapIterator'>
[4.48816956]
<class 'multiprocessing.pool.IMapIterator'>
[2.04935347]
<class 'multiprocessing.pool.IMapIterator'>
[1.41218664]
<class 'multiprocessing.pool.IMapIterator'>
[3.8168228]
<class 'multiprocessing.pool.IMapIterator'>
[5.38837387]
<class 'multiprocessing.pool.IMapIterator'>
[2.16817178]
<class 'multiprocessing.pool.IMapIterator'>
[6.81657977]
<class 'multiprocessing.pool.IMapIterator'>
[3.9211103

Process ForkPoolWorker-13:
Process ForkPoolWorker-12:
Traceback (most recent call last):
  File "/home/thiago/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/thiago/anaconda3/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/home/thiago/anaconda3/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/home/thiago/anaconda3/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))


KeyboardInterrupt: 

In [25]:
#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return result

def calculatestar(args):
    return calculate(*args)

def mul(a, b, c):
    time.sleep(0.5 * random.random())
    
    if c == 'sim':
        return a * b 
    
    else:
        return a

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

def test():
    
    for j in range(2):
        PROCESSES = int(j+1) 
        print('Creating pool with %d processes\n' % PROCESSES)

        test = np.random.rand(int(10),1)

        with multiprocessing.Pool(PROCESSES) as pool:

            #
            # Tests
            #

            TASKS = [(mul, (test[i], 7 , 'sim')) for i in range(len(test))]

            map_it = np.array(pool.map(calculatestar, TASKS))
            
            print(map_it[1][0])
            print(type(map_it[1][0]))

if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Creating pool with 1 processes

6.438889608501328
<class 'numpy.float64'>
Creating pool with 2 processes

3.890960870407004
<class 'numpy.float64'>


In [2]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
25
36
49
81
64
16
400
6748
[6747, 6749, 6747, 6748]
We lacked patience and got a multiprocessing.TimeoutError
For the moment, the pool remains available for more work
Now the pool is closed and no longer available


In [11]:
def f(a, b):
    print("{} {} {}".format(a, b, c))

In [12]:
iterable = [1, 2, 3, 4, 5]
pool = multiprocessing.Pool()
a = "hi"
b = "there"
func = functools.partial(f, a)
pool.map(func,b)
pool.close()
pool.join()

NameError: name 'c' is not defined