# 用Queue来协调各线程之间的工作

In [1]:
import logging
from pprint import pprint

**示例：**构建一个照片处理系统，该系统从数码相机里面持续获取照片、调整尺寸，并将其添加到网络相册中。该需求可以采用三阶段的管线来做。第一阶段获取新图片。第二阶段把下载好的图片传给缩放函数。第三阶段把缩放后的图片交给上传函数。

In [2]:
def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

设计一种任务传递方式，以便在管线的不同阶段之间传递工作任务。

In [3]:
from threading import Lock
from collections import deque

class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
    
    # 把新的图片添加到items列表的末端。
    # items: 用来存放待处理的条目
    def put(self, item):
        with self.lock:
            self.items.append(item)
    
    # 从待处理的条目清单顶部移除图片
    def get(self):
        with self.lock:
            return self.items.popleft()

用Python线程来表示管线的各个阶段，这种Worker线程，会从MyQueue队列中取出待处理的任务，并针对该任务运行相关的函数。然后把运行结果放在另一个MyQueue队列里。

In [4]:
from threading import Thread
from time import sleep

In [5]:
class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                # 输入队列为空
                sleep(0.01)  # No work to do
            except AttributeError:
                # The magic exit signal
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

In [6]:
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

In [7]:
# 启动线程，并将大量任务添加到管线的第一阶段。
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())

In [8]:
import time
while len(done_queue.items) < 1000:
    # Do something useful while waiting
    time.sleep(0.1)
# Stop all the threads by causing an exception in their
# run methods.
for thread in threads:
    thread.in_queue = None

In [9]:
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling',
      polled, 'times')

Processed 1000 items after polling 3033 times


**问题：**  
1. 为了判断所有任务是否都彻底处理完毕，必须在编写一个循环，持续判断done_queue队列中的任务数量。
2. Worker线程的run方法会一直执行其循环。即使到了应该退出的时候，也没有办法通知Worker线程停止这一循环。
3. 如果管线的某个阶段发生迟滞，随时都可能导致程序崩溃。

## 用Queue类来弥补自编队列的缺陷

**解决第一个问题：**Queue类使得工作线程无需再频繁地查询输入队列的状态。

In [10]:
from queue import Queue
queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get()                # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

Consumer waiting


In [11]:
print('Producer putting')
queue.put(object())            # Runs before get() above
thread.join()
print('Producer done')

Producer putting
Consumer done
Producer done


**解决第三个问题：**为了解决管线迟滞问题，用Queue类来限定队列中待处理的最大任务数量，使得相邻的两个阶段，可以通过该队列平滑地衔接起来。

In [12]:
queue = Queue(1)

In [13]:
def consumer():
    time.sleep(0.1)            # Wait
    queue.get()                # Runs second
    print('Consumer got 1\n')
    queue.get()                # Runs fourth
    print('Consumer got 2')

In [14]:
thread = Thread(target=consumer)
thread.start()

In [15]:
queue.put(object())            # Runs first
print('Producer put 1')
queue.put(object())            # Runs third
print('Producer put 2')
thread.join()
print('Producer done')

Producer put 1Consumer got 1


Producer put 2
Consumer got 2
Producer done


可以通过Queue类的task_done方法来追踪工作进度。

In [16]:
in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()      # Done second
    print('Consumer working')
    # Doing work
    print('Consumer done')
    in_queue.task_done()       # Done third

Thread(target=consumer).start()

Consumer waiting


In [17]:
in_queue.put(object())         # Done first
print('Producer waiting')
in_queue.join()                # Done fourth
print('Producer done')

Producer waitingConsumer working
Consumer done

Producer done


**改进：**将以上行为都封装到Queue的子类里面，并且令工作线程可以通过这个ClosableQueue类，判断出自己何时应该停止处理。

In [18]:
class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Cause the thread to exit
                yield item
            finally:
                self.task_done()

In [19]:
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [20]:
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

In [21]:
for thread in threads:
    thread.start()
for _ in range(1000):
    download_queue.put(object())
download_queue.close()

In [22]:
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

1000 items finished
