# Machine Learning PipeLines with ZenML

<em><b>Key Concepts:</b> ML pipelines, Steps</em>
<br>
In this notebook, the task is to understand how to convert existing ML code into ML pipelines using ZenML
<br>
Since models are build with skilearn, ZenML sklearn intergration should be installed.
<br>
Commands to install are the following: 

In [1]:
%pip install "zenml[server]"
!zenml integration install skilearn -y
%pip install pyparsing==2.4.2  # Required for colab

import IPython

# Automatically restart kernel 
IPython.Application.instance().kernel.do_shutdown(restart=True)

^C
Note: you may need to restart the kernel to use updated packages.


^C[1;35mNumExpr defaulting to 8 threads.[0m



  "class": algorithms.Blowfish,


<b>Colab Note</b>: you need an ngrok account to view some of the visualizations later. 
<br>
Open account and set user token

In [1]:
NGROK_TOKEN = '2ScQF8vFC6fu1XJawG27xxNRe4X_GRWUz1xzqs2re3uKvSxq'

Colab setup

In [3]:
%pip install zenml
from zenml.environment import Environment

if Environment.in_google_colab():
    !pip install pyngrok
    !ngrok authtoken {NGROK_TOKEN}

Note: you may need to restart the kernel to use updated packages.


<b>ML Pipeline</b> is simply an extension, including other steps you would typically do before or after building a model, like data acquisition, preprocessing, model deployment, or monitoring. ML pipleline defines a step-by-step procedure of you work.
<br>
Defining ML pipeline in code is great because:
<br>  

* we can easily rerun all our work, not just the model, eliminating bugs and making our models easier to reproduce.
* Data and models can be versioned and tracked, so we can see at a glance which dataset a model was trained on and how it compares to other models
* If the entire pipelins is coded up, we can automate many operational tasks, like retraining and redoploying models when underlying problem or data changes or rolling out new and improved models with CI/CD workflows.


# ZenML setup
ML pipelines are defined using ZenMl. ZenML is an excellent tool for this task, as it is straightforward and intuitive to use and has intergrations with most of the advanced MLOPs tools.
<br>
start a fresh ML stack

In [3]:
!Remove-Item -Path "C:\Users\KEVIN\Documents\Books and stuff\Machine Learning\Machine-Learning-Operations\.zen\" -Force
!zenml init

'Remove-Item' is not recognized as an internal or external command,
operable program or batch file.


[1;35mNumExpr defaulting to 8 threads.[0m
Found existing ZenML repository at path 'c:\Users\KEVIN\Documents\Books and 
stuff\Machine Learning\Machine-Learning-Operations'.
Found existing ZenML repository at path 'c:\Users\KEVIN\Documents\Books and 
stuff\Machine Learning\Machine-Learning-Operations'.


  "class": algorithms.Blowfish,
+--------------------- Traceback (most recent call last) ---------------------+
| C:\Users\KEVIN\Documents\anaconda3\Lib\site-packages\zenml\cli\base.py:239  |
| in init                                                                     |
|                                                                             |
|   236                                                                       |
|   237     with console.status(f"Initializing ZenML repository at {path}.\n" |
|   238         try:                                                          |
| > 239             Client.initialize(root=path)                              |
|   240             declare(f"ZenML repository initialized at {path}.")       |
|   241         except InitializationException as e:                          |
|   242             declare(f"{e}")                                           |
|                                                                             |
| C:\Use

# Example Experimental ML Code
In this code, we train a sci-kit learn SVC classifier to classify images of handwritten digits. We load the data, train a model on the training data, then test it on the test set.

In [2]:
import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split

def train_test() -> None:
    """Train and test a scikit-learn svc classifier on digits"""
    digits = load_digits()
    data = digits.images.reshape((len(digits.images), -1))
    X_train, X_test, y_train, y_test = train_test_split(
        data, digits.target, test_size=0.2, shuffle=False
    )
    model = SVC(gamma=0.001)
    model.fit(X_train, y_train)
    test_acc = model.score(X_test, y_test)
    print(f"Test accuracy: {test_acc}")

train_test()

Test accuracy: 0.9583333333333334


# Turning Experiments into ML pipelines with ZenML
We can identify three steps in our example: data loading, model training, and model evaluation. We define each of them as a ZenML pipeline step simply by moving each step to its own function and decorating them with ZenML's @step python decorator

In [6]:
from zenml import step
from typing_extensions import Annotated
import pandas as pd
from typing import Tuple

@step
def importer() -> Tuple[
    Annotated[np.ndarray, "X_train"],
    Annotated[np.ndarray, "X_test"],
    Annotated[np.ndarray, "y_train"],
    Annotated[np.ndarray, "y_test"],
]:
    """Load the digits dataset as numpy arrays"""
    digits = load_digits()
    data = digits.images.reshape((len(digits.images), -1))
    X_train, X_test, y_train, y_test = train_test_split(
        data, digits.target, test_size=0.2, shuffle=False
    )
    return X_train, X_test, y_train, y_test

@step
def svc_trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
) -> ClassifierMixin:
    """Train an sklearn SVC classifier."""
    model = SVC(gamma=0.001)
    model.fit(X_train, y_train)
    return model

@step
def evaluator(
    X_test:np.ndarray,
    y_test:np.ndarray,
    model: ClassifierMixin
) -> float:
    """Calculate the test set accuracy of an sklearn model"""
    test_acc = model.score(X_test, y_test)
    print(f"Test accuracy: {test_acc}")
    return test_acc

Similary, we can user ZenML`s @pipelne decorator to connect all of our steps into an ML pipeline.
<br>
The pipeline definition does not depend on the concrete step functions we defined above; it merely establishes a recipe for how data moves through the steps. So can replace steps as we wish.

In [7]:
from zenml import pipeline

@pipeline
def digits_pipeline():
    """Links the all the steps together in a pipeline"""
    X_train, X_test, y_train, y_test = importer()
    model = svc_trainer(X_train=X_train, y_train=y_train)
    evaluator(X_test=X_test, y_test=y_test, model=model)

# Running ZenML Pipelines
we initialize our pipeline with concrete step functions and call run() method to run it.

In [8]:
digits_svc_pipeline = digits_pipeline()
# digits_svc_pipeline.run(unlisted=True) 

[1;35mInitiating a new run for the pipeline: [0m[1;36mdigits_pipeline[1;35m.[0m


None of PyTorch, TensorFlow >= 2.0, or Flax have been found. Models won't be available and only tokenizers, configuration and file/data utilities can be used.


[1;35mRegistered new version: [0m[1;36m(version 1)[1;35m.[0m
[1;35mExecuting a new run.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mdefault[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35mStep [0m[1;36mimporter[1;35m has started.[0m
[1;35mStep [0m[1;36mimporter[1;35m has finished in [0m[1;36m2.614s[1;35m.[0m
[1;35mStep [0m[1;36msvc_trainer[1;35m has started.[0m
[1;35mStep [0m[1;36msvc_trainer[1;35m has finished in [0m[1;36m0.233s[1;35m.[0m
[1;35mStep [0m[1;36mevaluator[1;35m has started.[0m
Test accuracy: 0.9583333333333334
[1;35mStep [0m[1;36mevaluator[1;35m has finished in [0m[1;36m0.201s[1;35m.[0m
[1;35mRun [0m[1;36mdigits_pipeline-2023_12_13-19_21_41_844552[1;35m has finished in [0m[1;36m3.425s[1;35m.[0m
[1;35mYou can visualize your pipeline runs in the [0m[1;36mZenML Dashboard[1;35m. In order to try it locally

To visualize the pipeline run ZenML's dashboard.

In [None]:
from zenml.environment import Environment

def start_zenml_dashboard(port=8237):
    if Environment.in_google_colab():
        from pyngrok import ngrok

        public_url = ngrok.connect(port)
        print(f"\xlb[31mIn Colab, use this URL instead: {public_url}!\x1b[0m")
        !zenml up --blocking --port {port}

    else:
        !zenml up --port {port}

start_zenml_dashboard()