# 🌍 Overview

This demo is a minimalistic MLOps project intended to showcase how to put ML workflows in production. It features: 

- A feature engineering pipeline that loads data and prepares it for training.
- A training pipeline that loads the preprocessed dataset and trains a model.
- A batch inference pipeline that runs predictions on the trained model with new data.
- A stack switching and leveraging of Sagemaker step operator to outsource training to Cloud
- An analysis of training artifacts and their lineage (including connection with W&B)

<img src="_assets/pipeline_overview.png" width="50%" alt="Pipelines Overview">

# 👶 Step 0. Install Requirements

Let's install ZenML to get started. First we'll install the latest version of
ZenML as well as the `sklearn` and `xgboost` integration of ZenML:

In [None]:
! pip3 install -r requirements.txt
! zenml integration install sklearn xgboost -y
! zenml connect --url https://1cf18d95-zenml.cloudinfra.zenml.io 
! zenml model delete breast_cancer_classifier -y

import IPython
IPython.Application.instance().kernel.do_shutdown(restart=True)

In [None]:
# Initialize ZenML and set the default stack
!zenml init
!zenml stack set local-sagemaker-step-operator-wandb

In [None]:
!zenml stack describe local-sagemaker-step-operator-wandb

In [None]:
# Do the imports at the top
from zenml import Model
from zenml.client import Client
from zenml.logger import get_logger

from pipelines import training, inference

logger = get_logger(__name__)

# Initialize the ZenML client to fetch objects from the ZenML Server
client = Client()

# ⌚ Step 1: Training pipeline

Now that we have our data it makes sense to train some models to get a sense of
how difficult the task is. The Breast Cancer dataset is sufficiently large and complex 
that it's unlikely we'll be able to train a model that behaves perfectly since the problem 
is inherently complex, but we can get a sense of what a reasonable baseline looks like.

We'll start with two simple models, a SGD Classifier and a Random Forest
Classifier, both batteries-included from `sklearn`. We'll train them both on the
same data and then compare their performance.

<img src="_assets/cloud_mcp.png" width="60%" alt="Model Control Plane">

In [None]:
# let's have a look at model training step
%pycat steps/model_evaluator.py

Our two training steps both return different kinds of classifier
models, so we use the generic `ClassifierMixin` type hint for the return type.

ZenML allows you to load any version of any dataset that is tracked by the framework
directly into a pipeline using the `Client().get_artifact_version` interface. This is very convenient
in this case, as we'd like to send our preprocessed dataset from the older pipeline directly
into the training pipeline.

In [None]:
# let's have a look at training pipeline
%pycat pipelines/training.py

The end goal of this quick baseline evaluation is to understand which of the two
models performs better. We'll use the `evaluator` step to compare the two
models. This step takes in the model from the trainer step, and computes its score
over the testing set.

Soon you will see that it is relatively easy to train ML models using ZenML pipelines. But it can be somewhat clunky to track
all the models produced as you develop your experiments and use-cases. Luckily, ZenML offers a *Model Control Plane*,
which is a central register of all your ML models.

You can easily create a ZenML Model and associate it with your pipelines using the `Model` object:

In [None]:
pipeline_settings = {}

# Lets add some metadata to the model to make it identifiable
pipeline_settings["model"] = Model(
    name="breast_cancer_classifier",
    license="Apache 2.0",
    description="A breast cancer classifier",
)

In [None]:
# Let's train the XGBoost model and tag the version name with "xgboost"
pipeline_settings["model"].tags = ["breast_cancer", "classifier", "xgboost"]

# Use an XGBoost model with fixed seed.
training.with_options(enable_cache=False,**pipeline_settings)(
    model_type="xgboost",
    random_state=42
)

xgboost_run = client.get_pipeline("training").last_run

In [None]:
# Let's train the SGD model and tag the version name with "sgd"
pipeline_settings["model"].tags = ["breast_cancer", "classifier", "sgd"]

# Use a SGD classifier
sgd_run = training.with_options(enable_cache=True,**pipeline_settings)(
    model_type="sgd",
    random_state=42
)

sgd_run = client.get_pipeline("training").last_run

You can see from the logs already how our model training went: the
`XGBClassifier` performed considerably better than the `SGDClassifier`.
We can use the ZenML `Client` to verify this:

In [None]:
# The evaluator returns a float value with the accuracy
xgboost_run.steps["model_evaluator"].output.load() >= sgd_run.steps["model_evaluator"].output.load()

Running both pipelines has created two associated **model versions**.
You can list your ZenML model and their versions as follows:

In [None]:
zenml_model = client.get_model("breast_cancer_classifier")

versions = zenml_model.versions

print(f"Model {zenml_model.name} has {len(versions)} versions")

print(f"The latest two versions are: {versions[-2].version, versions[-1].version}")

The interesting part is that ZenML went ahead and linked all artifacts produced by the
pipelines to that model version, including the two pickle files that represent our
SGD and RandomForest classifier. We can see all artifacts directly from the model
version object:

In [None]:
# Let's load the XGBoost version
xgboost_zenml_model_version = client.list_model_versions("breast_cancer_classifier", tag="xgboost")[-1]

# We can now load our classifier directly as well
xgboost_classifier = xgboost_zenml_model_version.get_artifact("breast_cancer_classifier").load()

xgboost_classifier

If you are a [ZenML Cloud](https://zenml.io/cloud) user, you can see all of this visualized in the dashboard:

<img src="_assets/cloud_mcp_screenshot.png" width="70%" alt="Model Control Plane">

There is a lot more you can do with ZenML models, including the ability to
track metrics by adding metadata to it, or having them persist in a model
registry. However, these topics can be explored more in the
[ZenML docs](https://docs.zenml.io).

For now, we will use the ZenML model control plane to promote our best
model to `production`. You can do this by simply setting the `stage` of
your chosen model version to the `production` tag.

In [None]:
# Set our best classifier to production
xgboost_zenml_model_version.set_stage("production", force=True)

Of course, normally one would only promote the model by comparing to all other model
versions and doing some other tests. But that's a bit more advanced use-case. See the
[e2e_batch example](https://github.com/zenml-io/zenml/tree/main/examples/e2e) to get
more insight into that sort of flow!

Once the model is promoted, we can now consume the right model version in our
batch inference pipeline directly. Let's see how that works.

# 🫅 Step 2: Consuming the model in production

The batch inference pipeline simply takes the model marked as `production` and runs inference on it
with `live data`. The critical step here is the `inference_predict` step, where we load the model in memory
and generate predictions:

<img src="_assets/inference_pipeline.png" width="45%" alt="Inference pipeline">

In [None]:
# let's have a look at training pipeline
%pycat steps/inference_predict.py


Apart from the loading the model, we must also load the preprocessing pipeline that we ran in feature engineering,
so that we can do the exact steps that we did on training time, in inference time. Let's bring it all together:

In [None]:
# let's have a look at training pipeline
%pycat pipelines/inference.py

The way to load the right model is to pass in the `production` stage into the `Model` config this time.
This will ensure to always load the production model, decoupled from all other pipelines:

In [None]:
pipeline_settings = {"enable_cache": False}

# Lets add some metadata to the model to make it identifiable
pipeline_settings["model"] = Model(
    name="breast_cancer_classifier",
    version="production", # We can pass in the stage name here!
)

In [None]:
# the `with_options` method allows us to pass in pipeline settings
#  and returns a configured pipeline
inference.with_options(**pipeline_settings)(random_state=42, target="target")

ZenML automatically links all artifacts to the `production` model version as well, including the predictions
that were returned in the pipeline. This completes the MLOps loop of training to inference:

In [None]:
# Fetch production model
production_model_version = client.get_model_version("breast_cancer_classifier", "production")

# Get the predictions artifact
production_model_version.get_artifact("predictions").load()

You can also see all predictions ever created as a complete history in the dashboard:

<img src="_assets/cloud_mcp_predictions.png" width="70%" alt="Model Control Plane">

# 🐙 Step 3: Analyzing results

In [None]:
sgd_model_version = client.list_model_versions("breast_cancer_classifier",tag="sgd")[-1]
xgboost_model_version = client.list_model_versions("breast_cancer_classifier",tag="xgboost")[-1]
print(f"SGD version is staged as `{sgd_model_version.stage}`")
print(f"XGBoost version is staged as `{xgboost_model_version.stage}`")

At first, let's pull some meta information collected during models evaluation stage. To recall we used this step as evaluator:
```python
@step
def model_evaluator(
    model: ClassifierMixin,
    dataset_trn: pd.DataFrame,
    dataset_tst: pd.DataFrame,
    min_train_accuracy: float = 0.0,
    min_test_accuracy: float = 0.0,
    target: Optional[str] = "target",
) -> float:
    # Calculate the model accuracy on the train and test set
    trn_acc = model.score(...)
    tst_acc = model.score(...)

    ...
    
    predictions = model.predict(dataset_tst.drop(columns=[target]))
    metadata = {
        "train_accuracy": float(trn_acc),
        "test_accuracy": float(tst_acc),
        "confusion_matrix": confusion_matrix(dataset_tst[target], predictions)
        .ravel()
        .tolist(),
    }
    log_model_metadata(metadata={"wandb_url": wandb.run.url})
    log_artifact_metadata(
        metadata=metadata,
        artifact_name="breast_cancer_classifier",
    )

    wandb.log({"train_accuracy": metadata["train_accuracy"]})
    wandb.log({"test_accuracy": metadata["test_accuracy"]})
    wandb.log(
        {
            "confusion_matrix": wandb.sklearn.plot_confusion_matrix(
                dataset_tst[target], predictions, ["No Cancer", "Cancer"]
            )
        }
    )
    return float(tst_acc)
```
First we pull Accuracy metrics out of both model version for comparison:

In [None]:
sgd_clf_metadata = sgd_model_version.get_artifact("breast_cancer_classifier").run_metadata
xgboost_clf_metadata = xgboost_model_version.get_artifact("breast_cancer_classifier").run_metadata
print(f"SGD{' (production)' if sgd_model_version.stage == 'production' else ''} metrics: train={sgd_clf_metadata['train_accuracy'].value*100:.2f}% test={sgd_clf_metadata['test_accuracy'].value*100:.2f}%")
print(f"XGBoost{' (production)' if xgboost_model_version.stage == 'production' else ''} metrics: train={xgboost_clf_metadata['train_accuracy'].value*100:.2f}% test={xgboost_clf_metadata['test_accuracy'].value*100:.2f}%")

Now lets' plot collected Confusion Matrixes:

In [None]:
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

def plot_confusion_matrix(metadata_pointer, tp: str,ax):
    confusion_matrix = np.array(metadata_pointer["confusion_matrix"].value, dtype=float).reshape((2,2))
    confusion_matrix /= np.sum(confusion_matrix)
    sns.heatmap(confusion_matrix, annot=True,fmt='.2%',cmap="coolwarm",ax=ax)
    ax.set_title(f"{tp} confusion matrix")
    ax.set_ylabel("Ground Label")
    ax.set_xlabel("Predicted Label")

fig, ax = plt.subplots(1,2,figsize=(15,4))
plot_confusion_matrix(sgd_clf_metadata, "SGD",ax[0])
plot_confusion_matrix(xgboost_clf_metadata, "RF",ax[1])

So far we were able to collect all the information we tracked using Model Control Plane, but we also had Weights&Biases tracking enabled - let's dive into.

Thanks to Model Control Plane metadata we establish a nice connection between those 2 entities:

In [None]:
print(f'SGD version: {sgd_model_version.run_metadata["wandb_url"].value}')
print(f'RF version: {xgboost_model_version.run_metadata["wandb_url"].value}')

With Model Control Plane we can also easily track lineage of artifacts and pipeline runs:

In [None]:
for artifact_name, versions in sgd_model_version.data_artifacts.items():
    if versions:
        print(f"Existing version of `{artifact_name}`:")
        for version_name, artifact_ in  versions.items():
            print(version_name, artifact_.data_type.attribute)

In [None]:
for run_name, run_ in sgd_model_version.pipeline_runs.items():
    print(run_name, run_.id)

# 🙏 Step 4: Switching to production

Let's run all the moving pieces we navigated in the previous steps using production ready python script `run.py`

The next thing we want to do is to switch to production on the cloud

<img src="_assets/sagemaker_stack.png" width="60%" alt="Sagemaker step_op stack">

In [None]:
!zenml stack set sagemaker-pipelines-wandb
!zenml stack describe sagemaker-pipelines-wandb

In [None]:
!python3 run.py --training-pipeline