In [1]:
%%HTML
<h1>End-to-end TF KubeFlow pipleline on Vertex AI</h1>

<img width=800 src='images/architecture.png'/>

This is the <a href='https://github.com/GoogleCloudPlatform/vertex-pipelines-end-to-end-samples'>example</a> of architecture we are building overall. In this notebook, we are creating and running the KubeFlow pipeline on Vertex AI where data preparation to training is done in TensorFlow .

In [1]:
%%HTML
<h2>Useful links</h2>

This is going to be based on example mentioned in the links:
<br/>
<a href="https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline">It shows how to submit the KubeFlow pipeline to Vertex AI</a>
<br/>
<a href="https://medium.com/@lorenzo.colombi/kubeflow-pipeline-v2-tutorial-end-to-end-mnist-classifier-example-dc66714c2649"> KubeFlow pipeline code example </a>
<br/>
<a href="https://drive.google.com/file/d/1vykV30ic2CrJxTnD9Cie1ISEfTgczPKm/view?usp=drive_link">Shows how to compile</a>
<br/>
<a href="https://medium.com/@outsidenoxvodafone/how-to-use-google-cloud-vertex-ai-to-build-a-ml-pipeline-using-kubeflow-de61efa26fb3">Good example</a> 


In [3]:
%%bash

pip install kfp>2

In [1]:
import os
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics, ClassificationMetrics

In [17]:
%%HTML

<h3> Project Config</h3>

In [2]:
PIPELINE_NAME = 'mnist-vertex-pipelines'
GCS_BUCKET_NAME = 'mlops-heavy-workflow-bucket'

PROJECT_ID = 'mlops-heavy-workflow-project'
LOCATION = 'us-central1'
JOBID = f"training-pipeline-1"
ENABLE_CACHING = False

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: gs://mlops-heavy-workflow-bucket/pipeline_root/mnist-vertex-pipelines


In [19]:
%%HTML
<h3>Load Dataset</h3>
In this step, we simply load the MNIST dataset from Keras. In this example, we use Kubeflow input/output artifacts. Artifacts are used to pass data between pipeline steps and are inspectable in the DAG (Directed Acyclic Graph) UI pipeline representation. Both input and output artifacts have to be declared as function parameters.
<br/>

For this step, we set Tensorflow as the base container image. Let’s remember how every step (Python function) will be encapsulated in a container. So, we need to choose an image that has all the necessary modules installed, or specify which modules are needed.



In [4]:
@dsl.component(base_image="tensorflow/tensorflow", packages_to_install=['google-cloud-storage'])
def load_dataset(x_train_artifact: Output[Dataset], x_test_artifact: Output[Dataset],y_train_artifact: Output[Dataset],y_test_artifact: Output[Dataset], bucket_name:str):
    
    import numpy as np
    from tensorflow import keras
    import os
    from tensorflow.python.lib.io import file_io
    import pickle
    from google.cloud import storage
   
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
    
    # we needed to use FILEIO as the open('file_name') was throwing some errors
    np.save(file_io.FileIO(x_train_artifact.uri, 'w'),x_train)
    np.save(file_io.FileIO(x_test_artifact.uri, 'w'),x_test)
    np.save(file_io.FileIO(y_train_artifact.uri, 'w'),y_train)
    np.save(file_io.FileIO(y_test_artifact.uri, 'w'),y_test)
    

In [20]:
%%HTML
<h3> Preprocessing / Feature engineering</h3>
This step takes as input the Dataset object created by the previous step. It’s essentially a simplified mock version of a preprocessing stage, performing actions like reshaping and resizing the arrays. I chose to include this step to make the pipeline more realistic. Afterwards, the dataset is saved using an output Artifact, once again.
<br/>
Here, we use for the first time Metrics object as output. This is a simple artifact object for storing key-value scalar metrics. I



In [6]:
@dsl.component(base_image="tensorflow/tensorflow")
def preprocessing(metrics : Output[Metrics], x_train_processed : Output[Dataset], x_test_processed: Output[Dataset],
                  x_train_artifact: Input[Dataset], x_test_artifact: Input[Dataset]):
    ''' 
    just reshape and normalize data
    '''
    import numpy as np
    import os
    from io import BytesIO
    import tensorflow as tf
    from tensorflow.python.lib.io import file_io
    
    # load data artifact store

    x_train = np.load(BytesIO(file_io.read_file_to_string(x_train_artifact.uri, binary_mode=True)))
    x_test = np.load(BytesIO(file_io.read_file_to_string(x_test_artifact.uri, binary_mode=True)))

    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)
    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    #logging metrics using Kubeflow Artifacts
    metrics.log_metric("Len x_train", x_train.shape[0])
    metrics.log_metric("Len y_train", x_test.shape[0])
   
    
    # save feuture in artifact store
    np.save(file_io.FileIO(x_train_processed.uri, 'w'),x_train)
    np.save(file_io.FileIO(x_test_processed.uri, 'w'),x_test)


In [7]:
%%HTML

Artifacts serve a dual purpose: they facilitate the transfer of data between steps and provide a means to display data within the UI. Moreover, all the artifacts are saved permanently in the metadata store.

In [21]:
%%HTML

<h3> Model Definition</h3>

The next step’s goal is to specify the neural network architecture. This way, if we will want to change the model structure in the future we will only need to change this component. Other components are indipendent and they use the model as defined here.

In [9]:
@dsl.component(base_image="tensorflow/tensorflow")
def model_building(ml_model : Output[Model]):
    '''
    Define the model architecture
    This way it's more simple to change the model architecture and all the steps and indipendent
    '''
    from tensorflow import keras
    import tensorflow as tf
    import os
    from pathlib import Path
    
    #model definition
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax'))
    
    #saving model
    model.save(ml_model.uri+".keras")

In [22]:
%%html

<h3>Model Training</h3>


In [10]:
@dsl.component(base_image="tensorflow/tensorflow", packages_to_install=['scikit-learn'])
def model_training(
    ml_model : Input[Model],
    x_train_processed : Input[Dataset], x_test_processed: Input[Dataset],
    y_train_artifact : Input[Dataset], y_test_artifact :Input[Dataset],
    metrics: Output[Metrics], classification_metrics: Output[ClassificationMetrics], model_trained: Output[Model]
    ):
    """
    Build the model with Keras API
    Export model metrics
    """
    from tensorflow import keras
    import tensorflow as tf
    import numpy as np
    import os
    import glob
    from sklearn.metrics import confusion_matrix
    from io import BytesIO
    from tensorflow.python.lib.io import file_io
    from pathlib import Path
    
    #load dataset
    
    x_train = np.load(BytesIO(file_io.read_file_to_string(x_train_processed.uri, binary_mode=True)))
    x_test = np.load(BytesIO(file_io.read_file_to_string(x_test_processed.uri, binary_mode=True)))
    y_train = np.load(BytesIO(file_io.read_file_to_string(y_train_artifact.uri, binary_mode=True)))
    y_test = np.load(BytesIO(file_io.read_file_to_string(y_test_artifact.uri, binary_mode=True)))
    
    #load model structure
    model = keras.models.load_model(ml_model.uri+".keras")
    
    #reading best hyperparameters from katib
    lr=0.001#float(hyperparameters["lr"])
    no_epochs = 1 #int(hyperparameters["num_epochs"])
    
    #compile the model - we want to have a binary outcome
    model.compile(tf.keras.optimizers.SGD(learning_rate=lr),
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])

    
    #fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )

     
    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    #build a confusione matrix
    y_predict = model.predict(x=x_test)
    y_predict = np.argmax(y_predict, axis=1)
    cmatrix = confusion_matrix(y_test, y_predict)
    cmatrix = cmatrix.tolist()
    numbers_list = ['0','1','2','3','4','5','6','7','8','9']
    #log confusione matrix
    classification_metrics.log_confusion_matrix(numbers_list,cmatrix)
  
    #Kubeflox metrics export
    metrics.log_metric("Test loss", model_loss)
    metrics.log_metric("Test accuracy", model_accuracy)
    
    #keras.models.save_model(model,model_trained.uri + '.keras')
    
    # This will save the model with saved_mode.pb format which is required for deploying to Vertext AI.
    # https://cloud.google.com/vertex-ai/docs/model-registry/import-model#aiplatform_upload_model_sample-python_vertex_ai_sdk
    tf.saved_model.save(model, model_trained.uri)

In [25]:
%%HTML

<h3>Deploying the trained model to model registry and Entrypoint</h3>

<a href="https://medium.com/@wardarahim25/step-by-step-guide-to-creating-and-deploying-custom-ml-pipelines-with-gcp-vertex-ai-part-2-3be6e314bc48#1592">Good example</a>

In [12]:
@dsl.component(packages_to_install=['google-cloud-aiplatform'])
def deploy_to_endpoint(
    project: str,
    display_name: str,
    location: str,
    model: Input[Model],
    vertex_model: Output[Model],
    vertex_endpoint: Output[Model]
):
    from google.cloud import aiplatform as vertex_ai
    from pathlib import Path
    
    # Checks existing Vertex AI Enpoint or creates Endpoint if it is not exist.
    def create_endpoint ():
        endpoints = vertex_ai.Endpoint.list(
        filter='display_name="{}"'.format(display_name),
        order_by='create_time desc',
        project=project,
        location=location,
        )
        if len(endpoints) > 0:
            endpoint = endpoints[0] # most recently created
        else:
            endpoint = vertex_ai.Endpoint.create(
                display_name=display_name,
                project=project,
                location=location
        )
        return endpoint

    endpoint = create_endpoint()
    
    # Uploads trained model to Vertex AI Model Registry or creates new model version into existing uploaded one.
    def upload_model ():
        listed_model = vertex_ai.Model.list(
        filter='display_name="{}"'.format(display_name),
        project=project,
        location=location,
        )
        if len(listed_model) > 0:
            model_version = listed_model[0] # most recently created
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    parent_model=model_version.resource_name,
                    artifact_uri=model.uri,
                    serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest',
                    location=gcp_region,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
        else:
            model_upload = vertex_ai.Model.upload(
                    display_name=display_name,
                    artifact_uri=model.uri,
                    serving_container_image_uri='us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-11:latest',
                    location=location,
                    serving_container_predict_route="/predict",
                    serving_container_health_route="/health"
            )
            
        return model_upload
    
    uploaded_model = upload_model()
    
    # Save data to the output params
    vertex_model.uri = uploaded_model.resource_name

    # Deploys trained model to Vertex AI Endpoint
    model_deploy = uploaded_model.deploy(
        machine_type='n1-standard-4',
        endpoint=endpoint,
        traffic_split={"0": 100},
        deployed_model_display_name=display_name,
    )

    # Save data to the output params
    vertex_endpoint.uri = model_deploy.resource_name
    

  return component_factory.create_component_from_func(


In [26]:
%%HTML

<h3>Pipeline Definition</h3>

At the end we need to join all the pieces in a single pipeline. Note how the outputs of the various steps are used.

In [14]:
TEMPLATE_PATH = "mnist_pipeline.json"

@dsl.pipeline(
    name='mnist-classifier-dev',
    description='Detect digits')
def mnist_pipeline(bucket_name:str,project: str, display_name: str, location: str):
    load_task = load_dataset(bucket_name=bucket_name)
    load_task.set_caching_options(False)
    preprocess_task = preprocessing(
        x_train_artifact = load_task.outputs["x_train_artifact"],
        x_test_artifact = load_task.outputs["x_test_artifact"]
    )
    preprocess_task.set_caching_options(False)
    model_building_task = model_building()
    model_building_task.set_caching_options(False)
    training_task = model_training(
        ml_model = model_building_task.outputs["ml_model"],
        x_train_processed = preprocess_task.outputs["x_train_processed"],
        x_test_processed = preprocess_task.outputs["x_test_processed"],
        y_train_artifact = load_task.outputs["y_train_artifact"],
        y_test_artifact = load_task.outputs["y_test_artifact"]
    )
    training_task.set_caching_options(False)
    deploy_to_endpoint(
        model = training_task.outputs['model_trained'],
        project = project,
        location = location, 
        display_name = display_name
    )
    
kfp.compiler.Compiler().compile(mnist_pipeline, package_path=TEMPLATE_PATH)

In [15]:
%%HTML

<h3>Submitting to Vertex AI</h3>

In [16]:
import google.cloud.aiplatform as aip

aip.init(
    project=PROJECT_ID,
    location=LOCATION,
)

PIPELINE_PARAMS = {
    "bucket_name":GCS_BUCKET_NAME,
    "display_name":PIPELINE_NAME,
    "project":PROJECT_ID,
    "location":LOCATION
    
}

# Prepare the pipeline job
job = aip.PipelineJob(
    enable_caching=False,
    display_name=PIPELINE_NAME,
    template_path=TEMPLATE_PATH,
    pipeline_root=PIPELINE_ROOT,
    parameter_values=PIPELINE_PARAMS
)

job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/605364818408/locations/us-central1/pipelineJobs/mnist-classifier-dev-20240529080737
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/605364818408/locations/us-central1/pipelineJobs/mnist-classifier-dev-20240529080737')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/mnist-classifier-dev-20240529080737?project=605364818408
