# Digit Recognizer Data Pipeline Notebook

In this [Kaggle competition](https://www.kaggle.com/competitions/digit-recognizer/overview) 

>MNIST ("Modified National Institute of Standards and Technology") is the de facto “hello world” dataset of computer vision. Since its release in 1999, this classic dataset of handwritten images has served as the basis for benchmarking classification algorithms. As new machine learning techniques emerge, MNIST remains a reliable resource for researchers and learners alike.

>In this competition, your goal is to correctly identify digits from a dataset of tens of thousands of handwritten images.

## Install necessary packages

We use the requirement.txt file to list all the dependencies and then run pip install for the requirements.

In [1]:
%pip install -r requirements.txt --user --quiet

Note: you may need to restart the kernel to use updated packages.


  DEPRECATION: termcolor is being installed using the legacy 'setup.py install' method, because it does not have a 'pyproject.toml' and the 'wheel' package is not installed. pip 23.1 will enforce this behaviour change. A possible replacement is to enable the '--use-pep517' option. Discussion can be found at https://github.com/pypa/pip/issues/8559
  DEPRECATION: func-timeout is being installed using the legacy 'setup.py install' method, because it does not have a 'pyproject.toml' and the 'wheel' package is not installed. pip 23.1 will enforce this behaviour change. A possible replacement is to enable the '--use-pep517' option. Discussion can be found at https://github.com/pypa/pip/issues/8559
  DEPRECATION: wrapt is being installed using the legacy 'setup.py install' method, because it does not have a 'pyproject.toml' and the 'wheel' package is not installed. pip 23.1 will enforce this behaviour change. A possible replacement is to enable the '--use-pep517' option. Discussion can be fou

If this is the first time running this pip command, restart the kernel.

## Imports

In this section, we import the packages needed in this example.  It is good practice to gather your imports into a single place.  

In [2]:
# Imports
import kfp
import kfp.dsl as dsl
import kfp.components as components
from typing import NamedTuple



In [4]:
# Define pipeline variables and set default values
user_namespace: str = "kubeflow-user-example-com"

clone_step_container_image: str = "curtisab/ndot-jupyter-scipy:v1alpha1"
clone_step_train_pvc_existing: str = "digits-train"
clone_step_valid_pvc_existing: str = "digits-valid"

shape_step_container_image: str = "curtisab/ndot-jupyter-scipy:v1alpha1"
shape_step_train_pvc: str = "digits-train-clone"
shape_step_train_mountpoint: str = "/mnt/train"
shape_step_valid_pvc: str = "digits-valid-clone"
shape_step_valid_mountpoint: str = "/mnt/valid"

train_step_container_image: str = "curtisab/ndot-jupyter-scipy:v1alpha1"
train_step_train_pvc: str = "digits-train-clone"
train_step_train_mountpoint: str = "/mnt/train"
train_step_valid_pvc: str = "digits-valid-clone"
train_step_valid_mountpoint: str = "/mnt/valid"
train_step_model_pvc_existing: str = "digits-model"
train_step_model_mountpoint: str = "/mnt/model"

serve_step_container_image: str = "curtisab/ndot-jupyter-scipy:v1alpha1"
serve_step_model_pvc_existing: str = "digits-model"
serve_step_model_mountpoint: str = "/mnt/model"

In [5]:
# Set GPU limits; Due to SDK limitations, this must be hardcoded
train_step_num_gpu = 0
valid_step_num_gpu = 0

## Clone the data volumes
This step will run in separate container that will execute the clone volume step

In [7]:
def clone_step():
    print("Data Clone Step")
    
    """
    Clone the existing volumes
    Export clone pvc name
    """

    from netapp_dataops.k8s import clone_volume
    
    clone_volume(sourcePvcName=clone_step_train_pvc_existing, newPvcName=train_step_train_pvc, namespace=user_namespace)
    clone_volume(sourcePvcName=clone_step_valid_pvc_existing, newPvcName=train_step_valid_pvc, namespace=user_namespace)


## Data Preparation 
In this component, this code will run in a containerized enviornment.  It will process the data and as it back to the persistent volume claims.

In [8]:
def shape_step() :

    import os
    import numpy as np

    DATA_TRAIN_FILE = os.path.join(shape_step_train_mountpoint,'train.csv')
    TRAIN_DF = pd.read_csv(DATA_TRAIN_FILE)
    TRAIN_X = TRAIN_DF.drop('label', axis=1)
    TRAIN_Y = TRAIN_DF.label
    # Reshape image in 3 dimensions (height = 28px, width = 28px , channel = 1)... This is needed for the Keras API
    TRAIN_X = TRAIN_X.values.reshape(-1,28,28,1)
    # Normalize the data
    # Each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    TRAIN_X = TRAIN_X /255.0
    DATA_TRAIN_X_FILE = os.path.join(shape_step_train_mountpoint, "train_x.npy")
    np.save(DATA_TRAIN_X_FILE, TRAIN_X)
    DATA_TRAIN_Y_FILE = os.path.join(shape_step_train_mountpoint, "train_y.npy")
    np.save(DATA_TRAIN_Y_FILE, TRAIN_Y)

    DATA_VALID_FILE = os.path.join(shape_step_valid_mountpoint,'valid.csv')
    VALID_DF = pd.read_csv(DATA_VALID_FILE)
    VALID_X = VALID_DF.drop('label', axis=1)
    VALID_Y = VALID_DF.label
    # Reshape image in 3 dimensions (height = 28px, width = 28px , channel = 1)... This is needed for the Keras API
    VALID_X = VALID_X.values.reshape(-1,28,28,1)
    # Normalize the data
    # Each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    VALID_X = VALID_X /255.0 
    DATA_VALID_X_FILE = os.path.join(shape_step_valid_mountpoint, "valid_x.npy")
    np.save(DATA_VALID_X_FILE, VALID_X)
    DATA_VALID_Y_FILE = os.path.join(shape_step_valid_mountpoint, "valid_y.npy")
    np.save(DATA_VALID_Y_FILE, VALID_Y)
   

## Model generation step
This step will execute in a separate container.  It will save the model to the model persistent volume claim.  

In [None]:
def train_step(    
    no_epochs:int = 1,
    optimizer: str = "adam"
) -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]):

    print("Model Generation Step")

    """
    Build the model with Keras API
    Export model parameters
    """
    from tensorflow import keras
    import tensorflow as tf
    import numpy as np
    import pandas as pd
    import json

    # Construct the model structure
    
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))

    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax')) # Output are 10 classes, numbers from 0-9

    # Show model summary - how it looks
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)
    
    # Compile the model - we want to have a binary outcome
    model.compile(optimizer=optimizer,
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])

    # Get the data
    
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"y_train","/tmp/y_train.npy")
    y_train = np.load("/tmp/y_train.npy")
    
    # Fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    minio_client.fget_object(minio_bucket,"y_test","/tmp/y_test.npy")
    y_test = np.load("/tmp/y_test.npy")
    

    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    # Confusion Matrix

    # Generates output predictions for the input samples.
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions,axis=1) # the prediction outputs 10 values, we take the index number of the highest value, which is the prediction of the model

    # Generate confusion matrix
    confusion_matrix = tf.math.confusion_matrix(labels=y_test,predictions=test_predictions)
    confusion_matrix = confusion_matrix.numpy()
    vocab = list(np.unique(y_test))
    data = []
    for target_index, target_row in enumerate(confusion_matrix):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))

    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    cm_csv = df_cm.to_csv(header=False, index=False)
    
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                  ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": [0,1,2,3,4,5,6,7,8,9]
            },
            {
                'storage': 'inline',
                'source': '''# Model Overview
## Model Summary

```
{}
```

## Model Performance

**Accuracy**: {}
**Loss**: {}

'''.format(metric_model_summary,model_accuracy,model_loss),
                'type': 'markdown',
            }
        ]
    }
    
    metrics = {
      'metrics': [{
          'name': 'model_accuracy',
          'numberValue':  float(model_accuracy),
          'format' : "PERCENTAGE"
        },{
          'name': 'model_loss',
          'numberValue':  float(model_loss),
          'format' : "PERCENTAGE"
        }]}
    
    ### Save model to minIO
    
    keras.models.save_model(model,"/tmp/detect-digits")
    
    from minio import Minio
    import os

    minio_client = Minio(
            "100.65.11.110:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
    minio_bucket = "mlpipeline"


    import glob

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)

    upload_local_directory_to_minio("/tmp/detect-digits",minio_bucket,"models/detect-digits/1/") # 1 for version 1
    
    print("Saved model to minIO")
    
    from collections import namedtuple
    output = namedtuple('output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return output(json.dumps(metadata),json.dumps(metrics))
    


: 

In [None]:
# def serve_step():
#     print("Model Serve Step")
#     """
#     Create kserve instance
#     """
#     from kubernetes import client 
#     from kserve import KServeClient
#     from kserve import constants
#     from kserve import utils
#     from kserve import V1beta1InferenceService
#     from kserve import V1beta1InferenceServiceSpec
#     from kserve import V1beta1PredictorSpec
#     from kserve import V1beta1TFServingSpec
#     from datetime import datetime

#     namespace = utils.get_default_target_namespace()

#     now = datetime.now()
#     v = now.strftime("%Y-%m-%d--%H-%M-%S")

#     name='digits-recognizer-{}'.format(v)
#     kserve_version='v1beta1'
#     api_version = constants.KSERVE_GROUP + '/' + kserve_version

#     isvc = V1beta1InferenceService(api_version=api_version,
#                                    kind=constants.KSERVE_KIND,
#                                    metadata=client.V1ObjectMeta(
#                                        name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
#                                    spec=V1beta1InferenceServiceSpec(
#                                    predictor=V1beta1PredictorSpec(
#                                        tensorflow=(V1beta1TFServingSpec(
#                                            storage_uri="pvc://" + serve_step_model_pvc_existing)))) 
#                                             #QUESTION: Does this need to be something else
#     )
    
#     KServe = KServeClient()
#     KServe.create(isvc)

: 

In [10]:
# Generate components
comp_clone = components.create_component_from_func(clone_step, base_image=clone_step_container_image,
                                                            packages_to_install=['netapp-dataops-k8s==2.4.0'])

comp_shape = components.func_to_container_op(shape_step, base_image=train_step_container_image)
comp_shape.apply(
    kfp.onprem.mount_pvc(shape_step_train_pvc, 'train', shape_step_train_mountpoint)
)
comp_shape.apply(
    kfp.onprem.mount_pvc(shape_step_valid_pvc, 'valid', shape_step_valid_mountpoint)
)

# comp_train= components.create_component_from_func(train_step, base_image=train_step_container_image)
# comp_train.apply(
#     kfp.onprem.mount_pvc(train_step_train_pvc, 'train', train_step_train_mountpoint)
# )
# comp_train.apply(
#     kfp.onprem.mount_pvc(train_step_model_pvc, 'model', train_step_model_mountpoint)
# )

# comp_serve= components.create_component_from_func(serve_step, base_image=serve_step_container_image,
#                                                            packages_to_install=['kserve==0.10.1'])
# comp_serve.apply(
#     kfp.onprem.mount_pvc(serve_step_model_pvc, 'model', serve_step_model_mountpoint)
# )

AttributeError: 'function' object has no attribute 'apply'

In [None]:
@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)

: 

In [None]:
def create_pipe(no_epochs,optimizer):
    step1 = comp_clone()
    step2 = comp_shape()
    step2.after(step1)
    # step3 = comp_train(no_epochs,optimizer)
    # step3.after(step2)
    # step4 = comp_serve()
    # step4.after(step3)


: 

In [None]:
if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs" : 1,
        "optimizer": "adam"
    }

    now = datetime.now()
    pipe_version = now.strftime("%Y-%m-%d--%H-%M-%S")
    pipe_name = "digits-pipe-" + pipe_version
    pipe_file = pipe_name + ".yaml"
    pipe_description = "A sample digit recognizer pipeline"

    # Set this to 1 to run in Kubeflow instead of creating a yaml
    run_directly = 0
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(creaet_pipe,arguments=arguments,experiment_name=pipe_name)
    else:
        kfp.compiler.Compiler().compile(pipeline_func=create_pipe,package_path=pipe_file)
        #client.upload_pipeline_version(pipeline_package_path=pipe_file,pipeline_version_name=pipe_version,pipeline_name=pipe_name,description=pipe_description)

: 

: 