In [1]:
import kfp
from kfp import dsl
from kfp.dsl import Input, Output, Dataset, Model, Metrics, ClassificationMetrics, Artifact
import kfp.kubernetes

In [2]:
@dsl.component(base_image="matteobrina99999/bonfiglioli_iris:latest")
def lettura(dataset_artifact: Output[Dataset]):
    import boto3
    import os
    import shutil
    import pandas as pd

    s3 = boto3.client('s3',
                      endpoint_url=os.getenv('S3_ENDPOINT'),
                      aws_access_key_id=os.getenv('S3_ACCESS_KEY_ID'),
                      aws_secret_access_key=os.getenv('S3_SECRET_ACCESS_KEY'))
    
    s3.download_file('iris-bucket', 'iris_example/IRIS.csv', 'IRIS.csv')
    iris_data=pd.read_csv('IRIS.csv')
    print(iris_data.head())
    shutil.move("IRIS.csv", dataset_artifact.path)
    

In [3]:
@dsl.component(base_image="matteobrina99999/bonfiglioli_iris:latest")
def preprocessing(dataset_artifact: Input[Dataset],X_train_artifact: Output[Dataset], X_test_artifact: Output[Dataset], y_train_artifact: Output[Dataset], y_test_artifact: Output[Dataset]):
    import pandas as pd 
    from sklearn.preprocessing import LabelEncoder
    from sklearn.model_selection import train_test_split
    import numpy as np
    import tensorflow as tf
    import shutil

    iris_data=pd.read_csv(dataset_artifact.path, header=0)
    X = iris_data.loc[:, iris_data.columns != 'species']
    y = iris_data.loc[:, ['species']]
    y_enc = LabelEncoder().fit_transform(y)  #trasforma le etichette testuali in valori numerici
    y_label = tf.keras.utils.to_categorical(y_enc)
    X_train, X_test, y_train, y_test = train_test_split(X, y_label, test_size=0.3)
    np.save('/tmp/X_train.npy', X_train)
    np.save('/tmp/X_test.npy', X_test)
    np.save('/tmp/y_train.npy', y_train)
    np.save('/tmp/y_test.npy', y_test)
    shutil.move('/tmp/X_train.npy', X_train_artifact.path)
    shutil.move('/tmp/X_test.npy', X_test_artifact.path)
    shutil.move('/tmp/y_train.npy', y_train_artifact.path)
    shutil.move('/tmp/y_test.npy', y_test_artifact.path)

In [4]:
@dsl.component(base_image="matteobrina99999/bonfiglioli_iris:latest")
def model_building(X_train_artifact: Input[Dataset], model_artifact: Output[Model]):
    import numpy as np
    import shutil
    from tensorflow.keras.models import Sequential
    from tensorflow import keras
    X_train=np.load(X_train_artifact.path)
    
    model = Sequential([
        keras.layers.Input(shape=X_train.shape[1:]),
        keras.layers.Dense(1000, activation='relu'),
        keras.layers.Dense(500, activation='relu',),
        keras.layers.Dense(300, activation='relu'),
        keras.layers.Dropout(0.2),
        keras.layers.Dense(3, activation='softmax')
    ])

    

    path_parts = model_artifact.path.split("/")

    
    path_parts[-1] = "model.h5"
    
   
    model_artifact.path = "/".join(path_parts)
    model.save(model_artifact.path, model)
    

In [5]:
@dsl.component(base_image="matteobrina99999/bonfiglioli_iris:latest")
def model_training(X_train_artifact: Input[Dataset], X_test_artifact: Input[Dataset], y_train_artifact: Input[Dataset], y_test_artifact: Input[Dataset],  model_artifact: Input[Model], trained_model_artifact: Output[Model], hyperparameters : dict):
    import numpy as np
    from tensorflow import keras

    epochs=int(hyperparameters['epochs'])
    X_train=np.load(X_train_artifact.path)
    X_test=np.load(X_test_artifact.path)
    y_train=np.load(y_train_artifact.path)
    y_test=np.load(y_test_artifact.path)

    model=keras.models.load_model(model_artifact.path)
    model.compile(optimizer='adam', 
              loss=keras.losses.CategoricalCrossentropy(),
             metrics=['accuracy'])
    history = model.fit(X_train, y_train, epochs, validation_data=(X_test, y_test))
    path_parts = trained_model_artifact.path.split("/")

  
    path_parts[-1] = "trained_model.h5"
    
  
    trained_model_artifact.path = "/".join(path_parts)
    model.save(trained_model_artifact.path, model)
    

In [6]:
@dsl.component(base_image="matteobrina99999/bonfiglioli_iris:latest")
def model_conversion(trained_model_artifact: Input[Model], onnx_model_artifact: Output[Model]):
    from tensorflow import keras
    import tensorflow as tf
    import tf2onnx
    import onnx
    import boto3
    import os

    model=keras.models.load_model(trained_model_artifact.path)
    model.output_names=['output']
    input_signature = [tf.TensorSpec([None, 4], tf.float32, name='x')]
    onnx_model, _ = tf2onnx.convert.from_keras(model, input_signature)
    onnx.save(onnx_model, "/tmp/model.onnx")
    path_parts = onnx_model_artifact.path.split("/")

   
    path_parts[-1] = "model.onnx"
    

    onnx_model_artifact.path = "/".join(path_parts)
    onnx.save(onnx_model, onnx_model_artifact.path)
    text= '''name: "iris"
platform: "onnxruntime_onnx"
max_batch_size: 10000'''
    with open("/tmp/config.pbtxt", "wt") as file:
        file.write(text)
    s3 = boto3.client('s3',
                      endpoint_url=os.getenv('S3_ENDPOINT'),
                      aws_access_key_id=os.getenv('S3_ACCESS_KEY_ID'),
                      aws_secret_access_key=os.getenv('S3_SECRET_ACCESS_KEY'))

    response = s3.upload_file('/tmp/model.onnx', 'iris-bucket', '/iris_example/iris/1/model.onnx' )
    response = s3.upload_file('/tmp/config.pbtxt', 'iris-bucket', '/iris_example/iris/config.pbtxt')
    

In [7]:
@dsl.component(base_image="matteobrina99999/bonfiglioli_iris:kserve")
def model_serving(onnx_model_artifact : Input[Model]):
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1ONNXRuntimeSpec
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from datetime import datetime
    import time
    import os
    import boto3
    
 
    uri = 's3://iris-bucket/iris_example'

  
    namespace = "modelli"
    now = datetime.now()
    name="iris"
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version
    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false', 'serving.kserve.io/enable-prometheus-scraping': 'true'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       onnx=(V1beta1ONNXRuntimeSpec(
                                           storage_uri=uri))))
    )

    s3 = boto3.client('s3',
                      endpoint_url=os.getenv('S3_ENDPOINT'),
                      aws_access_key_id=os.getenv('S3_ACCESS_KEY_ID'),
                      aws_secret_access_key=os.getenv('S3_SECRET_ACCESS_KEY'))

    s3.download_file('iris-bucket', 'iris_example/config', '/tmp/config')

    KServe = KServeClient(config_file='/tmp/config')
    
  
    try:
        KServe.delete(name=name, namespace=namespace)
        print("Modello precedente eliminato")
    except:
        print("Non posso eliminare")
    time.sleep(10)
    
    KServe.create(isvc)

In [8]:
@dsl.pipeline(
    name='iris',
    description='iris-test')
def iris_pipeline(hyperparameters: dict):
    lettura_task= lettura()
    preprocessing_task=preprocessing(
        dataset_artifact = lettura_task.outputs["dataset_artifact"]
    )
    model_building_task=model_building(
        X_train_artifact = preprocessing_task.outputs["X_train_artifact"]
    )
    model_training_task=model_training(
        X_train_artifact = preprocessing_task.outputs["X_train_artifact"],
        X_test_artifact = preprocessing_task.outputs["X_test_artifact"],
        y_train_artifact = preprocessing_task.outputs["y_train_artifact"],
        y_test_artifact = preprocessing_task.outputs["y_test_artifact"],
        model_artifact = model_building_task.outputs["model_artifact"],
        hyperparameters=hyperparameters
    )
    model_conversion_task=model_conversion(
        trained_model_artifact = model_training_task.outputs["trained_model_artifact"]
    )

    model_serving_task=model_serving(
        onnx_model_artifact = model_conversion_task.outputs["onnx_model_artifact"]
    )
    lettura_task.set_caching_options(False)
    preprocessing_task.set_caching_options(False)
    model_building_task.set_caching_options(False)
    model_training_task.set_caching_options(False)
    model_conversion_task.set_caching_options(False)
    model_serving_task.set_caching_options(False)
    kfp.kubernetes.use_secret_as_env(lettura_task, 'iris-secret', {'S3_ACCESS_KEY_ID':'S3_ACCESS_KEY_ID', 'S3_SECRET_ACCESS_KEY':'S3_SECRET_ACCESS_KEY', 'S3_ENDPOINT':'S3_ENDPOINT'})
    kfp.kubernetes.use_secret_as_env(model_conversion_task, 'iris-secret', {'S3_ACCESS_KEY_ID':'S3_ACCESS_KEY_ID', 'S3_SECRET_ACCESS_KEY':'S3_SECRET_ACCESS_KEY', 'S3_ENDPOINT':'S3_ENDPOINT'})
    kfp.kubernetes.use_secret_as_env(model_serving_task, 'iris-secret', {'S3_ACCESS_KEY_ID':'S3_ACCESS_KEY_ID', 'S3_SECRET_ACCESS_KEY':'S3_SECRET_ACCESS_KEY', 'S3_ENDPOINT':'S3_ENDPOINT'})

In [9]:
from kfp import compiler
compiler.Compiler().compile(iris_pipeline, 'pipeline_iris.yaml')