## Performance Test for Multi-Model Inference
This notebook contains examples of performance measurements for various use cases involving multiple AI models:
* Baseline: performance of each model running individually
* All models running combined in a single thread
* All models running combined in multiple threads

This script works with the following inference options:

1. Run inference on DeGirum Cloud Platform;
2. Run inference on DeGirum AI Server deployed on a localhost or on some computer in your LAN or VPN;
3. Run inference on DeGirum ORCA accelerator directly installed on your computer.

To try different options, you just need to uncomment **one** of the lines in the code below.

You also need to specify your cloud API access token, cloud zoo URLs, and AI server hostname in [env.ini](env.ini) file, located in the same directory as this notebook.

#### Specify test options here

In [None]:
import mytools

# list of models to test
model_names = [
    "yolo_v5s_pet_det--512x512_quant_n2x_orca_1",
    "mobilenet_v1_imagenet--224x224_quant_n2x_orca_1",
    "mobilenet_v2_ssd_coco--300x300_quant_n2x_orca_1",
]
iterations = 100 if not mytools.get_test_mode() else 10  # how many iterations to run for each model
use_jpeg = True  # use JPEG or bitmap model input
exclude_preprocessing = True  # exclude preprocessing step from timing measurements
batch_sizes = [2, 4, 8, 16]  # eager batch sizes to test


#### Specify where do you want to run your inferences

In [None]:
import degirum as dg, mytools

cloud_token = mytools.get_token()  # get cloud API access token from env.ini file
cloud_zoo_url = mytools.get_cloud_zoo_url()  # get cloud zoo URL from env.ini file

#
# Please UNCOMMENT only ONE of the following lines to specify where to run AI inference
#

# 1. Inference on the DeGirum Cloud Platform
zoo = dg.connect(dg.CLOUD, cloud_zoo_url, cloud_token)

# 2. Inference on DeGirum AI Server deployed on a localhost or on some computer in your LAN or VPN
# zoo = dg.connect(mytools.get_ai_server_hostname(), cloud_zoo_url, cloud_token)

# 3. Inference on DeGirum ORCA accelerator installed on your computer
# zoo = dg.connect(dg.LOCAL, cloud_zoo_url, cloud_token)


#### The rest of the cells below should run without any modifications

In [None]:
import threading

# create models and input data
data = []
models = []
for model_name in model_names:
    model = zoo.load_model(model_name)
    model.image_backend = "opencv"  # select OpenCV backend
    model.input_numpy_colorspace = "BGR"
    model._model_parameters.InputImgFmt = ["JPEG" if use_jpeg else "RAW"]
    model.measure_time = True
    models.append(model)

    frame = "./images/TwoCats.jpg"
    if exclude_preprocessing:
        frame = model._preprocessor.forward(frame)[0]
    data.append(frame)

# define source of frames
def source(mi):
    for fi in range(iterations):
        yield data[mi]


# define timing results printer
def print_results(results):
    header = f"\n{' ':50} : " + " : ".join([f"{b:5}" for b in batch_sizes]) + "\n"
    lat = "Latency vs batch size (ms)" + header
    fps = "FPS vs batch size" + header

    add_lat = lambda result: f" : {result['time_stats']['FrameTotalDuration_ms'].avg:5.1f}"
    add_fps = lambda result: f" : {iterations / result['elapsed']:5.1f}"

    for model_name, model_batch_results in results.items():
        lat += f"{model_name:50}"
        fps += f"{model_name:50}"
        if "elapsed" in model_batch_results:
            lat += add_lat(model_batch_results)
            fps += add_fps(model_batch_results)
        else:
            for batch, model_result in model_batch_results.items():
                lat += add_lat(model_result)
                fps += add_fps(model_result)

        lat += "\n"
        fps += "\n"

    print(lat)
    print(fps)


#### Baseline maximum throughput and lowest latency of each model when running individually

To measure throughput, batch inference mode with big batch size is used.

To measure latency, single-frame inference mode is used.

In [None]:
def measure_baseline():
    ret = {model_name: {} for model_name in model_names}

    prog = mytools.Progress(len(model_names) * 2, speed_units="steps/s")

    for mi, model_name in enumerate(model_names):

        with models[mi] as model:
            model.eager_batch_size = 16
            model.frame_queue_depth = 16

            model(data[mi])  # run model once to warm up the system

            # batch predict: measure throughput
            t = mytools.Timer()
            for res in model.predict_batch(source(mi)):
                pass
            elapsed = t()
            prog.step()

            # single-frame predict: measure latency
            model.reset_time_stats()
            for d in source(mi):
                model(d)
            prog.step()

            # batch predict: measure throughput
            ret[model_name] = {
                "elapsed": elapsed,
                "time_stats": model.time_stats(),
            }

    return ret


baseline_results = measure_baseline()
print("Baseline results:")
print(f"{'Model name':50} : {'FPS':5} : {'Latency (ms)':5}")
for model_name, result in baseline_results.items():
    print(
        f"{model_name:50} : {iterations / result['elapsed']:5.1f} : {result['time_stats']['FrameTotalDuration_ms'].avg:5.1f}"
    )


#### Batch predict performance of all models running in parallel in multiple threads
This use case gives highest possible throughput when switching multiple models

In [None]:
def measure_batch_predict_mt():
    ret = {}
    for model_name in model_names:
        ret[model_name] = {}

    nmodels = len(model_names)

    prog = mytools.Progress(len(batch_sizes), speed_units="steps/s")
    for batch in batch_sizes:

        barr = threading.Barrier(nmodels)

        def run_one_model(mi):
            with models[mi] as model:
                model.eager_batch_size = batch
                model.frame_queue_depth = batch

                model(data[mi])  # run model once to warm up the system
                model.reset_time_stats()

                barr.wait()
                t = mytools.Timer()
                for res in model.predict_batch(source(mi)):
                    pass

                ret[model_names[mi]][batch] = {
                    "elapsed": t(),
                    "time_stats": model.time_stats(),
                }

        threads = [
            threading.Thread(target=run_one_model, args=(mi,)) for mi in range(nmodels)
        ]

        [thread.start() for thread in threads]
        [thread.join() for thread in threads]

        prog.step()

    return ret


batch_predict_mt_results = measure_batch_predict_mt()
print_results(batch_predict_mt_results)


#### Single predict performance of all models running in parallel in multiple threads
This use case gives smallest possible latency when switching multiple models

In [None]:
def measure_single_predict_mt():
    ret = {model_name: {} for model_name in model_names}
    nmodels = len(model_names)

    prog = mytools.Progress(len(batch_sizes), speed_units="steps/s")
    for batch in batch_sizes:

        barr = threading.Barrier(nmodels)

        def run_one_model(mi):
            with models[mi] as model:
                model.eager_batch_size = batch
                model.frame_queue_depth = batch

                model(data[mi])  # run model once to warm up the system
                model.reset_time_stats()

                barr.wait()
                t = mytools.Timer()
                for d in source(mi):
                    model(d)

                ret[model_names[mi]][batch] = {
                    "elapsed": t(),
                    "time_stats": model.time_stats(),
                }

        threads = [
            threading.Thread(target=run_one_model, args=(mi,)) for mi in range(nmodels)
        ]

        [thread.start() for thread in threads]
        [thread.join() for thread in threads]
        prog.step()

    return ret


single_predict_mt_results = measure_single_predict_mt()
print_results(single_predict_mt_results)
