# Bringing AI to Gravitational Wave Physics
## Alec Gunny, Dylan Rankin, Jeffrey Krupa, Muhammed Saleem, Tri Nguyen, Michael Coughlin, Philip Harris, Erik Katsavounidis, Steven Timm, Burt Holzman

<div class="center">
    <img src="images/institute-logos.png" height="auto" width="600px" />
</div>

<div class="padded">
    <div class="padded">
        <sub><sup><a href="https://arxiv.org/abs/2108.12430">https://arxiv.org/abs/2108.12430</a></sup></sub>
    </div>
</div>

## Overview
- Introduction
- A naive pipeline
- Inference-as-a-Service
- LIGO use cases
- Next steps

# Introduction
## Deep Learning in GW Physics

## Multi-Messenger Astrophysics (MMA)
Use signals detected in gravitational strain to alert observers of other messengers for followup

<div class="center">
    <img src="images/mma.png" height="auto" width="1000px" />
</div

## LIGO trigger system
<img src="images/trigger.png" height="auto" width="600px" class="left"/>
<img src="https://research.cs.wisc.edu/htcondor/manual/v7.6/img7.png" height="auto" width="400px" class="right" />

## LIGO use cases - DeepClean
- Noise regression in the gravitational wave detector data from instrumental artifacts and environmental contamination.
- Uses data from on-site sensors monitoring the instrumental and environmental signals (witness channels) 
- Makes noise predictions using the couplings between witness channels and  GW data. 
- Generic enough to subtract linear, non-linear, and non-stationary coupling mechanisms.
- Astrophysical signals remain unaffected by the subtraction 


<figure>
    <img src="images/deepclean-plots.png" height="auto" width="400px" />
    <figcaption>Ormiston, Rich, et al. “Noise Reduction in Gravitational-Wave Data via Deep Learning.” Physical Review Research, vol. 2, no. 3, July 2020, p. 033066. arXiv.org, doi:10.1103/PhysRevResearch.2.033066</figcaption>
</figure>

## LIGO use cases - BBHnet
- Archetypal search for binary blackhole mergers using strain data from both detectors
- Paper forthcoming

<img src="images/bbh-plot.png" height="auto" width="600px" />

## Real-time inference with deep learning
<div class="center">
    <img src="images/dc-offline-no-legend.png" height="auto" width="800px" />
</div>

- Massive reductions in processing time
- Not necessarily trivial to achieve
- In this talk, we'll outline a computing paradigm for achieving accelerated processing at scale with deep learning

# The traditional deep learning inference paradigm
### Building an example pipeline using PyTorch
- Illustrate the challenges outlined above
- Motivate the requirements of an improved paradigm

Begin with a few imports

In [1]:
import time
from concurrent.futures import ThreadPoolExecutor
from queue import Empty, Queue

import numpy as np
import torch

# some functionality will be necessarily
# buried in here. If you're interested,
# feel free to take a look
import utils

Define our model: a simple multi-layer perceptron

In [2]:
class MLP(torch.nn.Module):
    def __init__(self, input_size, hidden_sizes):
        super().__init__()

        self.layers = torch.nn.ModuleList()
        for size in hidden_sizes:
            self.layers.append(torch.nn.Linear(input_size, size))
            self.layers.append(torch.nn.ReLU())
            input_size = size

        self.layers.append(torch.nn.Linear(input_size, 1))
        self.layers.append(torch.nn.Sigmoid())

    def forward(self, x: torch.tensor) -> torch.tensor:
        for layer in self.layers:
            x = layer(x)
        return x

During training:
- instantiate an instance of this model then optimize its parameters
- export these optimized parameters for use later

In [3]:
INPUT_SIZE = 64
HIDDEN_SIZES = [256, 128, 64]
model = MLP(INPUT_SIZE, HIDDEN_SIZES)

# typically do some training here
# for i in range(num_epochs):
#    for x in dataset:
#        do_a_gradient_step(model, x)
# now our model has optimized parameters

# export these parameter values somewhere
torch.save(model.state_dict(), "model.pt")

At inference time, load in these optimized model weights and use them to map inputs to outputs

In [4]:
# build the operations that constitute a model
inference_model = MLP(INPUT_SIZE, HIDDEN_SIZES)

# set the values of the parameters of these
# operations to their optimized values
inference_model.load_state_dict(torch.load("model.pt"))

# create an array of input data to process
x = np.random.randn(INPUT_SIZE).astype("float32")
with torch.no_grad():
    # use the model infer predictions on the input data
    y = inference_model(torch.from_numpy(x))

# do some downstream processing on the data
y.numpy()

array([0.4779905], dtype=float32)

## Hetergeneous computing
- Often faster to use **accelerated** hardware like GPUs or FPGAs for computing inference
- Systems with multiple types of processors, e.g. CPUs and GPUs, referred to as **hetergeneous**

<img src="images/cpu-inference.png" height="auto" width="500px" class="left"/>
<img src="images/hetergeneous-inference.png" height="auto" width="500px" class="right">

## Heterogeneous inference

In [5]:
# move the model to the GPU
inference_model.cuda(0)

with torch.no_grad():
    # move the data on to the GPU
    x = torch.from_numpy(x).cuda(0)

    # execute inference on the GPU
    y = inference_model(x)

# move these predictions back to the CPU
# for downstream processing
y.cpu().numpy()

array([0.47799045], dtype=float32)

#### Inference on a dataset
- Generally interested in using the model for many thousands or millions of inferences
- Start with the simplest case: a dataset that can fit into memory at once

In [6]:
N = 5 * 10**5  # number of observations in our dataset
dataset = np.random.randn(N, INPUT_SIZE).astype("float32")

@torch.no_grad()
def do_some_inference(model, dataset, batch_size=8, device_index=0):
    # move the data to the GPU in bulk
    gpu_dataset = torch.from_numpy(dataset).cuda(device_index)

    # iterate through it in batches and yield predictions
    dataset = torch.utils.data.TensorDataset(gpu_dataset)
    for [x] in torch.utils.data.DataLoader(dataset, batch_size=batch_size):
        y = model(x)
        yield y.cpu().numpy()

# run through the dataset and get a rough time estimate
%time outputs = [y for y in do_some_inference(inference_model, dataset)]

CPU times: user 17.7 s, sys: 141 ms, total: 17.9 s
Wall time: 17.9 s


Seems to work pretty fast, but how well are we utilizing the GPU?

In [7]:
with utils.GpuUtilProgress(gpu_ids=0) as progbar:
    task_id = progbar.add_task("[cyan]Inference", total=N)

    outputs = []
    for y in do_some_inference(inference_model, dataset):
        outputs.append(y)
        progbar.update(task_id, advance=len(y))

output = np.concatenate(outputs, axis=0)

Output()

Yikes, only around 25%!
- GPUs are expensive, can we improve things via parallel execution (assuming we can't change the batch size)?
- First attempt: Naive (and sloppy) implementation using threading:

In [8]:
q = Queue()

def task(dataset_chunk, device_index):
    # for each inference task, create a copy of the
    # model on the indicated GPU device
    model = MLP(INPUT_SIZE, HIDDEN_SIZES).cuda(device_index)
    model.load_state_dict(torch.load("model.pt"))

    # iterate through our dataset and send
    # the results back to the main thread
    for y in do_some_inference(
        model, dataset_chunk, device_index=device_index
    ):
        q.put(y)

Run this task on multiple parallel threads (if we tried to use processes, Torch would complain):

In [9]:
num_jobs = 4

with utils.GpuUtilProgress(0) as progbar:
    task_id = progbar.add_task(f"Inference with {num_jobs} jobs", total=N)

    # create a pool of threads to do inference in parallel
    with ThreadPoolExecutor(4) as pool:
        # split the dataset into chunks and submit
        # inference on them as tasks to the pool
        [pool.submit(task, x, 0) for x in np.array_split(dataset, num_jobs)]

        # iterate through the dataset
        outputs = []
        while len(outputs) < N:
            outputs.extend(q.get())
            progbar.update(task_id, completed=len(outputs))

Output()

It looks like things actually got worse!
- Extracting good GPU performance is rarely simple or intuitive
- What next?

After spending a few hours perusing the PyTorch documentation and forums and experimenting, we come up with the following basic functional implementation:

In [10]:
def parallel_inference(X, num_gpus, jobs_per_gpu, progbar):
    num_jobs = num_gpus * jobs_per_gpu
    task_id = progbar.add_task(
        f"[cyan]{num_gpus} GPUs/{num_jobs} jobs",
        total=len(X),
        start=False
    )

    # we need special queue and value objects
    # specific to process spawning
    smp = torch.multiprocessing.get_context("spawn")
    q = smp.Queue()
    sync = smp.Value("d", 0.0)

    # pass a bunch of arguments into each
    # process that we need to spawn
    # note that we have to pass copies of some
    # of our local functions that live in `utils`
    # since we can't unpickle elsewhere functions
    # which are defined in __main__
    args = (
        X,  # the full dataset
        utils.MLP,  # the module class to use for inference
        [INPUT_SIZE, HIDDEN_SIZES],  # arguments to initialize the module
        utils.do_some_inference,  # the inference funcntion to use
        q,  # the queue to put the results in
        sync,  # a task synchronizer
        jobs_per_gpu,
        num_gpus
    )

    # spawn parallel jobs across all GPUs.
    # We have to host the `parallel_inference_task` function
    # in a separate module for the same pickling pickle
    # described above
    procs = torch.multiprocessing.spawn(
        utils.parallel_inference_task,
        args=args,
        nprocs=num_jobs,
        join=False
    )

    # wait to synchronize until all models load
    # so that we can compare throughput better
    while sync.value < num_jobs:
        time.sleep(0.01)

    # increment the synchronizer one more
    # time to kick off the jobs
    sync.value += 1
    progbar.start_task(task_id)

    # collect all the (unordered) inputs
    outputs = []
    while len(outputs) < N:
        try:
            # try to get the next result in
            # in the queue and increment everything
            outputs.extend(q.get_nowait())
            progbar.update(task_id, completed=len(outputs))
        except Empty:
            # if the queue is empty _and_ all the jobs
            # are dead, we're done here
            if procs.join(0.01):
                break

    # concatenate the outputs and return
    return np.stack(outputs, axis=0)

How does GPU usage and time-to-completion scale with the number of jobs/GPUs?

In [11]:
with utils.GpuUtilProgress([0, 1]) as progbar:
    y = parallel_inference(dataset, num_gpus=1, jobs_per_gpu=2, progbar=progbar)
    y = parallel_inference(dataset, num_gpus=1, jobs_per_gpu=4, progbar=progbar)
    y = parallel_inference(dataset, num_gpus=2, jobs_per_gpu=4, progbar=progbar)

Output()

Things seem to scale pretty well! But this is still far from ideal:
- Framework specific
    - No help if we want to extend to other frameworks
    - Torch is pretty unique in having this functionality at all
- The code is complicated and required a lot of non-physics expertise to build
    - Non-trivial to reconstruct for new applications
- Extremely contrived example, breaks down in most real use cases
    - Explore a few cases to show how

### Throughput too low
#### _The constraints of our use case demand that we further reduce processing time by an order of magnitude_
- Not obvious how to simply extend this code to multi-node
- Scaling not dynamic
    - Have to pick a level of parallelism and hope the resources are available to use it

### Throughput too high
####  _Data generation process is slow, needs to be parallelized to saturate GPU throughput_
- Low GPU utilization with local resources. Not obvious how this code can:
    - Scale to multiple clients
    - Allow other users to leverage spare cycles

### Model ensembling
#### _Connecting multiple models in a single pipeline_
<div class="center">
    <img src="images/model_sharing.png" height="auto" width="400px" class="center" />
    <img src="images/model_ensemble.png" height="auto" width="400px" class="center" />
</div>

### Model ensembling
Naive implemenation

In [12]:
@torch.no_grad()
def do_some_multi_model_inference(noise_remover, model, dataset, batch_size=8, device_index=0):
    gpu_dataset = torch.from_numpy(dataset).cuda(device_index)
    dataset = torch.utils.data.TensorDataset(gpu_dataset)

    for [x] in torch.utils.data.DataLoader(dataset, batch_size=batch_size):
        cleaned = noise_remover(x)
        y = model(cleaned)
        yield x.cpu().numpy()

noise_remover = utils.NoiseRemovalModel(INPUT_SIZE, [32, 16]).cuda(0)
with utils.GpuUtilProgress(0) as progbar:
    task_id = progbar.add_task("[cyan]Ensemble inference", total=N)
    for y in do_some_multi_model_inference(noise_remover, inference_model, dataset):
        progbar.update(task_id, advance=len(y))

Output()

## Model Ensembling

Once again, it _works_, but scaling it up is non-trivial:
- Models may require different levels of parallelism to keep any one model from bottlenecking the other (top on right)
- Most efficient implementation would have models executing asynchronously, with tensors passed between GPUs (bottom on right)
- If the models utilize different frameworks, this problem becomes exponentially harder

<div class="padded">
    <figure>
        <img src="images/bottleneck-both.png" height="auto" width="800px"/>
        <figcaption>Model 1 throughput too high for model 2, need to run more concurrent instances of model 2 to maximize throughput</figcaption>
    </figure>
    <figure>
        <img src="images/async.png" height="auto" width="800px"/>
        <figcaption>Model 2 computes inference on last output from Model 1, while Model 1 begins inference on next input</figcaption>
    </figure>
<div>

## Distribution
#### _Who do you want to use your model?_

- How much expertise should someone need to have to utilize your model in their pipeline?
- How much do they need to know about how your model is implemented?
- How will they be kept up-to-date when you retrain the model or improve the architecture?
    - Will these updates change their pipeline?
- What if they don't have access to accelerators?

## Takeaways
- Efficiently scheduling cross-platform, multi-GPU, multi-model asynchronous DL inference is hard
    - Not going to get 3 orders of magnitude with this
- Inference is just one piece of your pipeline. Really even just one line:
```python
y = model(x)
```
    How and where `model(x)` happens is largely irrelevant to everything else

So:
- Manage this piece separately to hide these details
- Scale it to meet the rate at which you can generate `x`s or how quickly you need `y`s

### Not just inference problems
- Large-scale offline validation required to quantify advantages of new research and automate model re-training
    - Faster processing times mean faster iteration on novel ideas
- Deployment on "real", uncontrolled data critical to:
    - identifying failure modes and improving understanding of model behavior
    - evaluating the correlation between validation metrics and true success criterion
        - Identify and remove signal leakage in training pipelines
        - Align training and test settings - can our algorithm optimize our latency/throughput/cost function better than its alternative?
- O4 run will collect ~2 TB/day/detector
    - Answering these questions now will ensure that we have the right combination of systems/algorithms in place to extract as much physics as possible

# Inference-as-a-Service
### An alternative paradigm

### Deep Learning Inference - Challenges
<div class="float200 padded">
    <img src="images/hardware-logos.png" width="300px" height="auto" class="right unpadded"/>
    <div class="left">
        <p>Access to and familiarity with accelerated hardware </p>
        <p>(GPUs, TPUs, FPGAs)</p>
    </div>
</div>

<div>
    <img src="images/framework-logos.png" width="300px" height="auto" class="left padded" />
    <p class="right"> Managing, leveraging, and translating across deep learning software stacks
</div>

<div></div>
<div></div>
<div class="float100">
    <img src="images/distributing-nns.png" width="250px" height="auto" class="right unpadded"/>
    <p class="left">Distributing updated models to dependent users and applications</p>
</div>

## Inference-as-a-Service
The **inference-as-a-service** (Iaas) paradigm addresses these issues
- Out-of-the-box software optimized for efficiently executing complex asynchronous workloads across devices
    - Hardware _and_ framework agnostic
- Exposes models for inference to **client** pipelines via standardized APIs
    - Pipeline code stays the same even as the model changes or the service moves
    - Centralized model repositories keep all clients on the same page
- Containerization makes deployments portable to meet workload demands
    - Minimizes environment management overhead
    - Integration with container management servicse like Kubernetes leads to easy scaling

## Deployment scenarios
- IaaS represents a _software_ model for managing inference execution on heterogenous _hardware_
- Not tied to any _particular_ hardware platform or deployment location
    - Tune to meet the needs of each use case

<div class="center">
    <img src="images/triton-ldg.png" height="auto" width="350px" class="left" />
    <img src="images/triton-cloud.png" height="auto" width="350px" class="right" />
</div>

### [Triton Inference Server](https://github.com/triton-inference-server/server)
<div class="float100">
    <img src="https://developer-blogs.nvidia.com/wp-content/uploads/2020/12/triton-1.png" width="400px" height="auto" class="right padded"/>
        <p>Off-the-shelf inference service developed and maintained by NVIDIA.</p>
</div>
<div class="unpadded">
    <list>
        <li class="explicit">Efficient scheduling of GPU resources</li>
        <li class="explicit">Multiple framework backend support</li>
        <li class="explicit">Pre-built containers released monthly to simplify dependency management</li>
        <li class="explicit">Dynamic model versioning and ensemble scheduling</li>
        <li class="explicit">Separately installable <a href="https://github.com/triton-inference-server/clien">client libraries</a>
    </list>
</div>

## Latency, Throughput, Cost
Fundamental trade-off in IaaS deployments
- **latency** - time required to complete any single inference request
- **throughput** - rate at which inference requests can be completed
- **cost** - expense incurred for resources associated with inference

<div class="padded">
    <img src="images/metrics.png" height="auto" width="700px" />
</div>

## Latency, Throughput, and Cost
- Functions of deployment environment and configuration
- Optimal value defined by constraints of use case
- Measure this function in your environment to find optimal operating point

<div class="padded">
    <img src="images/latency-throughput-cost.png" height="auto" width="700px" />
</div>

# LIGO use cases
## Results processing LIGO data with IaaS

## IaaS challenges with streaming time series

- Inference happens on fixed length **snapshots** of the time series at one moment in time
- Snapshots are sampled at some **inference sampling rate** $r \leq f_s$
    - Higher $r$ increases compute load $\rightarrow$ shift in latency/throughput/cost surface
    - Potential benefits to prediction quality, lower latency
- Overlapping data creates enormous I/O load

<img src="images/snapshotter_overlap.png" height="auto" width="600px"/>

## Challenges with streaming time series

- Introduce a model on server which maintains the most recent snapshot as a "state"
    - Only need to stream state updates of _new_ data
    - Improves I/O, but introduces a serial step which can limit parallelism

<div class="center">
    <img src="images/snapshotter_action.png" height="auto" width="180px" class="unpadded" />
</div>

## Offline Pipelines
- For each server, assign $k$ clients
    - With $n$ servers, each client is assigned $\frac{1}{nk}$ of the total dataset to process
    - Each client has an associated snapshotter state, must be routed to same server
- Inference sampling rate has no bearing on data generation rate
    - Determines the _number_ of frames needed to process for a given length of time

## DeepClean Offline
- Processed ~1 month of data during O3
<div class="center padded">
    <img src="images/dc-offline.png" height="auto" width="1200px" />
</div>

## Deepclean + BBHnet Offline
Implemented using several different frameworks:
<div class="center">
    <img src="images/ensemble.png" height="auto" width="1000px" />
</div>

## Deepclean + BBHnet Offline
- Processed ~27 hours of data during O2
- Strong scaling + elastic demand makes cloud ideal environment
    - Cost nearly constant as a function of GPU usage
    - Optimal scale $\rightarrow\infty$
    - Trade-off shifts to prediction quality from higher $r$ vs. cost, need to quantify


<img src="images/e2e-offline.png" height="800px" width="1200px" class="padded"/>

## HEPCloud
Framework from HEP community for sustained, high-throughput inference using Condor APIs
<div class="center">
    <img src="images/hepcloud.png" height="auto" width="800px" />
</div>

## DeepClean Online
- Overlapping outputs due to streaming time series
    - "Fully online" inference scheme causes negative impacts on PSD
    - Can trade off some latency for improved quality through aggregation
    - Working on improving trade-off via better training

<img src="images/dc-output-overlap.png" height="auto" width="300px" class="padded" />
<img src="images/psd-latency.png" height="auto" width="500px" />

# DeepClean Online
- Run on shared memory data replay frames
- Inference sampling rate $r$ dictates _average_ data generation rate
    - Frames become available in 1 second increments, true data generation rate is roughly a square wave
- One data stream = one snapshotter
    - Serial update causes queue build-up, no benefit to extra scale downstream
    - Optimizing this step highest priority

<img src="images/dc-online.png" height="auto" width="1200px" class="padded"/>

# Next Steps
## Taking IaaS into production at LIGO

## Expanding Scope
- IaaS model applicable to a wide range of applications
- Building tools and infrasturcture for easy integration of newer and larger models
    - <a href="https://github.com/fastmachinelearning/gw-iaas">https://github.com/fastmachinelearning/gw-iaas</a>

<div class="float100">
    <img src="images/a3d3.png" height="auto" width="400px" class="left" />
    <img src="images/venn-diagram.png" height="auto" width="400px" class="right" />
</div>

## DeepClean Online Production Deployment
<div class="center">
    <img src="images/dc-production.png" height="auto" width="800px" class="left" />
</div>

# Thank you