In [1]:
import os
import time
import random
import multiprocessing as mp
import logging

In [2]:
logging.basicConfig(level=logging.DEBUG)

get the number of CPU cores

In [4]:
nb_cores = mp.cpu_count()

In [5]:
print(nb_cores, 'cores')

8 cores


helpers

In [6]:
def f(n):
    print('n={}'.format(n))
    return n

In [7]:
def square(x):
    return x*x

# `Process()`

In [8]:
process = mp.Process(target=f, 
                     args=(0,))
process.start()
process.join()

n=0


# `Pool()`  

> "The Pool can take the number of processes as a parameter. [...] If we do not provide any value, then the number returned by os.cpu_count is used." -- https://zetcode.com/python/multiprocessing/

In [11]:
pool = mp.Pool(nb_cores)
data = pool.map(square, range(1, 5))
pool.close()
pool.join()
print(data)

[1, 4, 9, 16]


In [13]:
with mp.Pool(5) as pool:
    data = pool.map(square, range(1, 5))
print(data)

[1, 4, 9, 16]


# `Queue()`  

> "Queue objects are a FIFO data structure that are thread and process safe which make them perfect for passing data between different processes without potentially corrupting data." -- https://tutorialedge.net/python/python-multiprocessing-tutorial/

In [19]:
queue = mp.Queue()
[ queue.put(n) for n in range(4) ]
state = [ queue.get() for n in range(4) ]
print(state)

[0, 1, 2, 3]


In [21]:
def put(queue, n):
    queue.put(n)
    print('put {}'.format(n))

In [22]:
def get(queue):
    n = queue.get()
    print('got {}'.format(n))
    return n

In [6]:
queue = mp.Queue()

process = mp.Process(target=put, 
                     args=(queue, 9))
process.start()

print(queue.get())
process.join()

9


In [7]:
queue = mp.Queue()
queue.put(6)

process = mp.Process(target=get, 
                     args=(queue,))
process.start()
process.join()

got 6


In [15]:
queue = mp.Queue()

processes = {}

nb_producers = 4
processes['producers'] = [ mp.Process(target=put, args=(queue, n)) 
                           for n in range(nb_processes) ]

[ process.start() for process in processes['producers'] ]
[ process.join() for process in processes['producers'] ]
results = [ queue.get() for process in processes['producers'] ]

print(results)

[0, 1, 2, 3]


In [23]:
queue = mp.Queue()
[ queue.put(n) for n in range(4) ]

processes = {}

nb_consumers = 4
processes['consumers'] = [ mp.Process(target=get, args=(queue, )) 
                           for n in range(nb_processes) ]

[ process.start() for process in processes['consumers'] ]
[ process.join() for process in processes['consumers'] ]

got 0
got 1
got 2
got 3
finished


In [31]:
queue = mp.Queue()

processes = {}

nb_producers = 3
processes['producers'] = [ mp.Process(target=put, args=(queue, n)) 
                           for n in range(nb_producers) ]

nb_consumers = 2
processes['consumers'] = [ mp.Process(target=get, args=(queue, )) 
                           for n in range(nb_consumers) ]

[ process.start() for process in processes['producers'] ]
[ process.join() for process in processes['producers'] ]

[ process.start() for process in processes['consumers'] ]
[ process.join() for process in processes['consumers'] ]

put 0
put 1
put 2
got 0
got 1


[None, None]

In [None]:
queue = mp.Queue()

processes = {}

nb_producers = 3
processes['producers'] = [ mp.Process(target=put, args=(queue, n)) 
                           for n in range(nb_producers) ]

nb_consumers = 2
processes['consumers'] = [ mp.Process(target=get, args=(queue, )) 
                           for n in range(nb_consumers) ]

[ process.start() for process in processes['producers'] ]
[ process.join() for process in processes['producers'] ]

[ process.start() for process in processes['consumers'] ]
[ process.join() for process in processes['consumers'] ]

In [None]:
# class process(object):
    
#     def __init__(self):
    
#         self.max = 4
#         self.queue  = mp.Queue()
#         self.pool      = mp.Pool(self.max)
        
#     def put(self, n):
#         self.queue.put(n)
#         print('put {}'.format(n))

#     def get(self):
#         n = self.queue.get()
#         print('got {}'.format(n))
#         return n
    
# # class consumer(object):

# #     def get(self):
# #         n = self.queue.get()
# #         print('got {}'.format(n))
# #         return n
    
# # class producer(object):
        
# #     def put(self, n):
# #         self.queue.put(n)
# #         print('put {}'.format(n))

In [52]:
# https://stackoverflow.com/questions/17241663/filling-a-queue-and-managing-multiprocessing-in-python
# 
# This will spawn 3 processes (in addition of the parent process). Each child executes the worker_main function. It is a simple loop getting a new item from the queue on each iteration. Workers will block if nothing is ready to process.
# 
# At startup all 3 process will sleep until the queue is fed with some data. When a data is available one of the waiting workers get that item and starts to process it. After that, it tries to get an other item from the queue, waiting again if nothing is available...


def consume(queue_in, queue_out):
    pid = os.getpid()
    logging.debug("{} is ready".format(pid))
    while True:
        item = queue_in.get(block=True)
        # logging.debug("{} got {}".format(pid, item))
        queue_out.put(item)

queue_in = mp.Queue()
queue_out = mp.Queue()

nb_consumers = 3
pool = mp.Pool(nb_consumers, consume, (queue_in, queue_out))

# time.sleep(3)

nb_items = 5
for item in range(nb_items):
    logging.debug("inserting {}".format(item))
    queue_in.put(item)
    
# pool.close()
# pool.join()



def read(queue):
    pid = os.getpid()
    # logging.debug("{} waiting for an item".format(pid))
    while True:
        item = queue.get(block=True)
        logging.debug("{} got {}".format(pid, item))

process = mp.Process(target=read, 
                     args=(queue_out,))
process.start()
# process.join()

DEBUG:root:38534 is ready
DEBUG:root:38535 is ready
DEBUG:root:38538 is ready
DEBUG:root:inserting 0
DEBUG:root:inserting 1
DEBUG:root:inserting 2
DEBUG:root:inserting 3
DEBUG:root:inserting 4
DEBUG:root:38549 got 0
DEBUG:root:38549 got 1
DEBUG:root:38549 got 3
DEBUG:root:38549 got 2
DEBUG:root:38549 got 4


In [8]:
class example(object):
    
    def __init__(self):
    
        self.max = 4
        self.queue_in  = mp.Queue()
        self.queue_out = mp.Queue()
        self.pool      = None # mp.Pool(processes=self.max)
        self.process   = mp.Process(target=self.read)
        
    def start(self):
        
        # self.pool.map(self.consume)
        self.pool = mp.Pool(processes=self.max, initializer=self.consume)
        self.process.start()
        
    def fill(self, nb_items):
        
        for item in range(nb_items):
            logging.debug("inserting {}".format(item))
            self.queue_in.put(item)
        
    def consume(self):
        pid = os.getpid()
        logging.debug("{} is ready".format(pid))
        while True:
            item = self.queue_in.get(block=True)
            # logging.debug("{} got {}".format(pid, item))
            self.queue_out.put(item)
            
    def read(self):
        pid = os.getpid()
        # logging.debug("{} waiting for an item".format(pid))
        while True:
            item = self.queue_out.get(block=True)
            logging.debug("{} got {}".format(pid, item))
            

e = example()
e.fill(nb_items=5)
e.start()

DEBUG:root:inserting 0
DEBUG:root:inserting 1
DEBUG:root:inserting 2
DEBUG:root:inserting 3
DEBUG:root:inserting 4
DEBUG:root:51941 is ready
DEBUG:root:51944 is ready
DEBUG:root:51948 is ready
DEBUG:root:51951 is ready
DEBUG:root:51957 got 0
DEBUG:root:51957 got 1
DEBUG:root:51957 got 2
DEBUG:root:51957 got 3
DEBUG:root:51957 got 4


# `Manager()`

https://pymotw.com/2/multiprocessing/communication.html

In [None]:
def worker(d, key, value):
    d[key] = value


mgr = mp.Manager()
d = mgr.dict()
num_processes = 4
processes = [ mp.Process(target=worker, 
                         args=(d, i, i*2))
              for i in range(num_processes)
            ]
for process in processes:
    process.start()
    
for process in processes:
    process.join()

print(d)