<a href="https://colab.research.google.com/github/ghisford/Machine_learning/blob/main/MLOps_zenML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [26]:
!pip install "zenml[server]"
!zenml integration install sklearn -y
!pip install pyparsing==2.4.7 # required for colab

import IPython

# automatically restart kernel
IPython.Application.instance().kernel.do_shutdown(restart= True)

[1;35mNumExpr defaulting to 2 threads.[0m
[2K[32m⠇[0m Installing integrations...


{'status': 'ok', 'restart': True}

In [27]:
!pip install kaleido




In [1]:
!pip install sqlalchemy>=2.0

In [None]:
!pip install tensorflow-probability --upgrade

In [None]:

!pip install typing-extensions==4.7.0

On colab, you need an ngrok account to view some of the visualizations later.
Create an account and copy your authtoken

# Example Experimentation ML Code
This is how you would normally write your ML code and feel like a superstar

In [2]:
import numpy as np
from sklearn.base import ClassifierMixin
from sklearn.svm import SVC
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split


def train_test() -> None:
  """ Train and test a Scikit-learn SVC classifier on digits"""
  digits = load_digits()
  data = digits.images.reshape((len(digits.images), -1))
  X_train, X_test, y_train, y_test = train_test_split(data, digits.target, test_size= 0.2,
                                                      shuffle = False)
  model = SVC(gamma = 0.001)
  model.fit(X_train, y_train)
  test_acc = model.score(X_test, y_test)
  print(f"Test accuracy: {test_acc}")

train_test()

Test accuracy: 0.9583333333333334


# Turning experiments into ML pipelines with ZenML
In practice, ML workflows are much more complicated than that. you need to write code that you will use multiple times for this like preprocessing, and evaluation using different datasets and models.

In [3]:
!rm -rf .zen
!zenml init

[1;35mNumExpr defaulting to 2 threads.[0m
[?25l[1;35mInitializing the ZenML global configuration version to 0.50.0[0m
[32m⠋[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠙[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠹[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠸[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠼[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠴[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠇[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠏[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠋[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠙[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠹[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠸[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠼[0m Initializing ZenML repository at /content.
[2K[1A[2K[32m⠴[0m Initiali

In [11]:
from zenml import step
from typing_extensions import Annotated
import pandas as pd
from typing import Tuple

@step
def importer() -> Tuple[
    Annotated[np.ndarray, "X_train"],
    Annotated[np.ndarray, "X_test"],
    Annotated[np.ndarray, "y_train"],
    Annotated[np.ndarray, "y_test"]
]:

  """ Load the digits dataset as numpy arrays """
  digits = load_digits()
  data = digits.images.reshape((len(digits.images), -1))
  X_train, X_test, y_train, y_test = train_test_split(data, digits.target,
                                                      test_size= 0.2, shuffle= False)

  return X_train, X_test, y_train, y_test




@step

def svc_trainer(
    X_train: np.ndarray,
    y_train: np.ndarray,
) -> ClassifierMixin:
  """Train an sklearn SVC classifier"""
  model = SVC(gamma= 0.001)
  model.fit(X_train, y_train)
  return model


@step

def evaluator(
 X_test: np.ndarray,
 y_test: np.ndarray,
 model: ClassifierMixin,

) -> float:
  """Calculate the test set accuracy of an sklearn model"""
  test_acc = model.score(X_test, y_test)
  print(f"Test accuracy: {test_acc}")
  return test_acc


1. Next we use ZenML's pipeline decorator to connect all of our steps into an ML pipeline.
2. The pipeline definition does not depend on the concrete step functions we defined above. It merely establishes a recipe for how data moves through the steps.
3. This means we can replace steps as we wish e.g. run the pipeline with different models to compare their performance

In [5]:
from zenml import pipeline

@pipeline
def digits_pipeline():
  """Links all the steps together in a pipeline"""
  X_train, X_test, y_train, y_test = importer()
  model = svc_trainer(X_train= X_train, y_train= y_train)
  evaluator(X_test= X_test, y_test= y_test, model= model)



# Running ZenML Pipelines
Finally, we initialize our pipeline with concrete step functions and call the run() method to run it

In [23]:
digits_svc_pipeline= digits_pipeline()
digits_svc_pipeline.run(unlisted= True)

[1;35mInitiating a new run for the pipeline: [0m[1;36mdigits_pipeline[1;35m.[0m
[1;35mReusing registered version: [0m[1;36m(version: 2)[1;35m.[0m
[1;35mExecuting a new run.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mdefault[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35mUsing cached version of [0m[1;36mimporter[1;35m.[0m
[1;35mStep [0m[1;36mimporter[1;35m has started.[0m
[1;35mUsing cached version of [0m[1;36msvc_trainer[1;35m.[0m
[1;35mLinking artifact [0m[1;36moutput[1;35m to model [0m[1;36mNone[1;35m version [0m[1;36mNone[1;35m implicitly.[0m
[1;35mStep [0m[1;36msvc_trainer[1;35m has started.[0m
[1;35mUsing cached version of [0m[1;36mevaluator[1;35m.[0m
[1;35mLinking artifact [0m[1;36moutput[1;35m to model [0m[1;36mNone[1;35m version [0m[1;36mNone[1;35m implicitly.[0m
[1;35mStep [0m[1;36mevaluator[1;35m h

AttributeError: ignored

## CONGRATULATIONS! YOU ARE THE INSPIRATION OF THE YOUTH, VOICE OF THE MONEY, THE KINGKONG.
That, my friend, was your first ever pipeline

Let us visualize the pipeline we have just run in zenml's dashboard. To do so, run
##zenml up
to spin up a zenml dashboard locally.
log in with username "default" and empty password and navigate to the Runs' tab in the pipelines section

In [19]:
from zenml.environment import Environment
if Environment.in_google_colab():
  !pip install pyngrok
  !ngrok authtoken {2ZDxA4sbG2R6D4tOBT5kcwOw27g_5mzhhxuRUD1hevmY8F8XW}

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [21]:
!ngrok config add-authtoken 2ZDxA4sbG2R6D4tOBT5kcwOw27g_5mzhhxuRUD1hevmY8F8XW

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [24]:
from zenml.environment import Environment


def start_zenml_dashboard(port= 8237):
  if Environment.in_google_colab():
    from pyngrok import ngrok
    public_url = ngrok.connect(port)
    print(r"\xlb[3lIn Colab, use this URL instead: {public_url}:\xrb[On")
    !zenml up --blocking --port {port}

  else:
    !zenml up --port {port}

start_zenml_dashboard()

INFO:pyngrok.ngrok:Opening tunnel named: http-8237-ca09df94-76e8-4199-995b-539b168cefc1


[1;35mOpening tunnel named: http-8237-ca09df94-76e8-4199-995b-539b168cefc1[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:54+0000 lvl=info msg="no configuration paths supplied"


[1;35mt=2023-12-07T18:19:54+0000 lvl=info msg="no configuration paths supplied"[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:54+0000 lvl=info msg="using configuration at default config path" path=/root/.config/ngrok/ngrok.yml


[1;35mt=2023-12-07T18:19:54+0000 lvl=info msg="using configuration at default config path" path=/root/.config/ngrok/ngrok.yml[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:54+0000 lvl=info msg="open config file" path=/root/.config/ngrok/ngrok.yml err=nil


[1;35mt=2023-12-07T18:19:54+0000 lvl=info msg="open config file" path=/root/.config/ngrok/ngrok.yml err=nil[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:54+0000 lvl=info msg="starting web service" obj=web addr=127.0.0.1:4040 allow_hosts=[]


[1;35mt=2023-12-07T18:19:54+0000 lvl=info msg="starting web service" obj=web addr=127.0.0.1:4040 allow_hosts=[][0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg="client session established" obj=tunnels.session obj=csess id=17904b327b89


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg="client session established" obj=tunnels.session obj=csess id=17904b327b89[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg="tunnel session started" obj=tunnels.session


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg="tunnel session started" obj=tunnels.session[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg=start pg=/api/tunnels id=bfba45402e277ebe


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg=start pg=/api/tunnels id=bfba45402e277ebe[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg=end pg=/api/tunnels id=bfba45402e277ebe status=200 dur=401.091µs


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg=end pg=/api/tunnels id=bfba45402e277ebe status=200 dur=401.091µs[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg=start pg=/api/tunnels id=c9523dd387e45464


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg=start pg=/api/tunnels id=c9523dd387e45464[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg=end pg=/api/tunnels id=c9523dd387e45464 status=200 dur=113.07µs


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg=end pg=/api/tunnels id=c9523dd387e45464 status=200 dur=113.07µs[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg=start pg=/api/tunnels id=1bd8f2639970ba4d


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg=start pg=/api/tunnels id=1bd8f2639970ba4d[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg="started tunnel" obj=tunnels name=http-8237-ca09df94-76e8-4199-995b-539b168cefc1 addr=http://localhost:8237 url=https://2566-34-90-54-97.ngrok-free.app


\xlb[3lIn Colab, use this URL instead: {public_url}:\xrb[On
[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg="started tunnel" obj=tunnels name=http-8237-ca09df94-76e8-4199-995b-539b168cefc1 addr=http://localhost:8237 url=https://2566-34-90-54-97.ngrok-free.app[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:19:55+0000 lvl=info msg=end pg=/api/tunnels id=1bd8f2639970ba4d status=201 dur=112.847144ms


[1;35mt=2023-12-07T18:19:55+0000 lvl=info msg=end pg=/api/tunnels id=1bd8f2639970ba4d status=201 dur=112.847144ms[0m
[1;35mNumExpr defaulting to 2 threads.[0m
[1;35mDeploying a local ZenML server with name 'local'.[0m
[1;35mStarting ZenML Server as blocking process... press CTRL+C once to stop it.[0m
[32mINFO[0m:     Started server process [[36m48421[0m]
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Uvicorn running on [1mhttp://127.0.0.1:8237[0m (Press CTRL+C to quit)


INFO:pyngrok.process.ngrok:t=2023-12-07T18:20:37+0000 lvl=info msg="received stop request" obj=app stopReq="{err:<nil> restart:false}"


[1;35mt=2023-12-07T18:20:37+0000 lvl=info msg="received stop request" obj=app stopReq="{err:<nil> restart:false}"[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:20:37+0000 lvl=info msg="session closing" obj=tunnels.session err=nil


[1;35mt=2023-12-07T18:20:37+0000 lvl=info msg="session closing" obj=tunnels.session err=nil[0m


INFO:pyngrok.process.ngrok:t=2023-12-07T18:20:37+0000 lvl=info msg="accept failed" obj=tunnels.session obj=csess id=17904b327b89 err="reconnecting session closed"


[1;35mt=2023-12-07T18:20:37+0000 lvl=info msg="accept failed" obj=tunnels.session obj=csess id=17904b327b89 err="reconnecting session closed"[0m
[32mINFO[0m:     Shutting down
[32mINFO[0m:     Finished server process [[36m48421[0m]
[31mERROR[0m:    Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 686, in lifespan
    await receive()
  File "/usr/local/lib/python3.10/dist-packages/uvicorn/lifespan/on.py", line 137, in receive
    return await self.receive_queue.get()
  File "/usr/lib/python3.10/asyncio/queues.py", line 159, in get
    await getter
asyncio.exceptions.CancelledError

