# ZenML: Create production-ready ML pipelines (Windows) 

Our goal here is to help you to get the first practical experience with our tool and give you a brief overview on some basic functionalities of ZenML. We will start local in the jupyter notebook but will transition over to a more robust environment with Kubeflow pipelines.

This guide is designed to provide a practical introduction to transitioning from local setup to a more production MLOps stack. If you want more detail, our [full documentation](https://docs.zenml.io/) provides more on the concepts and how to implement them.

![zenml](assets/zenml.svg)

## Install libraries

In [1]:
# Install the ZenML CLI tool and Tensorflow
!pip install zenml



ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.9 requires typing-extensions<4,>=3.7.4; python_version < "3.9", but you have typing-extensions 4.3.0 which is incompatible.



Collecting typing-extensions>=3.7.0
  Using cached typing_extensions-4.3.0-py3-none-any.whl (25 kB)
Installing collected packages: typing-extensions
  Attempting uninstall: typing-extensions
    Found existing installation: typing-extensions 3.10.0.2
    Uninstalling typing-extensions-3.10.0.2:
      Successfully uninstalled typing-extensions-3.10.0.2
Successfully installed typing-extensions-4.3.0

[notice] A new release of pip available: 22.1.2 -> 22.2.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
!zenml integration install kubeflow -y

In [3]:
!zenml integration install tensorflow -y

Once the installation is completed, you can go ahead and create your first ZenML repository for your project. As ZenML repositories are built on top of Git repositories, you can create yours in a desired empty directory through:

In [4]:
# Initialize a ZenML repository
!zenml init

ZenML repository initialized at C:\Users\vsriniva\Desktop\zenml.
The local active profile was initialized to 'default' and the local active 
stack to 'default'. This local configuration will only take effect when you're 
running ZenML from the initialized repository root, or from a subdirectory. For
more information on profile and stack configuration, please visit 
https://docs.zenml.io/developer-guide/stacks-profiles-repositories.


This has created a local directory with a bunch of configuration for your MLOPS stack.

# Start with the local stack

The above commands have automatically created a local MLOps stack for you and set it to active. Let's make sure:

In [6]:
!zenml stack get
#!zenml stack set local_stack

Running with active profile: 'default' (local)
The active stack is: 'default'


![localstack.png](assets/localstack.png)

The default stack is the local stack.
Orchestrator: Python Kernel
Artifact Store: Stores all the artifacts that flow between steps.
Metadata Store: Keeps track of all the artifacts as well as all the parameters that flow through your pipeline.

## Create your first pipeline with the local_stack

Let's first do the imports

In [7]:
import logging
import os

import numpy as np
import tensorflow as tf

from zenml.integrations.constants import TENSORFLOW
from zenml.pipelines import pipeline
from zenml.steps import BaseStepConfig, Output, StepContext, step

## Define ZenML Steps

In the code that follows, you can see that we are defining the various steps of our pipeline. Each step is decorated with `@step`, the main abstraction that is currently available for creating pipeline steps.

The first step is an `importer` step that downloads a sample of the MNIST dataset.

In [8]:
@step
def importer() -> Output(
    X_train=np.ndarray,
    X_test=np.ndarray,
    y_train=np.ndarray,
    y_test=np.ndarray,
):
    """Download the MNIST data store it as an artifact"""
    (X_train, y_train), (
        X_test,
        y_test,
    ) = tf.keras.datasets.mnist.load_data()
    return X_train, X_test, y_train, y_test

Then we add a `normalizer` step that takes as input the test set and the trained model and evaluates some final metrics.

In [9]:
@step
def normalizer(
    X_train: np.ndarray, X_test: np.ndarray
) -> Output(X_train_normed=np.ndarray, X_test_normed=np.ndarray):
    """Normalize digits dataset with mean and standard deviation."""
    X_train_normed = (X_train - np.mean(X_train)) / np.std(X_train)
    X_test_normed = (X_test - np.mean(X_test)) / np.std(X_test)
    return X_train_normed, X_test_normed

We then add a `trainer` step, that takes the normalized data and trains a Keras model on the data. The step has an associated `TrainerConfig` step configuration class. Also note how we use the `StepContext` to extract the Artifact Store path alongside the output model Artifact where TensorBoard logs are to be stored.

In [10]:
class TrainerConfig(BaseStepConfig):
    """Trainer params"""

    epochs: int = 1
    lr: float = 0.001

@step
def trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
    context: StepContext,
    config: TrainerConfig,
) -> tf.keras.Model:
    """Train a neural net from scratch to recognize MNIST digits return our
    model or the learner"""
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(10, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )

    log_dir = os.path.join(context.get_output_artifact_uri(), "logs")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir, histogram_freq=1
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(config.lr),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    model.fit(
        X_train,
        y_train,
        epochs=config.epochs,
        callbacks=[tensorboard_callback],
    )

    return model

Finally, we had an `evaluator` to see how we did on the dataset!

In [11]:
@step
def evaluator(
    X_test: np.ndarray,
    y_test: np.ndarray,
    model: tf.keras.Model,
) -> float:
    """Calculate the accuracy on the test set"""

    _, test_acc = model.evaluate(X_test, y_test, verbose=2)
    logging.info(f"Test accuracy: {test_acc}")
    return test_acc

## Define ZenML Pipeline

A pipeline is defined with the `@pipeline` decorator. This defines the various steps of the pipeline and specifies the dependencies between the steps, thereby determining the order in which they will be run.

In [12]:
@pipeline
def mnist_pipeline(
    importer,
    normalizer,
    trainer,
    evaluator,
):
    # Link all the steps together
    X_train, X_test, y_train, y_test = importer()
    X_trained_normed, X_test_normed = normalizer(X_train=X_train, X_test=X_test)
    model = trainer(X_train=X_trained_normed, y_train=y_train)
    evaluator(X_test=X_test_normed, y_test=y_test, model=model)

## Run the pipeline

Running the pipeline is as simple as calling the `run()` method on an instance of the defined pipeline.

In [13]:
# Initialize the pipeline
first_pipeline = mnist_pipeline(
    importer=importer(),
    normalizer=normalizer(),
    trainer=trainer(),
    evaluator=evaluator(),
)

first_pipeline.run()

[1;35mCreating run for pipeline: [0m[33mmnist_pipeline[1;35m[0m
[1;35mCache enabled for pipeline [0m[33mmnist_pipeline[1;35m[0m
[1;35mUsing stack [0m[33mdefault[1;35m to run pipeline [0m[33mmnist_pipeline[1;35m...[0m
[1;35mStep [0m[33mimporter[1;35m has started.[0m
[1;35mUsing cached version of [0m[33mimporter[1;35m.[0m
[1;35mStep [0m[33mimporter[1;35m has finished in 0.086s.[0m
[1;35mStep [0m[33mnormalizer[1;35m has started.[0m
[1;35mUsing cached version of [0m[33mnormalizer[1;35m.[0m
[1;35mStep [0m[33mnormalizer[1;35m has finished in 0.041s.[0m
[1;35mStep [0m[33mtrainer[1;35m has started.[0m
[1;35mStep [0m[33mtrainer[1;35m has finished in 9.539s.[0m
[1;35mStep [0m[33mevaluator[1;35m has started.[0m
313/313 - 0s - loss: 0.3277 - accuracy: 0.9021 - 285ms/epoch - 911us/step


INFO:root:Test accuracy: 0.9021000266075134


[1;35mStep [0m[33mevaluator[1;35m has finished in 0.768s.[0m
[1;35mPipeline run [0m[33mmnist_pipeline-30_Aug_22-15_10_51_711183[1;35m has finished in 12.608s.[0m


## Visualize the model with TensorBoard

To visualize the model with TensorBoard, make use of the built-in ZenML TensorBoard visualizer, that will automatically start a TensorBoard server in the background.

In [14]:
from zenml.integrations.tensorflow.visualizers import (
    visualize_tensorboard,
    stop_tensorboard_server,
)

visualize_tensorboard(
    pipeline_name="mnist_pipeline",
    step_name="trainer",
)

To stop the TensorBoard server, you can use the `stop_tensorboard` utility function.

In [15]:
stop_tensorboard_server(
    pipeline_name="mnist_pipeline",
    step_name="trainer",
)

# Transitioning to Kubeflow Pipelines

We got pretty good results on the digits model that we trained, but at some point we want to get out of this notebook local stack and go to a stack which looks more like production. Here is where the ZenML [Kubeflow Pipelines](https://github.com/kubeflow/pipelines) integration helps!

## Pre-requisites

In order to run this example, you need to have installed:

* [Docker](https://docs.docker.com/get-docker/)
* [K3D](https://k3d.io/v5.2.1/) 
* [Kubectl](https://kubernetes.io/docs/tasks/tools/)

## Create a Kubeflow Stack

![localstack-with-kubeflow.png](assets/localstack-with-kubeflow-orchestrator.png)

In [16]:
!python -c "import zenml.environment; print(zenml.environment.get_system_details())"

ZenML version: 0.12.0
Install path: C:\Users\vsriniva\Anaconda3\Lib\site-packages\zenml
Python version: 3.8.8
Platform information: {'os': 'windows', 'windows_version_release': '10', 'windows_version': '10.0.19041', 'windows_version_service_pack': 'SP0', 'windows_version_os_type': 'Multiprocessor Free'}
Environment: native
Integrations: ['kubeflow', 'kubernetes', 'pytorch', 'scipy', 'seldon', 'sklearn', 'tensorflow']


### Register the container-registry

In [19]:
!zenml container-registry register local_reg --flavor=default --uri=localhost:5000

Running with active profile: 'default' (local)
[1;35mRegistered stack component with type 'container_registry' and name 'local_reg'.[0m
Successfully registered container registry `local_reg`.


### Register the orchestrator

In [21]:
!zenml orchestrator register kubeflow_orchest --flavor=kubeflow

Running with active profile: 'default' (local)
[1;35mRegistered stack component with type 'orchestrator' and name 'kubeflow_orchest'.[0m
Successfully registered orchestrator `kubeflow_orchest`.


Take the local registry and orchestrator and register it in a local_kubeflow_stack:

In [23]:
!zenml stack register local_kbflow_stack -m default -a default -o kubeflow_orchest -c local_reg

Running with active profile: 'default' (local)
[1;35mRegistered stack with name 'local_kbflow_stack'.[0m
Stack 'local_kbflow_stack' successfully registered!


Set the local_kubeflow_stack as active:

In [24]:
!zenml stack set local_kbflow_stack

Running with active profile: 'default' (local)
Active stack set to: 'local_kbflow_stack'


## Lets spin the stack up

In [26]:
!kubectl version

Client Version: version.Info{Major:"1", Minor:"24", GitVersion:"v1.24.2", GitCommit:"f66044f4361b9f1f96f0053dd46cb7dce5e990a8", GitTreeState:"clean", BuildDate:"2022-06-15T14:22:29Z", GoVersion:"go1.18.3", Compiler:"gc", Platform:"windows/amd64"}
Kustomize Version: v4.5.4
Server Version: version.Info{Major:"1", Minor:"24", GitVersion:"v1.24.2", GitCommit:"f66044f4361b9f1f96f0053dd46cb7dce5e990a8", GitTreeState:"clean", BuildDate:"2022-06-15T14:15:38Z", GoVersion:"go1.18.3", Compiler:"gc", Platform:"linux/amd64"}




In [27]:
!k3d version

k3d version v5.4.6
k3s version v1.24.4-k3s1 (default)


In [28]:
!zenml stack up

Running with active profile: 'default' (local)
Provisioning resources for active stack 'local_kbflow_stack'.
[1;35mProvisioning resources for stack 'local_kbflow_stack'.[0m


[33mWARN[0m[0000] No node filter specified                     
[31mFATA[0m[0000] Failed Cluster Configuration Validation:  failed to validate volume mount 'C:\Users\vsriniva\AppData\Roaming\zenml:C:\Users\vsriniva\AppData\Roaming\zenml': volume mount destination doesn't appear to be an absolute path: 'C' in 'C:\Users\vsriniva\AppData\Roaming\zenml:C:\Users\vsriniva\AppData\Roaming\zenml' 
Error: Unable to resume resources for KubeflowOrchestrator(type=orchestrator, flavor=kubeflow, name=kubeflow_orchest, uuid=a7eaf90b-7987-4514-8608-1813ec85e0b9, custom_docker_base_image_name=None, kubeflow_pipelines_ui_port=8080, kubeflow_hostname=None, kubernetes_context=k3d-zenml-kubeflow-a7eaf90b, synchronous=False, skip_local_validations=False, skip_cluster_provisioning=False, skip_ui_daemon_provisioning=False): No resources have been provisioned for this component.


Provisioning local Kubeflow Pipelines deployment...
Creating local K3D cluster 'zenml-kubeflow-a7eaf90b'.
Command '['k3d', 'cluster', 'create', 'zenml-kubeflow-a7eaf90b', '--image', 'rancher/k3s:v1.23.5-k3s1', '--registry-create', 'k3d-zenml-kubeflow-registry.localhost:5000', '--registry-config', 'C:\\Users\\vsriniva\\AppData\\Roaming\\zenml\\kubeflow\\a7eaf90b-7987-4514-8608-1813ec85e0b9\\k3d_registry.yaml', '--volume', 'C:\\Users\\vsriniva\\AppData\\Roaming\\zenml:C:\\Users\\vsriniva\\AppData\\Roaming\\zenml']' returned non-zero exit status 1.
Unable to spin up local Kubeflow Pipelines deployment.
If you wish to spin up this Kubeflow local orchestrator manually, please enter the following commands:

> k3d cluster create zenml-kubeflow-a7eaf90b --image rancher/k3s:v1.23.5-k3s1 --registry-create k3d-zenml-kubeflow-registry.localhost:5000 --registry-config C:\Users\vsriniva\AppData\Roaming\zenml\kubeflow\a7eaf90b-7987-4514-8608-1813ec85e0b9\k3d_registry.yaml --volume C:\Users\vsriniva\A

In [29]:
!k3d cluster create zenml-kubeflow-a7eaf90b --image rancher/k3s:v1.23.5-k3s1 --registry-create k3d-zenml-kubeflow-registry.localhost:5000 --registry-config C:\Users\vsriniva\AppData\Roaming\zenml\kubeflow\a7eaf90b-7987-4514-8608-1813ec85e0b9\k3d_registry.yaml --volume C:\Users\vsriniva\AppData\Roaming\zenml:C:\Users\vsriniva\AppData\Roaming\zenml

[33mWARN[0m[0000] No node filter specified                     
[31mFATA[0m[0000] Failed Cluster Configuration Validation:  failed to validate volume mount 'C:\Users\vsriniva\AppData\Roaming\zenml:C:\Users\vsriniva\AppData\Roaming\zenml': volume mount destination doesn't appear to be an absolute path: 'C' in 'C:\Users\vsriniva\AppData\Roaming\zenml:C:\Users\vsriniva\AppData\Roaming\zenml' 


## Write the pipeline to disk

In [None]:
%%writefile run-kubeflow.py
#  Copyright (c) ZenML GmbH 2021. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at:
#
#       https://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
#  or implied. See the License for the specific language governing
#  permissions and limitations under the License.

import logging
import os

import numpy as np
import tensorflow as tf

from zenml.integrations.constants import TENSORFLOW
from zenml.pipelines import pipeline
from zenml.repository import Repository
from zenml.steps import BaseStepConfig, Output, StepContext, step


@step
def importer() -> Output(
    X_train=np.ndarray,
    X_test=np.ndarray,
    y_train=np.ndarray,
    y_test=np.ndarray,
):
    """Download the MNIST data store it as an artifact"""
    (X_train, y_train), (
        X_test,
        y_test,
    ) = tf.keras.datasets.mnist.load_data()
    return X_train, X_test, y_train, y_test


@step
def normalizer(
    X_train: np.ndarray, X_test: np.ndarray
) -> Output(X_train_normed=np.ndarray, X_test_normed=np.ndarray):
    """Normalize digits dataset with mean and standard deviation."""
    X_train_normed = (X_train - np.mean(X_train)) / np.std(X_train)
    X_test_normed = (X_test - np.mean(X_test)) / np.std(X_test)
    return X_train_normed, X_test_normed


class TrainerConfig(BaseStepConfig):
    """Trainer params"""

    epochs: int = 1
    lr: float = 0.001


@step
def trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
    context: StepContext,
    config: TrainerConfig,
) -> tf.keras.Model:
    """Train a neural net from scratch to recognize MNIST digits return our
    model or the learner"""
    model = tf.keras.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(10, activation="relu"),
            tf.keras.layers.Dense(10),
        ]
    )

    log_dir = os.path.join(context.get_output_artifact_uri(), "logs")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(
        log_dir=log_dir, histogram_freq=1
    )

    model.compile(
        optimizer=tf.keras.optimizers.Adam(config.lr),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=["accuracy"],
    )

    model.fit(
        X_train,
        y_train,
        epochs=config.epochs,
        callbacks=[tensorboard_callback],
    )

    return model


@step
def evaluator(
    X_test: np.ndarray,
    y_test: np.ndarray,
    model: tf.keras.Model,
) -> float:
    """Calculate the accuracy on the test set"""

    _, test_acc = model.evaluate(X_test, y_test, verbose=2)
    logging.info(f"Test accuracy: {test_acc}")
    return test_acc


@pipeline(required_integrations=[TENSORFLOW], enable_cache=False)
def mnist_pipeline(
    importer,
    normalizer,
    trainer,
    evaluator,
):
    # Link all the steps together
    X_train, X_test, y_train, y_test = importer()
    X_trained_normed, X_test_normed = normalizer(X_train=X_train, X_test=X_test)
    model = trainer(X_train=X_trained_normed, y_train=y_train)
    evaluator(X_test=X_test_normed, y_test=y_test, model=model)


if __name__ == "__main__":
    # Run the pipeline
    pipeline_instance = mnist_pipeline(
        importer=importer(),
        normalizer=normalizer(),
        trainer=trainer(),
        evaluator=evaluator(),
    )
    pipeline_instance.run()


In [None]:
# Initialize a new pipeline
!python run-kubeflow.py

# Post execution workflow

In [None]:
from zenml.repository import Repository

## Get repo

In [None]:
repo = Repository()

## Pipelines 

In [None]:
pipelines = repo.get_pipelines()

## Retrieve the pipeline

In [None]:
mnist_pipeline = pipelines[0]

## Get the first run

In [None]:
runs = mnist_pipeline.runs  # chronologically ordered
mnist_run = runs[0]

## Get the second run

In [None]:
kubeflow_mnist_run = runs[1]

## Get the steps (note the first step name is different)

In [None]:
mnist_run.steps

In [None]:
kubeflow_mnist_run.steps

## Check the results of the evaluator and compare

In [None]:
mnist_eval_step = mnist_run.get_step(step='evaluator')
kubeflow_mnist_eval_step = kubeflow_mnist_run.get_step(step='evaluator')

In [None]:
# One output is simply called `output`, multiple is a dict called `outputs`.
mnist_eval_step.output.read()

In [None]:
kubeflow_mnist_eval_step.output.read()

# Congratulations!

… and that's it!. If you came here without a hiccup, you must have successly installed ZenML, set up a ZenML repo, configured a training pipeline, executed it and evaluated the results. You have also deployed said pipeline to a production MLOps stack from right within your notebook! Hurray!

However, if you had a hiccup or you have some suggestions/questions regarding our framework, you can always check our [docs](https://docs.zenml.io/) or our [Github](https://github.com/zenml-io/zenml) or even better join us on our [Slack channel](https://zenml.io/slack-invite).

Cheers!

For more detailed information on all the components and steps that went into this short example, please continue reading [our more detailed documentation pages](https://docs.zenml.io/).