Python programs that do many things concurrently often need to coordinate their work.
One of the most useful arrangements for concurrent work is a pipeline of functions.

A pipeline works like an assembly line used in manufacturing. Pipelines have many
phases in serial with a specific function for each phase. New pieces of work are constantly
added to the beginning of the pipeline. Each function can operate concurrently on the
piece of work in its phase. The work moves forward as each function completes until there
are no phases remaining. This approach is especially good for work that includes blocking
I/O or subprocesses—activities that can easily be parallelized using Python (see Item 37:
“Use Threads for Blocking I/O, Avoid for Parallelism”).

For example, say you want to build a system that will take a constant stream of images
from your digital camera, resize them, and then add them to a photo gallery online. Such a
program could be split into three phases of a pipeline. New images are retrieved in the first
phase. The downloaded images are passed through the resize function in the second phase.
The resized images are consumed by the upload function in the final phase.

Imagine you had already written Python functions that execute the phases: download,
resize, upload. How do you assemble a pipeline to do the work concurrently?

The first thing you need is a way to hand off work between the pipeline phases. This can
be modeled as a thread-safe producer-consumer queue (see Item 38: “Use Lock to
Prevent Data Races in Threads” to understand the importance of thread safety in Python;
see Item 46: “Use Built-in Algorithms and Data Structures” for the deque class).


### Example 5
Here, I represent each phase of the pipeline as a Python thread that takes work from one
queue like this, runs a function on it, and puts the result on another queue. I also track how
many times the worker has checked for new input and how much work it’s completed.

### Example 6
The trickiest part is that the worker thread must properly handle the case where the input
queue is empty because the previous phase hasn’t completed its work yet. This happens
where I catch the IndexError exception below. You can think of this as a holdup in the
assembly line.

### Example 7
Now I can connect the three phases together by creating the queues for their coordination
points and the corresponding worker threads.

### Example 8
I can start the threads and then inject a bunch of work into the first phase of the pipeline.
Here, I use a plain object instance as a proxy for the real data required by the
download function:

### Example 9
Now I wait for all of the items to be processed by the pipeline and end up in the done_queue.

### Example 10
This runs properly, but there’s an interesting side effect caused by the threads polling their
input queues for new work. The tricky part, where I catch IndexError exceptions in the
run method, executes a large number of times.


In [1]:
import logging
from pprint import pprint
from sys import stdout as STDOUT


# Example 1
def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item


# Example 2
from threading import Lock
from collections import deque

class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()


# Example 3
    def put(self, item):
        with self.lock:
            self.items.append(item)


# Example 4
    def get(self):
        with self.lock:
            return self.items.popleft()


# Example 5
from threading import Thread
from time import sleep

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0


# Example 6
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01)  # No work to do
            except AttributeError:
                # The magic exit signal
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1


# Example 7
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]


# Example 8
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())

    
# Example 9
import time
while len(done_queue.items) < 1000:
    # Do something useful while waiting
    time.sleep(0.1)
# Stop all the threads by causing an exception in their
# run methods.
for thread in threads:
    thread.in_queue = None


# Example 10
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling',
      polled, 'times')

When the worker functions vary in speeds, an earlier phase can prevent progress in later
phases, backing up the pipeline. This causes later phases to starve and constantly check
their input queues for new work in a tight loop. The outcome is that worker threads waste
CPU time doing nothing useful (they’re constantly raising and catching IndexError
exceptions).

But that’s just the beginning of what’s wrong with this implementation. There are three
more problems that you should also avoid. First, determining that all of the input work is
complete requires yet another busy wait on the done_queue. Second, in Worker the
run method will execute forever in its busy loop. There’s no way to signal to a worker
thread that it’s time to exit.

Third, and worst of all, a backup in the pipeline can cause the program to crash arbitrarily.
If the first phase makes rapid progress but the second phase makes slow progress, then the
queue connecting the first phase to the second phase will constantly increase in size. The
second phase won’t be able to keep up. Given enough time and input data, the program
will eventually run out of memory and die.
The lesson here isn’t that pipelines are bad; it’s that it’s hard to build a good producerconsumer

queue yourself.



>>> Queue to the Rescue

The Queue class from the queue built-in module provides all of the functionality you
need to solve these problems.

### Ex 11
Queue eliminates the busy waiting in the worker by making the get method block until
new data is available. For example, here I start a thread that waits for some input data on a
queue:

### Ex 12
Even though the thread is running first, it won’t finish until an item is put on the Queue
instance and the get method has something to return.

### Ex 13
To solve the pipeline backup issue, the Queue class lets you specify the maximum
amount of pending work you’ll allow between two phases. This buffer size causes calls to
put to block when the queue is already full. For example, here I define a thread that waits
for a while before consuming a queue:

### Ex 14
The wait should allow the producer thread to put both objects on the queue before the
consume thread ever calls get. But the Queue size is one. That means the producer
adding items to the queue will have to wait for the consumer thread to call get at least
once before the second call to put will stop blocking and add the second item to the
queue.


### Ex 15
The Queue class can also track the progress of work using the task_done method. This
lets you wait for a phase’s input queue to drain and eliminates the need for polling the
done_queue at the end of your pipeline. For example, here I define a consumer thread
that calls task_done when it finishes working on an item.

### Ex 16
Now, the producer code doesn’t have to join the consumer thread or poll. The producer
can just wait for the in_queue to finish by calling join on the Queue instance. Even
once it’s empty, the in_queue won’t be joinable until after task_done is called for
every item that was ever enqueued.

### Ex 17
I can put all of these behaviors together into a Queue subclass that also tells the worker
thread when it should stop processing. Here, I define a close method that adds a special
item to the queue that indicates there will be no more input items after it:

### Ex 18
Then, I define an iterator for the queue that looks for this special object and stops iteration when it’s found. This __iter__ method also calls task_done at appropriate times,
letting me track the progress of work on the queue.

### Ex 19
Now, I can redefine my worker thread to rely on the behavior of the ClosableQueue
class. The thread will exit once the for loop is exhausted.

### Ex 20
Here, I re-create the set of worker threads using the new worker class:

### Ex 21
After running the worker threads like before, I also send the stop signal once all the input
work has been injected by closing the input queue of the first phase.

### Ex 22
Finally, I wait for the work to finish by joining each queue that connects the phases. Each
time one phase is done, I signal the next phase to stop by closing its input queue. At the
end, the done_queue contains all of the output objects as expected.




In [2]:





# Example 11
from queue import Queue
queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get()                # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()


# Example 12
print('Producer putting')
queue.put(object())            # Runs before get() above
thread.join()
print('Producer done')


# Example 13
queue = Queue(1)               # Buffer size of 1

def consumer():
    time.sleep(0.1)            # Wait
    queue.get()                # Runs second
    print('Consumer got 1')
    queue.get()                # Runs fourth
    print('Consumer got 2')

thread = Thread(target=consumer)
thread.start()


# Example 14
queue.put(object())            # Runs first
print('Producer put 1')
queue.put(object())            # Runs third
print('Producer put 2')
thread.join()
print('Producer done')


# Example 15
in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()      # Done second
    print('Consumer working')
    # Doing work
    print('Consumer done')
    in_queue.task_done()       # Done third

Thread(target=consumer).start()


# Example 16
in_queue.put(object())         # Done first
print('Producer waiting')
in_queue.join()                # Done fourth
print('Producer done')


# Example 17
class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)


# Example 18
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()


# Example 19
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


# Example 20
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]


# Example 21
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())
download_queue.close()


# Example 22
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

Processed Consumer waiting1000
 Consumer doneitems after polling
 3028 times
Producer putting
Producer done
Producer put 1
Consumer got 1
Producer put 2Consumer got 2
Consumer waiting
Producer done
Consumer working

Producer waitingConsumer done

Producer done
1000 items finished


* Pipelines are a great way to organize sequences of work that run concurrently using multiple Python threads.
* Be aware of the many problems in building concurrent pipelines: busy waiting, stopping workers, and memory explosion.
* The Queue class has all of the facilities you need to build robust pipelines: blocking operations, buffer sizes, and joining.