In [1]:
import os
import shutil


def create_model_files(k):

#     dir_path = f"./models/add10-{k}"
#     if not os.path.exists(dir_path):
#         os.makedirs(dir_path)

#     model_file_path = os.path.join(dir_path, "model.py")
#     if not os.path.exists(model_file_path):
#         shutil.copy(f"./models/add10/model.py", model_file_path)

#     model_settings_file_path = os.path.join(dir_path, "model-settings.json")
#     if not os.path.exists(model_settings_file_path):
#         with open(model_settings_file_path, "w") as model_settings_file:
#             model_settings_file.write("""{
#     "name": "add10-""" + str(k) + """",
#     "implementation": "model.Add10",
#     "parameters": {
#         "version": "v0.0.1"
#     }
# }
#         """)

    model_yaml_path = f"./models/add10-{k}.yaml"
    if not os.path.exists(model_yaml_path):
        with open(model_yaml_path, "w") as model_yaml:
            model_yaml.write("""apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: add10-""" + str(k) + """
spec:
  storageUri: "gs://seldon-models/scv2/examples/latency-tests/mlserver/add10"
  requirements:
  - mlserver
  - python
        """)


def remove_model_files(k):
    dir_path = f"./models/add10-{k}"
    if os.path.exists(dir_path):
        shutil.rmtree(dir_path)

    model_yaml_path = f"./models/add10-{k}.yaml"
    if os.path.exists(model_yaml_path):
        os.remove(model_yaml_path)


def create_pipeline_file(k):
    pipeline_yaml_path = f"./pipelines/add10-{k}.yaml"

    file_content = """apiVersion: mlops.seldon.io/v1alpha1
kind: Pipeline
metadata:
  name: pipeline-add10-""" + str(k) + """
spec:
  steps:
    - name: add10-0
"""
    for i in range(1, k):
        file_content += """    - name: add10-""" + str(i) + """
      inputs:
        - add10-""" + str(i-1) + """
      tensorMap:
        add10-""" + str(i-1) + """.outputs.sum: INPUT
"""
    
    file_content += """  output:
    steps:
    - add10-""" + str(k-1) + """
"""

    if not os.path.exists(pipeline_yaml_path):
        with open(pipeline_yaml_path, "w") as pipeline_yaml:
            pipeline_yaml.write(file_content)


def remove_pipeline_file(k):
    pipeline_yaml_path = f"./pipelines/add10-{k}.yaml"
    if os.path.exists(pipeline_yaml_path):
        os.remove(pipeline_yaml_path)

In [25]:
import time
import json

def many_models_experiment(n_hops=1, n_repeats=1):
    for i in range(n_hops):
        create_model_files(i)

    create_pipeline_file(n_hops)

    for i in range(n_hops):
        !seldon model load -f ./models/add10-{i}.yaml

    !seldon pipeline load -f ./pipelines/add10-{n_hops}.yaml

    input_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
    request_string = '{"model_name":"add10", "inputs":[{"name":"INPUT","contents":{"int_contents":' + str(input_list) + '},"datatype":"INT32","shape":[1,' + str(len(input_list)) + ']}]}'
    expected_output_list = [x + 10 * n_hops for x in input_list]

    times = []
    try:
        for i in range(n_repeats):
            print(f"Starting call {i}...", end='')
            start = time.time()
            output = !seldon pipeline infer 'pipeline-add10-{n_hops}' --inference-mode grpc '{request_string}'
            end = time.time()
            print(" Done!")

            # validate output
            print(output)
            response_json = json.loads(output[0])
            output_list = response_json["outputs"][0]["contents"]["fp64Contents"]
            if not expected_output_list == output_list:
                raise ValueError(f"Expected {expected_output_list} but got {output}")

            times.append(end-start)

        return times
    except Exception as e:
        print(e)
        raise
    finally:
        # for i in range(n_hops):
        #     !seldon model unload add10-{i}

        # !seldon pipeline unload pipeline-add10-{n_hops}


        for i in range (n_hops):
            remove_model_files(i)
        remove_pipeline_file(n_hops)    


def single_model_experiment(n_hops=1, n_repeats=1):
    !seldon model load -f ./models/add10.yaml

    if n_hops not in [1, 10, 100, 1000]:
        raise ValueError(f"No defined test caller for {n_hops} hops")

    !seldon model load -f ./models/test-caller{n_hops}.yaml

    !seldon pipeline load -f ./pipelines/latency-test.yaml

    input_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
    request_string = '{"model_name":"test_caller_1", "inputs":[{"name":"INPUT0","contents":{"int_contents":' + str(input_list) + '},"datatype":"INT32","shape":[1,' + str(len(input_list)) + ']}]}'
    expected_output_list = [x + 10 * n_hops for x in input_list]

    times = []
    try:
        for _ in range(n_repeats):
            print(f"Starting call {i}...", end='')
            start = time.time()
            output = !seldon pipeline infer latency-test --inference-mode grpc '{request_string}'
            end = time.time()
            print(" Done!")

            # validate output
            print(output)
            response_json = json.loads(output[0])
            output_list = response_json["outputs"][0]["contents"]["fp64Contents"]
            if not expected_output_list == output_list:
                raise ValueError(f"Expected {expected_output_list} but got {output}")

            times.append(end-start)
    
        return times
    except Exception as e:
        print(e)
        raise
    finally:
        !seldon pipeline unload latency-test

        !seldon model unload test-caller

        !seldon model unload add10

In [3]:
import numpy as np

def print_stats(times_list):
    times_array = np.array(times_list)
    print(f"Min: {min(times_array)*1000:.1f}ms", )
    print(f"Max: {max(times_array)*1000:.1f}ms", )
    print(f"Mean: {np.mean(times_array)*1000:.1f}ms")
    print(f"Median: {np.median(times_array)*1000:.1f}ms")
    print(f"P90: {np.percentile(times_array, 90)*1000:.1f}ms")
    print(f"P99: {np.percentile(times_array, 99)*1000:.1f}ms")


In [16]:
many_models_times_10 = many_models_experiment(10, 20);
single_model_times_10 = single_model_experiment(10, 20);

{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
Starting call 0...['{"outputs":[{"name":"sum", "datatype":"FP64", "shape":["1", "16"], "contents":{"fp64Contents":[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116]}}]}']
 Done!
Starting call 1...['{"outputs":[{"name":"sum", "datatype":"FP64", "shape":["1", "16"], "contents":{"fp64Contents":[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116]}}]}']
 Done!
Starting call 2...['{"outputs":[{"name":"sum", "datatype":"FP64", "shape":["1", "16"], "contents":{"fp64Contents":[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116]}}]}']
 Done!
Starting call 3...['{"outputs":[{"name":"sum", "datatype":"FP64", "shape":["1", "16"], "contents":{"fp64Contents":[101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116]}}]}']
 Done!
Starting call 4...['{"outputs":[{"name":"sum", "datatype":"FP64", "shape":["1", "16"], "contents":{"fp64Contents":[101, 102, 103, 104, 

In [10]:
print("Many hops")
print_stats(many_models_times_10)
print()
print("No hops")
print_stats(single_model_times_10)

Many hops
Min: 70.6ms
Max: 283.8ms
Mean: 89.3ms
Median: 78.6ms
P90: 85.5ms
P99: 248.2ms

No hops
Min: 49.2ms
Max: 1314.4ms
Mean: 114.3ms
Median: 51.1ms
P90: 53.3ms
P99: 1075.0ms


In [26]:
many_models_times_100 = many_models_experiment(100, 20);

{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
Starting call 0... Done!
['Error: rpc error: code = Unavailable desc = upstream connect error or disconnect/reset before headers. reset reason: remote reset']
Expecting value: line 1 column 1 (char 0)
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}


JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [15]:
many_models_times_100 = many_models_experiment(100, 20);
single_model_times_100 = single_model_experiment(100, 20);

{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
Starting call 0...['Error: rpc error: code = Unavailable desc = upstream connect error or disconnect/reset before headers. reset reason: remote reset']
Expecting value: line 1 column 1 (char 0)
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}
{}


JSONDecodeError: Expecting value: line 1 column 1 (char 0)

In [None]:
print("Many hops")
print_stats(many_models_times_100)
print()
print("No hops")
print_stats(single_model_times_100)

In [None]:
# many_models_times_1000 = many_models_experiment(1000, 20);
# single_model_times_1000 = single_model_experiment(1000, 20);