In [None]:
import warnings
warnings.filterwarnings('ignore')
%load_ext autoreload
%autoreload 2
from absl import logging as absl_logging
absl_logging.set_verbosity(-10000)

Let's begin by initializing ZenML in our directory. We are going to use a local stack to begin with, for simplicity and then transition to other stacks. This can be achieved in code by executing the following block.

In [None]:
!zenml init
!zenml stack set local_stack

We will start by looking at the definition of a pipeline that we want to build. This will give an overview of what we want to achieve and how we plan on getting there. We'll dive into the details on some of the interesting steps after that.

## The Training Pipeline

We will be using a simple MNIST image recognition problem to show how ZenML can help you build a complete solution which involves importing and processing data, training and evaluating your model and finally deploying the model if the data it trained on has not drifted. 

The code below shows how our pipeline for this example is designed. 
- First, the data gets loaded through the importer step.
- We normalize it next and make it ready for training.
- The next steps train and evaluate the model. 
- We then take the input dataset and pass it to Evidently to generate a report.
- We use the drift detector step to figure out if drift has occured.
- We pass this information to the model deployer and the discord bot to take the desired actions.

In [None]:
from zenml.pipelines import pipeline

@pipeline(enable_cache=True, requirements_file=requirements_file)
def continuous_deployment_pipeline(
    importer,
    normalizer,
    trainer,
    evaluator,
    drift_splitter,
    drift_detector,
    deployment_trigger,
    model_deployer,
    discord_bot
):
    # Link all the steps' artifacts together
    x_train, y_train, x_test, y_test = importer()
    x_trained_normed, x_test_normed = normalizer(x_train=x_train, x_test=x_test)
    
    model = trainer(x_train=x_trained_normed, y_train=y_train)
    accuracy = evaluator(x_test=x_test_normed, y_test=y_test, model=model)

    reference_dataset, new_dataset = drift_splitter(x_train)
    drift_report, _ = drift_detector(reference_dataset, new_dataset)
    
    deployment_decision = deployment_trigger(drift_report)
    model_deployer(deployment_decision)
    discord_bot(deployment_decision)

### Linking Steps with data

Looking at the relations defined above, you can see that ZenML uses data to establish links between the steps and all the inputs and outputs (which we call artifacts) are beautifully persisted and made available to every step code by ZenML without you having to explicitly configure the boring stuff yourself.

### Composability

You can get creative with the way you want your pipeline to work and ZenML enables you to seamlessly switch between different implementations of your steps. You can notice that all the step functions are taken as inputs to the pipeline function which means you won't have to change the pipeline code at all to implement new funcitonality! (like, when using a new importer for your data)

## Let's look at the Steps

So far, you've seen the big picture when it comes to what we want to build. Now, let's dive into the steps to see the options that ZenML provides to build the logic into your execution flow. Let's start by looking at how we detect drift using Evidently.

### Integrating Evidently
Evidently is an open source tool that allows you to easily compute drift on your data.

At its core, Evidently’s drift detection calculation functions take in a reference data set and compare it with a separate comparison dataset. These are both passed in as Pandas dataframes, though CSV inputs are also possible. ZenML implements this functionality in the form of several standardized steps along with an easy way to use the visualization tools also provided along with Evidently as ‘Dashboards’.

If you’re working on any kind of machine learning problem that has an ongoing training loop that takes in new data, you’ll want to guard against drift. Machine learning pipelines are built on top of data inputs, so it is worth checking for drift if you have a model that was trained on a certain distribution of data. The incoming data is something you have less control over and since things often change out in the real world, you should have a plan for knowing when things have shifted. Evidently offers a growing set of features that help you monitor not only data drift but other key aspects like target drift and so on.

In [None]:
# First we need to install evidently to our python environment
!zenml integration install evidently -f

In [None]:
# Zenml provides some standard steps for the evidently integration
from zenml.integrations.evidently.steps import (
    EvidentlyProfileConfig,
    EvidentlyProfileStep,
)

# We create a config object for our evidently step - 
#  here we choose the datadrift profile 
evidently_drift_detector_config = EvidentlyProfileConfig(
    column_mapping=None,profile_sections=["datadrift"],)

### Splitting the data for drift detection

Now, let's look at the split logic and the purpose it serves. We are using the MNIST dataset from Keras for training in this example. Since this is not a timeseries, one way of splittig the data could be to select an arbitary row and then take all samples prior to it as rhe reference dataset and the rows after it as the new data. 

We have the option to add noise to the dataset to control the occurence of drift. 

TODO: Girl on car drifting awesome GIF

In [None]:
from steps.splitter import reference_data_splitter, TrainingSplitConfig

drift_data_split_config = TrainingSplitConfig(
    row=30000,
    add_noise=True)

### Deploying the model conditionally

Pipelines in ZenML are flexible to allow incorporation of complex workflows like conditional execution of steps or parallel execution when the orchestrator allows it.
In our example, we want to be able to deploy the model if there is no drift detected and to send a message on a Discord channel in case it is. 

We use the MLflow deployer step from our MLflow integration that takes a boolean value to conditionally deploy the model. 

In [None]:
from zenml.integrations.mlflow.steps import mlflow_deployer_step

model_deployer = mlflow_deployer_step(name="model_deployer")

A deployment trigger function is defined which takes in the report from the Evidently drift detector step and returns `True` if there is drift in the dataset.

In [None]:
from zenml.steps import step

@step
def deployment_trigger(
    drift_report: dict,
) -> bool:
    """Implements a simple model deployment trigger that looks at the
    drift report and deploys if there's none"""

    drift = drift_report["data_drift"]["data"]["metrics"]["dataset_drift"]

    if drift:
        return True
    else:
        return False

The output of the deployment trigger is then passed to the model deployer that we initialized earlier and to a function that can post messages on a discord channel. Both of these steps can be executed paralelly. 

## Running the pipeline
Once the pipeline and the steps are defined, we can create an instance of the training pipeline and pass our implementations of our steps to it. The `run()` function defined on the pipeline submits the pipeline to be run on an orchestrator (for now, it will be a local orchestrator). 

Import all the steps and pipelines.

In [None]:
from steps.deployment_trigger import deployment_trigger
from steps.discord_bot import discord_alert
from steps.evaluator import tf_evaluator
from steps.importer import importer_mnist
from steps.normailzer import normalizer
from steps.trainer import TrainerConfig, tf_trainer  # type: ignore [import]

from zenml.integrations.mlflow.steps import mlflow_deployer_step
from zenml.services import load_last_service_from_step
from steps.splitter import reference_data_splitter, TrainingSplitConfig
from pipelines.training_pipeline import continuous_deployment_pipeline
from zenml.integrations.mlflow.steps import MLFlowDeployerConfig

from zenml.integrations.evidently.steps import (
    EvidentlyProfileConfig,
    EvidentlyProfileStep,
)

Now, define the configurations that are needed by the split step and the evidently data drift steps. Once the configs are defined, we can create the pipeline instance and execute `run()` on it! 🚀

In [None]:
drift_data_split_config = TrainingSplitConfig(
    row=30000,
    add_noise=True)

evidently_profile_config = EvidentlyProfileConfig(
    column_mapping=None,
    profile_sections=["datadrift"])

model_deployer = mlflow_deployer_step(name="model_deployer")

def main(epochs: int = 5, lr: float = 0.003, min_accuracy: float = 0.92, stop_service: bool = True):

    if stop_service:
        service = load_last_service_from_step(
            pipeline_name="continuous_deployment_pipeline",
            step_name="model_deployer",
            running=True,
        )
        if service:
            service.stop(timeout=10)
        return

    # Initialize a continuous deployment pipeline run
    deployment = continuous_deployment_pipeline(
        importer=importer_mnist(),
        normalizer=normalizer(),
        trainer=tf_trainer(config=TrainerConfig(epochs=epochs, lr=lr)),
        evaluator=tf_evaluator(),
        drift_splitter=reference_data_splitter(drift_data_split_config),
        drift_detector=EvidentlyProfileStep(evidently_profile_config),
        deployment_trigger=deployment_trigger(),
        model_deployer=model_deployer(config=MLFlowDeployerConfig(workers=3)),
        discord_bot=discord_alert()
    )

    # run the pipeline on your orchestrator
    deployment.run()

## Post-execution Workflow