# From Notebook to Kubeflow Pipeline using Lung Disease Detection

In this notebook, we will walk you through the steps of converting a machine learning model, which you may already have on a jupyter notebook, into a Kubeflow pipeline. As an example, we will make use of Lung Disease Detection use case.

In this example we use:

* **Kubeflow pipelines** - [Kubeflow Pipelines](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) is a machine learning workflow platform that is helping data scientists and ML engineers tackle experimentation and productionization of ML workloads. It allows users to easily orchestrate scalable workloads using an SDK right from the comfort of a Jupyter Notebook.

**Note:** This notebook is to be run on a notebook server inside the Kubeflow environment. 

## Kubeflow pipeline building
we will make use of the containerized approach provided by Kubeflow to allow our model to be run using Kubernetes.

### 1. Install Kubeflow pipelines SDK

 The first step is to install the Kubeflow Pipelines SDK package.

In [1]:
# !pip install --user --upgrade kfp

After the installation, we need to restart kernel for changes to take effect:

Check if the install was successful:

In [2]:
# !which dsl-compile

You should see /usr/local/bin/dsl-compile above.

### 2. Build Container Components

The following cells define functions that will be transformed into lightweight container components. It is recommended to look at the corresponding F notebook to match what you see here to the original code.

In [3]:
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

<table>
  <tr><td>
    <img src="https://www.kubeflow.org/docs/images/pipelines-sdk-lightweight.svg"
         alt="Fashion MNIST sprite"  width="600">
  </td></tr>
  <tr><td align="center">
  </td></tr>
</table>

Components are self-contained pieces of code: Python functions.

The function must be completely self-contained. No code (incl. imports) can be defined outside of the body itself. All imports must be included in the function body itself! Imported packages must be available in the base image.

Why? Because each component will be packaged as a Docker image. The base image must therefore contain all dependencies. Any dependencies you install manually in the notebook are invisible to the Python function once it is inside the image. The function itself becomes the entrypoint of the image, which is why all auxiliary functions must be defined inside the function. That does cause some unfortunate duplication, but it also means you do not have to worry about the mechanism of packaging.

For this pipeline, we can define three components:

- Download data set
- Train model
- Evaluate the trained model

##### Import Kubeflow SDK

In [4]:
from typing import NamedTuple

import kfp
from kfp import dsl, components
from kfp.components import InputBinaryFile, OutputBinaryFile, func_to_container_op, InputPath, OutputPath
import time
from functools import partial
from kfserving import utils



##### Define a fucntion to converts a Python function to a component and returns a task.

In [5]:
func_to_container_op = partial(
    components.func_to_container_op,
    base_image='zdou001/only_tests:flower-nightly',
)

#### Component 1: Create standalone python function - load_task()

In [6]:
@func_to_container_op
def load_task(
    dataset_url: str,
    data_dir: OutputPath(str)
):
    """Download Hernia X-ray image data"""
    import os
    from pathlib import Path
    import urllib.request
    import tarfile

    Path(data_dir).mkdir(parents=True, exist_ok=True)

    thetarfile = dataset_url
    ftpstream = urllib.request.urlopen(thetarfile)
    thetarfile = tarfile.open(fileobj=ftpstream, mode="r|gz")
    thetarfile.extractall(data_dir)
    
    print(f'data saved to {data_dir}/Hernia_sample50')

#### Component 2: Create standalone python function - train_task()

In [7]:
@func_to_container_op
def train_task(
    data_dir: InputPath(str),
    batch_size: int,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_dir: OutputPath(str)):
    
    import math
    import os
    from pathlib import Path
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from tensorflow.keras.preprocessing.image import ImageDataGenerator
    import tensorflow as tf
    from tensorflow.keras.models import Sequential, load_model, Model
    from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, GlobalMaxPooling2D, GlobalAveragePooling2D, MaxPooling2D, BatchNormalization
    from tensorflow.keras.applications import VGG16, VGG19, ResNet101V2, ResNet50V2, InceptionV3, InceptionResNetV2, NASNetLarge, DenseNet121, DenseNet169, DenseNet201, Xception
    from tensorflow.keras.optimizers import Nadam, SGD, RMSprop, Adam
    from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint


    """Split data"""
    data_path = data_dir + '/Hernia_sample50' 
    df = pd.read_csv(f'{data_path}/dataset.csv', sep=",")
    df["image_index"]= df.image_index.apply(lambda x: x.replace('./datasets-registry', data_dir))
    print(df)
    print(df['labels'].value_counts())
    
    train_val_df, test_df = train_test_split(
        df,
        test_size = 0.2,
        random_state = 123,
        stratify=df['labels']
    )
    train_df, val_df = train_test_split(
        train_val_df,
        test_size = 0.2,
        random_state = 123,
        stratify=train_val_df['labels']
    )
    print(f"Number of train set = {len(train_df)}")
    print(f"Number of validation set = {len(val_df)}")
    print(f"Number of test set = {len(test_df)}")

    """Image Augmentation"""
    labels = train_df['labels'].unique()
    labels = list(labels)
    
    train_datagen = ImageDataGenerator(rescale=1./255.,
                                        rotation_range=20, 
                                        width_shift_range=0.2, 
                                        height_shift_range=0.2, 
                                        shear_range=0.3,
                                        zoom_range=0.3,
                                        horizontal_flip=True, 
                                        vertical_flip=False,
                                        fill_mode="nearest")

    val_test_datagen = ImageDataGenerator(rescale=1./255.)
    
    """Create generators from data frame"""
    train_generator = train_datagen.flow_from_dataframe(dataframe=train_df,
                                                        x_col='image_index',
                                                        y_col='labels',
                                                        target_size=(224, 224),
                                                        batch_size=batch_size,
                                                        class_mode='binary',
                                                        seed = 42,
                                                        shuffle=True,
                                                        classes = labels,
                                                        interpolation='nearest')

    val_generator = val_test_datagen.flow_from_dataframe(dataframe=val_df,
                                                            x_col='image_index',
                                                            y_col='labels',
                                                            target_size=(224, 224),
                                                            batch_size=batch_size,
                                                            class_mode='binary',
                                                            seed = 42,
                                                            classes = labels,
                                                            shuffle=True)
    
    """Define the model"""
    inceptresnet = InceptionResNetV2(
        weights='imagenet',
        input_shape=(224, 224, 3),
        include_top=False)

    x = inceptresnet.output
    x = GlobalAveragePooling2D(name="gap")(x)
    x = Dense(256, activation='elu', kernel_initializer='he_uniform')(x)
    x = BatchNormalization()(x)
    x = Dropout(dropout_rate)(x)
    pred = Dense(1, activation = "sigmoid", name="fc_out", kernel_initializer='he_uniform')(x)
    model = Model(inputs=inceptresnet.input, outputs=pred)
    
    """Train the model"""
    optimizer = Adam(learning_rate=learning_rate)
    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['accuracy'])
    
    Path(model_dir).mkdir(parents=True, exist_ok=True)
    model_saved_path = os.path.join(model_dir, "model.h5")

    checkpoint = ModelCheckpoint(model_saved_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=3, verbose=1, mode='min')

    earlyStopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, mode='min')

    callbacks_list = [checkpoint, reduce_lr, earlyStopping]

    model.fit(train_generator, 
            steps_per_epoch=math.ceil(train_generator.n/train_generator.batch_size),
            epochs=epochs,
            validation_data=val_generator,
            validation_steps=math.ceil(val_generator.n/val_generator.batch_size),
            callbacks=callbacks_list)
    
    model.summary()
    print(f'Model exported to: {model_dir}')
    print(os.listdir(model_dir))

#### Component 3: Create standalone python function - evaluate_task()
Evaluate the model with the following Python function. The metrics metadata (loss and accuracy) is available to the Kubeflow Pipelines UI. All metadata can automatically be visualized with output viewer(s).

In [8]:
@func_to_container_op
def evaluate_task(
    data_dir: InputPath(str),
    model_dir: InputPath(str),
    mlpipeline_metrics_path: OutputPath('Metrics')):
    """Loads a saved model from file and uses a pre-downloaded dataset for evaluation.
    Model metrics are persisted to `/mlpipeline-metrics.json` for Kubeflow Pipelines
    metadata."""
    
    import math
    import os
    from pathlib import Path
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from tensorflow.keras.preprocessing.image import ImageDataGenerator
    from tensorflow.keras.models import Sequential, load_model, Model
    from sklearn.metrics import  confusion_matrix, f1_score, roc_auc_score, roc_curve, auc, classification_report, accuracy_score, ConfusionMatrixDisplay
    from sklearn.metrics import cohen_kappa_score
    import json
    from collections import namedtuple

    """Load test Hernia dataset """
    data_path = data_dir + '/Hernia_sample50' 
    df = pd.read_csv(f'{data_path}/dataset.csv', sep=",")
    df["image_index"]= df.image_index.apply(lambda x: x.replace('./datasets-registry', data_dir))
    print(df)
    print(df['labels'].value_counts())
    
    train_val_df, test_df = train_test_split(
        df,
        test_size = 0.2,
        random_state = 123,
        stratify=df['labels']
    )
    train_df, val_df = train_test_split(
        train_val_df,
        test_size = 0.2,
        random_state = 123,
        stratify=train_val_df['labels']
    )
    
    """Image Augmentation"""
    labels = train_df['labels'].unique()
    labels = list(labels)
    
    val_test_datagen = ImageDataGenerator(rescale=1./255.)
    
    """Create generators from data frame"""
    test_generator = val_test_datagen.flow_from_dataframe(dataframe=test_df,
                                                        x_col='image_index',
                                                        y_col='labels',
                                                        target_size=(224, 224),
                                                        batch_size=1,
                                                        class_mode='binary',
                                                        classes = labels,
                                                        shuffle=False)

    """Load model and evalutate using metrics"""
    model_saved_path = os.path.join(model_dir, "model.h5")
    reloaded = load_model(model_saved_path)
    test_generator.reset()
    y_pred = reloaded.predict(test_generator, steps=(math.ceil(test_generator.n/test_generator.batch_size)), verbose=1)
    y_true = test_generator.classes
    
    reloaded.summary()
    auc = roc_auc_score(y_true, y_pred)
    f1 = f1_score(y_true, y_pred >= 0.5)
    acc = accuracy_score(y_true, y_pred >= 0.5)
    #fpr, tpr, thresholds = roc_curve(y_true, y_pred)
    #kappa is usually for imbalanced classes
    kappa_score = cohen_kappa_score(y_true, y_pred >= 0.5)

    cm = confusion_matrix(y_true, y_pred >= 0.5)    
    #TN, FP, FN, TP = confusion_matrix(y_true, y_pred >= 0.5).ravel()
    TN, FP, FN, TP = cm.ravel()
    print (TN, FP, FN, TP)

    #sensitivity or true positive rate
    sensitivity = TP/(TP+FN)
    #specificity or true negative rate
    specificity = TN/(TN+FP)
    #false positive rate
    FPR = FP/(FP+TN)
    #precision, positive predictive value
    PPV = TP/(TP+FP)
    #negative predictive value
    NPV = TN/(TN+FN)

    # print("InceptionResNetV2 model (weights=%f, img_w=%f, img_h=%f, channel=%f):" % (weights, img_w, img_h, channel))
    print ('AUC: ', round(auc, 3))
    print ('F1-score: ', round(f1, 3))
    print ('Sensitivity: ', round(sensitivity, 3))
    print ('Specificity: ', round(specificity, 3))
    print ('False positive rate:', round(FPR, 3))
    print ('PPV: ', round(PPV, 3))
    print ('NPV: ', round(NPV, 3))
    print ('Accuracy: ', round(acc, 3))
    print ('Kappa Score: ', round(kappa_score, 3))
    
        
    # save results to json
    metrics = []
    metrics.append({
            'f1': round(f1, 4),
            'accuracy': round(acc, 4)
        })
    
    metrics = []
    f1_dict = {
        'name': 'f1',
        'numberValue': round(f1, 4),
        'format': "PERCENTAGE"
    }
    acc_dict = {
        'name': 'accuracy',
        'numberValue': round(acc, 4),
        'format': "PERCENTAGE"
    }
    metrics.append(f1_dict)
    metrics.append(acc_dict)
        
    metrics_dict = {'metrics': metrics}
    print(metrics_dict)

    with open(mlpipeline_metrics_path, 'w') as f:
        json.dump(metrics_dict, f)

### 3. Combine the Components into a Pipeline

Note that up to this point you have not yet used the Kubeflow Pipelines SDK!

With the four components (i.e. self-contained functions) defined, wire up the dependencies with Kubeflow Pipelines.

The call components.func_to_container_op(f, base_image=img)(*args) has the following ingredients:

- `f` is the Python function that defines a component
- `img` is the base (Docker) image used to package the function
- `*arg`s lists the arguments to f

What the `*args` mean is best explained by going forward through the graph:

- `downloadOp` is the first step and has no dependencies; it therefore has no `InputPath`. Its output (i.e., `OutputPath`) is stored in `data_dir`
- `trainOp` needs the data downloaded from `downloadOp` and its signature lists `data_dir` (input) and `model_dir` (output). It depends on `downloadOp.output` (i.e., the previous step’s output) and stores its own outputs in `model_dir`, which can be used by another step. `downloadOp` is the parent of `trainOp`, as required.
- `evaluateOp`'s function takes three arguments: `data_dir` (i.e., `downloadOp.output`), `model_dir` (i.e., `trainOp.output`), and `metrics_path`, which is where the function stores its evaluation metrics. That way, `evaluateOp` can only run after the successful completion of both `downloadOp` and `trainOp`.

#### 3.1 Build Kubeflow Pipeline

Our next step will be to create the various components that will make up the pipeline. Define the pipeline using the *@dsl.pipeline* decorator.

The pipeline function is defined and includes a number of paramters that will be fed into our various components throughout execution. Kubeflow Pipelines are created decalaratively. This means that the code is not run until the pipeline is compiled. 

Define the pipeline and define parameters to be fed into pipeline

In [9]:
@dsl.pipeline(
    name='Lung Disease Detection Pipeline',
    description='Lung Disease Detection Pipeline to be executed on KubeFlow.',
)
def lung_disease_detection_pipeline(
    dataset_url='https://github.com/ZoieD/Hernia_X-ray_images_sample/raw/main/Hernia_sample50.tgz',
    batch_size=32,
    dropout_rate=0.5,
    learning_rate=0.001,
    epochs=5,
    namespace=utils.get_default_target_namespace(),       
):
    downloadOp = load_task(dataset_url)

    trainOp = train_task(downloadOp.output, batch_size, dropout_rate, learning_rate, epochs)
    trainOp.after(downloadOp)
    # trainOp.container.set_gpu_limit(1)

    evaluateOp = evaluate_task(downloadOp.output, trainOp.output)
    # evaluateOp.after(trainOp)
    # evaluateOp.container.set_gpu_limit(1)


#### 3.2 Run pipeline

Finally we feed our pipeline definition into the compiler and run it as an experiment. This will give us 2 links at the bottom that we can follow to the [Kubeflow Pipelines UI](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) where you can check logs, artifacts, inputs/outputs, and visually see the progress of your pipeline.

Create a client to enable communication with the Pipelines API server.

In [10]:
client = kfp.Client()

[E 220920 13:41:35 _satvolumecredentials:51] Failed to read a token from file '/var/run/secrets/kubeflow/pipelines/token' ([Errno 2] No such file or directory: '/var/run/secrets/kubeflow/pipelines/token').
[W 220920 13:41:35 _client:372] Failed to set up default credentials. Proceeding without credentials...


Compile and Run the pipeline

In [11]:
kfp.compiler.Compiler().compile(lung_disease_detection_pipeline, 'lung_disease_detection_pipeline.yaml')
pipeline_func=lung_disease_detection_pipeline
namespace = utils.get_default_target_namespace()
experiment_name = 'lung_disease_detection_pipeline'+'_'+namespace
run_name = pipeline_func.__name__ + ' run'
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                              experiment_name=experiment_name, 
                                              run_name=run_name + '-' + time.strftime("%Y%m%d-%H%M%S"), 
                                              arguments={})

