- Item 36: Use `subprocess` to Manage Child Processes
- Item 37: Use `Threads` for Blocking I/O, Avoid for Parallelism
- Item 38: Use Lock to Prevent Data Races in `Threads`
- Item 39: Use `Queue` to Coordinate Work Between `Threads`
- Item 40: Consider `Coroutines` to Run Many Functions Concurrently
- Item 41: Consider `concurrent.futures` for True Parallelism

In [1]:
# Preamble to mimick book environment
import logging
from pprint import pprint
from sys import stdout as STDOUT

## Item 36: Use `subprocess` to Manage Child Processes

In [3]:
# Example: subprocess
import subprocess
proc = subprocess.Popen(
    ['echo', 'Hello from the child!'],
    stdout=subprocess.PIPE)
out, err = proc.communicate() # reads the child process’s output and waits for termination.
print(out.decode('utf-8'))

Hello from the child!



In [4]:
# Child processes will run independently from their parent process
from time import sleep, time
proc = subprocess.Popen(['sleep', '0.3'])
while proc.poll() is None:
    print('Working...')
    # Some time consuming work here
    sleep(0.2)

print('Exit status', proc.poll())

Working...
Working...
Exit status 0


In [5]:
# the parent process is free to run many child processes in parallel.
def run_sleep(period):
    proc = subprocess.Popen(['sleep', str(period)])
    return proc

start = time()
procs = []
for _ in range(10):
    proc = run_sleep(0.1)
    procs.append(proc)


for proc in procs:
    proc.communicate()
end = time()
print('Finished in %.3f seconds' % (end - start))

Finished in 0.196 seconds


In [15]:
# pipe data from your Python program into a subprocess and retrieve its output.
import os

def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'\xe24U\n\xd0Ql3S\x11'
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()  # Ensure the child gets input
    return proc


procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    procs.append(proc)


for proc in procs:
    out, err = proc.communicate()
    print(out[-10:])

b'\x08\x03\xc2L\x1e\xae\xfd\xb4\nu'
b'U\x00\x01U\xd3g\x05c/Q'
b'\xbce\n\xe9U\xb0\xfe\x86\xd2A'


In [17]:
# create chains of parallel processes just like UNIX pipes
def run_md5(input_stdin):
    proc = subprocess.Popen(
        ['md5sum'],
        stdin=input_stdin,
        stdout=subprocess.PIPE)
    return proc


input_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    input_procs.append(proc)
    hash_proc = run_md5(proc.stdout)
    hash_procs.append(hash_proc)


for proc in input_procs:
    proc.communicate()
for proc in hash_procs:
    out, err = proc.communicate()
    print(out.strip())

b'0e4e433631c0afe4ae93a66daca9b677  -'
b'ace0f8d933ad03b46ec81c50dc09d658  -'
b'9cc345e9be44ff1ce20b3d946bf5bd5e  -'


In [19]:
# Set timeout parameter
proc = run_sleep(10)
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()

print('Exit status', proc.poll())

Exit status -15


### Things to Remember
- Use the `subprocess` module to run child processes and manage their input and output streams.
- **Child processes run in parallel** with the Python interpreter, enabling you to **maximize your CPU usage**.
- Use the `timeout` parameter with `communicate` to **avoid deadlocks** and **hanging child processes**.

## Item 37: Use `Threads` for Blocking I/O, Avoid for Parallelism

In [25]:
# Factoring a set of numbers in serial takes quite a long time.
def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i


from time import time
numbers = [2139079, 1214759, 1516637, 1852285]
start = time()
for number in numbers:
    list(factorize(number))
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.472 seconds


In [28]:
# Using Thread
# Although Python supports multiple threads of execution, the GIL causes only one of them to make forward progress at a time
# This demonstrates the effect of the GIL on programs running in the standard CPython interpreter.
from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))


start = time()
numbers = [2139079, 1214759, 1516637, 1852285]
threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)


for thread in threads:
    thread.join()
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.486 seconds


In [29]:
import select, socket

# Creating the socket is specifically to support Windows. Windows can't do
# a select call with an empty list.
# Running this system call in serial requires a linearly increasing amount of time.
def slow_systemcall():
    select.select([socket.socket()], [], [], 0.1)


start = time()
for _ in range(5):
    slow_systemcall()
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.513 seconds


In [30]:
# using thread
start = time()
threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)


def compute_helicopter_location(index):
    pass

for i in range(5):
    compute_helicopter_location(i)
for thread in threads:
    thread.join()
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.103 seconds


### Things to Remember
- Python `threads` **can’t run bytecode in parallel on multiple CPU cores** because of the global interpreter lock (**GIL**).
- Python `threads` are **still useful** despite the GIL because they provide an easy way **to do multiple things at seemingly the same time**.
- **Use Python threads** to make **multiple system calls in parallel**. This allows you to do **blocking I/O** at the same time as computation.

## Item 38: Use Lock to Prevent Data Races in Threads

In [35]:
# write a program that counts many things in parallel
class Counter(object):
    def __init__(self):
        self.count = 0

    def increment(self, offset):
        self.count += offset


def worker(sensor_index, how_many, counter):
    # I have a barrier in here so the workers synchronize
    # when they start counting, otherwise it's hard to get a race
    # because the overhead of starting a thread is high.
    BARRIER.wait()
    for _ in range(how_many):
        # Read from the sensor
        counter.increment(1)


from threading import Barrier, Thread
BARRIER = Barrier(5)
def run_threads(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()


how_many = 10**5
counter = Counter()
run_threads(worker, how_many, counter)
print('Counter should be %d, found %d' %
      (5 * how_many, counter.count))


Counter should be 500000, found 378708


In [36]:
offset = 5
counter.count += offset


value = getattr(counter, 'count')
result = value + offset
setattr(counter, 'count', result)


# Running in Thread A
value_a = getattr(counter, 'count')
# Context switch to Thread B
value_b = getattr(counter, 'count')
result_b = value_b + 1
setattr(counter, 'count', result_b)
# Context switch back to Thread A
result_a = value_a + 1
setattr(counter, 'count', result_a)

In [37]:
# Using Lock
from threading import Lock

class LockingCounter(object):
    def __init__(self):
        self.lock = Lock()
        self.count = 0

    def increment(self, offset):
        with self.lock:
            self.count += offset


BARRIER = Barrier(5)
counter = LockingCounter()
run_threads(worker, how_many, counter)
print('Counter should be %d, found %d' %
      (5 * how_many, counter.count))

Counter should be 500000, found 500000


### Things to Remember
- Even though Python has a **global interpreter lock**, you’re still **responsible for protecting against data races** between the threads in your programs.
- Your **programs will corrupt their data structures** if you **allow multiple threads** to modify the same objects without locks.
- The **Lock class** in the `threading` built-in module is Python’s standard mutual exclusion lock implementation.

## Item 39: Use Queue to Coordinate Work Between Threads

In [39]:
# Build a system that will take a constant stream of images from your digital camera, 
# resize them, and then add them to a photo gallery online
# 1. New images are retrieved
# 2. The downloaded images are passed through the resize function
# 3. The resized images are consumed by the upload function.

In [40]:
# Imagine already written Python functions that execute the phases
def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

In [41]:
from threading import Lock
from collections import deque

# Thread-safe producer-consumer queue
class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
    
    # The producer, your digital camera
    # adds new images to the end of the list of pending items.
    def put(self, item):
        with self.lock:
            self.items.append(item)

    # The consumer, the first phase of your processing pipeline, 
    # removes images from the front of the list of pending items.
    def get(self):
        with self.lock:
            return self.items.popleft()

In [42]:
from threading import Thread
from time import sleep

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01)  # No work to do
            except AttributeError:
                # The magic exit signal
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

In [43]:
# the queues for their coordination points and the corresponding worker threads.
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

In [44]:
# start the threads and then inject a bunch of work into the first phase of the pipeline.
for thread in threads:
    thread.start()

for _ in range(1000):
    # plain object instance as a proxy for the real data
    download_queue.put(object())


# Wait for all of the items to be processed by the pipeline
import time
while len(done_queue.items) < 1000:
    # Do something useful while waiting
    time.sleep(0.1)
# Stop all the threads by causing an exception in their
# run methods.
for thread in threads:
    thread.in_queue = None


# Results
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling',
      polled, 'times')

Processed 1000 items after polling 3027 times


In [45]:
# a thread that waits for some input data on a queue
from queue import Queue
queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get()                # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

Consumer waiting


In [46]:
print('Producer putting')
queue.put(object())            # Runs before get() above
thread.join()
print('Producer done')

Producer putting
Consumer done
Producer done


In [47]:
# To solve the pipeline backup issue, 
# the Queue class lets you specify the maximum amount of pending work 
# you’ll allow between two phases
queue = Queue(1)               # Buffer size of 1

def consumer():
    time.sleep(0.1)            # Wait
    queue.get()                # Runs second
    print('Consumer got 1')
    queue.get()                # Runs fourth
    print('Consumer got 2')

thread = Thread(target=consumer)
thread.start()

queue.put(object())            # Runs first
print('Producer put 1')
queue.put(object())            # Runs third
print('Producer put 2')
thread.join()
print('Producer done')

Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done


In [49]:
# The Queue class can also track the progress of work using the task_done method
in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()      # Done second
    print('Consumer working')
    # Doing work
    print('Consumer done')
    in_queue.task_done()       # Done third

Thread(target=consumer).start()


in_queue.put(object())         # Done first
print('Producer waiting')
in_queue.join()                # Done fourth
print('Producer done')

Consumer waiting
Producer waiting
Consumer working
Consumer done
Producer done


In [50]:
class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)


    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]


for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())
download_queue.close()


download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

1000 items finished


### Things to Remember
- **Pipelines** are a great way to **organize sequences of work** that run **concurrently** using multiple Python threads.
- Be aware of the many problems in building concurrent pipelines: **busy waiting**, **stopping workers**, and **memory explosion**.
- The `Queue` class has all of the facilities you need to build **robust pipelines**: blocking operations, buffer sizes, and joining.

## Item 40: Consider Coroutines to Run Many Functions Concurrently

In [51]:
# coroutine
def my_coroutine():
    while True:
        received = yield
        print('Received:', received)

it = my_coroutine()
next(it)             # Prime the coroutine
it.send('First')
it.send('Second')

Received: First
Received: Second


In [52]:
def minimize():
    current = yield
    while True:
        value = yield current
        current = min(value, current)


it = minimize()
next(it)            # Prime the generator
print(it.send(10))
print(it.send(4))
print(it.send(22))
print(it.send(-1))

10
4
4
-1


### The Game of Life

In [53]:
ALIVE = '*'
EMPTY = '-'

In [55]:
from collections import namedtuple
Query = namedtuple('Query', ('y', 'x'))

def count_neighbors(y, x):
    n_ = yield Query(y + 1, x + 0)  # North
    ne = yield Query(y + 1, x + 1)  # Northeast
    # Define e_, se, s_, sw, w_, nw ...
    e_ = yield Query(y + 0, x + 1)  # East
    se = yield Query(y - 1, x + 1)  # Southeast
    s_ = yield Query(y - 1, x + 0)  # South
    sw = yield Query(y - 1, x - 1)  # Southwest
    w_ = yield Query(y + 0, x - 1)  # West
    nw = yield Query(y + 1, x - 1)  # Northwest
    
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

In [56]:
it = count_neighbors(10, 5)
q1 = next(it)                  # Get the first query
print('First yield: ', q1)
q2 = it.send(ALIVE)            # Send q1 state, get q2
print('Second yield:', q2)
q3 = it.send(ALIVE)            # Send q2 state, get q3
print('...')
q4 = it.send(EMPTY)
q5 = it.send(EMPTY)
q6 = it.send(EMPTY)
q7 = it.send(EMPTY)
q8 = it.send(EMPTY)
try:
    it.send(EMPTY)     # Send q8 state, retrieve count
except StopIteration as e:
    print('Count: ', e.value)  # Value from return statement

First yield:  Query(y=11, x=5)
Second yield: Query(y=11, x=6)
...
Count:  2


In [57]:
Transition = namedtuple('Transition', ('y', 'x', 'state'))

def game_logic(state, neighbors):
    pass

def step_cell(y, x):
    state = yield Query(y, x)
    neighbors = yield from count_neighbors(y, x)
    next_state = game_logic(state, neighbors)
    yield Transition(y, x, next_state)

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state


it = step_cell(10, 5)
q0 = next(it)           # Initial location query
print('Me:      ', q0)
q1 = it.send(ALIVE)     # Send my status, get neighbor query
print('Q1:      ', q1)
print('...')
q2 = it.send(ALIVE)
q3 = it.send(ALIVE)
q4 = it.send(ALIVE)
q5 = it.send(ALIVE)
q6 = it.send(EMPTY)
q7 = it.send(EMPTY)
q8 = it.send(EMPTY)
t1 = it.send(EMPTY)     # Send for q8, get game decision
print('Outcome: ', t1)

Me:       Query(y=10, x=5)
Q1:       Query(y=11, x=5)
...
Outcome:  Transition(y=10, x=5, state='-')


In [58]:
TICK = object()

def simulate(height, width):
    while True:
        for y in range(height):
            for x in range(width):
                yield from step_cell(y, x)
        yield TICK


class Grid(object):
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

    def query(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def assign(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state


def live_a_generation(grid, sim):
    progeny = Grid(grid.height, grid.width)
    item = next(sim)
    while item is not TICK:
        if isinstance(item, Query):
            state = grid.query(item.y, item.x)
            item = sim.send(state)
        else:  # Must be a Transition
            progeny.assign(item.y, item.x, item.state)
            item = next(sim)
    return progeny


grid = Grid(5, 9)
grid.assign(0, 3, ALIVE)
grid.assign(1, 4, ALIVE)
grid.assign(2, 2, ALIVE)
grid.assign(2, 3, ALIVE)
grid.assign(2, 4, ALIVE)
print(grid)

---*-----
----*----
--***----
---------
---------



In [59]:
class ColumnPrinter(object):
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(row_count, len(data.splitlines()) + 1)
        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line
                if (i + 1) < len(self.columns):
                    rows[j] += ' | '
        return '\n'.join(rows)

columns = ColumnPrinter()
sim = simulate(grid.height, grid.width)
for i in range(5):
    columns.append(str(grid))
    grid = live_a_generation(grid, sim)

print(columns)

    0     |     1     |     2     |     3     |     4    
---*----- | --------- | --------- | --------- | ---------
----*---- | --*-*---- | ----*---- | ---*----- | ----*----
--***---- | ---**---- | --*-*---- | ----**--- | -----*---
--------- | ---*----- | ---**---- | ---**---- | ---***---
--------- | --------- | --------- | --------- | ---------


In [60]:
# Example 20
# This is for the introductory diagram
grid = Grid(5, 5)
grid.assign(1, 1, ALIVE)
grid.assign(2, 2, ALIVE)
grid.assign(2, 3, ALIVE)
grid.assign(3, 3, ALIVE)

columns = ColumnPrinter()
sim = simulate(grid.height, grid.width)
for i in range(5):
    columns.append(str(grid))
    grid = live_a_generation(grid, sim)

print(columns)

  0   |   1   |   2   |   3   |   4  
----- | ----- | ----- | ----- | -----
-*--- | --*-- | --**- | --*-- | -----
--**- | --**- | -*--- | -*--- | -**--
---*- | --**- | --**- | --*-- | -----
----- | ----- | ----- | ----- | -----


### Things to Remember
- `Coroutines` provide an **efficient way to run tens of thousands of functions** seemingly at the same time.
- Within a `generator`, the value of the `yield` expression will be whatever value was passed to the generator’s `send` method from the exterior code.
- Coroutines give you a powerful tool for separating the **core logic** of your program from its interaction with the surrounding environment.
- Python 2 doesn’t support `yield` from or returning values from generators.

## Item 41: Consider `concurrent.futures` for True Parallelism

In [61]:
# Greatest Commond Divisor
def gcd(pair):
    a, b = pair
    low = min(a, b)
    for i in range(low, 0, -1):
        if a % i == 0 and b % i == 0:
            return i


from time import time
numbers = [(1963309, 2265973), (2030677, 3814172),
           (1551645, 2229620), (2039045, 2020802)]

start = time()
results = list(map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.569 seconds


In [67]:
# multiple Python threads will yield no speed improvement
from concurrent.futures import ThreadPoolExecutor

start = time()
pool = ThreadPoolExecutor(max_workers=4)
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.584 seconds


In [68]:
# Speed up
from concurrent.futures import ProcessPoolExecutor

start = time()
pool = ProcessPoolExecutor(max_workers=4)  # The one change
results = list(pool.map(gcd, numbers))
end = time()
print('Took %.3f seconds' % (end - start))

Took 0.326 seconds


1. It takes each item from the numbers input data to map.
2. It serializes it into binary data using the pickle module
3. It copies the serialized data from the main interpreter process to a child interpreter process over a local socket.
4. Next, it deserializes the data back into Python objects using pickle in the child process.
5. It then imports the Python module containing the gcd function.
6. It runs the function on the input data in parallel with other child processes. 
7. It serializes the result back into bytes.
8. It copies those bytes back through the socket.
9. It deserializes the bytes back into Python objects in the parent process.
10. Finally, it merges the results from multiple children into a single list to return.

### Things to Remember
- Moving **CPU bottlenecks** to **C-extension modules** can be an effective way to improve performance while maximizing your investment in Python code. However, the **cost of doing so is high** and may introduce bugs.
- The `multiprocessing` module provides powerful tools that **can parallelize** certain types of Python computation with minimal effort.
- The power of `multiprocessing` is best accessed through the `concurrent.futures` built-in module and its simple `ProcessPoolExecutor` class.
- The advanced parts of the `multiprocessing` module **should be avoided** because they are so complex.