# MNIST Digits Dataset

## Importing The Data

In [1]:
def load_data():
    from tensorflow import keras
    import numpy as np
    import os
    """Load MNIST dataset and save train and test data as files in the file system."""
    
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
    
    # check shape of the data
    print("<==== data shape ====>")
    print(f"x_train: {x_train.shape} || y_train: {y_train.shape}")
    print(f"x_test: {x_test.shape} || y_test: {y_test.shape}")
    
    # Create directory to save data files
    if not os.path.exists("data"):
        os.makedirs("data")

    # Save train and test data as files
    np.save("data/x_train.npy", x_train)
    np.save("data/y_train.npy", y_train)
    np.save("data/x_test.npy", x_test)
    np.save("data/y_test.npy", y_test)

## Prepare The Data

In [2]:
def prepare_data():
    import numpy as np
    """Reshape the data to have 4D tensor shape (-1, 28, 28, 1)."""
    x_train = np.load("data/x_train.npy")
    x_test = np.load("data/x_test.npy")
    
    x_train = x_train.reshape(-1, 28, 28, 1)
    x_test = x_test.reshape(-1, 28, 28, 1)

    """Normalize the data by dividing it by 255."""
    x_train = x_train / 255
    x_test = x_test / 255

    np.save("data/x_train.npy", x_train)
    np.save("data/x_test.npy", x_test)

## Model Building

In [3]:
def build_model():
    from tensorflow import keras
    import numpy as np
    import os
    from google.cloud import storage

    """Build the model."""
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))
        
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu'))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))

    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax'))
    
    """Compile the model."""
    model.compile(optimizer="adam", loss="sparse_categorical_crossentropy", metrics=['accuracy'])
    
    """Train the CNN model and save the trained model to the file system."""
    #load the training dataset
    x_train = np.load("data/x_train.npy")
    y_train = np.load("data/y_train.npy")
    
    #train the model
    print(f"\n<==== model training ====>")    
    model.fit(x=x_train, y=y_train, epochs=1)
    
    # # Save the trained model
    # if not os.path.exists("data/models"):
    #     os.makedirs("data/models")
    keras.models.save_model(model, "/tmp/models/digits-recognizer")
    
    """Evaluate the model"""
    #load the testing dataset
    x_test = np.load("data/x_test.npy")
    y_test = np.load("data/y_test.npy")

    # Evaluate the model
    test_loss, test_acc = model.evaluate(x=x_test, y=y_test, verbose=0)
    print(f"Test Loss: {test_loss:.4f} || Test Accuracy: {test_acc:.4f}")
    
    # Upload the model to GCS; require for model serving
    client = storage.Client()

    source_directory = '/tmp/models/digits-recognizer'
    bucket_name = 'hxyro'

    for dirpath, dirnames, filenames in os.walk(source_directory):
        for filename in filenames:
            local_file_path = os.path.join(dirpath, filename)
            relative_file_path = os.path.relpath(local_file_path, source_directory)
            blob_name = os.path.join("models/digits-recognizer/1",relative_file_path).replace("\\", "/")
            bucket = client.get_bucket(bucket_name)
            blob = bucket.blob(blob_name)
            blob.upload_from_filename(local_file_path)
            print(f'Successfully uploaded {local_file_path} to GCS: {blob_name}')


## Model serving

In [4]:
def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    name='digits-recognizer'
    api_version = constants.KSERVE_GROUP + '/v1beta1'

    isvc = V1beta1InferenceService(api_version=api_version, kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                       predictor=V1beta1PredictorSpec(
                                           tensorflow=(V1beta1TFServingSpec(storage_uri="gs://hxyro/models/digits-recognizer")))))

    KServe = KServeClient()
    KServe.create(isvc)

## Build Pipelie

In [5]:
import kfp
import kfp.dsl as dsl

create_step_load_data = kfp.components.create_component_from_func(
    func=load_data,
    base_image="hxyro/tensorflow-full",
)
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image="hxyro/tensorflow-full",
)
create_step_build_model = kfp.components.create_component_from_func(
    func=build_model,
    base_image="hxyro/tensorflow-full",
    packages_to_install=['google-cloud-storage==2.8.0']
)
create_step_serve_model = kfp.components.create_component_from_func(
    func=model_serving,
    base_image="hxyro/tensorflow-full",
    packages_to_install=['kserve==0.10.1']
)
# Define the pipeline
@dsl.pipeline(
   name='Kubeflow Pipeline',
   description='A sample pipeline that build and train digits-recognizer model'
)

# Define parameters to be fed into pipeline
def build_pipeline():
    
    #difine volume
    vop = dsl.VolumeOp(
    name="t-vol",
    resource_name="t-vol", 
    size="5Gi",
    modes=dsl.VOLUME_MODE_RWM)
    
    #difine pipeline with Dependency graph
    load_data_task = create_step_load_data().add_pvolumes({'/data': vop.volume})
    prepare_data_task = create_step_prepare_data().add_pvolumes({'/data': vop.volume}).after(load_data_task)
    build_and_train_task = create_step_build_model().add_pvolumes({'/data': vop.volume}).after(prepare_data_task)
    serve_model_task = create_step_serve_model().after(build_and_train_task)
    
    #remove cache (optional)
    load_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    build_and_train_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    serve_model_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
kfp.compiler.Compiler().compile(
    pipeline_func=build_pipeline,
    package_path='pipeline.yaml')