# Prerequisites

Let's make sure Kubeflow pipeline is available

In [1]:
! pip show kfp

Name: kfp
Version: 1.0.0
Summary: KubeFlow Pipelines SDK
Home-page: UNKNOWN
Author: google
Author-email: None
License: UNKNOWN
Location: /usr/local/lib/python3.6/dist-packages
Requires: Deprecated, click, requests-toolbelt, google-auth, kubernetes, jsonschema, tabulate, cloudpickle, google-cloud-storage, strip-hints, PyYAML, kfp-server-api
Required-by: 


In [2]:
%%writefile minio_secret.yaml
apiVersion: v1
kind: Secret
metadata:
  name: minio-s3-secret
  annotations:
     serving.kubeflow.org/s3-endpoint: minio-service.kubeflow:9000
     serving.kubeflow.org/s3-usehttps: "0" # Default: 1. Must be 0 when testing with MinIO!
type: Opaque
data:
  AWS_ACCESS_KEY_ID: bWluaW8=
  AWS_SECRET_ACCESS_KEY: bWluaW8xMjM=
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: default
secrets:
  - name: minio-s3-secret

Overwriting minio_secret.yaml


In [3]:
! kubectl apply -f minio_secret.yaml

secret/minio-s3-secret unchanged
serviceaccount/default configured


# Configure access MinIO

Upload your Dataset to Minio

First, we configure credentials for mc, the MinIO command line client. We then use it to create a bucket, upload the dataset to it, and set access policy so that the pipeline can download it from MinIO.

In [4]:
# download MinIO client

! wget https://dl.min.io/client/mc/release/linux-amd64/mc
! chmod +x mc
! ./mc --help

--2021-05-03 23:55:56--  https://dl.min.io/client/mc/release/linux-amd64/mc
Resolving dl.min.io (dl.min.io)... 178.128.69.202
Connecting to dl.min.io (dl.min.io)|178.128.69.202|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 20529152 (20M) [application/octet-stream]
Saving to: ‘mc’


2021-05-03 23:56:06 (1.95 MB/s) - ‘mc’ saved [20529152/20529152]

NAME:
  mc - MinIO Client for cloud storage and filesystems.

USAGE:
  mc [FLAGS] COMMAND [COMMAND FLAGS | -h] [ARGUMENTS...]

COMMANDS:
  alias      set, remove and list aliases in configuration file
  ls         list buckets and objects
  mb         make a bucket
  rb         remove a bucket
  cp         copy objects
  mirror     synchronize object(s) to a remote site
  cat        display object contents
  head       display first 'n' lines of an object
  pipe       stream STDIN to an object
  share      generate URL for temporary access to an object
  find       search for objects
  sql        run sql queries on 

**a. Connect to the MinIO server**

In [5]:
! ./mc alias set minio http://minio-service.kubeflow:9000 minio minio123

[m[32mAdded `minio` successfully.[0m
[0m

**the code below clears out the bucket after downloading too many times due to testing**

In [6]:
! ./mc rm --recursive --force minio/bird

[m[32;1mRemoving `minio/bird/datasets.tar.gz`[0m.
[0m

**b. Create a bucket to store your data and export your model to MinIO**

**Make sure you clear this bucket once you are done running your pipeline**

In [7]:
! ./mc mb minio/bird

[33;3mmc: <ERROR> [0m[33;3mUnable to make bucket `minio/bird`. Your previous request to create the named bucket succeeded and you already own it.
[0m

**c. Upload the dataset to your bucket in MinIO.**

In [8]:
! tar --dereference -czf datasets.tar.gz ./datasets
! ./mc cp datasets.tar.gz minio/bird/datasets.tar.gz
! ./mc policy set download minio/bird

...ts.tar.gz:  1.65 GiB / 1.65 GiB ┃▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓┃ 114.84 MiB/s 14s[0m[0m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[m[32;1m[

# Minio Server URL and Credentials

In [9]:
MINIO_SERVER='minio-service.kubeflow:9000'
MINIO_ACCESS_KEY='minio'
MINIO_SECRET_KEY='minio123'

# Implement Kubeflow Pipelines Components

In this pipeline, we have the following components:

* bird dataset download component
* Train the TensorFlow model
* Evaluate the trained model
* Export the trained model
* Serve model

In [10]:
from typing import NamedTuple
import kfp
import kfp.components as components
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath #helps define the input & output between the components
import kubeflow.fairing.utils
NAMESPACE = kubeflow.fairing.utils.get_default_target_namespace()

### Component 1: Download the bird Data Set

In [11]:
def download_dataset(minio_server: str, data_dir: OutputPath(str)):
    """Download the bird data set to the KFP volume to share it among all steps"""
    import urllib.request
    import tarfile
    import os
    import subprocess

    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
        
    # this url leads to the bucket
    url = f'http://{minio_server}/bird/datasets.tar.gz'
    stream = urllib.request.urlopen(url)
    tar = tarfile.open(fileobj=stream, mode="r|gz")
    tar.extractall(path=data_dir)
    
    subprocess.call(["ls", "-lha", data_dir])

### Component 2: Train the Model

In [12]:
def train_model(data_dir: InputPath(str), model_dir: OutputPath(str)):
    """Trains a ResNet101V2 for 5 epochs using a pre-downloaded dataset.
    Once trained, the model is persisted to `model_dir`."""

    import os
    import tensorflow as tf
    import tensorflow_datasets as tfds
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense,Conv2D,MaxPool2D,Flatten,Dropout,BatchNormalization,Activation
    from tensorflow.keras.preprocessing.image import ImageDataGenerator
    
    # import ResNet101V2 model
    from keras.applications import ResNet101V2
    convlayer=ResNet101V2(input_shape=(224,224,3),weights='imagenet',include_top=False)
    convlayer.trainable = False
    
    # model architecture
    model=Sequential()
    model.add(convlayer)
    model.add(Dropout(0.5))
    model.add(Flatten())
    model.add(BatchNormalization())
    model.add(Dense(2048,kernel_initializer='he_uniform'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(1024,kernel_initializer='he_uniform'))
    model.add(BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dropout(0.5))
    model.add(Dense(265,activation='softmax'))
    
    # model parameters
    opt=tf.keras.optimizers.RMSprop(lr=0.0001)
    model.compile(loss='sparse_categorical_crossentropy',metrics=['accuracy'],optimizer=opt)
    
    # model summary
    print(model.summary())
    
     
    # load the train dataset
    train_dir, train_info = tfds.load(
        "bird",
        split="train",
        shuffle_files=True,
        as_supervised=True,
        with_info=True,
        download=True,
        data_dir=f"{data_dir}/datasets",
    )
    
    # generate batches of the tensor images of the training set
    train_datagen = ImageDataGenerator(rescale=1/255)
    train_dir = train_datagen.flow_from_directory(train_dir,
                                                target_size=(224,224),
                                                classes=['ALBATROSS'],
                                                color_mode='rgb',
                                                class_mode='sparse',batch_size=256)
    
    train_dir = train_dir.cache()
    train_dir = train_dir.shuffle(train_info.splits["train"].num_examples)
    train_dir = train_dir.prefetch(tf.data.experimental.AUTOTUNE)
   
    # load the validation dataset
    valid_dir, valid_info = tfds.load(
        "bird",
        split="valid",
        shuffle_files=True,
        as_supervised=True,
        with_info=True,
        download=True,
        data_dir=f"{data_dir}/datasets",
    )
   
    
    # generate batches of the tensor images of the validation set
    val_datagen = ImageDataGenerator(rescale=1/255)
    valid_dir = val_datagen.flow_from_directory(valid_dir,
                                              target_size=(224,224),
                                              classes=['ALBATROSS'],
                                              color_mode='rgb',
                                              class_mode='sparse',batch_size=256)
    
    valid_dir = valid_dir.cache()
    valid_dir = train_dir.shuffle(valid_info.splits["valid"].num_examples)
    valid_dir = valid_dir.prefetch(tf.data.experimental.AUTOTUNE)
   
    # fit the model
    model.fit(train_dir, validation_data=valid_dir, epochs=5)
    
    # save the model
    model.save(model_dir)
    print(f"Model saved {model_dir}")
    print(os.listdir(model_dir))

### Component 3: Evaluate the model

In [13]:
def evaluate_model(
    data_dir: InputPath(str), model_dir: InputPath(str), metrics_path: OutputPath(str)
) -> NamedTuple("EvaluationOutput", [("mlpipeline_metrics", "Metrics")]):
    """Loads a saved model from file and uses a pre-downloaded dataset for evaluation.
    Model metrics are persisted to `/mlpipeline-metrics.json` for Kubeflow Pipelines
    metadata."""
    
    import json
    import os
    import tensorflow as tf
    import tensorflow_datasets as tfds
    from tensorflow.keras.preprocessing.image import ImageDataGenerator
    from keras.preprocessing.image import load_img,img_to_array
    from collections import namedtuple
       
    # load the test dataset
    test_dir, test_info = tfds.load(
        "bird",
        split="test",
        shuffle_files=True,
        as_supervised=True,
        with_info=True,
        download=True,
        data_dir=f"{data_dir}/datasets",
    )
    
    # generate batches of the tensor images of the test set
    test_datagen = ImageDataGenerator(rescale=1/255)
    test_dir = test_datagen.flow_from_directory(test_dir,
                                                target_size=(224,224),
                                                color_mode='rgb',
                                                classes=['ALBATROSS'],
                                                class_mode='sparse',batch_size=256)
    
    
    test_dir = test_dir.cache()
    test_dir = test_dir.shuffle(test_info.splits["test"].num_examples)
    test_dir = test_dir.prefetch(tf.data.experimental.AUTOTUNE)
    
    # load saved model and evaluate on the test set
    model = tf.keras.models.load_model(model_dir)
    (loss, accuracy) = model.evaluate(test_dir)
    
    metrics = {
        "metrics": [
            {"name": "loss", "numberValue": str(loss), "format": "PERCENTAGE"},
            {"name": "accuracy", "numberValue": str(accuracy), "format": "PERCENTAGE"},
        ]
    }
    
    # Save the metrics as a json file 
    with open(metrics_path, "w") as f:
        json.dump(metrics, f)

    out_tuple = namedtuple("EvaluationOutput", ["mlpipeline_metrics"])

    return out_tuple(json.dumps(metrics))

### Component 4: Export the Model

In [14]:
def export_model(
    model_dir: InputPath(str),
    metrics: InputPath(str),
    export_bucket: str,
    model_name: str,
    model_version: int,
    minio_server: str,
    minio_access_key: str,
    minio_secret_key: str,
):
    import os
    import boto3
    from botocore.client import Config
    

    s3 = boto3.client(
        "s3",
        endpoint_url=f'http://{minio_server}',
        aws_access_key_id=minio_access_key,
        aws_secret_access_key=minio_secret_key,
        config=Config(signature_version="s3v4"),
    )

    # Create export bucket if it does not yet exist
    response = s3.list_buckets()
    export_bucket_exists = False

    print(response , export_bucket)
    for bucket in response["Buckets"]:
        if bucket["Name"] == export_bucket:
            export_bucket_exists = True

    if not export_bucket_exists:
        s3.create_bucket(ACL="public-read-write", Bucket=export_bucket)

    # Save model files to S3
    for root, dirs, files in os.walk(model_dir):
        for filename in files:
            local_path = os.path.join(root, filename)
            s3_path = os.path.relpath(local_path, model_dir)

            s3.upload_file(
                local_path,
                export_bucket,
                f"{model_name}/{model_version}/{s3_path}",
                ExtraArgs={"ACL": "public-read"},
            )

    response = s3.list_objects(Bucket=export_bucket)
    print(f"All objects in {export_bucket}:")
    for file in response["Contents"]:
        print("{}/{}".format(export_bucket, file["Key"]))

### Component 5: Serve Model

In [15]:
import kfp.compiler as compiler
import kfserving
from kfp import components

kfserving = components.load_component_from_file("kfserving-component.yaml") 

# How to Combine the Components into a Pipeline

Up to this point we have not yet used the Kubeflow Pipelines SDK!

With our four components (i.e. self-contained functions) defined, we can wire up the dependencies with Kubeflow Pipelines.

The call components.func_to_container_op(f, base_image=img)(*args) has the following ingredients:
* f is the Python function that defines a component
* img is the base (Docker) image used to package the function
* *args lists the arguments to f

What the *args mean is best explained by going forward through the graph:
* downloadOp is the very first step and has no dependencies; it therefore has no InputPath. Its output (i.e. OutputPath) is stored in data_dir.
* trainOp needs the data downloaded from downloadOp and its signature lists data_dir (input) and model_dir (output). So, it depends on downloadOp.output (i.e. the previous step's output) and stores its own outputs in model_dir, which can be used by another step. downloadOp is the parent of trainOp, as required.
* evaluateOp's function takes three arguments: data_dir (i.e. downloadOp.output), model_dir (i.e. trainOp.output), and metrics_path, which is where the function stores its evaluation metrics. That way, evaluateOp can only run after the successful completion of both downloadOp and trainOp.
* exportOp runs the function export_model, which accepts five parameters: model_dir, metrics, export_bucket, model_name, and model_version. From where do we get the model_dir? It is nothing but trainOp.output. Similarly, metrics is evaluateOp.output. The remaining three arguments are regular Python arguments that are static for the pipeline: they do not depend on any step's output being available. Hence, they are defined without using InputPath.
* kfservingOp is loaded from the external component and its order of execution should be specified explicitly by using kfservingOp.after(evaluateOp) function which assigns exportOp as a parent.

In [16]:
def train_and_serve(
    data_dir: str,
    model_dir: str,
    export_bucket: str,
    model_name: str,
    model_version: int,
    minio_server: str,
    minio_access_key: str,
    minio_secret_key: str,
):
    # For GPU support, please add the "-gpu" suffix to the base image
    BASE_IMAGE = "mavencodev/minio:v.0.1"

    downloadOp = components.func_to_container_op(
        download_dataset, base_image=BASE_IMAGE
    )(minio_server)

    trainOp = components.func_to_container_op(train_model, base_image=BASE_IMAGE)(
        downloadOp.output
    )

    evaluateOp = components.func_to_container_op(evaluate_model, base_image=BASE_IMAGE)(
        downloadOp.output, trainOp.output
    )

    exportOp = components.func_to_container_op(export_model, base_image=BASE_IMAGE)(
        trainOp.output, evaluateOp.output, export_bucket, 
        model_name, model_version, minio_server, minio_access_key, minio_secret_key
    )
    
    kfservingOp = kfserving(
        action="apply",
        default_model_uri=f"s3://{export_bucket}/{model_name}",
        model_name="bird",
        namespace= NAMESPACE,
        framework="tensorflow",
    )
    kfservingOp.after(exportOp)

Just in case it isn't obvious: this will build the Docker images. Each image is based on BASE_IMAGE and includes the Python functions as executable files. Each component can use a different base image though. This may come in handy if you want to have reusable components for automatic data and/or model analysis (e.g. to investigate bias).

We still have to define the pipeline itself. Our train_and_serve function defines dependencies but we must use the KFP domain-specific language (DSL) to register the pipeline with its components:

In [17]:
def op_transformer(op):
    op.add_pod_annotation(name="sidecar.istio.io/inject", value="false")
    return op


@dsl.pipeline(
    name="End-to-End bird Pipeline",
    description="A sample pipeline to demonstrate multi-step model training, evaluation and export",
)
def bird_pipeline(
    model_dir: str = "/train/model",
    data_dir: str = "/train/data",
    export_bucket: str = "bird",
    model_name: str = "bird",
    model_version: int = 1,
):
    MINIO_SERVER='minio-service.kubeflow:9000'
    MINIO_ACCESS_KEY='minio'
    MINIO_SECRET_KEY='minio123'
    
    
    train_and_serve(
        data_dir=data_dir,
        model_dir=model_dir,
        export_bucket=export_bucket,
        model_name=model_name,
        model_version=model_version,
        minio_server=MINIO_SERVER,
        minio_access_key=MINIO_ACCESS_KEY,
        minio_secret_key=MINIO_SECRET_KEY,
    )
    
    dsl.get_pipeline_conf().add_op_transformer(op_transformer)

With that in place, let's submit the pipeline directly from our notebook:

In [18]:
pipeline_func = bird_pipeline
run_name = pipeline_func.__name__ + " run"
experiment_name = "End-to-End-Demo"

kfp.compiler.Compiler().compile(pipeline_func,  'bird002.yaml')

[I 210504 00:05:39 driver:124] Generating grammar tables from /usr/lib/python3.6/lib2to3/Grammar.txt
[I 210504 00:05:39 driver:124] Generating grammar tables from /usr/lib/python3.6/lib2to3/PatternGrammar.txt
