In [22]:
import kfp
from kfp import dsl
import kfp.components as components
import seaborn as sns
import matplotlib.pyplot as plt
from typing import NamedTuple
def get_data_batch() -> NamedTuple('Outputs', [('datapoints_training', float),('datapoints_test', float),('dataset_version', str)]):
    """
    Function to get dataset and load it to minio bucket
    """
    print("getting data")
    from tensorflow import keras
    from minio import Minio
    import numpy as np
    import json

    minio_client = Minio(
        "192.168.1.235:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )
    minio_bucket = "mlpipeline"
    from tensorflow.keras.datasets import mnist
    import numpy as np
    
    def load_data():
        (x_train, y_train), (x_test, y_test) = mnist.load_data()
        return (x_train, y_train), (x_test, y_test)
    
    (x_train, y_train), (x_test, y_test) = load_data()

    x_predict = x_test[:15]
    x_test = x_test[15:]
    y_test = y_test[15:]
    
    x_train = x_train[:20000]
    y_train = y_train[:20000]

    # save to numpy file, store in Minio
    np.save("tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","tmp/x_train.npy")

    np.save("tmp/y_train.npy",y_train)
    minio_client.fput_object(minio_bucket,"y_train","tmp/y_train.npy")

    np.save("tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","tmp/x_test.npy")

    np.save("tmp/y_test.npy",y_test)
    minio_client.fput_object(minio_bucket,"y_test","tmp/y_test.npy")
    
    #save to numpy file, store in Minio tap predict
    np.save("tmp/x_predict.npy",x_predict)
    minio_client.fput_object(minio_bucket,"x_predict","tmp/x_predict.npy")

    dataset_version = "1.0"
    
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")

    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    from collections import namedtuple
    divmod_output = namedtuple('Outputs', ['datapoints_training', 'datapoints_test', 'dataset_version'])
    return [float(x_train.shape[0]),float(x_test.shape[0]),dataset_version]
    
    
def reshape_data():
    """
    Reshape the data for model building
    """
    print("reshaping data")
    
    from minio import Minio
    import numpy as np

    minio_client = Minio(
        "192.168.1.235:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )
    minio_bucket = "mlpipeline"

    # load data from minio
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    #get data predict from minio
    minio_client.fget_object(minio_bucket,"x_predict","/tmp/x_predict.npy")
    x_predict = np.load("/tmp/x_predict.npy")
    
    # reshaping the data
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)
    x_predict = x_predict.reshape(-1,28,28,1)
    
    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    x_predict = x_predict/255
    
    # save data from minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")
    
    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")
    
    # save data predict from minio
    np.save("/tmp/x_predict.npy",x_predict)
    minio_client.fput_object(minio_bucket,"x_predict","/tmp/x_predict.npy")

def model_building(
    no_epochs:int = 5,
    optimizer: str = "adam"
) -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]):
    """
    Build the model with Keras API
    Export model parameters
    """
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    import numpy as np
    import pandas as pd
    import json
    import matplotlib.pyplot as plt
    import seaborn as sns
    from sklearn.metrics import plot_confusion_matrix
    minio_client = Minio(
        "192.168.1.235:9000",
        access_key="minioadmin",
        secret_key="minioadmin",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu',input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(512, activation='relu'))

    model.add(keras.layers.Dense(310, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax'))

    #show model summary - how it looks
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)
    
    #compile the model - we want to have a binary outcome
    model.compile(optimizer=optimizer,
            loss="sparse_categorical_crossentropy",
            metrics=['accuracy'])
    
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"y_train","/tmp/y_train.npy")
    y_train = np.load("/tmp/y_train.npy")
    
    #fit the model and return the history while training
    history = model.fit(
    x=x_train,
    y=y_train,
    epochs=10,
    batch_size=128,
    )
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    minio_client.fget_object(minio_bucket,"y_test","/tmp/y_test.npy")
    y_test = np.load("/tmp/y_test.npy")

    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    # Confusion Matrix

    # Generates output predictions for the input samples.
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions,axis=1) 

    # generate confusion matrix
    confusion_matrix = tf.math.confusion_matrix(labels=y_test,predictions=test_predictions)
    confusion_matrix = confusion_matrix.numpy()
    
    minio_client.fget_object(minio_bucket,"x_predict","tmp/x_predict.npy")
    image = np.load('tmp/x_predict.npy')
    image_predict = model.predict(image)
    list_result=[]
    for i in range(15):
        show_image = plt.imshow(image[i]*255,cmap=plt.get_cmap('gray'),vmin=0,vmax=1)
        plt.savefig(f'tmp/image_{i}.png', dpi = 200)
        minio_client.fput_object(minio_bucket,f"image_{i}.png",f"tmp/image_{i}.png")
        index_value = max(image_predict[i])
        for j in range(10):
            if image_predict[i][j] == index_value:
                list_result.append(j)
                
    print(list_result)
    sr_listresult = pd.Series(list_result)
    print(sr_listresult)
    np.save("/tmp/list_result.npy", list_result)
    minio_client.fput_object(minio_bucket,"list_result","/tmp/list_result.npy")
    
    
    vocab = list(np.unique(y_test))
    data = []
    for target_index, target_row in enumerate(confusion_matrix):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))

    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    cm_csv = df_cm.to_csv(header=False, index=False)

    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": [0,1,2,3,4,5,6,7,8,9]
            },
            {
                'storage': 'inline',
                'source': '''# Model Overview
## Model Summary

```
{}
```

## Model Performance

**Accuracy**: {}
**Loss**: {}

'''.format(metric_model_summary,model_accuracy,model_loss),
                'type': 'markdown',
            }
        ]
    }
    
    metrics = {
      'metrics': [{
          'name': 'model_accuracy',
          'numberValue':  float(model_accuracy),
          'format' : "PERCENTAGE"
        },{
          'name': 'model_loss',
          'numberValue':  float(model_loss),
          'format' : "PERCENTAGE"
        }]}

    ### Save model to minIO
    model.save('/tmp/model_detect.h5')
    minio_client.fput_object(minio_bucket,"model.h5","/tmp/model_detect.h5")
    
    from minio import Minio
    import os

    minio_client = Minio(
            "192.168.1.235:9000",
            access_key="minioadmin",
            secret_key="minioadmin",
            secure=False
        )
    minio_bucket = "mlpipeline"


    import glob

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)
    
    print("Saved model to minIO")
    
    from collections import namedtuple
    output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return output(json.dumps(metadata),json.dumps(metrics))


    
comp_get_data_batch = components.create_component_from_func(get_data_batch,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0")
comp_reshape_data = components.create_component_from_func(reshape_data,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0")
comp_model_building = components.create_component_from_func(model_building,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0")


@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)
def output_test(no_epochs,optimizer):
    step1 = comp_get_data_batch()
    
    step2 = comp_reshape_data()
    step2.after(step1)
    
    step3 = comp_model_building(no_epochs,optimizer)
    step3.after(step2)


if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs" : 1,
        "optimizer": "adam"
    }

    run_directly = 1
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="test")
    else:
        kfp.compiler.Compiler().compile(pipeline_func=output_test,package_path='output_test.yaml')
        client.upload_pipeline_version(pipeline_package_path='output_test.yaml',pipeline_version_name="0.4",pipeline_name="pipeline test",description="just for testing")

In [20]:
import cv2 as cv
import tensorflow as tf
from tensorflow import keras
import numpy as np
model = keras.models.load_model('model1.keras')
user_input = r"img1.jpg"
if user_input.endswith('.txt'):
    with open(user_input, 'r') as file:
        data = file.read().split(',')
    # Convert the input data to numpy array
    input_data = np.array(data, dtype=np.float32).reshape(1, 28, 28, 1) / 255.0  # Assuming 28x28 grayscale image
else:
    # Assume input is a path to an image file
    img = tf.keras.preprocessing.image.load_img(user_input, color_mode='grayscale', target_size=(28, 28))
    img_array = tf.keras.preprocessing.image.img_to_array(img)
    input_data = np.expand_dims(img_array, axis=0) / 255.0  # Normalize the image data

# Perform prediction using the loaded model
predictions = model.predict(input_data)

# Get the predicted class (assuming a classification task)
predicted_class = np.argmax(predictions)

print(f"The predicted class is: {predicted_class}")

The predicted class is: 3
