# Enabling Deep Learning in Gravitational Wave Physics With Inference-as-a-Service
## Alec Gunny

## Deep Learning in Gravitational Wave Astronomy
<img src="images/gw-dl-use-cases.png" height="auto" width="980px" style="top:10px;"></img>

## Inference Time Challenges
<div class="float200 padded">
    <img src="images/hardware-logos.png" width="300px" height="auto" class="right unpadded"/>
    <p class="left">Access to and familiarity with accelerated hardware</p>
</div>

<div>
    <img src="images/framework-logos.png" width="300px" height="auto" class="left padded" />
    <p class="right"> Managing, leveraging, and translating across deep learning software stacks
</div>

<div></div>
<div></div>
<div class="float100">
    <img src="images/distributing-nns.png" width="250px" height="auto" class="right unpadded"/>
    <p class="left">Distributing updated models to dependent users and applications</p>
</div>

## A simple PyTorch example
- Naive inference pipeline for a model trained using PyTorch
- Pipeline itself loads the model and data onto the GPU and executes inference

In [1]:
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue

import numpy as np
import torch

import utils

Instantiate a simple model, "train" it, then save it for inference:

In [2]:
class MLP(torch.nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super().__init__()

        self.layers = torch.nn.ModuleList()
        for size in hidden_sizes:
            self.layers.append(torch.nn.Linear(input_size, size))
            self.layers.append(torch.nn.ReLU())
            input_size = size

        self.layers.append(torch.nn.Linear(input_size, 1))
        self.layers.append(torch.nn.Sigmoid())

    def forward(self, x: torch.tensor) -> torch.tensor:
        for layer in self.layers:
            x = layer(x)
        return x

INPUT_SIZE, HIDDEN_SIZES = 64, [256, 128, 64]
model = MLP(INPUT_SIZE, HIDDEN_SIZES).cuda(device=0)

# do_some_training(model) -> export the trained weights
torch.save(model.state_dict(), "model.pt")

At inference time, load in the model weights and use it to map inputs to outputs

In [3]:
inference_model = MLP(INPUT_SIZE, HIDDEN_SIZES).cuda(0)
inference_model.load_state_dict(torch.load("model.pt"))

# create an array on the CPU
x = np.random.randn(INPUT_SIZE).astype("float32")
with torch.no_grad():
    # move it on to the GPU
    x = torch.from_numpy(x).cuda(0)

    # map it to outputs on the GPU
    y = inference_model(x)

# move the outputs back on to the CPU
y.cpu().numpy()

array([0.47049186], dtype=float32)

- Generally interested in using the model for many thousands or millions of inferences
- Start with the simplest case: a dataset that can fit into memory at once

In [4]:
dataset = np.random.randn(5 * 10**5, 64).astype("float32")

@torch.no_grad()
def do_some_inference(model, dataset, batch_size=8, device_index=0):
    # move the data to the GPU in bulk
    gpu_dataset = torch.from_numpy(dataset).cuda(device_index)

    # iterate through it in batches and yield predictions
    dataset = torch.utils.data.TensorDataset(gpu_dataset)
    for [x] in torch.utils.data.DataLoader(dataset, batch_size=batch_size):
        y = model(x)
        yield y.cpu().numpy()

# run through the dataset and get a rough time estimate
%time outputs = [y for y in do_some_inference(inference_model, dataset)]

CPU times: user 17.6 s, sys: 205 ms, total: 17.8 s
Wall time: 17.8 s


Ok, but how well are we utilizing the GPU?

In [5]:
with utils.GpuUtilProgress(gpu_ids=0) as progbar:
    task_id = progbar.add_task("[cyan]Inference", total=len(dataset))

    outputs = []
    for y in do_some_inference(model, dataset):
        outputs.append(y)
        progbar.update(task_id, advance=len(y))

output = np.concatenate(outputs, axis=0)

Output()

- Not very well! GPUs are expensive, can we improve things via parallel execution (assuming we can't change the batch size)?
- First attempt: Naive (and sloppy) implementation using threading

In [6]:
q = Queue()
def task(dataset_chunk):
    model = MLP(INPUT_SIZE, HIDDEN_SIZES).cuda(0)
    model.load_state_dict(torch.load("model.pt"))

    for y in do_some_inference(model, dataset_chunk):
        q.put(y)

with utils.GpuUtilProgress(0) as progbar, ThreadPoolExecutor(4) as pool:
    task_id = progbar.add_task("Inference with 4 jobs", total=len(dataset))
    [pool.submit(task, x) for x in np.array_split(dataset, 4)]

    outputs = []
    while len(outputs) < len(dataset):
        y = q.get()
        progbar.update(task_id, advance=len(y))
        outputs.extend(y)

Output()

Not much help. What next?

After spending a few hours perusing the PyTorch documentation, we come up with the following basic functional implementation:

In [7]:
def parallel_inference(X, num_gpus, jobs_per_gpu, progbar):
    num_jobs = num_gpus * jobs_per_gpu
    task_id = progbar.add_task(
        f"[cyan]{num_gpus} GPUs/{num_jobs} jobs",
        total=len(X),
        start=False
    )

    # we need special queue and value objects
    # specific to process spawning
    smp = torch.multiprocessing.get_context("spawn")
    q = smp.Queue()
    sync = smp.Value("d", 0.0)

    # pass a bunch of arguments into each
    # process that we need to spawn
    # note that we have to pass copies of some
    # of our local functions that live in `utils`
    # since we can't unpickle elsewhere functions
    # which are defined in __main__
    args = (
        X,  # the full dataset
        utils.MLP,  # the module class to use for inference
        [INPUT_SIZE, HIDDEN_SIZES],  # arguments to initialize the module
        utils.do_some_inference,  # the inference funcntion to use
        q,  # the queue to put the results in
        sync,  # a task synchronizer
        jobs_per_gpu,
        num_gpus
    )

    # spawn parallel jobs across all GPUs.
    # We have to host the `parallel_inference_task` function
    # in a separate module for the same pickling pickle
    # described above
    procs = torch.multiprocessing.spawn(
        utils.parallel_inference_task,
        args=args,
        nprocs=num_jobs,
        join=False
    )

    # wait to synchronize until all models load
    # so that we can compare throughput better
    while sync.value < num_jobs:
        time.sleep(0.01)

    # increment the synchronizer one more
    # time to kick off the jobs
    sync.value += 1
    progbar.start_task(task_id)

    # collect all the (unordered) inputs
    outputs = []
    while True:
        try:
            # try to get the next result in
            # in the queue and increment everything
            y = q.get_nowait()
            progbar.update(task_id, advance=len(y))
            outputs.append(y)
        except Empty:
            # if there's nothing in the queue and
            # all the jobs are dead, we're done
            if procs.join(0.01):
                break

    # concatenate the outputs and return
    return np.concatenate(outputs, axis=0)

Run this at a few different scales to see what we can achieve:

In [8]:
with utils.GpuUtilProgress([0, 1]) as progbar:
    y = parallel_inference(dataset, num_gpus=1, jobs_per_gpu=2, progbar=progbar)
    y = parallel_inference(dataset, num_gpus=1, jobs_per_gpu=4, progbar=progbar)
    y = parallel_inference(dataset, num_gpus=2, jobs_per_gpu=4, progbar=progbar)

Output()

It manages to increase GPU utilization pretty well! But:
- Framework specific
    - No help if we want to extend to other frameworks
    - Torch is pretty unique in having this functionality at all
- The code is complicated and required a lot of non-physics expertise to build
    - Non-trivial to reconstruct for new applications
- Extremely contrived example, breaks down in most real use cases
    - Explore a few cases to show how

## Throughput too low
#### _The constraints of our use case demand that we further reduce processing time by an order of magnitude_
- Not obvious how to simply extend this code to multi-node
- Scaling not dynamic
    - Have to pick a level of parallelism and hope the resources are available to use it

## Throughput too high
####  _Data generation process is slow, needs to be parallelized to saturate GPU throughput_

In [9]:
class ThrottledDataset(torch.utils.data.IterableDataset):
    def __init__(self, x, device_index=0):
        self.x = torch.from_numpy(x).cuda(device_index)

    def __iter__(self):
        self.it = iter(self.x)
        return self

    def __next__(self):
        x = next(self.it)
        time.sleep(0.001)
        return x


@torch.no_grad()
def do_some_throttled_inference(model, dataset, batch_size=8, device_index=0):
    gpu_dataset = ThrottledDataset(dataset, device_index)
    for x in torch.utils.data.DataLoader(gpu_dataset, batch_size=batch_size):
        y = model(x)
        yield y.cpu().numpy()

## Throughput too high
Reimplement parallel inference with our new inference function

In [10]:
def throttled_parallel_inference(X, num_gpus, jobs_per_gpu, progbar):
    num_jobs = num_gpus * jobs_per_gpu
    task_id = progbar.add_task(
        f"[cyan]{num_gpus} GPUs/{num_jobs} jobs",
        total=len(X),
        start=False
    )

    # we need special queue and value objects
    # specific to process spawning
    smp = torch.multiprocessing.get_context("spawn")
    q = smp.Queue()
    sync = smp.Value("d", 0.0)

    # pass a bunch of arguments into each
    # process that we need to spawn
    # note that we have to pass copies of some
    # of our local functions that live in `utils`
    # since we can't pickle functions defined
    # in __main__
    args = (
        X,  # the full dataset
        utils.MLP,  # the module class to use for inference
        [INPUT_SIZE, HIDDEN_SIZES],  # arguments to initialize the module
        utils.do_some_throttled_inference,  # the inference funcntion to use
        q,  # the queue to put the results in
        sync,  # a task synchronizer
        jobs_per_gpu,
        num_gpus
    )

    # spawn a bunch of parallel jobs across all GPUs
    # have to host the `parallel_inference_task` in
    # a separate module for weird multiprocessing reasons
    procs = torch.multiprocessing.spawn(
        utils.parallel_inference_task,
        args=args,
        nprocs=num_jobs,
        join=False
    )

    # wait to synchronize until all models load
    # so that we can compare throughput better
    while sync.value < num_jobs:
        time.sleep(0.01)

    # increment the starter to kick off the jobs
    sync.value += 1
    progbar.start_task(task_id)

    # collect all the (unordered) inputs
    outputs = []
    while True:
        try:
            y = q.get_nowait()
            progbar.update(task_id, advance=len(y))
            outputs.append(y)
        except Empty:
            # if there's nothing in the queue and
            # all the jobs are dead, we're done
            if procs.join(0.01):
                break

    # concatenate the outputs and return
    return np.concatenate(outputs, axis=0)

## Throughput too high
Low GPU utilization with local resources. Scale out or allow other users to leverage spare cycles

In [11]:
x = dataset[:2 * 10**4]  # this will take longer, so just do a few for demo purposes
with utils.GpuUtilProgress([0, 1]) as progbar:
    y = throttled_parallel_inference(x, 1, 1, progbar=progbar)
    y = throttled_parallel_inference(x, 1, 2, progbar=progbar)
    y = throttled_parallel_inference(x, 1, 4, progbar=progbar)

Output()

## Model ensembling
#### _Connecting multiple models in a single pipeline_
<div class="center">
    <img src="images/model_sharing.png" height="auto" width="400px" class="center" />
    <img src="images/model_ensemble.png" height="auto" width="400px" class="center" />
</div>

## Model ensembling
Naive implemenation

In [12]:
@torch.no_grad()
def do_some_multi_model_inference(models, dataset, batch_size=8, device_index=0):
    gpu_dataset = torch.from_numpy(dataset).cuda(device_index)
    dataset = torch.utils.data.TensorDataset(gpu_dataset)

    for [x] in torch.utils.data.DataLoader(dataset, batch_size=batch_size):
        for model in models:
            x = model(x)
        yield x.cpu().numpy()

noise_remover = utils.NoiseRemovalModel(INPUT_SIZE, [32, 16]).cuda(0)
models = [noise_remover, inference_model]

with utils.GpuUtilProgress(0) as progbar:
    task_id = progbar.add_task("[cyan]Ensemble inference", total=len(dataset))
    for y in do_some_multi_model_inference(models, dataset):
        progbar.update(task_id, advance=len(y))

Output()

## Model Ensembling

Naive implementation works, but extending to parallel execution efficiently is non-trivial:

- Each model should execute asynchronously
- Each model will need _different_ levels of parallelism to alleviate bottlenecks
- Models should pass tensors between GPUs to best utilize hardware
- If the models utilize different frameworks, this problem becomes exponentially harder

<figure>
    <img src="images/bottleneck-both.png" height="auto" width="800px"/>
    <figcaption>Model 1 throughput too high for model 2, need to run more concurrent instances of model 2 to maximize throughput</figcaption>
</figure>

## Distribution
#### _Who do you want to use your model?_

- How much expertise should someone need to have to utilize your model in their pipeline?
- How much do they need to know about how your model is implemented?
- How will they be kept up-to-date when you retrain the model or improve the architecture?
    - Will these updates change their pipeline?
- What if they don't have access to accelerators?

#### Takeaways so far:
- Efficiently scheduling cross-platform, multi-GPU, multi-model asynchronous DL inference is hard
- Inference is just one piece of your pipeline. Really even just one line:
```python
y = model(x)
```
    How and where `model(x)` happens is largely irrelevant to everything else

So:
- Manage this piece separately to hide these details
- Scale it to meet the rate at which you can generate `x`s or how quickly you need `y`s

## Inference-as-a-Service
The **inference-as-a-service** (Iaas) paradigm addresses these issues
- Out-of-the-box software optimized for efficiently executing complex asynchronous workloads across devices
    - Hardware _and_ framework agnostic
- Exposes models for inference to **client** pipelines via standardized APIs
    - Pipeline code stays the same even as the model changes or the service moves
    - Centralized model repositories keep all clients on the same page
- Containerization makes deployments portable to meet workload demands
    - Minimizes environment management overhead
    - Integration with container management servicse like Kubernetes leads to easy scaling

Traditional pipeline pseudocode
```python
# need to define the architecture somewhere
# that users can get access to it
from model_zoo import MyModel

# load in some parameters to make sure we
# initialize the model correctly
with open("path/to/init/args.pickle", "rb") as f:
    args = pickle.load(f)
model = MyModel(**args)

# load in the latest checkpoint we know of
model.load_weights("path/to/latest/weights.h5")

for x in dataset:
    # this syntax will differ depending on the framework
    x = move_array_to_gpu(x)
    y = model(x)
    y = move_output_to_cpu(y)
    do_downstream_postprocessing(y)
```

Iaas
```python
import tritonclient.grpc as triton

# connect to the service at some url
# pipeline never needs to touch model itself
client = triton.InferenceServerClient("0.0.0.0:8001")

# build a protobuf message representing the input
metadata = client.get_model_metadata("my_model").inputs[0]
input = triton.InferInput(
    metadata.name, metadata.shape, metadata.datatype
)

for x in dataset:
    input.set_data_from_numpy(x)

    # use the latest available version of the model
    y = client.infer("my_model", inputs=[input])
    do_downstream_postprocessing(y)
```

The difference is that, _as is_, this code gets you:
- As much scale as you want
- on whatever hardware you want
- using whatever backend framework you want
- wherever you want
- and can receive updates without interrupting service

```python
import tritonclient.grpc as triton

# connect to the service at some url
# pipeline never needs to touch model itself
client = triton.InferenceServerClient("0.0.0.0:8001")

# build a protobuf message representing the input
metadata = client.get_model_metadata("my_model").inputs[0]
input = triton.InferInput(
    metadata.name, metadata.shape, metadata.datatype
)

for x in dataset:
    input.set_data_from_numpy(x)

    # use the latest available version of the model
    y = client.infer("my_model", inputs=[input])
    do_downstream_postprocessing(y)
```

In [13]:
from gravswell import quiver as qv

repo = qv.ModelRepository("my-repo")
entry = repo.add("my-model", platform=qv.Platform.ONNX)
entry.config

name: "my-model"
platform: "onnxruntime_onnx"

In [14]:
inference_model.to("cpu")
export_path = entry.export_version(
    inference_model,
    input_shapes={"x": (None, INPUT_SIZE)},
    output_names=["y"]
)
entry.config

name: "my-model"
platform: "onnxruntime_onnx"
input {
  name: "x"
  data_type: TYPE_FP32
  dims: -1
  dims: 64
}
output {
  name: "y"
  data_type: TYPE_FP32
  dims: -1
  dims: 1
}

In [15]:
utils.print_tree("my-repo")

my-repo/
    my-model/
        1/
            model.onnx
        config.pbtxt


In [16]:
entry.export_version(inference_model)
utils.print_tree("my-repo")

my-repo/
    my-model/
        1/
            model.onnx
        2/
            model.onnx
        config.pbtxt


In [17]:
entry.config.add_instance_group(count=4)
entry.config

name: "my-model"
platform: "onnxruntime_onnx"
input {
  name: "x"
  data_type: TYPE_FP32
  dims: -1
  dims: 64
}
output {
  name: "y"
  data_type: TYPE_FP32
  dims: -1
  dims: 1
}
instance_group {
  count: 4
  kind: KIND_GPU
}

In [18]:
repo.delete()

In [19]:
repo = qv.ModelRepository("gs://ligo-quiver-demo")
entry = repo.add("my-model", platform=qv.Platform.ONNX)
entry.config.add_instance_group(count=4)
export_path = entry.export_version(
    inference_model,
    input_shapes={"x": (None, INPUT_SIZE)},
    output_names=["y"]
)
! gsutil ls gs://ligo-quiver-demo/my-model

gs://ligo-quiver-demo/my-model/config.pbtxt
gs://ligo-quiver-demo/my-model/1/


In [20]:
from gravswell.cloudbreak import google as cb
from google.cloud import container_v1 as container

manager = cb.ClusterManager(project="gunny-multi-instance-dev", zone="us-east4-b")

cluster_config = container.Cluster(
    name="ligo-demo-cluster",
    node_pools=[container.NodePool(
        name="default-pool",
        initial_node_count=2,
        config=container.NodeConfig()
    )]
)
cluster = manager.create_resource(cluster_config)
cluster.deploy_gpu_drivers()

Output()

In [21]:
node_pool_config = container.NodePool(
    name="tritonserver-t4-pool",
    initial_node_count=1,
    config=cb.create_gpu_node_pool_config(
        vcpus=16,
        gpus=4,
        gpu_type="t4"
    )
)
node_pool = cluster.create_resource(node_pool_config)

Output()

In [22]:
cluster.deploy(
    "triton.yaml",
    name="tritonserver",
    tag="20.11",
    bucket="ligo-quiver-demo",
    gpus=4,
    vcpus=15  # at least come cpu has to go to running kubernetes
)
cluster.k8s_client.wait_for_deployment("tritonserver")

Output()

In [23]:
import tritonclient.grpc as triton

ip = cluster.k8s_client.wait_for_service("tritonserver")
client = triton.InferenceServerClient(f"{ip}:8001")
assert client.is_server_live()
assert client.is_model_ready("my-model")

Output()

In [24]:
batch_size = 8

metadata = client.get_model_metadata("my-model").inputs[0]
shape = [i if i != -1 else batch_size for i in metadata.shape]
input = triton.InferInput(metadata.name, shape, metadata.datatype)

with utils.GpuUtilProgress() as progress:
    task_id = progress.add_task("Inference", total=len(dataset))
    def callback(result, error):
        y = result.as_numpy("y")
        progress.update(task_id, advance=len(y))

    num_batches = len(dataset) // batch_size
    for x in np.split(dataset, num_batches):
        input.set_data_from_numpy(x)
        client.async_infer("my-model", inputs=[input], callback=callback)

    while not progress._tasks[task_id].finished:
        time.sleep(0.1)

Output()