## Single Consumer - Multiple Publisher
### get()

`get multiple items not more than the specified batch size` **and** `get the items within the timeout`

In [1]:
from queue import Queue, Empty
import time
from threading import Thread
from threading import Event
from typing import Callable, List
import random
import uuid

from typing import Any
from dataclasses import dataclass, field
import uuid
from threading import Event

In [2]:
class BatchedQueue:
    def __init__(self, timeout=1.0, bs=1):
        self.timeout = timeout
        self.bs = bs
        self._queue: Queue = Queue()
        self._result = []
        self._event = Event()

    def get(self):
        entered_at = time.time()
        timeout = self.timeout
        bs = self.bs

        if self._queue.qsize() >= bs:
            return [self._queue.get_nowait() for _ in range(bs)]

        while (
            self._event.wait(timeout - (time.time() - entered_at))
            and self._queue.qsize() < bs
        ):
            True

        result = []
        try:
            for _ in range(bs):
                result.append(self._queue.get_nowait())
            return result
        except Empty:
            return result

    def put(self, item):
        self._queue.put(item)
        if self._event.is_set() and self.size >= self.bs:
            self._event.set()

    @property
    def size(self):
        return self._queue.qsize()

In [3]:
q = BatchedQueue(timeout=2, bs=4)

q.put(1)
q.size

1

In [4]:
t0 = time.time()
print("size", q.size)
q.get()
print("size", q.size)
time.time() - t0

size 1
size 0


2.005953073501587

## Test with a publisher

In [5]:
import random

In [6]:
q = BatchedQueue(timeout=2, bs=4)


def publisher():
    for i in range(16):
        time.sleep(random.randint(0, 1))
        q.put(random.randint(1000, 100000))


thread1 = Thread(target=publisher, daemon=True)
thread2 = Thread(target=publisher, daemon=True)
thread3 = Thread(target=publisher, daemon=True)

thread1.start()
thread2.start()
thread3.start()

In [7]:
q.size

4

In [8]:
for i in range(12):
    t0 = time.time()
    items = q.get()
    print(items)
    t1 = time.time()
    print(f"consumed in {t1-t0:.2f}")

[51250, 40816, 7903, 26584]
consumed in 0.00
[87359, 66977, 26592, 39962]
consumed in 2.01
[93101, 41389, 98259, 70794]
consumed in 0.00
[68374, 40223, 48053, 6779]
consumed in 0.00
[39984, 49248, 43617, 37299]
consumed in 2.00
[66219, 54735, 50403, 76381]
consumed in 0.00
[77249, 19677, 32807, 18666]
consumed in 0.00
[54201, 19437, 64951, 82883]
consumed in 2.01
[65781, 19257, 38511, 6116]
consumed in 0.00
[77809, 68513, 59265, 36806]
consumed in 0.00
[76478, 51127, 54841, 48616]
consumed in 2.00
[92335, 76757, 11130, 48192]
consumed in 0.00


## Batched Processor

In [9]:
@dataclass
class WaitedObject:
    value: Any = None
    completed: bool = False
    _event: Event = None

    def __post_init__(self):
        self._event = Event()

    def mark_complete(self):
        self.completed = True
        self._event.set()

    def get(self, timeout: float = None):
        if self.completed:
            self._event.clear()
            return self.value

        if self._event.is_set():
            raise Exception("Already waiting!")
        self._event.wait(timeout)
        return self.value

In [10]:
a = WaitedObject(value=1)
a.mark_complete()
a.get(1)

1

In [11]:
class BatchProcessor:
    def __init__(
        self,
        func: Callable,
        timeout=4.0,
        bs=1,
    ):
        self._batched_queue = BatchedQueue(timeout=timeout, bs=bs)
        self.func = func
        thread = Thread(target=self._process_queue)
        thread.start()
        print("Started processing")
        self._event = Event()

    def _process_queue(self):
        while True:
            t0 = time.time()
            batch: List[WaitedObject] = self._batched_queue.get()
            t1 = time.time()
            # print(f"waited {t1-t0:.2f}s for batch")
            if not batch:
                # print("no batch")
                continue
            batch_values = [b.value for b in batch]
            # print(batch_values)
            results = self.func(batch_values)
            for b in batch:
                b.mark_complete()

    def put(self, item: Any):
        waited_obj = WaitedObject(value=item)
        self._batched_queue.put(waited_obj)
        return waited_obj

In [12]:
def fake_ml_api(x):
    print(f"{len(x)} items")
    time.sleep(random.randint(0, 1))
    return random.randint(0, 1)

In [13]:
processor = BatchProcessor(fake_ml_api, timeout=4, bs=16)

Started processing


In [14]:
results = []
for i in range(32):
    x = processor.put(i + 100)
    results.append(x)

In [15]:
results

[WaitedObject(value=100, completed=False, _event=<threading.Event at 0x1070a64d0: unset>),
 WaitedObject(value=101, completed=False, _event=<threading.Event at 0x1070b3150: unset>),
 WaitedObject(value=102, completed=False, _event=<threading.Event at 0x1070b32d0: unset>),
 WaitedObject(value=103, completed=False, _event=<threading.Event at 0x1070b33d0: unset>),
 WaitedObject(value=104, completed=False, _event=<threading.Event at 0x1070b34d0: unset>),
 WaitedObject(value=105, completed=False, _event=<threading.Event at 0x1070b3650: unset>),
 WaitedObject(value=106, completed=False, _event=<threading.Event at 0x1070b3710: unset>),
 WaitedObject(value=107, completed=False, _event=<threading.Event at 0x1070b3810: unset>),
 WaitedObject(value=108, completed=False, _event=<threading.Event at 0x1070b3910: unset>),
 WaitedObject(value=109, completed=False, _event=<threading.Event at 0x1070b3a50: unset>),
 WaitedObject(value=110, completed=False, _event=<threading.Event at 0x1070b3ad0: unset>),

In [16]:
results[0].get()

16 items
16 items


100

In [17]:
results[31].get()

131