In [1]:
!pip install kfp

Collecting kfp
  Downloading kfp-2.13.0.tar.gz (269 kB)
     ---------------------------------------- 0.0/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     - -------------------------------------- 10.2/269.1 kB ? eta -:--:--
     -------- ---------------------------- 61.4/269.1 kB 149.3 kB/s eta 0:00:02
     -------- ---------------------------- 61.4/269.1 kB 149.3 kB/s eta 0:00:02
     -------- ---------------------------- 61.4/269.1 kB 149.3 kB/s eta 0:00:02
     ----------- ------------------------- 81.9/269.1 kB 153.2 kB/s eta 0:00:02
     --------------- ------------

In [2]:
import os
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics, ClassificationMetrics

In [3]:
!pip show kfp

Name: kfp
Version: 2.13.0
Summary: Kubeflow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: 
Location: C:\Users\suvam\anaconda3_2\Lib\site-packages
Requires: click, docstring-parser, google-api-core, google-auth, google-cloud-storage, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, PyYAML, requests-toolbelt, tabulate, urllib3
Required-by: 


In [4]:
@dsl.component()
def load_dataset(
    x_train_artifact: Output[Dataset], 
    x_test_artifact: Output[Dataset],
    y_train_artifact: Output[Dataset], 
    y_test_artifact: Output[Dataset]
):
    '''
    Get dataset from Keras and load it separating input from output and train from test
    '''
    import numpy as np
    from tensorflow import keras
    import os

    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    np.save("/tmp/x_train.npy", x_train)
    os.rename("/tmp/x_train.npy", x_train_artifact.path)

    np.save("/tmp/y_train.npy", y_train)
    os.rename("/tmp/y_train.npy", y_train_artifact.path)

    np.save("/tmp/x_test.npy", x_test)
    os.rename("/tmp/x_test.npy", x_test_artifact.path)

    np.save("/tmp/y_test.npy", y_test)
    os.rename("/tmp/y_test.npy", y_test_artifact.path)

  return component_factory.create_component_from_func(


In [5]:
@dsl.component()
def preprocessing(
    metrics: Output[Metrics], 
    x_train_processed: Output[Dataset], 
    x_test_processed: Output[Dataset], 
    x_train_artifact: Input[Dataset], 
    x_test_artifact: Input[Dataset]
):
    '''
    Just reshape and normalize data
    '''
    import numpy as np
    import os

    # Load data artifact store
    x_train = np.load(x_train_artifact.path)
    x_test = np.load(x_test_artifact.path)

    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API 
    x_train = x_train.reshape(-1, 28, 28, 1)
    x_test = x_test.reshape(-1, 28, 28, 1)

    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1 
    x_train = x_train / 255
    x_test = x_test / 255

    #Logging metrics using Kubeflow Artifacts
    metrics.log_metric("Len x_train", x_train.shape[0])
    metrics.log_metric("Len x_test", x_test.shape[0])

    # save feature in artifact store 
    np.save("/tmp/x_train.npy", x_train)
    os.rename("/tmp/x_train.npy", x_train_processed.path)

    np.save("/tmp/x_test.npy", x_test)
    os.rename("/tmp/x_test.npy", x_test_processed.path)

In [6]:
@dsl.component(base_image="tensorflow/tensorflow")
def model_building(ml_model: Output[Model]):
    '''
    Define the model architecture
    This way it's more simple to change the model architecture and all the steps and independent
    '''
    from tensorflow import keras
    import tensorflow as tf
    import os

    #model definition
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28, 28, 1)))

    # corrected typo: MaxPoo12D ➝ MaxPooling2D
    model.add(keras.layers.MaxPooling2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dense(32, activation='relu'))
    model.add(keras.layers.Dense(10, activation='softmax'))

    model.save(ml_model.path)

In [7]:
@dsl.component(base_image="tensorflow/tensorflow", packages_to_install=['scikit-learn'])
def model_training(
    ml_model: Input[Model],
    x_train_processed: Input[Dataset], x_test_processed: Input[Dataset],
    y_train_artifact: Input[Dataset], y_test_artifact: Input[Dataset],
    hyperparameters: dict,
    metrics: Output[Metrics], 
    classification_metrics: Output[ClassificationMetrics], 
    model_trained: Output[Model]
):
    """
    Train the model with Keras API and export model metrics
    """
    from tensorflow import keras
    import tensorflow as tf
    import numpy as np
    import os
    import glob
    from sklearn.metrics import confusion_matrix

    #Load dataset
    x_train = np.load(x_train_processed.path)
    x_test = np.load(x_test_processed.path)
    y_train = np.load(y_train_artifact.path)
    y_test = np.load(y_test_artifact.path)

    #load model structure
    model = keras.models.load_model(ml_model.path)

    #reading best hyperparameters from katib
    lr = float(hyperparameters["lr"])
    no_epochs = int(hyperparameters["num_epochs"])

    #compile the model - we want to have a binary outcome 
    model.compile(
        optimizer=tf.keras.optimizers.SGD(learning_rate=lr),
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"]
    )

    #fit the model and return the history while training 
    history = model.fit(
        x=x_train,
        y=y_train,
        epochs=no_epochs,
        batch_size=20,
    )

    # Test the model against the test dataset
    # Returns the Loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test, y=y_test)

    #build a confusion matrix
    y_predict = model.predict(x=x_test)
    y_predict = np.argmax(y_predict, axis=1)
    cmatrix = confusion_matrix(y_test, y_predict).tolist()

    numbers_list = [str(i) for i in range(10)]

    #Log confusion matrix
    classification_metrics.log_confusion_matrix(numbers_list, cmatrix)

    #Kubeflax metrics export
    metrics.log_metric("Test loss", model_loss)
    metrics.log_metric("Test accuracy", model_accuracy)

    #adding /1/ subfolder for TFServing and saving model to artifact store
    model_trained.uri = model_trained.uri + '/1/'
    keras.models.save_model(model, model_trained.path)

In [8]:
@dsl.pipeline(
    name='mnist-classifier-dev',
    description='Detect digits'
)
def mnist_pipeline(hyperparameters: dict):
    load_task = load_dataset()
    preprocess_task = preprocessing(
        x_train_artifact=load_task.outputs["x_train_artifact"],
        x_test_artifact=load_task.outputs["x_test_artifact"]
    )

    model_building_task = model_building()
    training_task = model_training(
        ml_model=model_building_task.outputs["ml_model"],
        x_train_processed=preprocess_task.outputs["x_train_processed"],
        x_test_processed=preprocess_task.outputs["x_test_processed"],
        y_train_artifact=load_task.outputs["y_train_artifact"],
        y_test_artifact=load_task.outputs["y_test_artifact"],
        hyperparameters=hyperparameters
    )

    # TODO: serving_task = model_serving(model_trained=training_task.outputs["model_trained"])

In [9]:
kfp.compiler.Compiler().compile(mnist_pipeline, 'mnist_pipeline.yaml')