In [4]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

Due to GIL (Global Interpreter Lock) each python interpreter can only run on a single processor. So for parallel computations we need to lauch multiple interpreters under the hood.

Concurrency vs Parallelism:
* **Concurrency**: In a uniprocessor machine, multiple processes created through multiprocessing package are context switched by the OS, which results in concurrency.
* **Concurrency and Parallelism**: In a multiprocessor machine, multiple processes created through this package are executed in multiple processors along with context switching, providing both concurrency and parallelism.


### ways to start a process

* **spawn** : 
The parent process starts a fresh python interpreter process. The child process will only inherit those resources necessary to run the process objects run() method. In particular, unnecessary file descriptors and handles from the parent process will not be inherited. Starting a process using this method is rather slow compared to using fork or forkserver.
Available on Unix and Windows. The default on Windows and macOS. Does not use Unix fork. Essentially spawn creates a new process (with no access to parent's resources, so they has to be passed).

* **fork** : 
The parent process uses os.fork() to fork the Python interpreter. The child process, when it begins, is effectively identical to the parent process. All resources of the parent are inherited by the child process. Note that safely forking a multithreaded process is problematic.
Available on Unix only. The default on Unix.

* **forkserver** : 
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.

**JVM does not support fork** : 

* In C and many other programming languages the straightforward way to achieve multiprocessing is to fork separate processes (```fork()``` from C++ with explanation: https://www.csl.mtu.edu/cs4411.ck/www/NOTES/process/fork/create.html).  

* Java doesn’t support the concept of a fork (i.e. creating a copy of a running process). 

* Instead, all you can do is to start up a completely new process. 

* To create a mirror copy of your current process you’d need to start a new JVM instance with a recreated classpath and make sure that the new process reaches a state where you can get useful results from it. 

Using java from Python via JPype:

* fork creates a copy of the original process including the previous running JVM. In this case the forked JVM does not work as expected. Older versions of JPype (0.6.x) would allow the forked version to call startJVM which would create a big memory leak. The current version 0.7.1 gives and exception that the JVM cannot be restarted.

* When using JPype with multiprocessing it is necessary to spawn rather than to fork. Forked copies inherit a non functional JVM which leads to random issues.

* The problem is with the nature of multiprocessing. Python can either fork or spawn a new process. The fork option appears to have significant problems with the JVM. The default on linux is fork.

* Using the spawn context (multiprocessing.get_context("spawn")) to create a spawned version of Python will allow a fresh JVM to be created. Each spawned copy is completely independent. There are examples in the subrun.py in the test directory on github as that is what is used to test different JVM options for JPype.

* If you are using **threads** (rather than processes), all threads share the same JVM and do not need to the JVM independently.



### Threads and processes:

* Both processes and threads are independent sequences of execution. 
* The typical difference is that threads (of the same process) run in a shared memory space, while processes run in separate memory spaces. 
* Threads are an operating environment feature, rather than a CPU feature (though the CPU typically has operations that make threads efficient).
* more process and thread difference https://www.tutorialspoint.com/difference-between-process-and-thread

Multiprocessing functionality within this package requires that the __main__ module be importable by the children. Some examples, such as the multiprocessing.pool.Pool examples will not work in the interactive interpreter. 

Forked processes will work in jupyter cells, but not spawned

In [46]:
from multiprocessing import Pool, Process, Queue, Pipe, Value, Array
import multiprocessing as mp
import numpy as np
import os
import time

def f(x):
    print(f'x={x}')
    return x*x

with Pool(2) as pool:  # create pool of 2 processes
        pool.map(f, np.arange(10))

# output mess due to multiprocessing

x=2
x=3x=0
x=4

x=5
x=1x=6
x=7

x=8
x=9


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [27]:
p = Process(target=f, args=(np.arange(10),))  # Process class is used to spawn a process
p.start()  # then start() is called
p.join()  # wait for process to finish

# why such output???

x=[0 1 2 3 4 5 6 7 8 9]


In [31]:
# checking process ids in mutliprocessing context (can observe 2 different processes)
def info(msg):
    print(msg)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

info('Hello')
print('\n')
p = Process(target=info, args=('World',))
p.start()
p.join()

Hello
parent process: 9283
process id: 9285


World
parent process:9285
process id:11652


### exchanging objects between processes (Queue)

In [36]:
# using Queue to put and get variables

def f(q):
    q.put([42, None, 'hello'])

q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get())    
p.join()

[42, None, 'hello']


In [38]:
# The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). 
def f(conn):
    conn.send([42, None, 'hello'])  # each connection object has send() and recv() methods
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   
    p.join()

[42, None, 'hello']


Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

In [40]:
# Data can be stored in a shared memory map using Value or Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)  # double
    arr = Array('i', range(10))  # int array

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


Comment on Pool methods ```map, apply``` and ```apply_async```:

* in older python versions ```apply(f, args, kwargs)``` was used, now it's replaced by ```f(*args, **kwargs)```, so apply() is not really used
* Pool is mimicking the same behaviour  
* pool.apply applies a function (through 1 process) and waits until it's finished and returns a result
* pool.apply_async applies a function but returns AsyncResult, the current process can continue further, the function call output can be later retrieved using .get() which waits for the function to finish
* so pool.apply(..) is equivalent to pool.apply_async(..).get()
* apply_async has callback argument which is called when the function is complete (see example below)
* pool.map :
    - equivalent to built-in map but accepts only one iterable (!) 
    - will wait the functions to finish 
    - will preserve the passed argument order
    - will execute mapping for different arguments from the iterable in parallel

* pool.map_async - similar to pool.map but won't block the current parent process until the functions finish
* pool.starmap will apply * to the args, so we can pass multiple args to the function from an iterable of arg tuples

### Here is a table summary of all possible combinations

| method                | Multi-args  | Concurrence  |  Blocking    | Ordered-results|
-------------------|------------|-----------------|------------|---------|
Pool.map          | no          | yes            |yes         | yes|
Pool.map_async    | no          | yes            |no          | yes|
Pool.apply        | yes         | no             |yes         | no|
Pool.apply_async  | yes         | yes            |no          | no|
Pool.starmap      | yes         | yes            |yes         | yes|
Pool.starmap_async| yes         | yes            |no          | no|


In [47]:
# callback example for apply_async

def foo_pool(x):
    time.sleep(1)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(4):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

[0, 1, 4, 9]


In [49]:
# some examples

def f(x):
    return x*x


with Pool(processes=2) as pool:  # start 2 worker processes

    print(pool.map(f, range(10)))

    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print(res.get(timeout=1))             # prints "400"

    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print(res.get(timeout=1))             # prints the PID of that process

    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print([res.get(timeout=1) for res in multiple_results])


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
400
12140
[12140, 12140, 12140, 12140]


In [50]:
# multiple ways to iterate over some of the function arguments and fix the other 
# say we have f(a, b), we want to fix b=10 on the fly and apply f(a, b=10) to a list of a-values


from functools import partial
from itertools import repeat
from multiprocessing import Pool

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

Using functools.partial is very helpful (!!) since it's output created on the fly can be used both with fork and spawn, while other local functions (e.g. defined inside current function, not globally) cannot be pickled and are not suitable for multiprocessing

# (add multithreading in python)