## **A Short Introduction of Multiprocessing in Python**
by Moritz Haustein 

05.03.2024

### **Some general remarks on multiprocessing in Python**
Generally, multiprocessing allows to split computational tasks in subtasks and their parallel processing on multiple CPU cores, which considerably shortens the processing time. A way to use multiprocessing in Python is by using the in-built **multiprocessing API**. <br><br>
However, there are some pitfalls when useing **multiprocessing** in Python, e.g.: <br>
1) Debugging can be a challenge as **print** or **try-except** statements are not displayed depending on the code editor used, as we will see that later in the examples! <br>
2) In Jupiter notebooks, we have to import functions we want to use in multiprocessing from an external Python module. For this reason, I explain the function we want to use in a code block first, but import these functions from **codeLib.py** in the actual examples. <br>

There are even more problems that you may have to deal with, but usually you can find a solution in the internet. So don't get frustrated right at the start! <br>
Okay, after these first notes, I would like to show you some example how **multiprocessing** can be used in Python in the following sections. Furthermore, I have added a list of other multiprocessing tutorials which might be helpful to master multiprocessing in your code.


In [14]:
#import modules
import time
import numpy as np


### **The example: Counting the frequency of primes in a range of values**
To demonstrate how multiprocessing works in Python, we first need a task which requires a lot of CPU time for execution and should therefore run faster if it is executed by multiple CPUs at the same time. Here, we will use the following CPU-bound task: counting the frequency of primes in a range of values.

We will use two functions for this:
1. **isPrimeNumber**: checks if an input value is a prime and returns True or False

2. **countPrimesInRange**: loops through all values between the start and end and calls **isPrimeNumber** for each value. If the value is a prime number, it increments our counter variable **countOfPrimes** which is eventually returned after all values have been tested.

These functions are both straightforward, but definitly not the most efficient way to accomplish the task. For example, while values divisible by 2 are processed quite fast, for very large prime number, e.g. 79,999, the whole for-loop must be run in **isPrimeNumber** and no other value can be checked until this is finished.

In the first example, we will use only a single process for the task, i.e. this script will run only on one CPU. That allows us to see how much multiprocessing can speed up the calculations.

In [15]:
def isPrimeNumber(value):
    if value < 2:
        return False
    for iCheckIfDivider in range(2, value):
        if (value % iCheckIfDivider) == 0:
            return False
    return True

def countPrimesInRange(startValue, endValue):
    countOfPrimes = 0
    for iValue in range(startValue, endValue + 1): #+1 required to test the endValue, too
        if isPrimeNumber(iValue):
            countOfPrimes += 1
    return countOfPrimes

#Main script
#define range
startValue = 1
endValue = 80000

#start processing
startTime = time.perf_counter() 
countOfPrimes = countPrimesInRange(startValue, endValue)
endTime = time.perf_counter()

#print results
print("Count of primes between %i to %i is: %i\n"%(startValue, endValue, countOfPrimes))
print("\nProcessing time: %.3f seconds" %(endTime - startTime))

Count of primes between 1 to 80000 is: 7837


Processing time: 20.141 seconds


### **Using Mutiprocessing: How to run a task in independent processes**
Now let's see how fast we are when we divide the range of values into two tasks and have them processed by two independent workers, i.e. both work simulataneously on seperate CPU cores.
1. For this, we first split the range of 1-80,000 into two tasks, i.e. **task_1** and **task_2**.

2. Next we need to initate the two workers by using **mp.Process**. For this, we need to give *mp.Process* at least two arguments:
     - **target**: this is the function the process should call. Important: only the name of the function without '()'
     - **args**: the input arguments of the function we have assigned as **target**

 In principle, we could use **countPrimesInRange** as target function, but we will add some additional features in the function **countPrimesInRangeMP**:
 After counting all primes in a range, it will create and print a message as well as return the **countOfPrimes** together with the **message**.

In [16]:
def countPrimesInRangeMP(startValue, endValue):
    countOfPrimes = countPrimesInRange(startValue, endValue) #call original counting function
    
    #print status of this worker
    processID = os.getpid() #asks operation system for ID of process this function is running in
    message = "Worker (ID: %s) found %i primes from %i to %i\n"%(processID, countOfPrimes, startValue, endValue)
    print(message)

    return countOfPrimes, message #return both: the count value+ the message

A single processes created with **mp.Process** can be stored in a "variable" so that we can access them later; in our example in  **worker1** and **worker2**.

The workers have three important methods we can call:   
    - **start()**: is used to start the processing. The worker will call *countPrimeNumbersInRangeMP* function with the start and end values as given by the tasks **1** and **2**. <br>
    - **join()**: by calling **join()** we will wait until the worker is done with its task. That actually blocks our script from procceeding. If we would not use **join()**, the script would run until its end and terminate even when the workers are still running. <br>
    - **close()**: will removes the additional computer resources used by the worker. That is not mandatory, but we will follow the Scout Rule: Keep your computer clean!   

In [17]:
import multiprocessing as mp
from codeLib import countPrimesInRangeMP 

#split range between [startValue, endValue] into 2 tasks for 2 workers (processes)
task_1 = [1, 40000]
task_2 = [40001, 80000]

#initate two processes
startTime = time.perf_counter()
print('initate workers...')
worker1 = mp.Process(target = countPrimesInRangeMP, args = (task_1[0], task_1[1])) #args: 1st is the startValue, 2nd is the endValue
worker2 = mp.Process(target = countPrimesInRangeMP, args = (task_2[0], task_2[1]))

#start processing
print('start processing...')
worker1.start()
worker2.start()

#wait until workers are done! Actually, "join" blocks the script from proceeding.
print('wait until workers are done...')
worker1.join() 
print('worker 1: done!')
worker2.join()
print('worker 2: done!')

#close workers
print('close workers')
worker1.close()
worker2.close()

endTime = time.perf_counter()

#print results
print("Processing time: %.3f seconds" %(endTime - startTime))


initate workers...
start processing...
wait until workers are done...
worker 1: done!
worker 2: done!
close workers
Processing time: 15.642 seconds


That was already faster than in our single-proccess example. But why is not the message we created in **countPrimesInRangeMP** being printed? And there are our results?
Whether you can see the messages printed by the workers depends on the IDE you are using. Unfortunately, we cannot see them in Jupyter notebooks. Also, there is no direct way to get results from the workers. What we need is a way to share data with the workers!

### **Ways of sharing data with processes**

In the next example, we will use two ways of sharing data with the workers:
1) **mp.Value**: allows to access a value in our main script as well as by the worker processess. We will initiate a **sharedValue** as integer starting at 0 by calling **mp.Value('i', 0)**. To access the stored value we have to use **sharedValue.value** later.

2) **mp.Queue**: can store a list of variables. It operates as first-in-first-out (FIFO) buffer, i.e. the first variable stored is also the first variable you will get back. There are four methods of **mp.Queue** objects which we will use in our script:
    - **put(var)**: insert variable in the queue. <br>
    - **get()**: returns first variable stored in the queue and deletes it in the queue <br>
    - **empty()**: ask if the queue is empty. Returns True (empty) or False (still variables stored) <br>
    - **close()**: closing the queue at end of our script. Same as for **mp.Process** <br>


We also have to modify our counting function and save it as **countPrimeNumbersInRange_withSharedVars**: <br>
1) It now recieves also the **sharedValue** and the **messageQueue** as arguments. <br>
2) After the worker has counted the primes, we add that result to the **sharedValue**. <br>
3) Instead of printing the message, we put it now into the **messageQueue**, so we can print it later <br>
4) The return statement is now removed, as we cannot use it anyways.

In [18]:
def countPrimeNumbersInRange_withSharedVars(startValue, endValue, sharedValue, messageQueue):
    countOfPrimes = _countPrimesInRangeMP(startValue, endValue)
    
    #store numberOfPrimes in mp.Value
    sharedValue.value += countOfPrimes
    
    #produce message
    processID = os.getpid() 
    message = "Worker (ID: %s) found %i primes from %i to %i"%(processID, countOfPrimes , startValue, endValue)
    
    #store message in mp.Queue
    messageQueue.put(message)

In [19]:
import multiprocessing as mp
from codeLib import countPrimesInRange_withSharedVars 

#created shared variable between processes; stores primes count from workers
sharedValue = mp.Value('i', 0) # "i": integer, 0: starting value

#create shared queue between processes; stores messages from workers
messageQueue = mp.Queue() 

#split range between [startValue, endValue] into 2 tasks for 2 workers (processes)
task_1 = [1, 40000]
task_2 = [40001, 80000]

startTime = time.perf_counter()

print('initate workers...')
worker1 = mp.Process(target = countPrimesInRange_withSharedVars, args = (task_1[0], task_1[1], sharedValue, messageQueue)) #added sharedValue, messageQueue as arguments
worker2 = mp.Process(target = countPrimesInRange_withSharedVars, args = (task_2[0], task_2[1], sharedValue, messageQueue))

#start processing
print('start processing...')
worker1.start()
worker2.start()

#wait until workers are done! 
print('block script until processing is done...')
worker1.join()
print('worker 1: done!')
worker2.join()
print('worker 2: done!')

#close workers
print('close workers')
worker1.close()
worker2.close()

endTime = time.perf_counter()


#print results
numberOfPrimes = sharedValue.value #get the final count of primes
startValue = task_1[0]
endValue = task_2[1]
print('\n')
print("Number of primes between %i to %i is: %i\n"%(startValue, endValue, numberOfPrimes))
print("worker stated following messages:")
while not messageQueue.empty(): #asking to loop through whole queue by asking if it is empty
    message = messageQueue.get() #retrive a message from the queue
    print(message)

#close the queue, too
messageQueue.close() 

print("Processing time: %.3f seconds" %(endTime - startTime))



initate workers...
start processing...
block script until processing is done...
worker 1: done!
worker 2: done!
close workers


Number of primes between 1 to 80000 is: 7837

worker stated following messages:
Worker (ID: 13620) found 4203 primes from 1 to 40000
Worker (ID: 20856) found 3634 primes from 40001 to 80000
Processing time: 16.129 seconds


### **Using more than two workers in an automatized fashion**

Next, we will try to use more workers to improve performance even further. Of course, we could just copy/paste our code for each new worker, but it would be more elegant to have an automated solution.
For this, we need to:
1) have a variable which we name **numbersOfWorkers** that we use to set the number of workers we want
2) split our range into as many tasks as we have workers
3) create an empty list called **workerCollection** in which we store all new workers.


In [20]:
#how can we automatically split the tasks for different numbers of workers
startValue = 1
endValue = 80000
numberOfWorkers = 4

#code for splitting the tasks
def splitRangeIntoTasks(startValue, endValue, numberOfWorkers):
    splitArrs = np.array_split(np.arange(startValue, endValue + 1), numberOfWorkers)
    tasks = [[i[0], i[-1]] for i in splitArrs]
    return tasks, splitArrs

tasks, splitArrs = splitRangeIntoTasks(startValue, endValue, numberOfWorkers)

print('The splitArrs looks like:')
for iSplit in splitArrs:
    print(iSplit)
print('The tasks looks like:')
for iTask in tasks:
    print(iTask)
print('----------')





The splitArrs looks like:
[    1     2     3 ... 19998 19999 20000]
[20001 20002 20003 ... 39998 39999 40000]
[40001 40002 40003 ... 59998 59999 60000]
[60001 60002 60003 ... 79998 79999 80000]
The tasks looks like:
[1, 20000]
[20001, 40000]
[40001, 60000]
[60001, 80000]
----------


Next, we use a for-loop to loop through our tasks and create a worker in each iteration which we store by appending it to our **workerCollection** list.
Afterwards, we can loop through our **workerCollection** to start, join, and close the workers.

In [21]:
import multiprocessing as mp
from codeLib import splitRangeIntoTasks, countPrimesInRange_withSharedVars #this is required for Jupyiter notebook (and other IDEs; mostly on Windows) to work with multiprocessing

#created shared variable between processes; stores primes count from workers
sharedValue = mp.Value('i', 0) # "i": integer, 0: starting value

#create shared queue between processes; stores print messages from workers
messageQueue = mp.Queue() 

#define number of workers
numberOfWorkers = 4 #8

#split range between startValue/endValue into multiple tasks
tasks = splitRangeIntoTasks(startValue, endValue, numberOfWorkers)


startTime = time.perf_counter()
print('initate workers...')
workerCollection = []
for iTask in tasks: #loop through tasks
    worker = mp.Process(target = countPrimesInRange_withSharedVars, args = (iTask[0], iTask[1], sharedValue, messageQueue))
    workerCollection.append(worker)

#start processing
print('start processing...')
for iWorker in workerCollection:
    iWorker.start()

#wait until workers are done! Actually, "join" blocks the script from proceeding.
print('block script until processing is done...')
for iWorkerID, iWorker in enumerate(workerCollection):
    iWorker.join() #block worker
    iWorker.close() #close worker directly
    print('worker %i : done and is closed!'%iWorkerID)
numberOfPrimes = sharedValue.value


endTime = time.perf_counter()

#print results
print('\n')
print("Number of primes between %i to %i is: %i\n"%(startValue, endValue, numberOfPrimes))
print("worker stated following messages:")
while not messageQueue.empty():
    message = messageQueue.get()
    print(message)
messageQueue.close()

print("Processing time: %.3f seconds" %(endTime - startTime))

initate workers...
start processing...
block script until processing is done...
worker 0 : done and is closed!
worker 1 : done and is closed!
worker 2 : done and is closed!
worker 3 : done and is closed!


Number of primes between 1 to 80000 is: 7837

worker stated following messages:
Worker (ID: 12212) found 2262 primes from 1 to 20000
Worker (ID: 15640) found 1941 primes from 20001 to 40000
Worker (ID: 21704) found 1854 primes from 40001 to 60000
Worker (ID: 10268) found 1780 primes from 60001 to 80000
Processing time: 10.325 seconds


Now, the processing was even faster than using only two workers!

### **Simplify things with mp.Pool**

That's a lot of code. But there is a much simpler way to use multiprocessing: The "Pool" class

This interface handles a lot of responsability for us:
- it catches the return of each workers, so we don't need shared variables or queues <br>
- automatically start and join the workers for us <br>
- the closing of the Pool can be ensured by using Context-Manager, i.e. using the with-statement <br>

To initiate a multiprocessing **Pool** you can simply call **mp.Pool(*number_of_wanted_workers*)** and store it in a *variable*; however, as we will use the context manager, we have to use the following syntax: **with mp.Pool(numberOfWorker) as workerPool:**. 
That allows us to start all workers by calling the **map** or **starmap** methods from the **Pool** class. If you look into the code below, you will hopefully see that it is quite similar to our for-loop to initate/collect workers in the last section!
Importantly, the results of a **Pool** are stored in a list, i.e. each item in the list is the return from the function we put into the **workerPool** for each worker. If there is more than one returned variable, all outputs will be stored in a tuple. <br>

A final note: the rule of thumb is that you should use always the number of CPU cores available in your system. To do this, we can simply call **mp.cpu_count()**!


In [22]:
import multiprocessing as mp
from codeLib import splitRangeIntoTasks, countPrimesInRangeMP


#define number of workers: rule of tumb: for best results use the number of CPUs available in your system
numberOfWorkers = mp.cpu_count() 
print("number of CPUs: %i" % numberOfWorkers)



#split range between startValue/endValue into multiple tasks
startValue = 1
endValue = 80000
tasks = splitRangeIntoTasks(startValue, endValue, numberOfWorkers)



startTime = time.perf_counter()
#initate/process with Pool
print('Initate worker pool and start processing...')

with mp.Pool(numberOfWorkers) as workerPool: 
    resultsFromWorkers = workerPool.starmap(countPrimesInRangeMP, tasks) #use 'starmap' for multiple arguments for the function; use 'map' for single arguments 


#Merge the single results for the workers 
#resultsFromWorkers is a list, each worker returned a tuple with (numberOfPrimes, message)
countedPimesByWorker, messages = [], []
for iNumberOfPrimes, iMessage in resultsFromWorkers:
    countedPimesByWorker.append(iNumberOfPrimes)
    messages.append(iMessage)
numberOfPrimes = np.sum(countedPimesByWorker)

endTime = time.perf_counter()

#print results
print('\n')
print("Number of primes from %i to %i is: %i"%(startValue, endValue, numberOfPrimes))
print("Worker returned following counts: %s" % countedPimesByWorker)
print("worker stated following messages:")
for iMessage in messages:
    print(iMessage)

print("Processing time: %.3f seconds" %(endTime - startTime))

number of CPUs: 20
Initate worker pool and start processing...


Number of primes from 1 to 80000 is: 7837
Worker returned following counts: [550, 457, 431, 424, 400, 406, 387, 377, 392, 379, 376, 367, 373, 364, 374, 356, 361, 354, 356, 353]
worker stated following messages:
Worker (ID: 21788) found 550 primes from 1 to 4000
Worker (ID: 18780) found 457 primes from 4001 to 8000
Worker (ID: 16992) found 431 primes from 8001 to 12000
Worker (ID: 22380) found 424 primes from 12001 to 16000
Worker (ID: 20788) found 400 primes from 16001 to 20000
Worker (ID: 17036) found 406 primes from 20001 to 24000
Worker (ID: 14788) found 387 primes from 24001 to 28000
Worker (ID: 14856) found 377 primes from 28001 to 32000
Worker (ID: 10372) found 392 primes from 32001 to 36000
Worker (ID: 15164) found 379 primes from 36001 to 40000
Worker (ID: 16088) found 376 primes from 40001 to 44000
Worker (ID: 15304) found 367 primes from 44001 to 48000
Worker (ID: 5212) found 373 primes from 48001 to 52000
Worke

As you can see, the proccesing time is much shorter than in the examples above if all available CPU cores are used.

### **More advanced: Writing your own Process classes**

Here I would like to give you an idea how you can write your own process classes. That allows you to create own processes that run in the background in your program and can do calculations or control devices as required. For example, you could outsource the control of a video camera: The camera can started/stopped and collects all video frames in the memory during acqusition, while your main programm can do other things. <br>
However, object-oriented programming is a topic in itself, and describing how this exactly works in Python is beyond the scope of this tutorial. But there are many tutorials about it online; so just google it!
<br>
To create a own process class, you simply have to inhere the main functionality of **mp.Process** in your own class by:
1) call: class **Name_of_your_class(mp.Process)**
2) call **super().__init__()** in the **__init__** method of your class

Afterwards, your class has the same methods as **mp.Process** has, e.g. **start()**, **join()**, and **close()**. 
Importantly, you don't have to give your class a function as **target** argument, but rather you have to write your function into the **run** method


In [23]:
import multiprocessing as mp

class OwnWorker(mp.Process): #we inherit the basic functionality such as start, join, close methods from mp.Process
    def __init__(self): #is called when object is created
        super().__init__() #required to inherit functions from mp.Process  
        
    def run(self): #this method is called when the user 
        pass
        #do stuff here
        
worker = OwnWorker()
worker.start()
worker.join()
worker.close()

In the next example, we will define a Worker that performs a simple task: It adds a 1 to each value of a task-list and stores it in **self.results**. The task-list is passed to the worker during initialisation, i.e. by calling **worker = OwnWorker1(tasks)**, and stored in **self.tasks**. To start processing, we can use the **start**-method by calling **worker.start()**. 

In [24]:
class OwnWorker1(mp.Process):
    def __init__(self, tasks):
        super().__init__()       
        self.tasks = tasks
        self.results = []

    def run(self):
        for iValue in self.tasks:
            incValue = iValue + 1
            self.results.append(incValue)
        

Cool, now the results will be stored in **worker.results**. But can we access those from the main script?

In [25]:
import multiprocessing as mp
from codeLib import OwnWorker1 

tasks = [0, 1, 2, 3, 4, 5]
worker = OwnWorker1(tasks)
worker.start()
worker.join()
worker.close()
results = worker.results
print("Results: %s"%results)

Results: []


Unfortuntatly not! When we try to access **worker.results**, we only get an empty list. We again need some kind of communication between the worker process and the main programm, as we needed in the first examples of this tutorial. However, this time we use a **Pipe**.<br>
A pipe acutally offers a two-way communiction, meaning that we can send data or recieve data from the worker and from the main script. For this, you get two pipe objects back when you call **mp.Pipe()**, e.g. **pipeMain, pipeWorker = mp.Pipe()**. Both, **pipeMain** and **pipeWorker** have the same methods you can call: <br>
- **send**: used to send data from one pipe to the other pipe object <br>
- **recv**: used to recv data by one pipe if the other pipe has send data <br>
- **poll**: ask if there was send data from one pipe to the other. Here you can also define a **timeout** in seconds, i.e. the pipe will wait a certain time for data from the other pipe. This is important as sending data requires some time and you might miss that data when you just call **recv()**<br><br>

To demonstrate how you can use pipes for communication, we write a new process class with the name **OwnWorker2**: <br>
1) The **pipeWorker** is passed to the class during initialisation by calling **worker = OwnWorker2(pipeWorker)**. We also define a **self.isRunning = True** boolean variable in **__init__** method, which allows us to control whether the worker is active or closed <br>
2) In the **run** method, we define a while-loop that will run infinitely as long as **self.isRunning** is True. In the while loop we when ask if there are any data send from the main programm to the **pipeWorker** by calling **if self.pipeworker.poll(timeout=0.5)**. If there is data, we will recive the data by calling **task = self.pipeWorker.recv()** followed by asking which type of task was send. If it is the string variable **'close'** we set **self.isRunning** to False which will finally break the while-loop and close the worker. If it is not a string, we presume it is a list of values which we want to process. After processing, we send the results back to the main programm via **self.pipeWorker.send(results)** <br>





In [26]:
class OwnWorker2(mp.Process):
    def __init__(self, pipeWorker):
        super().__init__()       
        self.pipeWorker = pipeWorker
        self.isRunning = True

    def run(self):
        while self.isRunning:
            if self.pipeWorker.poll(timeout=0.5): #timeout is important, if not CPU load gets to 100%, alternative: use time.sleep(0.5)
                task = self.pipeWorker.recv()
                if isinstance(task, str): #check if close signal was send
                    if task == 'close':
                        self.isRunning = False
                else: #process the data
                    results = []
                    for iValue in task:
                        incValue = iValue + 1
                        results.append(incValue)
                        self.pipeWorker.send(results)
    
            #time.sleep(0.5)

        self.pipeWorker.send('worker is closed!')
        self.close()

Now, we can write our main programm. We first initate our pipes. The **pipeWorker** is used as argument during initialisation of our worker by calling **worker = OwnWorker2(pipeWorker)**, while **pipeMain** is used in the main programm to communicate with the worker. After starting the worker by calling **worker.start()**, it will wait for tasks. We will send two tasks to the worker by calling **pipeMain.send(task1)** and **pipeMain.send(task1)**. Afterwards, we will ask if the worker has processed the data by calling **if pipeMail.poll(timeout=2)**, meaning we wait up to 2 seconds before we try to recieve the results from the worker by calling **pipeMain.recv()**. Between the tasks, we use **time.sleep** to simulate additional computations before the next task is ready for the worker. In a final step, we send **'close'** to the worker via **pipeMain** to terminate the worker.

In [27]:
import multiprocessing as mp
from codeLib import OwnWorker2


#we use a Pipe to communicate with our worker
pipeMain, pipeWorker = mp.Pipe()


#initate worker
worker = OwnWorker2(pipeWorker)
worker.start()
print('sleep for 1 second')
time.sleep(1)

#send tasks to worker
task1 = [0, 1, 2, 3, 4, 5]
pipeMain.send(task1)
if pipeMain.poll(timeout=2):
    results = pipeMain.recv()
    print("results for task1: %s "%results)

print('sleep for 1 second')
time.sleep(1)

#send tasks to worker
task2 = [5,5,5,1]
pipeMain.send(task2)
if pipeMain.poll(timeout=2):
    results = pipeMain.recv()
    print("results for task2: %s "%results)

print('sleep for 1 second')
time.sleep(1)

#close worker
pipeMain.send('close')
if pipeMain.poll(timeout=2):
    msg = pipeMain.recv()
    print(msg)


sleep for 1 second
results for task1: [1, 2, 3, 4, 5, 6] 
sleep for 1 second
results for task2: [6, 6, 6, 2] 
sleep for 1 second
worker is closed!


As you can see, it is relatively easy to construct your own **mp.Process** class, keep it active as long as necessary, and process data whenever it is available. 

Here are some additional sources about multiprocessing in Python: <br>
https://superfastpython.com/multiprocessing-in-python/ <br>
https://zetcode.com/python/multiprocessing/ <br>
https://medium.com/@shahooda637/multi-processing-in-python-32d4b1c97354 <br>
https://pymotw.com/2/multiprocessing/basics.html <br>
https://www.machinelearningplus.com/python/parallel-processing-python/ <br>
https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a <br>
