In [1]:
import kfp.compiler
from kfp import dsl
from kfp.dsl import Input, Output, Artifact

# Step 1: Download Image, Preprocess, Predict, and Decode Predictions
@dsl.component(base_image="quay.io/nageshrathod/pipeline:pipeline1")
def download_and_predict(output_predictions: Output[Artifact]):
    import numpy as np
    import cv2
    import requests
    from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

    model = MobileNetV2(weights='imagenet')
    
    # downloaad the image of cat to process it
    url = "https://raw.githubusercontent.com/redhat-developer-demos/openshift-ai/main/2_Cat-dog-prediction/cat.jpg"
    response = requests.get(url)
    img_array = np.array(bytearray(response.content), dtype=np.uint8)
    img = cv2.imdecode(img_array, cv2.IMREAD_COLOR)
    img = cv2.resize(img, (224, 224))
    
    # create a numpy array for he model
    data = np.empty((1, 224, 224, 3))
    
    # store our image inside tbhe batch of images
    data[0] = img
    data = preprocess_input(data)

    #classify :
    predictions = model.predict(data)
    
    # how to get the predicitions :
    np.save(output_predictions.path, predictions)

# Step 2: Load Model and Save to .h5 Format
@dsl.component(base_image="tensorflow/tensorflow:2.9.1")
def load_and_save_model(model_output: Output[Artifact]):
    from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
    import os

    model_instance = MobileNetV2(weights='imagenet')
    model_output_path = os.path.join(model_output.path, "mobilenetv2_model.h5")
    model_instance.save(model_output_path)

# Step 3: Convert MobileNetV2 Model to ONNX Format
@dsl.component(base_image="quay.io/nageshrathod/pipeline:pipeline1")
def convert_model_to_onnx(model_input: Input[Artifact], onnx_model_output: Output[Artifact]):
    import os
    # import subprocess
    import tf2onnx
    import tensorflow as tf

    # Load the TensorFlow model
    model_instance = tf.keras.models.load_model(os.path.join(model_input.path, "mobilenetv2_model.h5"))

    # Convert the model to ONNX format
    onnx_model_content, _ = tf2onnx.convert.from_keras(model_instance, opset=13)

    # Ensure the directory exists before saving the ONNX model    
    onnx_model_path = os.path.join(onnx_model_output.path, "mobilenetv2_model.onnx")
    os.makedirs(os.path.dirname(onnx_model_path), exist_ok=True)
    
    # Save the ONNX model
    with open(onnx_model_path, "wb") as f:
        f.write(onnx_model_content.SerializeToString())

# # Step 4: Save ONNX Model to MinIO
@dsl.component(base_image="quay.io/nageshrathod/pipeline:pipeline1")
def save_onnx_to_minio(onnx_model_input: Input[Artifact]):
    import os
    import boto3
    from botocore.client import Config

    s3 = boto3.client(
        's3',
        endpoint_url='https://minio-api-nageshrathod-dev.apps.sandbox-abc.openshiftapps.com',
        aws_access_key_id='minio',
        aws_secret_access_key='minio123',
        config=Config(signature_version='s3v4')
    )
    
    bucket_name = 'mobilenet-v2'
    object_name = 'models/mobilenetv2_model.onnx'

    # Construct the full file path
    onnx_model_file_path = os.path.join(onnx_model_input.path, "mobilenetv2_model.onnx")

    # Upload the ONNX model file to MinIO
    s3.upload_file(onnx_model_file_path, bucket_name, object_name)
    
    print(f"ONNX model saved to MinIO bucket '{bucket_name}' with object name '{object_name}'.")


# Pipeline Definition
@dsl.pipeline(name="image-prediction-pipeline")
def image_prediction_pipeline():
    # Task 1: Download image, preprocess, and predict
    download_and_predict_task = download_and_predict()

    # Task 2: Load and save the model to .h5 format
    save_model_task = load_and_save_model()
    save_model_task.after(download_and_predict_task)

    # Task 3: Convert the model to ONNX format
    convert_to_onnx_task = convert_model_to_onnx(model_input=save_model_task.outputs['model_output'])
    convert_to_onnx_task.after(save_model_task)

    # Task 4: Save the ONNX model to MinIO
    save_onnx_to_minio_task = save_onnx_to_minio(onnx_model_input=convert_to_onnx_task.outputs['onnx_model_output'])
    save_onnx_to_minio_task.after(convert_to_onnx_task)

if __name__ == "__main__":
    kfp.compiler.Compiler().compile(image_prediction_pipeline, package_path="image_prediction_data_pipeline.yaml")