# The multiprocessing Module

## Multiprocessing in Python

The multiprocessing modulewas added in python 2.6, and since we are now going to spin up processes instead of threads we can avoid Global Interpreter Lock (GIL) snd take full advantage of the multiple processors on a machine.

In [1]:
%cd 29_multiprocessing_module

/Users/miesner.jacob/python-for-programmers-educative/Module 4 - Advanced Concepts in Python/29_multiprocessing_module


In [2]:
# Creating a series of processes that call the same function

import os
from multiprocessing import Process

# We define our doubler function in a python file and import it
from doubler import doubler

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []

    # Define and start processes simultaneously
    for index, number in enumerate(numbers):
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        proc.start()

    # Tell Python to wait for processes to ternimate
    for proc in procs:
        proc.join()

5 doubled to 10 by process id: 46476
10 doubled to 20 by process id: 46477
15 doubled to 30 by process id: 46478
20 doubled to 40 by process id: 46479
25 doubled to 50 by process id: 46480


In [3]:
# Running processes with more readable name than and ID #

# Importing updated function
from doubler import doubler_with_process_name

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []

    for index, number in enumerate(numbers):
        proc = Process(target=doubler_with_process_name, args=(number,))
        procs.append(proc)
        proc.start()

    proc = Process(target=doubler_with_process_name, name='Test', args=(2,))
    proc.start()
    procs.append(proc)

    for proc in procs:
        proc.join()

5 doubled to 10 by: Process-6
10 doubled to 20 by: Process-7
15 doubled to 30 by: Process-8
20 doubled to 40 by: Process-9
25 doubled to 50 by: Process-10
2 doubled to 4 by: Test


## Locks

The multiprocessing module includes locks just like the threading module> All you need to do is import Lock, aquire it, do something, and then relase it!

In [4]:
from multiprocessing import Lock

# Import function to acrquire lock, print item, then release lock
from printer import printer

if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

## Logging

Logging processes is a little different than logging threads. The reason for this is that Python’s logging packages doesn’t use process shared locks, so it’s possible for you to end up with messages from different processes getting mixed up.

In [5]:
import logging
import multiprocessing

if __name__ == '__main__':
    lock = Lock()
    items = ['tango', 'foxtrot', 10]
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    for item in items:
        p = Process(target=printer, args=(item, lock))
        p.start()

## The Pool Class

The Pool class is used to represent a pool of prcesses. It has methods that allow you to offload tasks to the processes.

In [9]:
from multiprocessing import Pool
from doubler import doubler_vanilla

# Create 3 workers and map our function & input to the workers
if __name__ == '__main__':
    numbers = [5, 10, 20]
    pool = Pool(processes=3)
    print(pool.map(doubler_vanilla, numbers))

[10, 20, 40]


[INFO/SpawnPoolWorker-21] process shutting down
[INFO/SpawnPoolWorker-22] process shutting down
[INFO/SpawnPoolWorker-21] process exiting with exitcode 0
[INFO/SpawnPoolWorker-22] process exiting with exitcode 0
[INFO/SpawnPoolWorker-23] process shutting down
[INFO/SpawnPoolWorker-23] process exiting with exitcode 0
[INFO/SpawnPoolWorker-24] child process calling self.run()
[INFO/SpawnPoolWorker-25] child process calling self.run()
[INFO/SpawnPoolWorker-26] child process calling self.run()


In [10]:
# Use apply_async to ask for result of process, get result using .get() method

if __name__ == '__main__':
    pool = Pool(processes=3)
    result = pool.apply_async(doubler_vanilla, (25,))
    print(result.get(timeout=1))

50


[INFO/SpawnPoolWorker-24] process shutting down
[INFO/SpawnPoolWorker-25] process shutting down
[INFO/SpawnPoolWorker-24] process exiting with exitcode 0
[INFO/SpawnPoolWorker-25] process exiting with exitcode 0
[INFO/SpawnPoolWorker-26] process shutting down
[INFO/SpawnPoolWorker-26] process exiting with exitcode 0
[INFO/SpawnPoolWorker-27] child process calling self.run()
[INFO/SpawnPoolWorker-28] child process calling self.run()
[INFO/SpawnPoolWorker-29] child process calling self.run()


## Process Communication

The multiprocssing module has two primary methods for communciation between processes: Queues and Pipes. The Queue implementation is both thread and process safe, so that is what we will use in the example below.

In [14]:
from multiprocessing import Queue

sentinel = -1

# Creates data to be consumed and waits for the consumer
# to finish processing
from process_communication import creator


# Consumes some data and works on it
# In this case, all it does is double the input
from process_communication import my_consumer

if __name__ == '__main__':
    q = Queue()
    data = [5, 10, 13, -1]
    process_one = Process(target=creator, args=(data, q))
    process_two = Process(target=my_consumer, args=(q,))
    process_one.start()
    process_two.start()

    q.close()
    q.join_thread()

    process_one.join()
    process_two.join()

Creating data and putting it on the queue
data found to be processed: 5
10
data found to be processed: 10
20
data found to be processed: 13
26
data found to be processed: -1
-2


[INFO/Process-30] child process calling self.run()
[INFO/Process-30] process shutting down
[INFO/Process-30] process exiting with exitcode 0
[INFO/Process-31] child process calling self.run()
[INFO/Process-31] process shutting down
[INFO/Process-31] process exiting with exitcode 0
