In [None]:
image = "docker.io/yacouby1/mlrun:unstable"

In [None]:
%config Completer.use_jedi = False
import os
import pandas as pd
from sklearn.datasets import load_iris
import mlrun
from mlrun import import_function, get_dataitem, get_or_create_project

project_name = "int-mm-v233"
project = get_or_create_project(project_name, context="./")

In [None]:
project.enable_model_monitoring(base_period=1,image=image,deploy_histogram_data_drift_app=False)

In [None]:
# Download the pre-trained Iris model
# get_dataitem("https://s3.wasabisys.com/iguazio/models/iris/model.pkl").download("model.pkl")

iris = load_iris()
train_set = pd.DataFrame(
    iris["data"],
    columns=["sepal_length_cm", "sepal_width_cm", "petal_length_cm", "petal_width_cm"],
)

model_name = "RandomForestClassifier"

# Log the model through the projects API so that it is available through the feature store API
# TODO: log training dataset
project.log_model(model_name, model_file="model-iris.pkl", training_set=train_set)

# Deploy

In [None]:
def deply_serv(image=None, monitoring=True, key="my_model", model_name="RandomForestClassifier"):
    # Import the serving function from the function hub
    serving_fn = import_function(
        "hub://v2_model_server", project=project_name, new_name="serving"
    )

    # Add the model to the serving function's routing spec
    serving_fn.add_model(
        model_name, model_path=f"store://models/{project_name}/{model_name}:latest"
    )
    if monitoring:
        tracking_policy = {
            "default_batch_intervals": "0 */2 * * *",
            "stream_image": image,
            "default_batch_image": image,
            # "application_batch": True,
        }
        serving_fn.set_tracking(tracking_policy=tracking_policy)

    serving_fn.spec.build.image = image
    serving_fn.spec.image = image
    serving_fn.spec.build.requirements = ["scikit-learn"]

    # Deploy the function
    serving_fn.deploy()
    return serving_fn

In [None]:
serving_fn = deply_serv(image=image)

In [None]:
notifications = [
            {
                "kind": "slack",
                "name": "",
                "message": "A drift was detected",
                "severity": "warning",
                "when": ["now"],
                "condition": "failed",
                "secret_params": {
                    "webhook": "https://hooks.slack.com",
                },
            },
        ]
alert_data = mlrun.common.schemas.AlertConfig(
            project=project_name,
            name="drift_alert",
            summary="A drift was detected",
            severity="low",
            entity={"kind": "model", "project": project_name, "id": "*"},
            trigger={"events": ["drift_detected"]},
            criteria=None,
            notifications=notifications,
        ).dict()

project.create_alert_config("slack_drift_alert", alert_data)

# Invoke the model

In [None]:
import json
from time import sleep
from random import choice, uniform

iris = load_iris()
iris_data = iris["data"].tolist()

model_name = "RandomForestClassifier"
serving_1 = project.get_function("serving")
for i in range(200):
    data_point = choice(iris_data)
    # data_point = [0.5,0.5,0.5,0.5]
    serving_1.invoke(
        f"v2/models/{model_name}/infer", json.dumps({"inputs": [data_point, data_point]})
    )
    sleep(choice([0.01, 0.04]))

# APPLICATION REGISTRATION

In [None]:
app = project.set_model_monitoring_function(
    application_class="MyApp",
    name="myAppv2",
    image=image,
)

In [None]:
project.deploy_function(app)

# USER APPLICATION CODE

In [None]:
# mlrun: start-code

In [None]:
import mlrun
from mlrun.model_monitoring.application import (
    ModelMonitoringApplicationBase,
    ModelMonitoringApplicationResult,
)
from mlrun.datastore.targets import ParquetTarget
import typing
import pandas as pd
import json
from mlrun.artifacts import (
    Artifact,
    DatasetArtifact,
    PlotlyArtifact,
    TableArtifact,
    update_dataset_meta,
)
import os
import random
from mlrun.artifacts.manager import ArtifactManager, extend_artifact_path

from mlrun.datastore import store_manager


class MyApp(ModelMonitoringApplicationBase):
    def __init__(self):
        self.name = "a"

    def do_tracking(
        self,
        application_name: str,
        sample_df_stats: pd.DataFrame,
        feature_stats: pd.DataFrame,
        sample_df: pd.DataFrame,
        start_infer_time: pd.Timestamp,
        end_infer_time: pd.Timestamp,
        latest_request: pd.Timestamp,
        endpoint_id: str,
        output_stream_uri: str,
    ) -> typing.Union[
        ModelMonitoringApplicationResult, list[ModelMonitoringApplicationResult]
    ]:
        print("sample_df_stats.head()")
        print(sample_df_stats.head())
        print("feature_stats.head()")
        print(feature_stats.head())
        print("sample_df.head()")
        print(sample_df.head())

        self.context.log_artifact(TableArtifact("current_stats", df=sample_df_stats))
        drift_result = random.uniform(-1,5)
        if drift_result < 0:
            status = mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.irrelevant
        elif drift_result < 3:
            status = mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.no_detection
        elif drift_result < 4:
            status = mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.potential_detection
        else:
            status = mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.detected
        return ModelMonitoringApplicationResult(
            name="data_drift_test",
            value=7.23,
            kind=0,
            status=mlrun.common.schemas.model_monitoring.constants.ResultStatusApp.detected,
            extra_data={},
        )