#### 1. **Model Runner Step Sync And Async Testing**

Notebook tests `ModelRunnerStep` within a serving graph to enable real-time monitoring and drift detection with MLRun. 
The main focus is to test te preformance of sync and async usage.

In [1]:
# Import mlrun and create project instance
import mlrun

image = "mlrun/mlrun"
project_name = "monitored-model-runner-asyncio"
project = mlrun.get_or_create_project(project_name, context="./",user_project=True, allow_cross_project=True)

> 2025-10-27 15:15:11,848 [info] Project loaded successfully: {"project_name":"monitored-model-runner-asyncio-matanz"}


In [2]:
# Import tools
import pandas as pd
from sklearn.svm import SVC
import pickle
from sklearn.datasets import load_iris
from mlrun.features import Feature

In [3]:
# Train simple SVM model on Iris dataset, save it and reformat the DS as list
iris = load_iris()
clf = SVC()           
clf.fit(iris.data, iris.target)
with open("SVM.pkl", "wb") as fh:
    pickle.dump(clf, fh)
iris_data = iris["data"].tolist()

# load the dataset again as a DF
iris = load_iris()
train_set = pd.DataFrame(
    iris["data"],
    columns=["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"],
)

# Create a Model Artifact in the project using the trained model
model_name = "SVM"
model_artifact = project.log_model(
    model_name,
    model_file="SVM.pkl",
    training_set=train_set,
    framework="sklearn",
    outputs=[Feature(name="label")],
)

#### 5. Define your function and ModelRunnerStep

Define functions to all the edge cases

In [4]:
# Config the code path and the serving function sync and async
sync_code_path = r"model_class_sync.py"
bouth_code_path = r"model_class_bouth.py"

async_sync_function = project.set_function(func=sync_code_path,image="mlrun/mlrun",kind="serving",name="async")
bouth_function = project.set_function(func=bouth_code_path,image="mlrun/mlrun",kind="serving",name="bouth")

In [5]:
from mlrun.serving.states import ModelRunnerStep

model_runner_step = ModelRunnerStep(
    name="my_runner", model_selector="MyModelSelector",model_selector_parameters={"name":"my-selector"})

for i in range(10):
    model_runner_step.add_model(
        model_class="MyModel",
        endpoint_name=f"my-{i}-model",
        model_artifact=model_artifact,
        input_path="inputs.here",
        result_path="outputs",
        outputs=["label"],
        execution_mechanism="asyncio",
    )

#### 6. Build graphs to all the edge cases

description


In [17]:
async_graph_defined = bouth_function.set_topology("flow",engine="async")
async_graph_defined.to("MyPreprocessStep").to(model_runner_step).to("MyEnrichStep").respond()
async_graph_defined.plot()

async_graph_undefined = async_sync_function.set_topology("flow",engine="async")
async_graph_undefined.to("MyPreprocessStep").to(model_runner_step).to("MyEnrichStep").respond()
async_graph_undefined.plot()


MLRunInvalidArgumentError: graph topology is already set, cannot be overwritten

#### Run using mock

descteption

In [7]:
# 4. create mock server and test it locally (no k8s / deployment)

async_undefined_mock_server = bouth_function.to_mock_server()
async_defined_mock_server = async_sync_function.to_mock_server()

> 2025-10-27 15:15:14,290 [info] Server graph after adding system steps: {"graph":"{'MyPreprocessStep': {'class_name': 'MyPreprocessStep', 'kind': 'task'}, 'my_runner': {'endpoint_type': 1, 'after': ['MyPreprocessStep'], 'model_endpoint_creation_strategy': 'skip', 'class_name': 'mlrun.serving.ModelRunner', 'kind': 'model_runner', 'class_args': {'model_selector': ('MyModelSelector', {'name': 'my-selector'}), execution_mechanism_by_model_name: {'my-0-model': 'asyncio', 'my-1-model': 'asyncio', 'my-2-model': 'asyncio', 'my-3-model': 'asyncio', 'my-4-model': 'asyncio', 'my-5-model': 'asyncio', 'my-6-model': 'asyncio', 'my-7-model': 'asyncio', 'my-8-model': 'asyncio', 'my-9-model': 'asyncio'}, models: {'my-0-model': ('MyModel', {'artifact_uri': 'store://models/monitored-model-runner-asyncio-matanz/SVM#0@4ae95284-200e-4389-829b-e52066e2a117^55b5294e6a2414d628289c2d4c63f37bf0dd8c2a', 'name': 'my-0-model'}), 'my-1-model': ('MyModel', {'artifact_uri': 'store://models/monitored-model-runner-asyn

In [8]:
async_undefined_mock_server
async_defined_mock_server


<mlrun.serving.server.GraphServer at 0x7f53da05d220>

In [18]:
from random import choice
from datetime import datetime

iris_data = iris["data"].tolist()
data_point = choice(iris_data)
print(f"Data point:{data_point}")

time_before = datetime.now()
print("Before async_sync_function invoke:", time_before.strftime("%H:%M:%S.%f"))
async_sync_response = async_undefined_mock_server.test(
    "/",
    body={
        "models": None,
        "inputs": [data_point, data_point],
    },
)
time_after = datetime.now()
print("After invoke:", time_after.strftime("%H:%M:%S.%f"))

# Compute total time difference
total_time = time_after - time_before
print("Total time:", total_time.total_seconds(), "seconds")

Data point:[6.4, 2.7, 5.3, 1.9]
Before async_sync_function invoke: 15:22:58.753550
async
async
async
async
async
async
async
async
async
async
After invoke: 15:22:59.762416
Total time: 1.008866 seconds


In [10]:
# async_sync_response

In [11]:
iris_data = iris["data"].tolist()
data_point = choice(iris_data)
print(f"Data point:{data_point}")


time_before = datetime.now()
print("Before async_sync_function invoke:", time_before.strftime("%H:%M:%S.%f"))
async_sync_response = async_defined_mock_server.test(
    "/",
    body={
        "models": None,
        "inputs": [data_point, data_point],
    },
)
time_after = datetime.now()
print("After invoke:", time_after.strftime("%H:%M:%S.%f"))

# Compute total time difference
total_time = time_after - time_before
print("Total time:", total_time.total_seconds(), "seconds")

Data point:[5.8, 2.7, 5.1, 1.9]
Before async_sync_function invoke: 15:15:15.985020
After invoke: 15:15:15.986699
Total time: 0.001679 seconds


In [12]:
# async_sync_response

#### 7. Deploying Your Function

Running this cell will deploy your serving function to the cluster. This also deploys the real-time monitoring functions for your project, which are configured to track the serving function's performance and detect model drift.

In [13]:
async_sync_function.deploy()
bouth_function.deploy()

> 2025-10-27 15:15:16,004 [info] Starting remote function deploy
2025-10-27 15:15:17  (info) Deploying function
2025-10-27 15:15:17  (info) Building
2025-10-27 15:15:17  (info) Staging files and preparing base images
2025-10-27 15:15:17  (warn) Using user provided base image, runtime interpreter version is provided by the base image
2025-10-27 15:15:17  (info) Building processor image
2025-10-27 15:16:43  (info) Build complete
2025-10-27 15:16:51  (info) Function deploy complete
> 2025-10-27 15:16:58,327 [info] Model endpoint creation task completed with state succeeded
> 2025-10-27 15:16:58,328 [info] Successfully deployed function: {"external_invocation_urls":["monitored-model-runner-asyncio-matanz-async.default-tenant.app.cust-cs-il.iguazio-cd0.com/"],"internal_invocation_urls":["nuclio-monitored-model-runner-asyncio-matanz-async.default-tenant.svc.cluster.local:8080"]}
> 2025-10-27 15:16:58,384 [info] Starting remote function deploy
2025-10-27 15:16:58  (info) Deploying function
20

'http://monitored-model-runner-asyncio-matanz-bouth.default-tenant.app.cust-cs-il.iguazio-cd0.com/'

In [14]:
iris_data = iris["data"].tolist()
data_point = choice(iris_data)
print(f"Data point:{data_point}")


time_before = datetime.now()
print("Before async_sync_function invoke:", time_before.strftime("%H:%M:%S.%f"))
async_sync_response = async_sync_function.invoke(
    "/",
    body={
        "models": None,
        "inputs": [data_point, data_point],
    },
)
time_after = datetime.now()
print("After invoke:", time_after.strftime("%H:%M:%S.%f"))

# Compute total time difference
total_time = time_after - time_before
print("Total time:", total_time.total_seconds(), "seconds")

Data point:[5.9, 3.2, 4.8, 1.8]
Before async_sync_function invoke: 15:18:29.668815
After invoke: 15:18:29.748434
Total time: 0.079619 seconds


In [15]:
# async_sync_response

In [16]:
iris_data = iris["data"].tolist()
data_point = choice(iris_data)
print(f"Data point:{data_point}")


print("Before bouth_function invoke:", datetime.now().strftime("%H:%M:%S.%f"))
bouth_response = bouth_function.invoke(
    "/",
    body={
        "models": None,
        "inputs": [data_point, data_point],
    },
)
time_after = datetime.now()
print("After invoke:", time_after.strftime("%H:%M:%S.%f"))

# Compute total time difference
total_time = time_after - time_before
print("Total time:", total_time.total_se "seconds")

SyntaxError: invalid syntax (<ipython-input-16-d2d48890bc78>, line 19)

In [None]:
bouth_response

# TODO
2. Check if we get a warning when we try to run async when there is no "async predict" 
    
3. Check if the async work as expected

4. Try to add different prompts to the same model using async, and inspect the behavior
