In [1]:
!python3 -m pip install kfp kfp-server-api kserve kfp-pipeline-spec --upgrade --user

Collecting kfp-pipeline-spec
  Using cached kfp_pipeline_spec-0.2.0-py3-none-any.whl (12 kB)
Collecting argparse>=1.4.0
  Using cached argparse-1.4.0-py2.py3-none-any.whl (23 kB)
Installing collected packages: argparse
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfserving 0.5.1 requires azure-storage-blob<=2.1.0,>=1.3.0, but you have azure-storage-blob 12.9.0 which is incompatible.[0m[31m
[0mSuccessfully installed argparse-1.4.0


In [2]:
# 
# Creating a ML pipeline with the MNIST digits dataset
# KFPv1 Example with lightweight Python components only
#
import kfp
from kfp import dsl
import kfp.components as components
from typing import NamedTuple

In [3]:
pip list

Package                  Version
------------------------ -------------------
absl-py                  0.11.0
adal                     1.2.7
aiohttp                  3.8.3
aiohttp-cors             0.7.0
aiorwlock                1.3.0
aiosignal                1.3.1
alog                     0.9.13
anyio                    3.6.1
argon2-cffi              21.3.0
argon2-cffi-bindings     21.2.0
asgiref                  3.6.0
asttokens                2.0.5
astunparse               1.6.3
async-timeout            4.0.2
attrs                    21.4.0
avro                     1.11.0
azure-common             1.1.28
azure-core               1.26.2
azure-identity           1.12.0
azure-storage-blob       12.9.0
azure-storage-common     2.1.0
azure-storage-file-share 12.7.0
Babel                    2.10.3
backcall                 0.2.0
beautifulsoup4           4.11.1
bleach                   5.0.1
blessed                  1.19.1
bokeh                    2.3.2
boto3                    1.26.62
botocor

In [4]:
'''
    A function to get dataset from the minio_bucket, split them into train and test set, and save the splits into minio-bucket to be accessed by other components.
    Returns a named tuple 'Outputs' that can also be accessed by other components when used in a pipeline.
'''
def get_data_batch() -> NamedTuple('Outputs', [('datapoints_training', float),('datapoints_test', float),('dataset_version', str)]):
   
    print("getting data")
    from tensorflow import keras
    from minio import Minio
    import numpy as np
    import json

    minio_client = Minio(
        "10.43.114.82:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    minio_client.fget_object(minio_bucket,"mnist.npz","/tmp/mnist.npz")
    
    def load_data():
        with np.load("/tmp/mnist.npz", allow_pickle=True) as f:
            x_train, y_train = f["x_train"], f["y_train"]
            x_test, y_test = f["x_test"], f["y_test"]

        return (x_train, y_train), (x_test, y_test)
    
    # Get MNIST data directly from library
    (x_train, y_train), (x_test, y_test) = load_data()

    # save to numpy file, store in Minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")

    np.save("/tmp/y_train.npy",y_train)
    minio_client.fput_object(minio_bucket,"y_train","/tmp/y_train.npy")

    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")

    np.save("/tmp/y_test.npy",y_test)
    minio_client.fput_object(minio_bucket,"y_test","/tmp/y_test.npy")
    
    dataset_version = "1.0"
    
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")

    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    from collections import namedtuple
    divmod_output = namedtuple('Outputs', ['datapoints_training', 'datapoints_test', 'dataset_version'])
    return [float(x_train.shape[0]),float(x_test.shape[0]),dataset_version]
 

In [5]:
"""
    Dummy function to use as an example for parallel running.
"""
def get_latest_data():
    print("Adding latest data")
    

In [6]:
"""
    Grab train and test splits from the minio bucket, reshape the data(A Part of data processing), and put the reshaped dataset back to the minio bucket that can be used to build your model by other components.
"""
def reshape_data():
    
    print("reshaping data")
    
    from minio import Minio
    import numpy as np

    minio_client = Minio(
        "10.43.114.82:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    # load data from minio
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)

    # normalizing the data to speeden the calculations
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    # save data to minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")
    
    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")

In [8]:
"""
    Build the model with Keras API, Grab the training dataset from the minio bucket, Train the model using the grabbed data, Export the model and the model parameters to minio_bucket. 
"""
def model_building(
    no_epochs:int = 1,
    optimizer: str = "adam"
) -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]):
    
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    import numpy as np
    import pandas as pd
    import json
    
    minio_client = Minio(
        "10.43.114.82:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    # Creating a simple Neural Network, the goal of the project is to understand what pipeline components are and how pipelines work and not building the perfect model.
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))

    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax')) #output are 10 classes, numbers from 0-9

    # show model summary - how it looks
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)
    
    # compile the model - we want to have a binary outcome
    model.compile(optimizer=optimizer,
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])
    
    # Get training set from the minio bucket.
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"y_train","/tmp/y_train.npy")
    y_train = np.load("/tmp/y_train.npy")
    
    # fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )
    
    # Get test set from the minio bucket
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    minio_client.fget_object(minio_bucket,"y_test","/tmp/y_test.npy")
    y_test = np.load("/tmp/y_test.npy")
    

    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    # Confusion Matrix

    # Generates output predictions for the input samples.
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions,axis=1) # the prediction outputs 10 values, we take the index number of the highest value, which is the prediction of the model

    # generate confusion matrix
    confusion_matrix = tf.math.confusion_matrix(labels=y_test,predictions=test_predictions)
    confusion_matrix = confusion_matrix.numpy()
    vocab = list(np.unique(y_test))
    data = []
    for target_index, target_row in enumerate(confusion_matrix):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))

    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    cm_csv = df_cm.to_csv(header=False, index=False)
    
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                  ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": [0,1,2,3,4,5,6,7,8,9]
            },
            {
                'storage': 'inline',
                'source': '''# Model Overview
## Model Summary

```
{}
```

## Model Performance

**Accuracy**: {}
**Loss**: {}

'''.format(metric_model_summary,model_accuracy,model_loss),
                'type': 'markdown',
            }
        ]
    }
    
    metrics = {
      'metrics': [{
          'name': 'model_accuracy',
          'numberValue':  float(model_accuracy),
          'format' : "PERCENTAGE"
        },{
          'name': 'model_loss',
          'numberValue':  float(model_loss),
          'format' : "PERCENTAGE"
        }]}
    
    ### Save model to minio bucket
    
    keras.models.save_model(model,"/tmp/detect-digits")
    
    from minio import Minio
    import os

    minio_client = Minio(
            "10.43.114.82:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
    minio_bucket = "mlpipeline"


    import glob

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)

    upload_local_directory_to_minio("/tmp/detect-digits",minio_bucket,"models/detect-digits/1/") # 1 for version 1
    
    print("Saved model to minIO")
    
    from collections import namedtuple
    output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return output(json.dumps(metadata),json.dumps(metrics))

In [9]:
"""
    Creating a kserve instance.
    KServe enables serverless inferencing on Kubernetes and provides performant, high abstraction interfaces for common machine learning (ML) frameworks like TensorFlow, XGBoost, scikit-learn, PyTorch, and ONNX to solve production model serving use cases.
"""
def model_serving():
    
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    # Creating the service that can be deployed. For his we can grabbing the saved model from the minio_bucket.
    namespace = utils.get_default_target_namespace()

    now = datetime.now()
    v = now.strftime("%Y-%m-%d--%H-%M-%S")

    name='digits-recognizer-{}'.format(v)
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       tensorflow=(V1beta1TFServingSpec(
                                           storage_uri="s3://mlpipeline/models/detect-digits/"))))
    )

    # Deploying the service.
    KServe = KServeClient()
    KServe.create(isvc)

In [9]:
# Creating pipeline components from the functions that was built earlier and saving the component as a yaml file to be shared across teams and resused.
comp_get_data_batch = components.create_component_from_func(get_data_batch,
                                                            base_image='python:3.8.10', 
                                                            packages_to_install=['tensorflow==2.5.3','minio==6.0.2','numpy==1.19.5'], 
                                                            output_component_file='component_get_batch_data.yaml')
comp_get_latest_data = components.create_component_from_func(get_latest_data, 
                                                             base_image='python:3.8.10',
                                                             output_component_file='component_get_latest_data.yaml')
comp_reshape_data = components.create_component_from_func(reshape_data, 
                                                          base_image='python:3.8.10', 
                                                          packages_to_install=['minio==6.0.2','numpy==1.19.5'], 
                                                          output_component_file='component_reshape_data.yaml')
comp_model_building = components.create_component_from_func(model_building, 
                                                            base_image='python:3.8.10', 
                                                            packages_to_install=['tensorflow==2.5.3','minio==6.0.2','numpy==1.19.5','pandas==1.2.4'],
                                                            output_component_file='component_build_model.yaml')
comp_model_serving = components.create_component_from_func(model_serving, base_image='python:3.8.10',
                                                           packages_to_install=['kserve==0.8.0','kubernetes==12.0.1','protobuf==3.19.4'],
                                                           output_component_file='component_serve_model.yaml')

In [10]:
# from kfp import compiler
# cmplr = compiler.Compiler()
# cmplr.compile(comp_get_data_batch, package_path='get_batch_data.yaml')

In [11]:
# cmplr.compile(comp_get_latest_data, package_path='get_latest_data.yaml')
# cmplr.compile(comp_reshape_data, package_path='reshape_data.yaml')

In [17]:
# import sys
# from pathlib import Path

# # in jupyter (lab / notebook), based on notebook path
# module_path = str(Path.cwd())

In [18]:
# module_path

'/home/jovyan/kubeflow_example'

## This is how we create a pipeline

In [19]:
@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)
def output_test(no_epochs,optimizer):
    # import os
    # step1_1 = comp_get_data_batch()
    # Now lets load the same component from the component YAML file that we created, laad_component_from_file() returns a factory function. You can also load a component from URL or text.
    step1_1 = components.load_component_from_file("/home/jovyan/kubeflow_example/pipeline/components/component_get_batch_data.yaml")()
    # step1_2 = comp_get_latest_data()
    # Now lets load the same component from the component YAML file that we created
    step1_2 = components.load_component_from_file("/home/jovyan/kubeflow_example/pipeline/components/component_get_latest_data.yaml")()
    
    # step2 = comp_reshape_data()
    # Now lets load the same component from the component YAML file that we created
    step2 = components.load_component_from_file("/home/jovyan/kubeflow_example/pipeline/components/component_reshape_data.yaml")()
    step2.after(step1_1)
    step2.after(step1_2)
    
    # step3 = comp_model_building(no_epochs,optimizer)
    # Now lets load the same component from the component YAML file that we created
    step3 = components.load_component_from_file("/home/jovyan/kubeflow_example/pipeline/components/component_build_model.yaml")(no_epochs,optimizer)
    step3.after(step2)
    
    # step4 = comp_model_serving()
    # Now lets load the same component from the component YAML file that we created
    step4 = components.load_component_from_file("/home/jovyan/kubeflow_example/pipeline/components/component_serve_model.yaml")()
    step4.after(step3)
    
    
    # Following lines are used if we don't want to use components run stored in the temporary cache. Useful if you want to change things and re-run tests, but if you are running the pipeline for the first time then we can comment it out(doesn't really matter since cache is empty regardless). 
    step1_1.execution_options.caching_strategy.max_cache_staleness = "P0D"
    step1_2.execution_options.caching_strategy.max_cache_staleness = "P0D"
    step2.execution_options.caching_strategy.max_cache_staleness = "P0D"
    step3.execution_options.caching_strategy.max_cache_staleness = "P0D"
    step4.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [20]:
import kfp
kfp.__version__
# kfp.Client().list_experiments()

'1.8.18'

# Main function to run your experiments
    we are creating a new experiment called "pipeline_components_test" and running this experiment with the pipeline created in the previous cell. It is also possible to export the pipeline as a YAML file and use it to create and run an experiment.

In [21]:
if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs" : 1,
        "optimizer": "adam"
    }

    run_directly = 1
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="pipeline_components_test")
    else:
        kfp.compiler.Compiler().compile(pipeline_func=output_test,package_path='output_test.yaml')
        client.upload_pipeline_version(pipeline_package_path='output_test.yaml',pipeline_version_name="0.4",pipeline_name="pipeline test",description="just for testing")

In [None]:
import kfp

In [None]:
client = kfp.Client()
# kfp.components.list_experiments(page_token='', page_size=10, sort_by='')


In [None]:
client.list_experiments()