In [1]:
import numpy as np
import requests
import httpx
from PIL import Image
import io
from tqdm.auto import tqdm
from tqdm.asyncio import tqdm_asyncio
import onnxruntime as ort

In [2]:
!uv pip install aiohttp

[2mUsing Python 3.11.11 environment at: /home/chris/repos/deepweeds-kerascv/.venv[0m
[2mAudited [1m1 package[0m [2min 3ms[0m[0m


In [3]:
BATCH_SIZE = 4096

In [4]:
def gen_batch(n_batch: int):
    images = []
    for _ in range(n_batch):
        img_bytes = io.BytesIO()
        data = np.random.uniform(0, 255, (256, 256, 3)).astype(np.uint8)
        Image.fromarray(data).save(img_bytes, "JPEG")
        img_bytes.seek(0)
        images.append(img_bytes)
    return images

## 1. Get baselines running inference directly

In [5]:
def make_session(providers: list[str | tuple[str, dict]]):
    onnx_session = ort.InferenceSession("../models/MeNet.onnx", providers=providers)
    onnx_input_name = onnx_session.get_inputs()[0].name
    onnx_output_name = onnx_session.get_outputs()[0].name
    preload_sample = np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
    onnx_session.run([onnx_output_name], {onnx_input_name: preload_sample})
    return onnx_session

In [6]:
onnx_session = make_session(providers=["TensorrtExecutionProvider"])
onnx_input_name = onnx_session.get_inputs()[0].name
onnx_output_name = onnx_session.get_outputs()[0].name

In [7]:
%%timeit
preload_sample = np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
_ = onnx_session.run([onnx_output_name], {onnx_input_name: preload_sample})

1.93 ms ± 37.6 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [8]:
images = [
    np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
    for _ in range(BATCH_SIZE)
]
for img in tqdm(images):
    _ = onnx_session.run([onnx_output_name], {onnx_input_name: img})

  0%|          | 0/4096 [00:00<?, ?it/s]

In [9]:
del onnx_session
onnx_session = make_session(
    providers=[("TensorrtExecutionProvider", {"trt_fp16_enable": True})]
)

In [10]:
%%timeit
preload_sample = np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
_ = onnx_session.run([onnx_output_name], {onnx_input_name: preload_sample})

1.71 ms ± 93.9 μs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


In [11]:
images = [
    np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
    for _ in range(BATCH_SIZE)
]
for img in tqdm(images):
    _ = onnx_session.run([onnx_output_name], {onnx_input_name: img})

  0%|          | 0/4096 [00:00<?, ?it/s]

In [12]:
del onnx_session
onnx_session = make_session(providers=["CUDAExecutionProvider"])

In [13]:
%%timeit
preload_sample = np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
_ = onnx_session.run([onnx_output_name], {onnx_input_name: preload_sample})

3.81 ms ± 568 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [14]:
images = [
    np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
    for _ in range(BATCH_SIZE)
]
for img in tqdm(images):
    _ = onnx_session.run([onnx_output_name], {onnx_input_name: img})

  0%|          | 0/4096 [00:00<?, ?it/s]

In [15]:
del onnx_session
onnx_session = make_session(providers=["CPUExecutionProvider"])

In [16]:
%%timeit
preload_sample = np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
_ = onnx_session.run([onnx_output_name], {onnx_input_name: preload_sample})

31.5 ms ± 5.91 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [17]:
images = [
    np.random.uniform(0, 255, (1, 256, 256, 3)).astype(np.float32)
    for _ in range(BATCH_SIZE)
]
for img in tqdm(images):
    _ = onnx_session.run([onnx_output_name], {onnx_input_name: img})

  0%|          | 0/4096 [00:00<?, ?it/s]

## 2. Results running inference through our fastapi endpoint

In [18]:
%%timeit

img_bytes = io.BytesIO()
data = np.random.uniform(0, 255, (256, 256, 3)).astype(np.uint8)
Image.fromarray(data).save(img_bytes, "JPEG")
img_bytes.seek(0)

url = "http://localhost:8000/predict"
files = {"file": ("image.jpg", img_bytes, "image/jpeg")}

response = requests.post(url, files=files)
assert response.status_code == 200
# print(response.status_code)
# print(response.json())


5.87 ms ± 142 μs per loop (mean ± std. dev. of 7 runs, 100 loops each)


In [19]:
async def send_image_request(image_bytes, client: httpx.AsyncClient):
    # Convert numpy array to image bytes
    files = {"file": ("image.jpg", image_bytes, "image/jpeg")}
    response = await client.post("/predict", files=files)
    return response.json()

In [20]:
images = gen_batch(BATCH_SIZE // 4)
async with httpx.AsyncClient(
    base_url="http://localhost:8000/", limits=httpx.Limits(max_connections=1024)
) as client:
    res = await tqdm_asyncio.gather(
        *[send_image_request(image, client) for image in images[:]]
    )

100%|██████████| 1024/1024 [00:09<00:00, 108.86it/s]


HTTPX is borked, I've no idea why it's so slow?? - doesn't make any requests with > 1024 coroutines even with a reasonable connection limit (so need to do some extra concurrency control probably), and is worse on both sync and async compared to standard requests.

In [21]:
images = gen_batch(BATCH_SIZE)

In [22]:
for img_bytes in tqdm(images):
    files = {"file": ("image.jpg", img_bytes, "image/jpeg")}
    response = requests.post("http://localhost:8000/predict", files=files)
    assert response.status_code == 200, (response.status_code, response.content)

  0%|          | 0/4096 [00:00<?, ?it/s]

In [23]:
import aiohttp


async def aio_request(img_bytes, session):
    data = aiohttp.FormData()
    data.add_field("file", img_bytes, filename="image.jpg", content_type="image/jpeg")

    async with session.post("http://localhost:8000/predict", data=data) as response:
        return await response.json()

In [24]:
images = gen_batch(BATCH_SIZE)
async with aiohttp.ClientSession() as session:
    res = await tqdm_asyncio.gather(
        *[aio_request(image, session) for image in images[:]]
    )

100%|██████████| 4096/4096 [00:12<00:00, 335.68it/s]


Looks like our gpu usage is never above ~50% even through the high request rate, so data throughput / python overhead is a hard-limit - we can't feed the model fast enough.  Bumping the number of processes will ease this but annoyingly we need a separate model instance for each, maybe this would be not be a problem if we switched to a language that allows parallel threads

Running the server with 4x uvicorn workers:

In [None]:
images = gen_batch(BATCH_SIZE)
async with aiohttp.ClientSession() as session:
    res = await tqdm_asyncio.gather(
        *[aio_request(image, session) for image in images[:]]
    )

100%|██████████| 4096/4096 [00:04<00:00, 833.67it/s]


^ Halves the runtime but 4x's the vram usage (way under the limit anyway though so we're fine, but for sure not ideal with larger models)

Ideas to experiment with in random order:

1. Try an optimised inference service that does clever things I don't have time to implement - (nvidia Triton, TFServing...)
2. Try to figure out how to make the onnx inference session async compatible
3. If no luck try to re-write inference service in cpp with threading & async (as opposed to async only) on the prediction endpoint