In [None]:
#Proviamo a farle in v2

from tensorflow import keras
from minio import Minio
import numpy as np
import json
import kfp
from kfp import dsl
import os
from kfp.dsl import Output, Dataset, Model, Metrics, ClassificationMetrics


@dsl.component( base_image="tensorflow/tensorflow",packages_to_install=['minio'])
def load_dataset(minio_url : str = "10.152.183.148:9000", minio_access_key : str ="minio", minio_secret : str ="minio123" , minio_bucket : str = "mlpipeline"):
    '''
    get dataset from minio and load it to minio separating X from Y and train from test
    
    Returns: number of example in training dataset, number of example in test dataset, dataset version.
    '''
    from minio import Minio
    import numpy as np
    from tensorflow import keras
    
    minio_client = Minio(
        minio_url,
        access_key=minio_access_key,
        secret_key=minio_secret,
        secure=False
    )
   
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

    # save to numpy file, store in Minio
    np.save("tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","tmp/x_train.npy")

    np.save("tmp/y_train.npy",y_train)
    minio_client.fput_object(minio_bucket,"y_train","tmp/y_train.npy")

    np.save("tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","tmp/x_test.npy")

    np.save("tmp/y_test.npy",y_test)
    minio_client.fput_object(minio_bucket,"y_test","tmp/y_test.npy")

    dataset_version = "1.0"

    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")

    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    print("Success!")

@dsl.component(base_image="tensorflow/tensorflow", packages_to_install=['minio'])
def preprocessing(
        metrics : Output[Metrics],
        minio_url : str = "10.152.183.148:9000", minio_access_key : str ="minio", minio_secret : str ="minio123" , minio_bucket : str = "mlpipeline"
        
):
    '''
    get data from minio and reshape this way: len,28,28 -> len,28,28,1 and load (again) to minio
    one channel because it's a grey scale image
    '''
    from minio import Minio
    import numpy as np
    
    minio_client = Minio(
        minio_url,
        access_key=minio_access_key,
        secret_key=minio_secret,
        secure=False
    )
    print("getting data from minio")
    
    # load data from minio
    minio_client.fget_object(minio_bucket,"x_train","tmp/x_train.npy")
    x_train = np.load("tmp/x_train.npy") 
    minio_client.fget_object(minio_bucket,"x_test","tmp/x_test.npy")
    x_test = np.load("tmp/x_test.npy")
    
    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)
    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255

    print(f"x_train shape: {x_train.shape}")
    print(f"x_test shape: {x_test.shape}")
    
    metrics.log_metric("Len x_train", x_train.shape[0])
    metrics.log_metric("Len y_train", x_test.shape[0])
   
    
    # save data in minio
    np.save("tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","tmp/x_train.npy")
    np.save("tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","tmp/x_test.npy")
    
    print("Success")

@dsl.component(base_image="tensorflow/tensorflow", packages_to_install=['minio'])
def model_building(minio_url : str = "10.152.183.148:9000", minio_access_key : str ="minio", minio_secret : str ="minio123" , minio_bucket : str = "mlpipeline"):
    '''
    Define the model and load it (not yet compiled to minio)
    This way it's more simple to change the model architecture and all the steps and indipendent
    '''
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    
    minio_client = Minio(
        minio_url,
        access_key=minio_access_key,
        secret_key=minio_secret,
        secure=False
    )
    
    #model definition
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))
    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax'))
    
    summary = model.summary()
    
    #saving model
    model.save('./tmp/model.keras')
    #upload_local_directory_to_minio("./tmp/model",minio_bucket,"models/detect-digits-notcompiled/")
    minio_client.fput_object(minio_bucket, "models/detect-digits-notcompiled/model.keras", "./tmp/model.keras")
    
    print("Success")
    
@dsl.component(packages_to_install=['minio','kubeflow-katib'])
def hyperparameter_search(minio_url : str = "10.152.183.148:9000", minio_access_key : str ="minio", minio_secret : str ="minio123" , minio_bucket : str = "mlpipeline") -> dict:
    '''
    Use Katib for hyperparameter tuning
    '''
    import kubeflow.katib as katib
    import ast
    import sys
    import time
    from datetime import datetime
    
    def katib_search_supportfunc(parameters : dict):
        """
        Load model from minio, compile a fit
        Used by katib to find the best hyperparameter
        """
        from tensorflow import keras
        import tensorflow as tf
        from minio import Minio
        import time
        import numpy as np

        # Get HyperParameters from the input params dict.
        lr = float(parameters["lr"])
        num_epoch = int(parameters["num_epoch"])

        print("lr:", lr)
        print("num_epoch:",num_epoch)

        minio_client = Minio(
            "10.152.183.148:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
        minio_bucket = "mlpipeline"
        print("getting data from minio")

        #model loading from minio
        minio_client.fget_object(minio_bucket, "models/detect-digits-notcompiled/model.keras", "./tmp/model_from_minio.keras")
        model = keras.models.load_model("./tmp/model_from_minio.keras")


        #compile the model - we want to have a binary outcome
        print("compiling model")
        model.compile(tf.keras.optimizers.SGD(learning_rate=lr),
                  loss="sparse_categorical_crossentropy",
                  metrics=['accuracy'])

        model.summary()

        #load dataset from minio
        minio_client.fget_object(minio_bucket,"x_train","tmp/x_train.npy")
        x_train = np.load("tmp/x_train.npy")
        minio_client.fget_object(minio_bucket,"y_train","tmp/y_train.npy")
        y_train = np.load("tmp/y_train.npy")
        minio_client.fget_object(minio_bucket,"x_test","tmp/x_test.npy")
        x_test = np.load("tmp/x_test.npy") 
        minio_client.fget_object(minio_bucket,"y_test","tmp/y_test.npy")
        y_test = np.load("tmp/y_test.npy")


        #fit the model and return the history while training
        history = model.fit(
          x=x_train,
          y=y_train,
          epochs=num_epoch,
          batch_size=20,
        )

        # Test the model against the test dataset
        # Returns the loss value & metrics values for the model in test mode.
        model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)

        #output the metrics to stdout
        loss_str = "loss="+str(model_loss)
        acc_str = "accuracy="+str(model_accuracy)
        print(loss_str)
        print(acc_str)

    # Set parameters with their distribution for HyperParameter Tuning with Katib.
    parameters = {
        "lr": katib.search.double(min=0.1, max=0.2),
        "num_epoch": katib.search.int(min=1, max=2),
    }

    # Start the Katib Experiment.
    now = datetime.now() # current date and time
    date_time = now.strftime("-%m-%d-%Y--%H-%M-%S")
    exp_name = "tune-mnist-example" + date_time
    katib_client = katib.KatibClient()
    
    
    #Hyperparamter tuning
    katib_client.tune(
        name=exp_name,
        packages_to_install = ["minio"],
        base_image = "tensorflow/tensorflow",
        objective=katib_search_supportfunc, # Objective function.
        parameters=parameters, # HyperParameters to tune.
        algorithm_name="cmaes", # Alorithm to use.
        objective_metric_name="accuracy", # Katib is going to optimize "accuracy".
        additional_metric_names=["loss"], # Katib is going to collect these metrics in addition to the objective metric.
        max_trial_count=2, # Trial Threshold (max number of training)
        parallel_trial_count=2)
    
    #Getting the best parameter
    status = katib_client.is_experiment_succeeded(exp_name)
    print(f"Katib Experiment is Succeeded: {status}\n")
    
    while(str(status)=="False"):
        print("Waiting for experiment end...")
        time.sleep(10)
        status = katib_client.is_experiment_succeeded(exp_name)
    print("Exeperiment ended")

    best_hps = katib_client.get_optimal_hyperparameters(exp_name)

    if best_hps != None:
        print("Current Optimal Trial\n")
        print(best_hps)

        # Convert the input string to a dictionary
        result_dict = ast.literal_eval(str(best_hps))
        
        # Extracting useful information
        result_list = result_dict['parameter_assignments']

        result={}
        for i in result_list:
            result[i["name"]] = i["value"]

        print("Best HPs extracted:",result)
    else:
        print("Can't get best hyperparameter error - Exit")
        sys.exit()
    
    return result

@dsl.component(base_image="tensorflow/tensorflow", packages_to_install=['minio','scikit-learn'])
def model_training(
    metrics: Output[Metrics], classification_metrics: Output[ClassificationMetrics],
    hyperparameters : dict, minio_url : str = "10.152.183.148:9000", minio_access_key : str ="minio", minio_secret : str ="minio123" , minio_bucket : str = "mlpipeline"
    ):
    """
    Build the model with Keras API
    Export model parameters
    """
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    import numpy as np
    import os
    import glob
    from sklearn.metrics import confusion_matrix
   
    
    #reading best hyperparameters from katib
    lr=float(hyperparameters["lr"])
    no_epochs = int(hyperparameters["num_epoch"])
    
    minio_client = Minio(
        minio_url,
        access_key=minio_access_key,
        secret_key=minio_secret,
        secure=False
    )
    print("getting data from minio")
    
    #model loading from minio
    minio_client.fget_object(minio_bucket, "models/detect-digits-notcompiled/model.keras", "./tmp/model_from_minio.keras")
    model = keras.models.load_model("./tmp/model_from_minio.keras")
    model.summary()
    
    #compile the model - we want to have a binary outcome
    model.compile(tf.keras.optimizers.SGD(learning_rate=lr),
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])
    
    
    
    #load dataset from minio
    minio_client.fget_object(minio_bucket,"x_train","tmp/x_train.npy")
    x_train = np.load("tmp/x_train.npy")
    minio_client.fget_object(minio_bucket,"y_train","tmp/y_train.npy")
    y_train = np.load("tmp/y_train.npy")
    minio_client.fget_object(minio_bucket,"x_test","tmp/x_test.npy")
    x_test = np.load("tmp/x_test.npy") 
    minio_client.fget_object(minio_bucket,"y_test","tmp/y_test.npy")
    y_test = np.load("tmp/y_test.npy")
    
    
    #fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )

    y_predict = model.predict(x=x_test)
    y_predict = np.argmax(y_predict, axis=1)
    
    
    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
   
    cmatrix = confusion_matrix(y_test, y_predict)
    cmatrix = cmatrix.tolist()
    print(len(cmatrix))
    print(len(cmatrix[0]))
   
    
    #Kubeflox metrics export
    metrics.log_metric("Test loss", model_loss)
    metrics.log_metric("Test accuracy", model_accuracy)
          
        
    numbers_list = ['0','1','2','3','4','5','6','7','8','9']
    classification_metrics.log_confusion_matrix(numbers_list,cmatrix)
    #build a confusion matrix
    
    # Generates output predictions for the input samples.
    
    # the prediction outputs 10 values, we take the index number of the highest value, which is the prediction of the model

    #save trained model to minio
    keras.models.save_model(model,"tmp/detect-digits")

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(minio_path, local_file[1 + len(local_path):])
                minio_client.fput_object(bucket_name, remote_path, local_file)

    upload_local_directory_to_minio("tmp/detect-digits",minio_bucket,"models/detect-digits/1") # 1 for version 1

    print("Saved trained model to minIO")
    print("Success")
    
@dsl.component(packages_to_install=['kserve','kubernetes'])
def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime
    import time

    namespace = utils.get_default_target_namespace()

    now = datetime.now()
    #v = now.strftime("%Y-%m-%d--%H-%M-%S")

    #name='digits-recognizer-{}'.format(v)
    name="digits-recognizer"
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       tensorflow=(V1beta1TFServingSpec(
                                           storage_uri="s3://mlpipeline/models/detect-digits/"))))
    )

    KServe = KServeClient()
    #provo a eliminare il deploy se esiste
    try:
        KServe.delete(name=name, namespace=namespace)
        print("Modello precedente eliminato")
    except:
        print("Non posso eliminare")
    time.sleep(10)
    
    KServe.create(isvc)


@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)
def mnist_pipeline(minio_url : str = "10.152.183.148:9000", minio_access_key : str ="minio", minio_secret : str ="minio123" , minio_bucket : str = "mlpipeline"):
    step1 = load_dataset(minio_url =minio_url, minio_access_key = minio_access_key, minio_secret = minio_secret , minio_bucket = minio_bucket)
    step2 = preprocessing(minio_url =minio_url, minio_access_key = minio_access_key, minio_secret = minio_secret , minio_bucket = minio_bucket)
    step3 = model_building(minio_url =minio_url, minio_access_key = minio_access_key, minio_secret = minio_secret , minio_bucket = minio_bucket)
    step4 = hyperparameter_search(minio_url =minio_url, minio_access_key = minio_access_key, minio_secret = minio_secret , minio_bucket = minio_bucket)
    step5 = model_training(hyperparameters= step4.output,minio_url =minio_url, minio_access_key = minio_access_key, minio_secret = minio_secret , minio_bucket = minio_bucket)
    step6 = model_serving()
    
    
    step2.after(step1)
    step3.after(step2)
    step4.after(step3)
    step5.after(step4)
    step6.after(step5)
 
#main
#minio config 
minio_url="10.152.183.148:9000"
minio_access_key="minio"
minio_secret="minio123"

print("start")

arguments = {
    "minio_url":"10.152.183.148:9000",
    "minio_access_key":"minio",
    "minio_secret":"minio123",
    "minio_bucket" : "mlpipeline"   
            }

with open(os.environ['KF_PIPELINES_SA_TOKEN_PATH'], "r") as f:
    TOKEN = f.read()

client = kfp.Client(
    existing_token=TOKEN,
    host='http://ml-pipeline.kubeflow.svc.cluster.local:8888',
)
namespace="kubeflow-user-example-com"
kfp.compiler.Compiler().compile(pipeline_func=mnist_pipeline,package_path='./mnist-pipeline.yaml')

#client.create_run_from_pipeline_func(mnist_pipeline,arguments=arguments,experiment_name="test",namespace="kubeflow-user-example-com",enable_caching=False)
#client.upload_pipeline(pipeline_package_path='output_test.yaml',pipeline_version_name="0.4",pipeline_name="mnist video tutorial")
#client.upload_pipeline(pipeline_package_path='./mnist-pipeline.yaml',pipeline_name="mnist video tutorial",namespace = namespace)
client.upload_pipeline_version(pipeline_package_path='./mnist-pipeline.yaml',pipeline_name="mnist video tutorial",pipeline_version_name= "2 uso artifact store")
