Python provides a few different implementations of a first-in-first-out (FIFO) queue,
but most notable is `queue.Queue`.

In [8]:
import queue

q = queue.Queue()
q.put(1)
q.get()

1

This queue is thread-safe, and therefore can be used in a multithreading context.

In [9]:
import threading
import time
q = queue.Queue()

def worker(name: str, q: queue.Queue, stop, lock):
    while True:
        item = q.get()
        with lock:
            print(f"{name} is working on {item}", flush=True)
        time.sleep(1.0) # Artificial delays
        q.task_done()

        if stop.is_set():
            break

stop = threading.Event()
lock = threading.Lock()
t1 = threading.Thread(target=worker, args=("Worker 1", q, stop, lock))
t2 = threading.Thread(target=worker, args=("Worker 2", q, stop, lock))

for item in range(30):
    q.put(item)

t1.start()
t2.start()

q.join()
print("All work complete")
stop.set()


Worker 1 is working on 0
Worker 2 is working on 1
Worker 1 is working on 2
Worker 2 is working on 3
Worker 1 is working on 4
Worker 2 is working on 5
Worker 1 is working on 6
Worker 2 is working on 7
Worker 1 is working on 8
Worker 2 is working on 9
Worker 2 is working on 10
Worker 1 is working on 11
Worker 2 is working on 12
Worker 1 is working on 13
Worker 1 is working on 14
Worker 2 is working on 15
Worker 1 is working on 16
Worker 2 is working on 17
Worker 2 is working on 18
Worker 1 is working on 19
Worker 2 is working on 20
Worker 1 is working on 21
Worker 2 is working on 22
Worker 1 is working on 23
Worker 1 is working on 24
Worker 2 is working on 25
Worker 2 is working on 26
Worker 1 is working on 27
Worker 1 is working on 28
Worker 2 is working on 29
All work complete


It can be noted that the two worker threads share the data pushed to the queue. This is
known as "work-stealing". It may be desired to have an alternative form, a queue that
can support `get` calls for each worker so each worker shares the data instead.

To implement this, we can reuse `queue.Queue` in a simple wrapper.

In [11]:
class PublishQueue:
    """A thread-safe multi-publisher, multi-consumer queue that supports work sharing."""
    def __init__(self):
        self.queues = []
    
    def register(self) -> queue.Queue:
        """Adds a new consumer to this queue, returning a queue.Queue for `get` calls."""
        new_queue = queue.Queue()
        self.queues.append(new_queue)
        return new_queue

    def publish(self, item):
        """Puts the item on each registered queue"""
        for q in self.queues:
            q.put(item)
    
    def join(self):
        """Joins each registered queue"""
        for q in self.queues:
            q.join()

def worker(name: str, q: queue.Queue, stop, lock):
    while True:
        item = q.get()
        with lock:
            print(f"{name} is working on {item}", flush=True)
        time.sleep(1.0) # Artificial delays
        q.task_done()

        if stop.is_set():
            break
        
q = PublishQueue()
stop = threading.Event()
lock = threading.Lock()
t1 = threading.Thread(target=worker, args=("Worker 1", q.register(), stop, lock))
t2 = threading.Thread(target=worker, args=("Worker 2", q.register(), stop, lock))

for item in range(30):
    q.publish(item)

t1.start()
t2.start()

q.join()
print("All work complete")
stop.set()

Worker 1 is working on 0
Worker 2 is working on 0
Worker 1 is working on 1
Worker 2 is working on 1
Worker 1 is working on 2
Worker 2 is working on 2
Worker 1 is working on 3
Worker 2 is working on 3
Worker 1 is working on 4
Worker 2 is working on 4
Worker 1 is working on 5
Worker 2 is working on 5
Worker 1 is working on 6
Worker 2 is working on 6
Worker 1 is working on 7
Worker 2 is working on 7
Worker 1 is working on 8
Worker 2 is working on 8
Worker 1 is working on 9
Worker 2 is working on 9
Worker 1 is working on 10
Worker 2 is working on 10
Worker 1 is working on 11
Worker 2 is working on 11
Worker 1 is working on 12
Worker 2 is working on 12
Worker 1 is working on 13
Worker 2 is working on 13
Worker 1 is working on 14
Worker 2 is working on 14
Worker 1 is working on 15
Worker 2 is working on 15
Worker 1 is working on 16
Worker 2 is working on 16
Worker 1 is working on 17
Worker 2 is working on 17
Worker 1 is working on 18
Worker 2 is working on 18
Worker 1 is working on 19
Worker

Now each worker gets their own queue, but it looks like a single queue from the calling
thread. We can further improve the definition by adhering closer to the `queue.Queue`
implementation.

In [12]:
class PublishQueue:
    """The send-side of a SPMC. The receive side is implemented via `register` and the
    associated queue object."""
    def __init__(self):
        self.queues = []
    
    def empty(self) -> bool:
        """Return `True` if all queues are empty, `False` otherwise."""
        return all([q.empty() for q in self.queues])
    
    def full(self) -> bool:
        """Return `True` if all queues are full, `False` otherwise."""
        return all([q.full() for q in self.queues])
    
    def put(self, item, block=True, timeout=None):
        for q in self.queues:
            q.put(item, block=block, timeout=timeout)
    
    def put_nowait(self, item):
        for q in self.queues:
            q.put_nowait(item)

    # NOTE: No `get` or associated consumer-side functions.
    def join(self):
        for q in self.queues:
            q.join()

    def register(self, queue=None) -> queue.Queue:
        """Adds a new consumer to this queue, returning a queue.Queue or subclass for
        `get` calls.
        
        Registers the `queue` if not `None`, otherwise initialize a new `queue.Queue`
        with default size. `queue` must implement the functions of a `queue.Queue`
        object, and be thread-safe.
        """
        if queue is None:
            queue = queue.Queue()
        
        self.queues.append(queue)
        return queue

Any variants, such as `PriorityQueue` and `LifoQueue` can be supplied via the register
function, allowing for customizablility. This Single-Producer-Multi-Consumer with
sharing can be used to implement a simplified publisher-subscriber system using polling.