# Queue 101

In [1]:
from queue import Queue, Full, Empty, PriorityQueue, LifoQueue
import threading
import itertools
import time

## Core Features of Threading

### 1. Key Functions of Queue

In [None]:
try:
    q = Queue()                 # unbounded queue
    q = Queue(maxsize=3)        # bounded

    item = [1, 2, 3]
    q.put(item)                 # blocks (sleeps indefinitely) if queue full
    q.put(item, timeout=2.0)    # blocks until timeout and raises queue.Full if no slot in time         
    q.put_nowait(item)          # instead of blocking, raises queue.Full if full

    val = q.get()               # blocks if empty
    q.task_done()               # call once per item retrieved via get()

    val = q.get(timeout=1.0)    # blocks until timeout and raises queue.Empty if no item in time
    q.task_done()               # call once per item retrieved via get()

    val = q.get_nowait()        # instead of blocking, raises queue.Empty if empty
    q.task_done()               # call once per item retrieved via get()

    q.join()                    # wait until task_done called for all queued items
    print("task done")
    
except Full:
    print("queue.Full error")

except Empty:
    print("queue.Empty error")

task done


### 2. Single Producer -> Single Consumer

In [None]:
def producer(q: Queue):
    print("producer start")

    for i in range(5):
        q.put(i)
        print(f"produced: {i}")
        time.sleep(1.5)
    q.put(None)  # None sentinel is used as a common shutdown signal for consumer

def consumer(q: Queue):
    print("consumer start")
    
    while True:
        item = q.get()
        if item is None:
            q.task_done() # always pair each get with exactly one task_done()
            break
        print(f"consumed: {item}")
        q.task_done() # always pair each get with exactly one task_done()
        time.sleep(1)


q = Queue(maxsize=3)

t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

q.join()   # wait until all tasks are marked done

t1.join()
t2.join()

producer start
produced: 0
consumer start
consumed: 0
produced: 1consumed: 1

produced: 2consumed: 2

produced: 3consumed: 3

produced: 4consumed: 4



### 3. Multiple Consumers

In [6]:
def worker(q: Queue, worker_id: int):
    while True:
        item = q.get() # block (sleep indefinitely) until an item is received
        if item is None:
            q.task_done()
            return
        # process item
        print(f"worker {worker_id} processed {item}")
        q.task_done()
        time.sleep(3)

q = Queue(maxsize=50)  # bounded queue for backpressure

workers = [
    threading.Thread(target=worker, args=(q, i), daemon=True)
    for i in range(4)
]

for w in workers:
    w.start()

# enqueue jobs
for job in range(20):
    q.put(job)

# stop workers (one sentinel per worker)
for _ in workers:
    q.put(None)

q.join()  # wait until all jobs + sentinels processed


worker 0 processed 0worker 1 processed 1
worker 2 processed 2
worker 3 processed 3

worker 2 processed 4worker 3 processed 5
worker 1 processed 6
worker 0 processed 7

worker 3 processed 8worker 2 processed 9
worker 0 processed 10
worker 1 processed 11

worker 2 processed 12
worker 0 processed 13
worker 1 processed 14
worker 3 processed 15
worker 0 processed 16worker 1 processed 17
worker 2 processed 18

worker 3 processed 19


### 4. Bounded Queue for Backpressure

In [None]:
q = Queue(maxsize=2)
consumed = []

def push_latest(q, num_items):
    for item in range(num_items):
        if item == num_items - 1:
            q.put(None)
            print("pushed None sentinel")
            break
        try:
            q.put_nowait(item) # instead of blocking, raises queue.Full if full
            print(f"item {item} pushed to queue")
        except Full:
            print(f"Queue full, dropping item: {item}") # drop if full (or implement "replace oldest" policy)

        time.sleep(1)

def consumer(q):    
    while True:
        item = q.get()
        if item is None:
            q.task_done() # always pair each get with exactly one task_done()
            print("None consumed")
            consumed.append(item)
            break
        print(f"consumed: {item}")
        consumed.append(item)
        q.task_done() # always pair each get with exactly one task_done()
        time.sleep(3)

t1 = threading.Thread(target=push_latest, args=(q,20))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t2.start()

q.join()   # wait until all tasks are marked done

t1.join()
t2.join()

print(f"consumed items: {consumed}")

item 0 pushed to queue
consumed: 0
item 1 pushed to queue
item 2 pushed to queue
consumed: 1item 3 pushed to queue

Queue full, dropping item: 4
Queue full, dropping item: 5
consumed: 2
item 6 pushed to queue
Queue full, dropping item: 7
Queue full, dropping item: 8
consumed: 3
item 9 pushed to queue
Queue full, dropping item: 10
Queue full, dropping item: 11
consumed: 6
item 12 pushed to queue
Queue full, dropping item: 13
Queue full, dropping item: 14
consumed: 9
item 15 pushed to queue
Queue full, dropping item: 16
Queue full, dropping item: 17
consumed: 12
item 18 pushed to queue
consumed: 15
pushed None sentinel
consumed: 18
None consumed
consumed items: [0, 1, 2, 3, 6, 9, 12, 15, 18, None]


### 5. Priority Scheduling

In [None]:
pq = PriorityQueue()
counter = itertools.count() # counter for tiebreakers

pq.put((4, next(counter), "A"))
pq.put((3, next(counter), "B"))
pq.put((3, next(counter), "C"))

print(pq.get())  # (1, "urgent")
print(pq.get())  # (5, "medium")
print(pq.get())  # (10, "low priority")

(3, 1, 'B')
(3, 2, 'C')
(4, 0, 'A')


### Quick Reminders

1. empty(), qsize() are not reliable for correctness
- In multithreading code, queue state change immediately.
- Use blocking get() with timeouts or sentinel shutdowns instead

2. Don't swallow exceptions without calling task_done()
- Even if process fails and goes through exception, you must call task_done() (often structured with try else finally)

3. join() requires task_done()
- Always put task_done() for each get() if you want to use join(), otherwise code blocks forever

4. Use queue.Queue() for threads, not asyncio
- asyncio has its own queue module, asyncio.Queue

5. Queues are great boundaries
- It decouples production rate from consumption rate, I/O from CPU, and pipelines into stages