# From Notebook to Kubeflow Pipeline using Fashion MNIST

In this notebook, we will walk you through the steps of converting a machine learning model, which you may already have on a jupyter notebook, into a Kubeflow pipeline. As an example, we will make use of the fashion we will make use of the fashion MNIST dataset and the [Basic classification with Tensorflow](https://www.tensorflow.org/tutorials/keras/classification) example.

In this example we use:

* **Kubeflow pipelines** - [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) is a machine learning workflow platform that is helping data scientists and ML engineers tackle experimentation and productionization of ML workloads. It allows users to easily orchestrate scalable workloads using an SDK right from the comfort of a Jupyter Notebook.

* **Microk8s** - [Microk8s](https://microk8s.io/docs) is a service that gives you the ability to spin up a lightweight Kubernetes cluster right on your local machine. It comes with Kubeflow built right in. 

**Note:** This notebook is to be run on a notebook server inside the Kubeflow environment. 

## Section 1: Data exploration (as in [here](https://www.tensorflow.org/tutorials/keras/classification))

The [Fashion MNIST](https://github.com/zalandoresearch/fashion-mnist)  dataset contains 70,000 grayscale images in 10 clothing categories. Each image is 28x28 pixels in size. We chose this dataset to demonstrate the funtionality of Kubeflow Pipelines without introducing too much complexity in the implementation of the ML model.

To familiarize you with the dataset we will do a short exploration. It is always a good idea to understand your data before you begin any kind of analysis.

<table>
  <tr><td>
    <img src="https://tensorflow.org/images/fashion-mnist-sprite.png"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
    <b>Figure 1.</b> <a href="https://github.com/zalandoresearch/fashion-mnist">Fashion-MNIST samples</a> (by Zalando, MIT License).<br/>&nbsp;
  </td></tr>
</table>


### 1.1 Install packages:

In [None]:
!python -m pip install --user --upgrade pip
!pip install --user --upgrade pandas matplotlib numpy

After the installation, we need to restart kernel for changes to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

### 1.2 Import libraries

In [None]:
# TensorFlow and tf.keras
import tensorflow as tf
from tensorflow import keras

# Data exploration libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

### 1.2 Import the Fashion MNIST dataset

In [None]:
fashion_mnist = keras.datasets.fashion_mnist

(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

Each image is mapped to a single label. Since the *class names* are not included with the dataset, store them here to use later when plotting the images:

In [None]:
class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
               'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

Let's look at the format of each dataset split. The training set contains 60,000 images and the test set contains 10,000 images which are each 28x28 pixels.

---



### 1.3 Explore the data

In [None]:
print(f'Number of training images: {train_images.shape[0]}\n')
print(f'Number of test images: {test_images.shape[0]}\n')

print(f'Image size: {train_images.shape[1:]}')


There are logically 60,000 training labels and 10,000 test labels.

In [None]:
print(f'Number of labels: {len(train_labels)}\n')
print(f'Number of test labels: {len(test_labels)}')

Each label is an integer between 0 and 9 corresponding to the 10 class names.

In [None]:
unique_train_labels = np.unique(train_labels)

for label in zip(class_names, train_labels):
  label_name, label_num = label
  print(f'{label_name}: {label_num}')

### 1.4 Preprocess the data

To properly train the model, the data must be normalized so each value will fall between 0 and 1. Later on this step will be done inside of the training script but we will show what that process looks like here.

The first image shows that the values fall in a range between 0 and 255.

In [None]:
plt.figure()
plt.imshow(train_images[0])
plt.colorbar()
plt.grid(False)
plt.show()

To scale the data we divide the training and test values by 255.

In [None]:
train_images = train_images / 255.0

test_images = test_images / 255.0

We plot the first 25 images from the training set to show that the data is in fact in the form we expect.

In [None]:
plt.figure(figsize=(10,10))
for i in range(25):
    plt.subplot(5,5,i+1)
    plt.xticks([])
    plt.yticks([])
    plt.grid(False)
    plt.imshow(train_images[i], cmap=plt.cm.binary)
    plt.xlabel(class_names[train_labels[i]])
plt.show()

# Section 2: Kubeflow pipeline building

Up until this point, all our steps are similar to what you can find in the [Basic classification with Tensorflow](https://https://www.tensorflow.org/tutorials/keras/classification) example. The next step on that example is to build the model. However, we will make use of the containerized approach provided by Kubeflow to allow our model to be run using Kubernetes.

### 2.1 Install Kubeflow pipelines SDK

 The first step is to install the Kubeflow Pipelines SDK package.



In [7]:
!pip install --user --upgrade kfp

Collecting kfp
[?25l  Downloading https://files.pythonhosted.org/packages/ee/44/537b4c8877fb90508f0cd481bac9fdef48aa539e9db2008a6c16325d59cf/kfp-0.5.1.tar.gz (119kB)
[K     |████████████████████████████████| 122kB 2.1MB/s eta 0:00:01
Collecting kfp-server-api<0.6.0,>=0.2.5 (from kfp)
  Downloading https://files.pythonhosted.org/packages/c3/aa/c1206ff5683c6c9f6e9dfa425b89fb9d52e2ea6b9b327e026a974a2ca4ab/kfp-server-api-0.5.0.tar.gz
Collecting Deprecated (from kfp)
  Downloading https://files.pythonhosted.org/packages/76/a1/05d7f62f956d77b23a640efc650f80ce24483aa2f85a09c03fb64f49e879/Deprecated-1.2.10-py2.py3-none-any.whl
Collecting strip-hints (from kfp)
  Downloading https://files.pythonhosted.org/packages/72/a5/1b8e2990d3f2b24a83d682607a0de5a745ad24de1b2bca33b1561c303a9c/strip-hints-0.1.9.tar.gz
Building wheels for collected packages: kfp, kfp-server-api, strip-hints
  Building wheel for kfp (setup.py) ... [?25ldone
[?25h  Created wheel for kfp: filename=kfp-0.5.1-cp36-none-any.whl

After the installation, we need to restart kernel for changes to take effect:

In [None]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

Check if the install was successful:

**/usr/local/bin/dsl-compile**

In [1]:
!which dsl-compile

/usr/local/bin/dsl-compile


You should see **/usr/local/bin/dsl-compile** above.

### 2.2 Build Container Components

The following cells define functions that will be transformed into lightweight container components. It is recommended to look at the corresponding Fashion MNIST notebook to match what you see here to the original code.

<table>
  <tr><td>
    <img src="https://www.kubeflow.org/docs/images/pipelines-sdk-lightweight.svg"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
  </td></tr>
</table>


In [2]:
# Import Kubeflow SDK
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [3]:
def train(data_path, model_file):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import pickle
    
    import tensorflow as tf
    from tensorflow.python import keras
    
    # Download the dataset and split into training and test data. 
    fashion_mnist = keras.datasets.fashion_mnist

    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

    # Normalize the data so that the values all fall between 0 and 1.
    train_images = train_images / 255.0
    test_images = test_images / 255.0

    # Define the model using Keras.
    model = keras.Sequential([
    keras.layers.Flatten(input_shape=(28, 28)),
    keras.layers.Dense(128, activation='relu'),
    keras.layers.Dense(10)
    ])

    model.compile(optimizer='adam',
                  loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=['accuracy'])

    # Run a training job with specified number of epochs
    model.fit(train_images, train_labels, epochs=10)

    # Evaluate the model and print the results
    test_loss, test_acc = model.evaluate(test_images,  test_labels, verbose=2)
    print('Test accuracy:', test_acc)

    # Save the model to the designated 
    model.save(f'{data_path}/{model_file}')

    # Save the test_data as a pickle file to be used by the predict component.
    with open(f'{data_path}/test_data', 'wb') as f:
        pickle.dump((test_images,test_labels), f)

In [4]:
def predict(data_path, model_file, image_number):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import pickle

    import tensorflow as tf
    from tensorflow import keras

    import numpy as np
    
    # Load the saved Keras model
    model = keras.models.load_model(f'{data_path}/{model_file}')

    # Load and unpack the test_data
    with open(f'{data_path}/test_data','rb') as f:
        test_data = pickle.load(f)
    # Separate the test_images from the test_labels.
    test_images, test_labels = test_data
    # Define the class names.
    class_names = ['T-shirt/top', 'Trouser', 'Pullover', 'Dress', 'Coat',
                   'Sandal', 'Shirt', 'Sneaker', 'Bag', 'Ankle boot']

    # Define a Softmax layer to define outputs as probabilities
    probability_model = tf.keras.Sequential([model, 
                                            tf.keras.layers.Softmax()])

    # See https://github.com/kubeflow/pipelines/issues/2320 for explanation on this line.
    image_number = int(image_number)

    # Grab an image from the test dataset.
    img = test_images[image_number]

    # Add the image to a batch where it is the only member.
    img = (np.expand_dims(img,0))

    # Predict the label of the image.
    predictions = probability_model.predict(img)

    # Take the prediction with the highest probability
    prediction = np.argmax(predictions[0])

    # Retrieve the true label of the image from the test labels.
    true_label = test_labels[image_number]
    
    class_prediction = class_names[prediction]
    confidence = 100*np.max(predictions)
    actual = class_names[true_label]
    
    
    with open(f'{data_path}/result.txt', 'w') as result:
        result.write(" Prediction: {} | Confidence: {:2.0f}% | Actual: {}".format(class_prediction,
                                                                        confidence,
                                                                        actual))
    
    print('Prediction has be saved successfully!')

In [5]:
# Create train and predict lightweight components.
train_op = comp.func_to_container_op(train, base_image='tensorflow/tensorflow:latest-gpu-py3')
predict_op = comp.func_to_container_op(predict, base_image='tensorflow/tensorflow:latest-gpu-py3')

### 2.3 Build Kubeflow Pipeline

Create a client to enable communication with the Pipelines API server.

In [6]:
client = kfp.Client(host='pipelines-api.kubeflow.svc.cluster.local:8888')

Kubeflow Pipelines are created decalaratively. This means that the code is not run until the pipeline is compiled. 

Define some environment variables which are to be used as inputs at various points in the pipeline.

In [7]:
DATA_PATH = '/mnt'
MODEL_PATH='mnist_model.h5'
# An integer representing an image from the test set that the model will attempt to predict the label for.
IMAGE_NUMBER = 0

Our next step will be to create the various components that will make up the pipeline. Define the pipeline using the *@dsl.pipeline* decorator.

The pipeline function is defined and includes a number of paramters that will be fed into our various components throughout execution.

A [Persistent Volume Claim](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) can be quickly created using the [VolumeOp](https://) method to save and persist data between the components. Note that while this is a great method to use locally, you could also use a cloud bucket for your persistent storage.

In [8]:
# Define the pipeline
@dsl.pipeline(
   name='MNIST Pipeline',
   description='A toy pipeline that performs mnist model training and prediction.'
)

# Define parameters to be fed into pipeline
def mnist_container_pipeline(
    data_path: str = DATA_PATH,
    model_file: str = MODEL_PATH, 
    image_number: int = IMAGE_NUMBER
):
    
    # Define volume to share data between components.
    vop = dsl.VolumeOp(
    name="create_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWM)
    
    # Create MNIST training component.
    mnist_training_container = train_op(data_path, model_file) \
                                    .add_pvolumes({data_path: vop.volume})

    # Create MNIST prediction component.
    mnist_predict_container = predict_op(data_path,
                                         model_file,
                                         image_number
                                         ) \
                                    .add_pvolumes({data_path: mnist_training_container.pvolume})
    
    # Print the result of the prediction
    mnist_result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: mnist_predict_container.pvolume},
        arguments=['cat', f'{data_path}/result.txt']
    )

### 2.4 Run pipeline

Finally we feed our pipeline definition into the compiler and run it as an experiment. This will give us 2 links at the bottom that we can follow to the [Kubeflow Pipelines UI](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) where you can check logs, artifacts, inputs/outputs, and visually see the progress of your pipeline.

In [9]:
pipeline_func = mnist_container_pipeline

In [10]:
experiment_name = 'fashion_mnist_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
             "model_file":MODEL_PATH,
             "image_number": IMAGE_NUMBER}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)