# Item 70: Use `Queue` to Coordinate Work Between Threads

1. Be aware of the many problems in building concurrent pipelines: busy waiting, telling workders to stop, knowing when work is done, and memory explosion.
2. The `Queue` class has all the facilities you need to build robust pipelines: blocking operations, buffer sizes, joining, and shutdown.


In [None]:
from collections import deque
from threading import Lock, Thread
import time

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
        
    def put(self, item):
        with self.lock:
            self.items.append(item)
            
    def get(self):
        with self.lock:
            return self.items.popleft()
        
class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func 
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01) # No work to do 
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item
    
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())
    
while len(done_queue.items) < 1000:
    # Do something useful while waiting
    ...
    
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f"Processed {processed} items after "
      f"polling {polled} times")
    
    

Processed 1000 items after polling 3011 times


以上实现方式，有三个问题：
1. First, determining that all of the input work is complete requires yet another busy wait on the `done_queue`. 
2. Second, in `Worker`, the `run` method will execute forever in its busy loop.(There's no obvious way to signal to a worker thread that it's time to exit.)
3. Third, and worst of all, a backup in the pipeline can cause the program to crash arbitrarily.


The `Queue` class from the `queue` built-in module provides all the functionality you need to solve the problems outlined above.


In [None]:
# Here I show how a thread continues to process work after `shutdown` is called:

from queue import ShutDown, Queue
from threading import Thread

my_queue2 = Queue()

def consumer():
    while True:
        try:
            item = my_queue2.get()
        except ShutDown:
            print("Terminating!")
            return
        else:
            print("Got item", item)
            my_queue2.task_done()
            

thread = Thread(target=consumer)
my_queue2.put(1)
my_queue2.put(2)
my_queue2.put(3)
my_queue2.put(4)
my_queue2.shutdown()

thread.start()
my_queue2.join()
thread.join()
print("Done!")

In [None]:
# Bring all of these behaviors together

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue 
    
    def run(self):
        while True:
            try:
                item = self.in_queue.get()
            except ShutDown:
                return
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.in_queue.task_done()
                
def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item            
    
download_queue = Queue()
resize_queue = Queue(100)
upload_queue = Queue(100)
done_queue = Queue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()
    
for _ in range(1000):
    download_queue.put(object())
    
download_queue.shutdown()
download_queue.join()

resize_queue.shutdown()
resize_queue.join()

upload_queue.shutdown()
upload_queue.join()

done_queue.shutdown() 

counter = 0
while True:
    try:
        item = done_queue.get()
    except ShutDown:
        break
    else:
        done_queue.task_done()
        counter += 1


done_queue.join()

for thread in threads:
    thread.join()

print(counter, "items finished")
    

-------> 0
1000 items finished
