In [None]:
from queue import Queue
from threading import Thread
import time

# Basic queue

In [None]:
queue = Queue()

In [None]:
def consumer():
    print('Consumer waiting')
    queue.get()
    print('Consumer done')

In [None]:
thread = Thread(target=consumer)
thread.start()

print('producer putting')
queue.put(object())
thread.join()
print('Producer done')

## Queue with max size 1

In [None]:
queue = Queue(1)

In [None]:
def consumer():
    time.sleep(0.1)
    queue.get()
    print('Consumer got 1')
    queue.get()
    print('Consumer got 2')

In [None]:
thread = Thread(target=consumer)
thread.start()

In [None]:
queue.put(object())
print('Producer put 1')
queue.put(object())
print('Producer put 2')
thread.join()
print('Producer done')

## Use SENTINEL

In [None]:
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
    
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                #pass
                self.task_done()

In [None]:
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [None]:
def simulate_work(i):
    print(i)
    return i

In [None]:
in_queue = ClosableQueue()
out_queue = ClosableQueue()
threads = [StoppableWorker(simulate_work, in_queue, out_queue)]

In [None]:
for thread in threads:
    thread.start()
for i in range(10):
    in_queue.put(i)

In [None]:
in_queue.put(10)

In [None]:
in_queue.close()
in_queue.join()
print(out_queue.qsize(), ' items finished')

In [None]:
in_queue.put(11)

In [None]:
print(in_queue.qsize(), ' items left in queue')