# Inference-as-a-Service with `hermes`
This is intended to serve as a brief overview as to how the `hermes` libraries might be used to accelerate an inference deployment as-a-service using NVIDIA's [Triton inference server](https://developer.nvidia.com/nvidia-triton-inference-server). We'll start by showing what a vanilla, suboptimal deployment might look like to introduce all the relevant concepts, then make things slightly more complex to show how to analyze and identify the bottlenecks in a deployment and remove them.

As you can see from the `pyproject.toml`, all of the relevant `hermes` libraries are currently installed in this environment. In a production setting, you might consider breaking these up to keep environments more lightweight. For example, `hermes.quiver` might be installed in your training environment to export at train time, or might be installed in a dedicated export deployment if it involves more complex dependencies like TensorRT. Meanwhile, your inference environment might have `hermes.aeriel` and `hermes.stillwater` installed for deploying and monitoring an inference service. This is not critical to the discussion here, but worth bringing up to point out that the `hermes` libraries are not a monolith and are intended to be lightweight and composable.

## Overview
In this example, we'll begin by building a neural network, then exporting it from memory to disk in a format that Triton can use for serving it for inference. Obviously, in practice we'd like to train this network on some data between these steps, but since we're more interested here in the inference side, we'll pretend this training has been done elsewhere. For the sake of simplicity, we'll build a 1D convolutional network with a single output (which might be used for e.g. for binary classification).

Once our model has been properly exported, we'll spin up a Triton inference service which will load the model and expose it for inference via gRPC requests. We'll then build some dummy inference data and iterate through it to build requests to send to our inference service, aggregating its responses into a timeseries of network outputs.

We'll start with our imports. For the time being, we'll be using
- `hermes.quiver` to handle exporting our model to a format usable by Triton
- `hermes.aeriel.serve` to spin up an inference service locally using Python APIs
- `hermes.aeriel.client` to make requests to that inference service

In [1]:
import logging
import os
import shutil
import sys
import time

import numpy as np
import pandas as pd
import torch

# some plotting utilities
from bokeh.io import output_notebook, show
from bokeh.plotting import figure
from bokeh.palettes import Dark2_8 as palette

# our hermes imports
from hermes import quiver as qv
from hermes.aeriel.client import InferenceClient
from hermes.aeriel.serve import serve
from hermes.stillwater import ServerMonitor

# couple cheap local utilities
from src import utils

output_notebook()
logger = utils.get_logger()

Now let's establish some parameters for both the model we'd like to build as well as for our inference time deployment.

In [2]:
# model parameters
NUM_IFOS = 2  # number of interferometers analyzed by our model
SAMPLE_RATE = 2048  # rate at which input data to the model is sampled
KERNEL_LENGTH = 4  # length of the input to the model in seconds

# inference parameters
INFERENCE_DATA_LENGTH = 2048  # amount of data to analyze at inference time
INFERENCE_SAMPLING_RATE = 0.25  # rate at which we'll sample input windows from the inference data
INFERENCE_RATE = 250  # seconds of data we'll try to analyze per second

# convert some of these into more useful units for slicing purposes
kernel_size = int(SAMPLE_RATE * KERNEL_LENGTH)
inference_stride = int(SAMPLE_RATE / INFERENCE_SAMPLING_RATE)
inference_data_size = int(SAMPLE_RATE * INFERENCE_DATA_LENGTH)
num_inferences = (inference_data_size - kernel_size) // inference_stride + 1

Now let's build our extremely simple network

In [3]:
class GlobalAvgPool(torch.nn.Module):
    def forward(self, x):
        return x.mean(axis=-1)


nn = torch.nn.Sequential(
    torch.nn.Conv1d(NUM_IFOS, 8, kernel_size=7, stride=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(8, 32, kernel_size=7, stride=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(32, 64, kernel_size=7, stride=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(64, 128, kernel_size=7, stride=2),
    torch.nn.ReLU(),
    torch.nn.Conv1d(128, 256, kernel_size=7, stride=2),
    torch.nn.ReLU(),
    GlobalAvgPool(),
    torch.nn.Linear(256, 1024),
    torch.nn.ReLU(),
    torch.nn.Linear(1024, 1)
)

# INSERT TRAINING CODE HERE

Ok, now the set-up work is done. We have a "trained" neural network, and we're ready to export it for inference. Enter `hermes`. One of the key concepts in as-a-service inference is the idea of a **model repository**, a local (or cloud-based) directory with a prescribed structure that hosts all the versions of all the models to be served for inference.

Maintaining this prescribed structure, as well as the configs that Triton needs to be able to map to named inputs and outputs, can be onerous and non-trivial, so `hermes.quiver` was built to take the headache out of building and maintaining these repositories. In this next step, we'll build a model repository (clearing it beforehand in case you run this notebook multiple times), add a new entry to it for the network that we just build, then export the current version of this network to that repository as an [ONNX](https://onnx.ai/) binary which Triton can load and serve for inference.

In [4]:
# let's make sure we're starting with a fresh repo
repo_path = "model-repo"
utils.clear_repo(repo_path)

# initialize a blank model repository
repo = qv.ModelRepository(repo_path)
assert len(repo.models) == 0  # this attribute will get updated as we add models

# create a new entry in the repo for our model
model = repo.add("my-classifier", platform=qv.Platform.ONNX)
assert len(repo.models) == 1
assert model == repo.models["my-classifier"]

# now export our current version of the network to this entry.
# Since we haven't exported any versions of this model yet,
# Triton needs to know what names to give the inputs and
# outputs and what shapes to expect, so we have to specify
# them explicitly this first time.
# Note that -1 indicates variable length batch dimension.
model.export_version(
    nn,
    input_shapes={"hoft": (-1, NUM_IFOS, kernel_size)},
    output_names=["prob"]
)

'my-classifier/1/model.onnx'

Note that our model is associated with a `Config` object that describes the metadata Triton requires

In [5]:
model.config

name: "my-classifier"
platform: "onnxruntime_onnx"
input {
  name: "hoft"
  data_type: TYPE_FP32
  dims: -1
  dims: 2
  dims: 8192
}
output {
  name: "prob"
  data_type: TYPE_FP32
  dims: -1
  dims: 1
}

and that each model can be associated with multiple different versions corresponding to different weight values, or even wholesale different architectures. The only thing that matters is that the network represents the same input-to-output mapping:

In [6]:
model.versions

[1]

And with that, our model is ready to be served for inference! In the code below, we'll use `hermes.aeriel.serve` to spin up a [Singularity container](https://docs.sylabs.io/guides/3.5/user-guide/introduction.html) on a single GPU (index 0) which will run a Triton inference service in the background. Once the `serve` context exits, the service and the container running it will both be spun-down.

Once the server is up and running, we'll use `hermes.aeriel.client` to establish a client connection to it, then iterate through our dummy data and make requests using this connection. The requests are made asynchronously and the responses parsed in a callback thread. The parsed responses are made available in the main thread through a queue which can be succinctly accessed by `InferenceClient.get()`, which will return `None` if there are no responses to be returned.

Note that below, we'll do our inference in batch sizes of 1. In principle, we could get better throughput by increasing that batch size (at the cost of some latency), but as we'll see momentarily there are more pressing issues we need to address first.

In [7]:
# Start by spinning up an inference service.
# Note that this will print the singularity command
# used to start the service. This is unfortunately an
# unavoidable bug in singularity right now (if you don't
# want to receive an unnecessary warning instead), but it's
# probably good to note what is really happening under the
# hood anyway.
# The `instance` returned by the context is an object
# representing the running singularity container instance.
with serve("model-repo", gpus=[0]) as instance:
    # Do our data generation in parallel while the server
    # spins up. This will obviously be pretty fast, but
    # for more complicated data generation steps this can
    # be a good way to parallelize your efforts.
    hoft = np.random.randn(NUM_IFOS, inference_data_size).astype("float32")

    # now wait until the inference service is online and
    # ready to receive requests before we attempt to connect
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    # establish a client connection to the inference service
    # and infer the names and shapes of the inputs it expects
    client = InferenceClient(
        "localhost:8001",
        model_name="my-classifier",
        model_version=1  # can use -1 to imply latest version
    )

    # client context establishes a streaming connection to
    # the inference service. Since we're not yet streaming
    # updates but making requests individually, we don't
    # technically need this, but it's a good habit to get into.
    with client:
        # now iterate through our inference timeseries at the
        # prescribed stride and send these inputs to the server
        # for inference.
        for i in range(int(num_inferences)):
            start = i * inference_stride
            stop = start + kernel_size

            # add a dummy dimension for the batch
            kernel = hoft[:, start: stop][None]
            client.infer(kernel)

            # sleep to maintain the prescribed data rate
            # and avoid overloading the network
            time.sleep(1 / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

        # now that we've submitted all our inference requests,
        # start pulling them from the output queue as they
        # become available.
        results = []
        while len(results) < num_inferences:
            response = client.get()
            if response is not None:
                y, request_id, sequence_id = response
                results.append(y[:, 0])
        logger.info("Inference complete!")

# concatenate all the responses into a single timeseries
results = np.concatenate(results)

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest peachy_salad_1710


We can see from our logs that we roughly hit our inference rate target (a little bit below since our sleep sets an upper bound and we have to pay for things like serializing our gRPC request, but not too shabby).

In principle, that does it: you now have everything you need to run inference-as-a-service with `hermes`. But there are some considerations you might think about that could help improve both network performance and throughput. For some inspiration, let's take a look at what our timeseries of network responses looks like:

In [8]:
p = figure(
    title="Network response vs. time",
    x_axis_label="Time from start [s]",
    y_axis_label="NN Output",
    width=600,
    height=300,
    tools="",
)
times = np.arange(num_inferences) / INFERENCE_SAMPLING_RATE
times += KERNEL_LENGTH  # to indicate where the right edge of the kernel is

p.line(times, results, line_width=2.0)
show(p)

So as we should have expected, it's just a timeseries of more or less random data. What is worth noting about this timeseries, however, is the rate at which it is sampled: 0.25 Hz means that for shorter-duration events like binary blackhole mergers, we only get to make one prediction on each event. Surely there might be some benefit to taking predictions from multiple overlapping windows containing the same event and aggregating them somewhow. Let's increase our inference sampling rate to, say, 4 Hz and see how this impacts our throughput.

For this next round of inference, I'm going to add a little more complication up front at the expense of slightly more elegance at inference time. Rather than iterating through responses after all our inference requests have been submitted, I'll set up a callback up front that aggregates our responsesinto an array in real-time in the callback thread, then returns the array once completed.

In [9]:
class Callback:
    def __init__(self, num_inferences):
        self.y = np.zeros((num_inferences,))

    def __call__(self, response, request_id, sequence_id):
        self.y[request_id] = response[0, 0]
        if (request_id + 1) == len(self.y):
            return self.y


# reset some of our parameters with a new inference sampling rate
INFERENCE_SAMPLING_RATE = 4
inference_stride = int(SAMPLE_RATE / INFERENCE_SAMPLING_RATE)
num_inferences = (inference_data_size - kernel_size) // inference_stride + 1
callback = Callback(num_inferences)

# from here things will look more or less the same
with serve("model-repo", gpus=[0]) as instance:
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    # instantiate client with custom callback
    client = InferenceClient(
        "localhost:8001",
        model_name="my-classifier",
        model_version=1,
        callback=callback
    )

    with client:
        for i in range(int(num_inferences)):
            start = i * inference_stride
            stop = start + kernel_size
            kernel = hoft[:, start: stop][None]

            # pass explicit request ids this time
            # for the callback to use
            client.infer(kernel, request_id=i)
            time.sleep(1 / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

    # now wait until the callback returns its
    # filled out array to the client's queue
    while True:
        results = client.get()
        if results is not None:
            logger.info("Inference complete!")
            break

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest goodbye_pancake_5706


And now let's take a look at how this timeseries looks

In [10]:
p = figure(
    title="Network response vs. time",
    x_axis_label="Time from start [s]",
    y_axis_label="NN Output",
    width=600,
    height=300,
    tools="",
)
times = np.arange(num_inferences) / INFERENCE_SAMPLING_RATE
times += KERNEL_LENGTH

p.line(times, results, line_width=2.0)
show(p)

So things work with higher frequency inference, but our logs indicate that we're now falling well short of our intended inference rate target (at time of writing, I'm seeing a rate of ~145 data seconds / second). Why is that? Can our network capable of handling the desired number of requests per second?

For diagnosing these sorts of questions, Triton makes some inference metrics available at port 8002 by default, which clients can query for per-model information like cumulative time spent queueing and executing requests, as well as cumulative inference counts. The `hermes.stillwater` library has a `ServerMonitor` class which requests and organizes these metrics from potentially many servers in a separate process (to not slow down inference) and writes them to a local log file.

Let's run the same snippet from above with a monitor in place and take a look at how queuing latency evolves over time. If requests spend longer and longer queuing over time, then our network is acting as a bottleneck and we need to scale it up. Otherwise, something else is going wrong.

In [11]:
callback = Callback(num_inferences)
with serve("model-repo", gpus=[0]) as instance:
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    client = InferenceClient(
        "localhost:8001",
        model_name="my-classifier",
        model_version=1,
        callback=callback
    )
    monitor = ServerMonitor(
        model_name="my-classifier",
        ips="localhost",
        filename="non-streaming_single-model_server-stats.csv",
        model_version=1,
        name="monitor"
    )

    with client, monitor:
        for i in range(int(num_inferences)):
            start = i * inference_stride
            stop = start + kernel_size
            kernel = hoft[:, start: stop][None]

            client.infer(kernel, request_id=i)
            time.sleep(1 / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

        # wait for the callback under the context this
        # time to let the monitor keep doing its thing
        while True:
            results = client.get()
            if results is not None:
                logger.info("Inference complete!")
                break

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest stinky_pastry_3512


Now let's load in the CSV that our monitor produced and take a look at the data it captured.

In [12]:
df = pd.read_csv("non-streaming_single-model_server-stats.csv")
df

Unnamed: 0,timestamp,ip,model,count,queue,compute_input,compute_infer,compute_output,request
0,1.663720e+09,localhost,my-classifier,2,7691253,7346,7685740,60,15384601
1,1.663720e+09,localhost,my-classifier,251,1889819993,22845,7898638,2164,1897756693
2,1.663720e+09,localhost,my-classifier,462,3261910440,22645,151057,4121,3262109736
3,1.663720e+09,localhost,my-classifier,451,2887317378,24483,146749,3998,2887515642
4,1.663720e+09,localhost,my-classifier,463,2660231453,23813,148424,3828,2660429533
...,...,...,...,...,...,...,...,...,...
61,1.663720e+09,localhost,my-classifier,58,3170,2896,21323,418,30811
62,1.663720e+09,localhost,my-classifier,57,3005,2796,20496,455,29940
63,1.663720e+09,localhost,my-classifier,57,3007,2710,21033,483,29961
64,1.663720e+09,localhost,my-classifier,58,3250,2835,20409,421,30484


The `queue`, `compute_input`, `compute_infer`, and `compute_output` columns all represent different steps in the inference compute of a single request. The values in each column represent the cumulative microseconds spent on each step over all of the requests computed between each ping to the metrics service, the number of which is indicated by the `count` column. Knowing this, let's reframe some of the info in these columns in a more useful fashion for our purposes.

In [13]:
df["Time since start (s)"] = df["timestamp"] - df["timestamp"][0]
df["Average queue time (us)"] = df["queue"] / df["count"]
infer_time = df[[f"compute_{i}" for i in ["input", "infer", "output"]]].sum(axis=1)
df["Average infer time (us)"] = infer_time / df["count"]

df = df[["Time since start (s)", "Average queue time (us)", "Average infer time (us)"]]
df

Unnamed: 0,Time since start (s),Average queue time (us),Average infer time (us)
0,0.000000,3.845626e+06,3.846573e+06
1,0.101565,7.529163e+06,3.156831e+04
2,0.203396,7.060412e+06,3.848983e+02
3,0.305061,6.402034e+06,3.885366e+02
4,0.406572,5.745640e+06,3.802700e+02
...,...,...,...
61,6.189616,5.465517e+01,4.247759e+02
62,6.291206,5.271930e+01,4.166140e+02
63,6.392915,5.275439e+01,4.250175e+02
64,6.494053,5.603448e+01,4.080172e+02


Now let's plot the queue time as a function of time, and consider what this data tells us:

In [14]:
p = figure(
    title="Queuing latency vs. time",
    x_axis_label="Time from start of monitoring [s]",
    y_axis_label="Average queue latency in interval [us]",
    height=300,
    width=600,
    tools=""
)
p.line(
    df["Time since start (s)"],
    df["Average queue time (us)"],
    line_width=2.0
)
show(p)

Counterintuitively, our queue time starts off really high, then goes *down* over time! Moreover, the length of the x-axis, a little over 6 seconds, doesn't match up with the time delta we see between our logs above. What's going on here?

It turns out that the first couple of inferences can often take substantially longer than the rest, as Triton tries to optimize the compute kernels used to actually perform inference. So while the first request is taking several seconds to process, an enormous queue builds up while we inundate the server with follow-ups. Meanwhile, the `ServerMonitor` doesn't start saving metrics until the server-side metrics service indicates that at least one inference has completed, which explains why we see a much shorter observation window.

One trick to get around this in practice is to block until the first inference response comes back, then open the floodgates to the rest of our requests. Let's try this and then see what story our inference metrics tell us.

In [15]:
callback = Callback(num_inferences)
with serve("model-repo", gpus=[0]) as instance:
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    client = InferenceClient(
        "localhost:8001",
        model_name="my-classifier",
        model_version=1,
        callback=callback
    )
    monitor = ServerMonitor(
        model_name="my-classifier",
        ips="localhost",
        filename="non-streaming_single-model-with-wait_server-stats.csv",
        model_version=1,
        name="monitor"
    )

    with client, monitor:
        for i in range(int(num_inferences)):
            start = i * inference_stride
            stop = start + kernel_size
            kernel = hoft[:, start: stop][None]

            client.infer(kernel, request_id=i)
            time.sleep(1 / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

            if not i:
                # you might imagine building a better callback with
                # a more explicit/elegant mechanism for handling this
                while not callback.y[0]:
                    time.sleep(1e-3)
                logger.info("First request completed")

        while True:
            results = client.get()
            if results is not None:
                logger.info("Inference complete!")
                break

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest bloated_lentil_1869


In [16]:
df = pd.read_csv("non-streaming_single-model-with-wait_server-stats.csv")

# go through our preparation steps
df["Time since start (s)"] = df["timestamp"] - df["timestamp"][0]
df["Average queue time (us)"] = df["queue"] / df["count"]
infer_time = df[[f"compute_{i}" for i in ["input", "infer", "output"]]].sum(axis=1)
df["Average infer time (us)"] = infer_time / df["count"]
df["Throughput (s' / s)"] = df["count"] / df["timestamp"].diff() / INFERENCE_SAMPLING_RATE

df = df[[
    "Time since start (s)",
    "Throughput (s' / s)",
    "Average queue time (us)",
    "Average infer time (us)"
]]
df

Unnamed: 0,Time since start (s),Throughput (s' / s),Average queue time (us),Average infer time (us)
0,0.000000,,48.833333,311259.750000
1,0.101627,162.357869,47.606061,386.030303
2,0.203340,157.306186,49.578125,387.828125
3,0.304984,159.872023,50.076923,379.446154
4,0.406557,159.983475,53.507692,375.384615
...,...,...,...,...
137,13.885181,138.503583,54.696429,401.035714
138,13.986419,138.287978,55.178571,453.892857
139,14.087929,137.916734,56.303571,397.660714
140,14.189511,140.282006,57.157895,406.701754


In [17]:
p = figure(
    title="Queuing latency vs. time",
    x_axis_label="Time from start of monitoring [s]",
    y_axis_label="Average queue latency in interval [us]",
    height=300,
    width=600,
    tools=""
)
p.line(
    df["Time since start (s)"].loc[1:],
    df["Average queue time (us)"].loc[1:],
    line_width=2.0
)
show(p)

There's a couple interesting takeaways here. First and foremost is that even though we're unable to hit our target throughput, the queue latency for our model stays largely constant as a function of time. This tells us that our neural network's throughput capacity is not being saturated: we're not getting data to it fast enough, and so it's not the bottleneck in our pipeline.

We can also see this from the fact that our average throughput matches the total throughput we saw above even when we had the first request bottlenecking our pipeline: once the first request cleared, the network was able to tear through our data faster than we could replenish the queue with new requests, and so the total time taken was purely a function of how quickly we could send requests to the server.

So with this in mind, what's the existing bottleneck and how do we alleviate it? Well, think about what happened when we went from `INFERENCE_SAMPLING_RATE = 1 / KERNEL_LENGTH` to `INFERENCE_SAMPLING_RATE = KERNEL_LENGTH`: we're continuing to send `KERNEL_LENGTH`-long windows of data to the inference service with each request, but now we're doing it 16 times as often per second, with largely redundant data! It's this network I/O that's bottlenecking our pipeline now.

To alleviate this, we'll need to build a model on the server-side which can cache data we've already sent and build the windows we need with exclusively *new* data and pass these along to our neural network. `hermes.quiver` has built-in support for constructing such a **model ensemble** by adding in a "snapshotter" model on the front-end of the server which maintains the state of the most recent input snapshot.

In [18]:
# add a new meta-model to the repository that organizes
# graphs of existing models to pass outputs from one
# as inputs to the next
ensemble = repo.add("streaming-classifier", platform=qv.Platform.ENSEMBLE)

# insert a snapshotter model at the front of this ensemble
# whose output will be passed to the input of my-classifier
classifier = repo.models["my-classifier"]
ensemble.add_streaming_inputs(
    classifier.inputs["hoft"],
    stream_size=inference_stride,
    batch_size=1
)

# we know our first request takes ~12s, so make
# sure that the snapshotter will maintain a state
# for longer than this
snapshotter = repo.models["snapshotter"]
snapshotter.config.sequence_batching.max_sequence_idle_microseconds = int(25 * 10**6)
snapshotter.config.write()

# mark the output of our classifier as the output
# of the whole ensemble and then export a "version"
# of the ensemble (basically just writes its config)
ensemble.add_output(classifier.outputs["prob"])
ensemble.export_version(None)

'streaming-classifier/1/model.empty'

Now we can run inference on our streaming model and only send the updates we need for each request, rather than the entire window!

In [19]:
# keep our inference results from earlier to
# verify that this implementation comes out the same
nonstreaming_results = results

# we need to do more inferences this time, since
# the snapshot state gets initialized to 0s, so the
# first KERNEL_LENGTH * INFERENCE_SAMPLING_RATE updates
# just function to fill the snapshot out
num_inferences = inference_data_size // inference_stride
callback = Callback(num_inferences)
with serve("model-repo", gpus=[0]) as instance:
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    client = InferenceClient(
        "localhost:8001",
        model_name="streaming-classifier",
        model_version=1,
        callback=callback
    )
    monitor = ServerMonitor(
        model_name="streaming-classifier",
        ips="localhost",
        filename="streaming_single-model_server-stats.csv",
        model_version=1,
        name="monitor"
    )

    with client, monitor:
        for i in range(int(num_inferences)):
            start = i * inference_stride
            stop = start + inference_stride  # note the smaller slice
            kernel = hoft[:, start: stop]

            # provide some additional information to
            # the inference server to allow us to keep
            # track of multiple different streams
            client.infer(
                kernel,
                request_id=i,
                sequence_id=1001,
                sequence_start=i == 0,
                sequence_end=(i + 1) == num_inferences
            )
            time.sleep(1 / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

            if not i:
                while not callback.y[0]:
                    time.sleep(1e-3)
                logger.info("First request completed")

        while True:
            results = client.get()
            if results is not None:
                logger.info("Inference complete!")
                break

results = results[int(KERNEL_LENGTH * INFERENCE_SAMPLING_RATE) - 1:]
assert (results == nonstreaming_results).all()

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest gloopy_lettuce_8926


In [20]:
p = figure(
    title="Network response vs. time",
    x_axis_label="Time from start [s]",
    y_axis_label="NN Output",
    width=600,
    height=300,
    tools="",
)
times = np.arange(num_inferences) / INFERENCE_SAMPLING_RATE
times += KERNEL_LENGTH

p.line(times, results, line_width=2.0)
show(p)

In [21]:
df = pd.read_csv("streaming_single-model_server-stats.csv")
df

Unnamed: 0,timestamp,ip,model,count,queue,compute_input,compute_infer,compute_output,request
0,1.663720e+09,localhost,snapshotter,16,1465,665,2155,641,5280
1,1.663720e+09,localhost,my-classifier,16,851,153,7390567,465,7392855
2,1.663720e+09,localhost,snapshotter,80,7423,2749,10116,3115,25100
3,1.663720e+09,localhost,my-classifier,79,3869,708,23365,1977,33034
4,1.663720e+09,localhost,snapshotter,80,7406,2956,12062,3606,27803
...,...,...,...,...,...,...,...,...,...
203,1.663720e+09,localhost,my-classifier,81,3055,1835,24571,2040,38768
204,1.663720e+09,localhost,snapshotter,80,7796,2809,9827,2971,25187
205,1.663720e+09,localhost,my-classifier,79,3247,1811,24295,1999,38322
206,1.663720e+09,localhost,snapshotter,79,4864,2656,8732,2267,20091


In [22]:
p = figure(
    title="Queuing latency vs. time",
    x_axis_label="Time from start of monitoring [s]",
    y_axis_label="Average queue latency in interval [us]",
    height=300,
    width=600,
    tools=""
)

for model, color in zip(["snapshotter", "my-classifier"], palette):
    subdf = df[df["model"] == model]
    time_from_start = subdf["timestamp"] - df["timestamp"].iloc[0]
    queue_time = subdf["queue"] / subdf["count"]

    p.line(
        time_from_start.iloc[1:],
        queue_time.iloc[1:],
        line_width=2.0,
        line_color=color,
        legend_label=model
    )
p.legend.location = "top_left"
show(p)

It looks like there's a slight upward trend to our snapshotter model's queue times, which means we may be bottlenecked by the rate at which can perform the snapshot update. This is the tradeoff we incur in exchange for reduced network I/O: we introduce a serial step in our processing pipeline which limits the amount of parallelism we can take advantage of.

There are a couple different ways we can get around this, however. We can either parallelize across multiple inference streams (or take a single stream and break it up into multiple), or we can batch our updates to take advantage of parallel execution on the server. I'm going to to try the latter first, but later we'll see how `hermes` makes the former easy as well.

Let's export a new ensemble and associated snapshotter model that expects batched updates, then run inference with that.

In [23]:
batched_ensemble = repo.add("batched-streaming-classifier", platform=qv.Platform.ENSEMBLE)

# right now we can only handle batches small
# enough that the update isn't longer than
# the kernel itself, so we'll use the biggest
# batch we can with that constraint
batch_size = int(KERNEL_LENGTH * INFERENCE_SAMPLING_RATE)
classifier = repo.models["my-classifier"]
batched_ensemble.add_streaming_inputs(
    classifier.inputs["hoft"],
    stream_size=inference_stride,
    batch_size=batch_size,
    name="batched-snapshotter"
)

# we know our first request takes ~12s, so make
# sure that the snapshotter will maintain a state
# for longer than this
snapshotter = repo.models["batched-snapshotter"]
snapshotter.config.sequence_batching.max_sequence_idle_microseconds = int(25 * 10**6)
snapshotter.config.write()

# mark the output of our classifier as the output
# of the whole ensemble and then export a "version"
# of the ensemble (basically just writes its config)
batched_ensemble.add_output(classifier.outputs["prob"])
batched_ensemble.export_version(None)

'batched-streaming-classifier/1/model.empty'

In [24]:
# make a new callback that's better equipped to
# slice out a batch of responses
class BatchedCallback:
    def __init__(self, num_inferences, batch_size):
        self.y = np.zeros((num_inferences * batch_size,))
        self.batch_size = batch_size

    def __call__(self, response, request_id, sequence_id):
        start = request_id * self.batch_size
        self.y[start: start + len(response)] = response[:, 0]
        if (start + len(response) + 1) >= len(self.y):
            return self.y


num_kernels = inference_data_size // inference_stride
num_inferences = num_kernels // batch_size
callback = BatchedCallback(num_inferences, batch_size)
with serve("model-repo", gpus=[0]) as instance:
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    client = InferenceClient(
        "localhost:8001",
        model_name="batched-streaming-classifier",
        model_version=1,
        callback=callback
    )
    monitor = ServerMonitor(
        model_name="batched-streaming-classifier",
        ips="localhost",
        filename="batched-streaming_single-model_server-stats.csv",
        model_version=1,
        name="monitor"
    )

    with client, monitor:
        for i in range(int(num_inferences)):
            start = i * inference_stride * batch_size
            stop = (i + 1) * inference_stride * batch_size
            kernel = hoft[:, start: stop]

            # provide some additional information to
            # the inference server to allow us to keep
            # track of multiple different streams
            client.infer(
                kernel,
                request_id=i,
                sequence_id=1001,
                sequence_start=i == 0,
                sequence_end=(i + 1) == num_inferences
            )
            time.sleep(batch_size / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

            if not i:
                while not callback.y[0]:
                    time.sleep(1e-3)
                logger.info("First request completed")

        while True:
            results = client.get()
            if results is not None:
                logger.info("Inference complete!")
                break

results = results[int(KERNEL_LENGTH * INFERENCE_SAMPLING_RATE) - 1:]
assert (results == nonstreaming_results).all()

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest lovely_malarkey_1196


In [25]:
from bokeh.models import LinearAxis, Range1d

df = pd.read_csv("batched-streaming_single-model_server-stats.csv")
p = figure(
    title="Queuing latency vs. time",
    x_axis_label="Time from start of monitoring [s]",
    y_axis_label="Average queue latency in interval [us]",
    height=400,
    width=750,
    y_range=(0, 200),
    tools=""
)
p.extra_y_ranges = {"throughput": Range1d(0, 300)}
axis = LinearAxis(axis_label="Throughput [data s / s]", y_range_name="throughput")
p.add_layout(axis, "right")

for model, color in zip(["batched-snapshotter", "my-classifier"], palette):
    subdf = df[df["model"] == model]
    time_from_start = subdf["timestamp"] - df["timestamp"].iloc[0]

    queue_time = subdf["queue"] / subdf["count"]
    throughput = batch_size * subdf["count"] / subdf.timestamp.diff() / INFERENCE_SAMPLING_RATE

    p.line(
        time_from_start.iloc[2:],
        queue_time.iloc[2:],
        line_width=2.0,
        line_color=color,
        legend_label=model + " queue latency"
    )
    p.line(
        time_from_start.iloc[2:],
        throughput.iloc[2:],
        line_width=2.0,
        line_color=color,
        line_dash="2 2",
        legend_label=model + " throughput",
        y_range_name="throughput"
    )
p.legend.orientation = "horizontal"
p.legend.location = "bottom"
show(p)

In [26]:
INFERENCE_RATE = 2500
callback = BatchedCallback(num_inferences, batch_size)
with serve("model-repo", gpus=[0]) as instance:
    logger.info("Waiting for inference service to come online...")
    instance.wait()
    logger.info("Service ready!")

    client = InferenceClient(
        "localhost:8001",
        model_name="batched-streaming-classifier",
        model_version=1,
        callback=callback
    )
    monitor = ServerMonitor(
        model_name="batched-streaming-classifier",
        ips="localhost",
        filename="batched-streaming-2500_single-model_server-stats.csv",
        model_version=1,
        name="monitor"
    )

    with client, monitor:
        for i in range(int(num_inferences)):
            start = i * inference_stride * batch_size
            stop = (i + 1) * inference_stride * batch_size
            kernel = hoft[:, start: stop]

            # provide some additional information to
            # the inference server to allow us to keep
            # track of multiple different streams
            client.infer(
                kernel,
                request_id=i,
                sequence_id=1001,
                sequence_start=i == 0,
                sequence_end=(i + 1) == num_inferences
            )
            time.sleep(batch_size / INFERENCE_SAMPLING_RATE / INFERENCE_RATE)

            if not i:
                while not callback.y[0]:
                    time.sleep(1e-3)
                logger.info("First request completed")

        while True:
            results = client.get()
            if results is not None:
                logger.info("Inference complete!")
                break

singularity -s instance start --nv /cvmfs/singularity.opensciencegrid.org/fastml/gwiaas.tritonserver:latest crunchy_peanut_butter_6737


In [27]:
df = pd.read_csv("batched-streaming-2500_single-model_server-stats.csv")
p = figure(
    title="Queuing latency vs. time",
    x_axis_label="Time from start of monitoring [s]",
    y_axis_label="Average queue latency in interval [us]",
    height=400,
    width=750,
    y_range=(0, 400),
    tools=""
)
p.extra_y_ranges = {"throughput": Range1d(1800, 2200)}
axis = LinearAxis(axis_label="Throughput [data s / s]", y_range_name="throughput")
p.add_layout(axis, "right")

for model, color in zip(["batched-snapshotter", "my-classifier"], palette):
    subdf = df[df["model"] == model]
    time_from_start = subdf["timestamp"] - df["timestamp"].iloc[0]

    queue_time = subdf["queue"] / subdf["count"]
    throughput = batch_size * subdf["count"] / subdf.timestamp.diff() / INFERENCE_SAMPLING_RATE

    p.line(
        time_from_start.iloc[2:],
        queue_time.iloc[2:],
        line_width=2.0,
        line_color=color,
        legend_label=model + " queue latency"
    )
    p.line(
        time_from_start.iloc[2:],
        throughput.iloc[2:],
        line_width=2.0,
        line_color=color,
        line_dash="2 2",
        legend_label=model + " throughput",
        y_range_name="throughput"
    )
p.legend.location = "top_right"
show(p)