# Multiprocessing in Python Part 1

In [6]:

import multiprocessing as mp
print(mp.cpu_count())


4


## class Pool

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]) <br><br>
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

processes is the number of worker processes to use. If processes is None then the number returned by os.cpu_count() is used.

If initializer is not None then each worker process will call initializer(*initargs) when it starts.

maxtasksperchild is the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process, to enable unused resources to be freed. The default maxtasksperchild is None, which means worker processes will live as long as the pool.

context can be used to specify the context used for starting the worker processes. Usually a pool is created using the function multiprocessing.Pool() or the Pool() method of a context object. In both cases context is set appropriately.

Note that the methods of the pool object should only be called by the process which created the pool.

apply(func[, args[, kwds]]) <br><br>
Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

apply_async(func[, args[, kwds[, callback[, error_callback]]]]) <br><br>
A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

map(func, iterable[, chunksize]) <br><br>
A parallel equivalent of the map() built-in function (it supports only one iterable argument though, for multiple iterables see starmap()). It blocks until the result is ready.

This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. The (approximate) size of these chunks can be specified by setting chunksize to a positive integer.

Note that it may cause high memory usage for very long iterables. Consider using imap() or imap_unordered() with explicit chunksize option for better efficiency.

map_async(func, iterable[, chunksize[, callback[, error_callback]]]) <br><br>
A variant of the map() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

imap(func, iterable[, chunksize]) <br><br>
A lazier version of map().

The chunksize argument is the same as the one used by the map() method. For very long iterables using a large value for chunksize can make the job complete much faster than using the default value of 1.

Also if chunksize is 1 then the next() method of the iterator returned by the imap() method has an optional timeout parameter: next(timeout) will raise multiprocessing.TimeoutError if the result cannot be returned within timeout seconds.

imap_unordered(func, iterable[, chunksize]) <br><br>
The same as imap() except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be “correct”.)

starmap(func, iterable[, chunksize]) <br><br>
Like map() except that the elements of the iterable are expected to be iterables that are unpacked as arguments.

Hence an iterable of [(1,2), (3, 4)] results in [func(1,2), func(3,4)].

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]) <br><br>
A combination of starmap() and map_async() that iterates over iterable of iterables and calls func with the iterables unpacked. Returns a result object.

close() <br><br>
Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

terminate() <br><br>
Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.

join() <br><br>
Wait for the worker processes to exit. One must call close() or terminate() before using join().

In [None]:

import multiprocessing as mp
from multiprocessing import Pool

def square(x):
    return x * x

if __name__ == '__main__':
    with Pool(5) as pool:
        print(pool.map(square, [1, 2, 3]))
        pool.close()
    

## class AsyncResult

class multiprocessing.pool.AsyncResult <br><br>
The class of the result returned by Pool.apply_async() and Pool.map_async().

get([timeout]) <br><br>
Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().

wait([timeout]) <br><br>
Wait until the result is available or until timeout seconds pass.

ready() <br><br>
Return whether the call has completed.

successful() <br><br>
Return whether the call completed without raising an exception. Will raise ValueError if the result is not ready.

In [None]:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))

        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError
        

## class Process

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None) <br><br>
Process objects represent activity that is run in a separate process. The Process class has equivalents of all the methods of threading.Thread.

The constructor should always be called with keyword arguments. group should always be None; it exists solely for compatibility with threading.Thread. target is the callable object to be invoked by the run() method. It defaults to None, meaning nothing is called. name is the process name (see name for more details). args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. If provided, the keyword-only daemon argument sets the process daemon flag to True or False. If None (the default), this flag will be inherited from the creating process.

By default, no arguments are passed to target.

If a subclass overrides the constructor, it must make sure it invokes the base class constructor (Process.__init__()) before doing anything else to the process.

In [3]:

import multiprocessing as mp
from multiprocessing import Process
from multiprocessing import Queue
import os

def info(title):
    print(title)
    print(f'module name: {__name__}')
    print(f'parent process: {os.getppid()}')
    print(f'process id: {os.getpid()}')

def f(name):
    info(title='f')
    print(f'Hello, {name}')

if __name__ == '__main__':
    info(title='__main__')
    mp.set_start_method(method='spawn', force=False)
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()
    

__main__
module name: __main__
parent process: 22558
process id: 25559


KeyboardInterrupt: 

## Contexts and Start Methods

spawn <br><br>
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.

Available on Unix and Windows. The default on Windows and macOS.

fork <br><br>
The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.

Available on Unix only. The default on Unix.

forkserver <br><br>
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

Available on Unix platforms which support passing file descriptors over Unix pipes.

In [None]:

import multiprocessing as mp
from multiprocessing import Process

def print_range(start=0, stop=0, step=1):
    print(range(start, stop, step))

def print_ascii(ascii_list):
    print([chr(asc) for asc in ascii_list])
        
if __name__ == '__main__':
    mp.set_start_method(method='spawn', force=False)
    p1 = Process(target=print_range, args=(0, 10))
    p2 = Process(target=print_ascii, args=range(97, 123))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    

In [None]:

import multiprocessing as mp
from multiprocessing import Process

if __name__ == '__main__':
    mp.set_start_method(method='fork', force=False)
    p1 = Process(target=print_range, args=(0, 10))
    p2 = Process(target=print_ascii, args=range(97, 123))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    

In [None]:

import multiprocessing as mp
from multiprocessing import Process

if __name__ == '__main__':
    mp.set_start_method(method='forkserver', force=False)    
    p1 = Process(target=print_range, args=(0, 10))
    p2 = Process(target=print_ascii, args=(97, 123))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    

## Pipe

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way).

In [None]:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
    
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv()) # prints "[42, None, 'hello']"
    p.join()
    