<a href="https://colab.research.google.com/github/dancher00/HPPL/blob/main/(Eng_)HPPL2025_Multiprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Multiprocessing
=======

https://www.digitalocean.com/community/tutorials/python-multiprocessing-example

https://docs.python.org/3/library/multiprocessing.html

One of the best extensive tutorials:
https://superfastpython.com/multiprocessing-in-python/


Processes and threads in programming:
========

* A process is an independent program that runs in its own memory space and has its own resources.
* A thread is the smallest unit of execution within a process. Threads within a single process share memory and resources with each other.

```
Processes:
+-------------------+    +-------------------+
| Process 1         |    | Process 2         |
| +---------------+ |    | +---------------+ |
| | Memory        | |    | | Memory        | |
| +---------------+ |    | +---------------+ |
+-------------------+    +-------------------+

Threads:
+--------------------------------------------+
| Process                                    |
| +------------+  +------------+  +---------+|
| | Thread 1   |  | Thread 2   |  | Thread 3||
| +------------+  +------------+  +---------+|
| *Shared memory for all threads*            |
+--------------------------------------------+

```

"Fork-join" concept

1. Start with one process
2. Create new processes when needed
3. Then merge them
4. The master remains

In [None]:
import multiprocessing

In [None]:
multiprocessing.cpu_count()

2

In [None]:
%%writefile process.py

from multiprocessing import Process
from time import sleep

def print_func(continent='Asia'):
    print('The name of continent is : ', continent)
    sleep(2)
    print('Done')
    return 5

if __name__ == "__main__":  # confirms that the code is under main function
    names = ['America', 'Europe', 'Africa']
    procs = []
    proc = Process(target=print_func)  # instantiating without any argument
    procs.append(proc)
    proc.start()

    # instantiating process with arguments
    for name in names:
        # print(name)
        proc = Process(target=print_func, args=(name,))
        procs.append(proc)
        proc.start()

    # complete the processes
    for proc in procs:
        proc.join()




Writing process.py


In [None]:
!time python process.py

The name of continent is :  Asia
The name of continent is :  America
The name of continent is :  Africa
The name of continent is :  Europe
Done
Done
Done
Done

real	0m2.123s
user	0m0.090s
sys	0m0.033s


https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

In [None]:
%%writefile pool.py

from multiprocessing import Pool
import numpy as np

def f(x):
    return x*x

if __name__ == '__main__':
    a = np.linspace(0, 1, 2**20)
    with Pool(2) as p:
        a = p.map(f, a, 2)


Writing pool.py


In [None]:
!python pool.py

In [None]:
%%writefile pool2.py

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Writing pool2.py


In [None]:
!python pool2.py

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
9
16
25
36
49
64
81
400
1240
[1242, 1242, 1242, 1240]
We lacked patience and got a multiprocessing.TimeoutError
For the moment, the pool remains available for more work
Now the pool is closed and no longer available


Object methods Pool.
=====

https://docs-python.ru/standart-library/paket-multiprocessing-python/klass-pool-modulja-multiprocessing/

    Pool.apply() calls a function with arguments,
    Pool.apply_async() asynchronous version of the method Pool.apply(),
    Pool.map() multiprocessor equivalent of a built-in function map(),
    Pool.map_async() asynchronous version of the method Pool.map(),
    Pool.imap() a lazier version of the method Pool.map(),
    Pool.imap_unordered() the same as Pool.imap(), only the results come when they are ready,
    Pool.starmap() similar to the method Pool.map(), just a different argument passing,
    Pool.starmap_async() combination of methods Pool.starmap() and Pool.map_async(),
    Pool.close() prevents tasks from being submitted to the pool,
    Pool.terminate() stops work processes,
    Pool.join() waits for work processes to complete,
    Object AsyncResult result of method calls Pool.apply_async() and Pool.map_async()
        AsyncResult.get() returns the result as soon as it arrives,
        AsyncResult.wait() waiting for the result to be available,
        AsyncResult.ready() checks if the call has completed,
        AsyncResult.successful() checks whether the call completed without exception.
    An example of creating, launching, and using a worker process pool.
    Testing the basic functionality of the Pool object.

Continuation
=====

In [None]:
from multiprocessing import Process
from time import sleep
from math import *

def func():
    print("Hello")
    sleep(5)
    print("Done sleeping")


In [None]:
%%timeit -n 1 -r 1
p1 = Process(target=func)
p2 = Process(target=func)

p1.start()
p2.start()

# try removing .join() and check the execution time
p1.join()
p2.join()

Interprocess data exchange
=====

Queue
-----

In [None]:
%%writefile consumer_reader.py
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Queue

# generate work
def producer(queue):
    print('Producer: Running')
    # generate work
    for i in range(10):
        # generate a value
        value = random()  # can be image from disk, data from web
        # block
        sleep(value)
        # add to the queue
        queue.put(value)
    # all done
    queue.put(None)
    print('Producer: Done')

# consume work
def consumer(queue):
    print('Consumer: Running')
    # consume work
    while True:
        # get a unit of work
        item = queue.get()
        # check for stop
        if item is None:
            break
        # report
        print(f'>got {item}')
    # all done
    print('Consumer: Done')

# entry point
if __name__ == '__main__':
    # create the shared queue
    queue = Queue()
    # start the consumer
    consumer_process = Process(target=consumer, args=(queue,))
    consumer_process.start()
    # start the producer
    producer_process = Process(target=producer, args=(queue,))
    producer_process.start()
    # wait for all processes to finish
    producer_process.join()
    consumer_process.join()
    # single process again


Writing consumer_reader.py


In [None]:
!python consumer_reader.py

Consumer: Running
Producer: Running
>got 0.971842640546128
>got 0.32066725832916576
>got 0.17697458286042433
>got 0.9663653495704442
>got 0.560177324280211
>got 0.8238391898477898
>got 0.45191779945688715
>got 0.1436803701553504
>got 0.035843953267184925
Producer: Done
>got 0.1945241820513286
Consumer: Done


Pipe
---

In [None]:
%%writefile pipe.py
# example of using a pipe between processes
from time import sleep
from random import random
from multiprocessing import Process
from multiprocessing import Pipe
import numpy as np

# generate work
def sender(connection):
    print('Sender: Running')
    # generate work
    for i in range(10):
        # generate a value
        value = random()
        # block
        sleep(value)
        # send data
        connection.send(value)
    # all done
    a = np.linspace(0,1,2**8)
    connection.send(a)

    connection.send(None)
    print('Sender: Done')

# consume work
def receiver(connection):
    print('Receiver: Running')
    # consume work
    while True:
        # get a unit of work
        item = connection.recv()
        # report
        print(f'>receiver got {item}')
        # check for stop
        if item is None:
            break
    # all done
    print('Receiver: Done')

# entry point
if __name__ == '__main__':
    # create the pipe
    conn1, conn2 = Pipe()
    # start the sender
    sender_process = Process(target=sender, args=(conn1,))
    sender_process.start()
    # start the receiver
    receiver_process = Process(target=receiver, args=(conn2,))
    receiver_process.start()
    # wait for all processes to finish
    sender_process.join()
    receiver_process.join()

Writing pipe.py


In [None]:
!python pipe.py

Sender: Running
Receiver: Running
>receiver got 0.15486487517410186
>receiver got 0.7688022150906013
>receiver got 0.9390614677667868
>receiver got 0.5042946374758969
>receiver got 0.8534686761018544
>receiver got 0.5787084730825145
>receiver got 0.7041913442981438
>receiver got 0.17120911952560558
>receiver got 0.6094496809775317
>receiver got 0.13038531919792484
Sender: Done
>receiver got [0.         0.00392157 0.00784314 0.01176471 0.01568627 0.01960784
 0.02352941 0.02745098 0.03137255 0.03529412 0.03921569 0.04313725
 0.04705882 0.05098039 0.05490196 0.05882353 0.0627451  0.06666667
 0.07058824 0.0745098  0.07843137 0.08235294 0.08627451 0.09019608
 0.09411765 0.09803922 0.10196078 0.10588235 0.10980392 0.11372549
 0.11764706 0.12156863 0.1254902  0.12941176 0.13333333 0.1372549
 0.14117647 0.14509804 0.14901961 0.15294118 0.15686275 0.16078431
 0.16470588 0.16862745 0.17254902 0.17647059 0.18039216 0.18431373
 0.18823529 0.19215686 0.19607843 0.2        0.20392157 0.20784314
 0.2