In [None]:
# Register the MLflow experiment tracker
!zenml experiment-tracker register mlflow_tracker --flavor=mlflow

# Register the MLflow model registry
!zenml model-registry register mlflow_registry --flavor=mlflow

# Register the MLflow model deployer
!zenml model-deployer register mlflow_deployer --flavor=mlflow

# Register the Evidently data validator
!zenml data-validator register evidently_validator --flavor=evidently

# Register a new stack with the new stack components
!zenml stack register quickstart_stack -a default\
                                       -o default\
                                       -d mlflow_deployer\
                                       -e mlflow_tracker\
                                       -r mlflow_registry\
                                       -dv evidently_validator\
                                       --set

# Visualize the current ZenML stack
!zenml stack describe

In [None]:
from zenml.pipelines import pipeline


@pipeline
def training_pipeline(
    training_data_loader,
    trainer,
    evaluator,
    model_register,
):
    """Train, evaluate, and deploy a model."""
    X_train, X_test, y_train, y_test = training_data_loader()
    model = trainer(X_train=X_train, y_train=y_train)
    test_acc = evaluator(X_test=X_test, y_test=y_test, model=model)
    model_register(model)


@pipeline
def inference_pipeline(
    inference_data_loader,
    mlflow_model_deployer,
    predictor,
    training_data_loader,
    drift_detector,
):
    """Inference pipeline with skew and drift detection."""
    inference_data = inference_data_loader()
    model_deployment_service = mlflow_model_deployer()
    predictor(model_deployment_service, inference_data)
    training_data, _, _, _ = training_data_loader()
    drift_detector(training_data, inference_data)

In [None]:
import numpy as np
import pandas as pd
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

from zenml.steps import Output, step


@step
def training_data_loader() -> Output(
    X_train=pd.DataFrame,
    X_test=pd.DataFrame,
    y_train=pd.Series,
    y_test=pd.Series,
):
    """Load the iris dataset as tuple of Pandas DataFrame / Series."""
    iris = load_iris(as_frame=True)
    X_train, X_test, y_train, y_test = train_test_split(
        iris.data, iris.target, test_size=0.2, shuffle=True, random_state=42
    )
    return X_train, X_test, y_train, y_test

In [None]:
@step
def inference_data_loader() -> pd.DataFrame:
    """Load some (random) inference data."""
    return pd.DataFrame(
        data=np.random.rand(10, 4) * 10,  # assume range [0, 10]
        columns=load_iris(as_frame=True).data.columns,
    )

In [None]:
import mlflow

from sklearn.base import ClassifierMixin
from sklearn.svm import SVC

from zenml.client import Client

experiment_tracker = Client().active_stack.experiment_tracker

@step(enable_cache=False, experiment_tracker=experiment_tracker.name)
def svc_trainer_mlflow(
    X_train: pd.DataFrame,
    y_train: pd.Series,
) -> ClassifierMixin:
    """Train a sklearn SVC classifier and log to MLflow."""
    mlflow.sklearn.autolog()  # log all model hparams and metrics to MLflow
    model = SVC(gamma=0.01)
    model.fit(X_train.to_numpy(), y_train.to_numpy())
    train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
    print(f"Train accuracy: {train_acc}")
    return model

In [None]:
import mlflow

from sklearn.base import ClassifierMixin
from sklearn.tree import DecisionTreeClassifier

from zenml.client import Client
from zenml.integrations.mlflow.steps import mlflow_model_deployer_step, MLFlowDeployerParameters

experiment_tracker = Client().active_stack.experiment_tracker

@step(enable_cache=False, experiment_tracker=experiment_tracker.name)
def tree_trainer_mlflow(
    X_train: pd.DataFrame,
    y_train: pd.Series,
) -> ClassifierMixin:
    """Train a decision tree classifier and log to MLflow."""
    mlflow.sklearn.autolog()  # log all model hparams and metrics to MLflow
    model = DecisionTreeClassifier()
    model.fit(X_train.to_numpy(), y_train.to_numpy())
    train_acc = model.score(X_train.to_numpy(), y_train.to_numpy())
    print(f"Train accuracy: {train_acc}")
    return model

In [None]:
@step
def evaluator(
    X_test: pd.DataFrame,
    y_test: pd.Series,
    model: ClassifierMixin,
) -> float:
    """Calculate the accuracy on the test set"""
    test_acc = model.score(X_test.to_numpy(), y_test.to_numpy())
    print(f"Test accuracy: {test_acc}")
    return test_acc

In [None]:
@step
def deployment_trigger(test_acc: float) -> bool:
    """Only deploy if the test accuracy > 90%."""
    return test_acc > 0.9

In [None]:
from zenml.integrations.mlflow.steps.mlflow_deployer import MLFlowDeployerParameters, mlflow_model_registry_deployer_step
from zenml.integrations.mlflow.steps.mlflow_registry import MLFlowRegistryParameters, mlflow_register_model_step
from zenml.model_registries.base_model_registry import ModelRegistryModelMetadata

In [None]:
from zenml.integrations.evidently.steps import (
    EvidentlyProfileParameters,
    evidently_profile_step,
)

evidently_profile_params = EvidentlyProfileParameters(
    profile_sections=["datadrift"]
)
drift_detector = evidently_profile_step(
    step_name="drift_detector", params=evidently_profile_params
)

In [None]:
from zenml.services import BaseService
from zenml.client import Client


@step(enable_cache=False)
def prediction_service_loader() -> BaseService:
    """Load the model service of our train_evaluate_deploy_pipeline."""
    client = Client()
    model_deployer = client.active_stack.model_deployer
    services = model_deployer.find_model_server(
        pipeline_name="training_pipeline",
        pipeline_step_name="model_deployer",
        running=True,
    )
    service = services[0]
    return service

In [None]:
@step
def predictor(
    service: BaseService,
    data: pd.DataFrame,
) -> Output(predictions=list):
    """Run a inference request against a prediction service"""
    service.start(timeout=10)  # should be a NOP if already started
    prediction = service.predict(data.to_numpy())
    prediction = prediction.argmax(axis=-1)
    print(f"Prediction is: {[prediction.tolist()]}")
    return [prediction.tolist()]

In [None]:
training_pipeline(
    training_data_loader=training_data_loader(),
    trainer=svc_trainer_mlflow(),
    evaluator=evaluator(),
    model_register=mlflow_register_model_step(
        params=MLFlowRegistryParameters(
            name="zenml-quickstart-model",
            metadata=ModelRegistryModelMetadata(
                gamma=0.01, arch="svc"
            ),
            description=f"The first run of the Quickstart pipeline.",
        )
    ),
).run(unlisted=True)

In [None]:
training_pipeline(
    training_data_loader=training_data_loader(),
    trainer=tree_trainer_mlflow(),
    evaluator=evaluator(),
    model_register=mlflow_register_model_step(
        params=MLFlowRegistryParameters(
            name="zenml-quickstart-model",
            metadata=ModelRegistryModelMetadata(
                arch="decision_tree"
            ),
            description=f"The second run of the Quickstart pipeline.",
        )
    ),
).run(unlisted=True)

In [None]:
!zenml model-registry models list

!zenml model-registry models list-versions zenml-quickstart-model

In [None]:
inference_pipeline(
    inference_data_loader=inference_data_loader(),
    mlflow_model_deployer=mlflow_model_registry_deployer_step(
        params=MLFlowDeployerParameters(
            registry_model_name="zenml-quickstart-model",
            registry_model_version="1",
            # or you can use the model stage if you have set it in the MLflow registry
            # registered_model_stage="None" # "Staging", "Production", "Archived"
        )
    ),
    predictor=predictor(),
    training_data_loader=training_data_loader(),
    drift_detector=drift_detector,
).run()

In [None]:
from zenml.environment import Environment
from zenml.integrations.mlflow.mlflow_utils import get_tracking_uri


def start_zenml_dashboard(port=8237):
    if Environment.in_google_colab():
        from pyngrok import ngrok

        public_url = ngrok.connect(port)
        print(f"\x1b[31mIn Colab, use this URL instead: {public_url}!\x1b[0m")
        !zenml up --blocking --port {port}

    else:
        !zenml up --port {port}

start_zenml_dashboard()