In [18]:
# 
# Creating a ML pipeline with the MNIST digits dataset
# KFPv1 Example with lightweight Python components only
#

import kfp
from kfp import dsl
import kfp.components as components

from typing import NamedTuple
def get_data_batch() -> NamedTuple('Outputs', [('datapoints_training', float),('datapoints_test', float),('dataset_version', str)]):
    """
    Function to get dataset and load it to minio bucket
    """
    print("getting data")
    from tensorflow import keras
    import numpy as np
    import json
    import boto3
    import logging
    from botocore.exceptions import ClientError
    from botocore.client import Config    
    import os
    
    FB_DATA_VIP_URL = "http://10.21.127.71" #URL with IP or hostname of your FlashBlade's data VIP
    FB_S3_ACCESS_KEY = "PSFBSAZQJMLBOLBJPJKGGDPOKNBFJKNKEKCIBDBKC"
    FB_S3_SECRET_KEY = "A634182A4ada951da+10d6/A0421BC11f17471eDMKE"

    s3 = boto3.client('s3', aws_access_key_id=FB_S3_ACCESS_KEY, aws_secret_access_key=FB_S3_SECRET_KEY, endpoint_url=FB_DATA_VIP_URL)

    os.mkdir('/tmp/data')
    fb_bucket = "demo-pipeline-fb"

    s3.download_file(fb_bucket,'mnist.npz','/tmp/data/mnist.npz')
    
    def load_data():
        with np.load("/tmp/data/mnist.npz", allow_pickle=True) as f:
            x_train, y_train = f["x_train"], f["y_train"]
            x_test, y_test = f["x_test"], f["y_test"]

        return (x_train, y_train), (x_test, y_test)
    
    # Get MNIST data directly from library
    (x_train, y_train), (x_test, y_test) = load_data()

    # save to numpy file, store in FlashBlade bucket
    def upload_files_to_s3(local_folder, bucket_name, s3_prefix=''):
        s3 = boto3.client('s3', aws_access_key_id=FB_S3_ACCESS_KEY, aws_secret_access_key=FB_S3_SECRET_KEY, endpoint_url=FB_DATA_VIP_URL)
    # Walk through the local folder and upload each file to the S3 bucket
        for root, _, files in os.walk(local_folder):
            for file in files:
                local_file_path = os.path.join(root, file)
                s3_key = os.path.join(s3_prefix, os.path.relpath(local_file_path, local_folder))
                # Upload the file to the S3 bucket
                s3.upload_file(local_file_path, bucket_name, s3_key)
    
    # save to numpy file, store in FB
    np.save("/tmp/data/x_train.npy",x_train)
    np.save("/tmp/data/y_train.npy",y_train)
    np.save("/tmp/data/x_test.npy",x_test)
    np.save("/tmp/data/y_test.npy",y_test)
    upload_files_to_s3("/tmp/data",fb_bucket,"") 
        
    dataset_version = "1.0"
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")

    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    from collections import namedtuple
    divmod_output = namedtuple('Outputs', ['datapoints_training', 'datapoints_test', 'dataset_version'])
    return [float(x_train.shape[0]),float(x_test.shape[0]),dataset_version]
    
def get_latest_data():
    """
    Dummy functions for showcasing
    """
    print("Adding latest data")
    
    
def reshape_data():
    """
    Reshape the data for model building
    """
    print("reshaping data")
    
    from tensorflow import keras
    import numpy as np
    import json
    import boto3
    import logging
    from botocore.exceptions import ClientError
    from botocore.client import Config    
    import os
    
    FB_DATA_VIP_URL = "http://10.21.127.71" #URL with IP or hostname of your FlashBlade's data VIP
    FB_S3_ACCESS_KEY = "PSFBSAZQJMLBOLBJPJKGGDPOKNBFJKNKEKCIBDBKC"
    FB_S3_SECRET_KEY = "A634182A4ada951da+10d6/A0421BC11f17471eDMKE"

    s3 = boto3.client('s3', aws_access_key_id=FB_S3_ACCESS_KEY, aws_secret_access_key=FB_S3_SECRET_KEY, endpoint_url=FB_DATA_VIP_URL)

    fb_bucket = "demo-pipeline"
    os.mkdir('/tmp/data')
    
    def upload_files_to_s3(local_folder, bucket_name, s3_prefix=''):
        s3 = boto3.client('s3', aws_access_key_id=FB_S3_ACCESS_KEY, aws_secret_access_key=FB_S3_SECRET_KEY, endpoint_url=FB_DATA_VIP_URL)
    # Walk through the local folder and upload each file to the S3 bucket
        for root, _, files in os.walk(local_folder):
            for file in files:
                local_file_path = os.path.join(root, file)
                s3_key = os.path.join(s3_prefix, os.path.relpath(local_file_path, local_folder))
                # Upload the file to the S3 bucket
                s3.upload_file(local_file_path, bucket_name, s3_key)

    s3.download_file(fb_bucket,'x_train.npy','/tmp/data/x_train.npy')
    x_train = np.load("/tmp/data/x_train.npy")
    s3.download_file(fb_bucket,'x_test.npy','/tmp/data/x_test.npy')
    x_test = np.load("/tmp/data/x_test.npy")
    
    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)

    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    np.save("/tmp/data/x_train.npy",x_train)
    np.save("/tmp/data/x_test.npy",x_test)
    upload_files_to_s3("/tmp/data",fb_bucket,"") 
    
    print(f"train X shape: {x_train.shape}")
    print(f"test X shape: {x_test.shape}")

def model_building(
    no_epochs:int = 1,
    optimizer: str = "adam"
) -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]):
    """
    Build the model with Keras API
    Export model parameters
    """
    import tensorflow as tf
    from tensorflow import keras
    import numpy as np
    import pandas as pd
    import json
    import boto3
    import logging
    from botocore.exceptions import ClientError
    from botocore.client import Config    
    import os
    FB_DATA_VIP_URL = "http://10.21.127.71" #URL with IP or hostname of your FlashBlade's data VIP

    FB_S3_ACCESS_KEY = "PSFBSAZQJMLBOLBJPJKGGDPOKNBFJKNKEKCIBDBKC"
    FB_S3_SECRET_KEY = "A634182A4ada951da+10d6/A0421BC11f17471eDMKE"

    s3 = boto3.client('s3', aws_access_key_id=FB_S3_ACCESS_KEY, aws_secret_access_key=FB_S3_SECRET_KEY, endpoint_url=FB_DATA_VIP_URL)

    fb_bucket = "demo-pipeline"
    os.mkdir('/tmp/data')
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))

    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax')) #output are 10 classes, numbers from 0-9

    #show model summary - how it looks
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)
    
    #show model summary - how it looks
    model.summary()
    
    #compile the model - we want to have a binary outcome
    model.compile(optimizer=optimizer,
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])
    
    s3.download_file(fb_bucket,'x_train.npy','/tmp/data/x_train.npy')
    x_train = np.load("/tmp/data/x_train.npy")
    
    s3.download_file(fb_bucket,'y_train.npy','/tmp/data/y_train.npy')
    y_train = np.load("/tmp/data/y_train.npy")
    
    #fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )

    s3.download_file(fb_bucket,'x_test.npy','/tmp/data/x_test.npy')
    x_test = np.load("/tmp/data/x_test.npy")
    
    s3.download_file(fb_bucket,'y_test.npy','/tmp/data/y_test.npy')
    y_test = np.load("/tmp/data/y_test.npy")

    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    # Confusion Matrix

    # Generates output predictions for the input samples.
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions,axis=1) # the prediction outputs 10 values, we take the index number of the highest value, which is the prediction of the model

    # generate confusion matrix
    confusion_matrix = tf.math.confusion_matrix(labels=y_test,predictions=test_predictions)
    confusion_matrix = confusion_matrix.numpy()
    vocab = list(np.unique(y_test))
    data = []
    for target_index, target_row in enumerate(confusion_matrix):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))

    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    cm_csv = df_cm.to_csv(header=False, index=False)
    
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                  ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": [0,1,2,3,4,5,6,7,8,9]
            },
            {
                'storage': 'inline',
                'source': '''# Model Overview
## Model Summary

```
{}
```

## Model Performance

**Accuracy**: {}
**Loss**: {}

'''.format(metric_model_summary,model_accuracy,model_loss),
                'type': 'markdown',
            }
        ]
    }
    
    metrics = {
      'metrics': [{
          'name': 'model_accuracy',
          'numberValue':  float(model_accuracy),
          'format' : "PERCENTAGE"
        },{
          'name': 'model_loss',
          'numberValue':  float(model_loss),
          'format' : "PERCENTAGE"
        }]}
    
    ### Save model to minIO
    
    keras.models.save_model(model,"/tmp/detect-digits")
    
    def upload_files_to_s3(local_folder, bucket_name, s3_prefix=''):
    # Walk through the local folder and upload each file to the S3 bucket
        for root, _, files in os.walk(local_folder):
            for file in files:
                local_file_path = os.path.join(root, file)
                s3_key = os.path.join(s3_prefix, os.path.relpath(local_file_path, local_folder))
                # Upload the file to the S3 bucket
                s3.upload_file(local_file_path, bucket_name, s3_key)

    upload_files_to_s3("/tmp/detect-digits",fb_bucket,"models/detect-digits/1")      

    for key in s3.list_objects(Bucket=fb_bucket)['Contents']:
        print(key['Key'])
        
    from collections import namedtuple
    output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return output(json.dumps(metadata),json.dumps(metrics))

def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    now = datetime.now()
    v = now.strftime("%Y-%m-%d--%H-%M-%S")

    name='digits-recognizer-{}'.format(v)
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-fb-kserve",
                                       tensorflow=(V1beta1TFServingSpec(
                                           storage_uri="s3://demo-pipeline/models/detect-digits/"))))
    )
    KServe = KServeClient()
    KServe.create(isvc)

comp_get_data_batch = components.create_component_from_func(get_data_batch,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",packages_to_install=['boto3'])
comp_get_latest_data = components.create_component_from_func(get_latest_data,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",packages_to_install=['boto3'])
comp_reshape_data = components.create_component_from_func(reshape_data,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",packages_to_install=['boto3'])
comp_model_building = components.create_component_from_func(model_building,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",packages_to_install=['boto3'])
comp_model_serving = components.create_component_from_func(model_serving,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",packages_to_install=['boto3','kserve==0.8.0.1'])


@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)
def output_test(no_epochs,optimizer):
    step1_1 = comp_get_data_batch()
    step1_2 = comp_get_latest_data()
    
    step2 = comp_reshape_data()
    step2.after(step1_1)
    step2.after(step1_2)
    
    step3 = comp_model_building(no_epochs,optimizer)
    step3.after(step2)
    
    step4 = comp_model_serving()
    step4.after(step3)


if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs" : 1,
        "optimizer": "adam"
    }

    run_directly = 0
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="testfb")
    else:
        kfp.compiler.Compiler().compile(pipeline_func=output_test,package_path='fb_output_demo.yaml')
        client.upload_pipeline_version(pipeline_package_path='fb_output_demo.yaml',pipeline_version_name="0.4",pipeline_name="pipeline test")

ApiException: (400)
Reason: Bad Request
HTTP response headers: HTTPHeaderDict({'date': 'Fri, 03 Nov 2023 17:47:03 GMT', 'content-length': '976', 'content-type': 'text/plain; charset=utf-8', 'x-envoy-upstream-service-time': '24', 'server': 'envoy'})
HTTP response body: {"error_message":"Failed to create a pipeline version due to error reading pipeline id","error_details":"Failed to create a pipeline version due to error reading pipeline id\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*PipelineUploadServer).uploadPipelineVersion\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/pipeline_upload_server.go:195\ngithub.com/kubeflow/pipelines/backend/src/apiserver/server.(*PipelineUploadServer).UploadPipelineVersionV1\n\t/go/src/github.com/kubeflow/pipelines/backend/src/apiserver/server/pipeline_upload_server.go:161\nnet/http.HandlerFunc.ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2122\ngithub.com/gorilla/mux.(*Router).ServeHTTP\n\t/go/pkg/mod/github.com/gorilla/mux@v1.8.0/mux.go:210\nnet/http.serverHandler.ServeHTTP\n\t/usr/local/go/src/net/http/server.go:2936\nnet/http.(*conn).serve\n\t/usr/local/go/src/net/http/server.go:1995\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1598"}
