# Item 72: Avoid creating new `Thread` instances for on-demand fan-out

You know what I feel like doing? counting blue pixels in big buck bunny frames.

In [None]:
import asyncio
import threading
from time import sleep
from urllib.request import urlretrieve
from tempfile import TemporaryDirectory
from threading import Lock
import os
import httpx


class BluePixels:
    def __init__(self):
        self.blue_pixel_count = 0
        self.lock = Lock()
        self._video_dir = TemporaryDirectory()
        self._video_file = os.path.join(self._video_dir.name, "bb.mp4")

        url = "https://download.blender.org/peach/bigbuckbunny_movies/BigBuckBunny_320x180.mp4"

        with httpx.stream("GET", url) as response:
            response.raise_for_status()
            with open(self._video_file, "wb") as f:
                for chunk in response.iter_bytes():
                    f.write(chunk)

    def __del__(self):
        self._video_dir.cleanup()

    def get_video_path(self):
        with self.lock:
            return self._video_file

    def get_blue_pixel_count(self) -> int:
        with self.lock:
            return self.blue_pixel_count

    def set_blue_pixel_count(self, value: int):
        with self.lock:
            self.blue_pixel_count = value


counter = BluePixels()

In [None]:
import cv2
import os
import numpy as np
import time

max_allowed_red = 50
max_allowed_green = 50


def count_blue_pixels(counter_ref: BluePixels):
    cap = cv2.VideoCapture(counter_ref.get_video_path())
    while True:
        did_read_frame, frame = cap.read()

        if not did_read_frame:
            break

        # OPENCV QUIZZIN TIME: whats goin on on these 3 lines?
        blue = frame[:, :, 0]
        green = frame[:, :, 1]
        red = frame[:, :, 2]

        blue_mask = blue >= 50
        red_mask = red <= max_allowed_red
        green_mask = green <= max_allowed_green

        # PAUSE: To the readers: wtf is going on here?
        blue_pixels = blue_mask & red_mask & green_mask
        blue_count = cv2.countNonZero(blue_pixels.astype(np.uint8))

        counter_ref.set_blue_pixel_count(
            blue_count + counter_ref.get_blue_pixel_count()
        )

    cap.release()
    cv2.destroyAllWindows()
    print(f"Blue pixels: {counter_ref.get_blue_pixel_count()}")


start = time.time()
count_blue_pixels(counter)
print("took", time.time() - start)

#### Nice code nerd, sure is SLOW!!! /speeds off on a motorcycle/

In [None]:
# Obviously theres a BIT of I/O happening here. Maybe we can juice it up a bit with some threads:


def count_em(counter_ref: BluePixels, frame: np.ndarray):
    blue = frame[:, :, 0]
    green = frame[:, :, 1]
    red = frame[:, :, 2]

    blue_mask = blue >= 50
    red_mask = red <= max_allowed_red
    green_mask = green <= max_allowed_green

    blue_pixels = blue_mask & red_mask & green_mask
    blue_count = cv2.countNonZero(blue_pixels.astype(np.uint8))

    counter_ref.set_blue_pixel_count(blue_count + counter_ref.get_blue_pixel_count())


class MyCapturedVideo:
    def __init__(self, video_path: str):
        self._cap = cv2.VideoCapture(video_path)
        self.lock = Lock()

    def get_frame(self) -> tuple[bool, np.ndarray]:
        with self.lock:
            return self._cap.read()

    def __del__(self):
        self._cap.release()


def threaded_count_blue_pixels(counter_ref: BluePixels, max_threads: int):
    cap = MyCapturedVideo(counter_ref.get_video_path())

    threads = []

    while True:
        did_read_frame, frame = cap.get_frame()

        if not did_read_frame:
            break
        print(f"Thread count: {len(threads)}")

        if len(threads) > max_threads:
            print("Too many threads!")
            break
        args = (counter_ref, frame)
        thread = threading.Thread(target=count_em, args=args)
        threads.append(thread)
        thread.start()  # FAN-OUT, FOR I/O FREEDOM

        # Someone remind eric to uncomment this, to demonstrate thread-runtime impact on memory footprint growth
        # time.sleep(0.01)

        if not cap.get_frame()[0]:
            break

    for thread in threads:
        thread.join()  # Fan-in

    cv2.destroyAllWindows()
    print(f"Blue pixels: {counter_ref.get_blue_pixel_count()}")

In [None]:
counter.set_blue_pixel_count(0)

# if we use more than 2 gigs for threads, stop the program
max_threads = 2_000_000_000 / 8_000_000  # each thread takes about 8mb

start = time.time()
threaded_count_blue_pixels(counter, max_threads)
print("took", time.time() - start)

#### Yeah
You can see where this is going wrong right?

Think of how many frames are in a video. It's a _lot_.

Threads take memory (1-8MB), and they have startup overhead time. This isn't just a Python problem, this effects most languages that deal with threads (note that green-threads are a bit different) ((green threads are what Kotlin ❤️‍🔥and hideous awful Golang 🤢use))

(((Golang = 🗑️)))

Pop Quiz, evaluate these statements:
1. Threading is hard, but thankfully, exceptions thrown in a Python Thread are propagated out to the main thread.
2. Golang really isn't _that_ bad.
3. The overhead of starting threads, combined with their memory footprint, make them a poor choice for large-scale fan-out.

## Item 73: Understand How Using `Queue` for Concurrency Requires Refactoring
Remember queues?

In [None]:
from queue import ShutDown
from threading import Thread


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            try:
                frame = self.in_queue.get()
            except ShutDown:
                return
            else:
                result = self.func(frame)
                self.out_queue.put(result)
                self.in_queue.task_done()

In [None]:
def count_em_2(frame: np.ndarray):
    blue = frame[:, :, 0]
    green = frame[:, :, 1]
    red = frame[:, :, 2]

    blue_mask = blue >= 50
    red_mask = red <= max_allowed_red
    green_mask = green <= max_allowed_green

    blue_pixels = blue_mask & red_mask & green_mask
    return cv2.countNonZero(blue_pixels.astype(np.uint8))

In [None]:
from queue import Queue

in_queue = Queue()
out_queue = Queue()

# NOTE, for this one we wont even use the counter, since the queues save us from needing this locking behavior
counter.set_blue_pixel_count(0)


def queued_count_blue_pixels(counter_ref: BluePixels):
    cap = MyCapturedVideo(counter_ref.get_video_path())

    threads = []
    for _ in range(5):
        worker = StoppableWorker(count_em_2, in_queue, out_queue)
        worker.start()
        threads.append(worker)

    while True:
        did_read_frame, frame = cap.get_frame()
        if not did_read_frame:
            break

        in_queue.put(frame)

    in_queue.join()
    item_count = out_queue.qsize()

    total_blue_pixels = 0
    for _ in range(item_count):
        pixel_count = out_queue.get()
        total_blue_pixels += pixel_count

    in_queue.shutdown()
    for thread in threads:
        thread.join()
    print(f"Blue pixels: {counter.get_blue_pixel_count()}")


start = time.time()
queued_count_blue_pixels(counter)
print("took", time.time() - start)

### Seems nice right? Or at least, nicer?

This works better than the one-thread-per-unit-of-work fan-out from item 72. But it's still got issues. For example, lets say I wanted to count Green and Red pixels too? I'd have to copy/paste a lot of that queue pipeline code.

Another area where this falls short: why did I choose 5 workers? Did I masterfully calculate the precise number of workers for maximum efficiency on my machine?

No. I guessed. And that's not a GREAT way to approach concurrency, since concurrency typically only comes in the mix when we want more efficiency.

## Item 74: Consider ThreadPoolExecutor When Threads Are Necessary for Concurrency
There's a reason Temporal `Worker`s don't ask you to pass them some `Queue` instances when they need to execute blocking code.

Heck, Java folks have probably been waiting for this section. BEHOLD: `ThreadPoolExecutor`

In [None]:
from concurrent.futures import ThreadPoolExecutor

counter.set_blue_pixel_count(0)


def pooled_count_blue_pixels(
    counter_ref: BluePixels, pool: ThreadPoolExecutor, count_em=count_em
):
    cap = MyCapturedVideo(counter_ref.get_video_path())

    futures = []

    while True:
        did_read_frame, frame = cap.get_frame()
        if not did_read_frame:
            break
        args = (counter_ref, frame)
        future = pool.submit(count_em, *args)
        futures.append(future)

    for future in futures:
        future.result()

    print(f"Blue pixels: {counter.get_blue_pixel_count()}")


with ThreadPoolExecutor(max_workers=5) as pool:
    start = time.time()
    pooled_count_blue_pixels(counter, pool)
    print("took", time.time() - start)

In [None]:
# also ThreadPoolExecutor handles exceptions in a saner way than Threads
def my_crappy_func():
    raise Exception("oh no")


with ThreadPoolExecutor(max_workers=5) as pool:
    task = pool.submit(my_crappy_func)
    task.result()

ThreadPoolExecutor is a good solution for things like blocking file I/O. But it doesn't really scale up for things like web servers.

## Item 75: Achieve Highly Concurrent I/O with Coroutines
**SPOILER ALERT**: This is the stuff that backs FastAPI (and every other ASGI compliant server)

Coroutines operate similar to generators (remember those?) where they will pause on `await` and resume as soon as the _awaitable_ `async` function is resolved. This is similar to how `yield` works in generators.

Coroutines don't have the baggage that Threads do: they have a low memory footprint, and their call-cost is _functionally_ equivalent to a function call. Fast baby.

They also don't need all this complicated locking/synchronizing logic.

In [None]:
import asyncio


async def aio_count_em(frame: np.ndarray) -> int:
    blue = frame[:, :, 0]
    green = frame[:, :, 1]
    red = frame[:, :, 2]

    blue_mask = blue >= 50
    red_mask = red <= max_allowed_red
    green_mask = green <= max_allowed_green

    blue_pixels = blue_mask & red_mask & green_mask
    return cv2.countNonZero(blue_pixels.astype(np.uint8))


# Again, just reusing BluePixels here because it has that video in memory
async def aio_count_blue_pixels(counter_ref: BluePixels, aio_count_em=aio_count_em):
    cap = cv2.VideoCapture(counter_ref.get_video_path())

    tasks = []
    while True:
        did_read_frame, frame = cap.read()
        if not did_read_frame:
            break
        task = aio_count_em(frame)  # note the lack of await
        # GUESS TIME: what do you think the type of `task` is?
        tasks.append(task)

    total = sum(await asyncio.gather(*tasks))
    cap.release()
    cv2.destroyAllWindows()
    print(f"Blue pixels: {total}")


start = time.time()
await aio_count_blue_pixels(counter)
print("took", time.time() - start)

#### What stands out to you about that example?
1. it was slower than the ThreadPoolExecutor: why?
2. was it easier to read?

In [None]:
# Let's simulate some latency, as if we were reading over a network
def count_em_3(counter_ref: BluePixels, frame: np.ndarray):
    time.sleep(0.001)
    return count_em(counter_ref, frame)


async def aio_count_em_2(frame: np.ndarray) -> int:
    await asyncio.sleep(0.001)
    return await aio_count_em(frame)


start = time.time()
with ThreadPoolExecutor(max_workers=5) as pool:
    pooled_count_blue_pixels(counter, pool, count_em=count_em_3)
    print("Thread Pool took", time.time() - start)

start = time.time()
await aio_count_blue_pixels(counter, aio_count_em=aio_count_em_2)
print("Async took", time.time() - start)

Assuming that demo worked as expected, 👀 coroutines were _a lot_ faster. This is because the sleep on the coroutine yielded execution back to the event loop, so the loop continued a different coroutine. The threads did something similar: they blocked, which triggered a release of the GIL, but due to the higher overhead cost of that, and the context switching involved, it was slower. Python threads **are** system threads, and therefore the OS had to save/restore CPU registers, memory mappings, etc. $$$

Thought experiment for the group: would the Java threading equivalent be faster than both of these?