In [5]:
#!/usr/bin/env python
# coding: utf-8

# Kubeflow Pipeline: Bolts Classifier 


In this notebook, we will show how to:
- Define a Kubeflow Pipeline using the Pipelines Python SDK
- Compile the pipeline package from the pipeline definition

## Setup

In [6]:
import kfp
import kfp.dsl as dsl
import kfp.gcp as gcp
from kubernetes import client as k8s_client

## Define Pipeline components

In [7]:
def bolts_download_op(step_name='download'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipeline-data-download:0.2',
        arguments = []
    )

def bolts_preprocess_op(image_dir, tfhub_module, bottleneck_dir, step_name='pre-process'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipelines-bolts-preprocess:0.1',
        arguments = [
            '--image_dir', image_dir,
            '--tfhub_module', tfhub_module,
            '--how_many_training_steps', "0",
            '--bottleneck_dir', bottleneck_dir,
        ]
    )

def bolts_train_op(image_dir, tfhub_module, model_dir, tensorboard_dir, bottleneck_dir, eval_store_file, number_of_steps, learning_rate,  step_name='train'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipeline-bolts-train:0.2',
        arguments = [
            '--image_dir', image_dir,
            '--tfhub_module', tfhub_module,
            '--saved_model_dir', model_dir,
            '--summaries_dir', tensorboard_dir,
            '--how_many_training_steps', number_of_steps,
            '--learning_rate', learning_rate,
            '--bottleneck_dir', bottleneck_dir,
            '--eval_output_dir', eval_store_file,
            '--print_misclassified_test_images', 'True',
        ]
    )

def bolts_tensorboard_op(tensorboard_dir, step_name='tensorboard'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipelines-bolts-tensorboard:0.1',
        arguments = [
            '--logdir', tensorboard_dir,
        ]
    )

def bolts_predict_op(model_path, label_path, image_path, input_layer='Placeholder', output_layer='final_result', step_name='predict'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipelines-bolts-predict:0.13',
        arguments = [
            '--graph', model_path,
            '--labels', label_path,
            '--image', image_path,
            '--input_layer', input_layer,
            '--output_layer', output_layer,
        ]
    )

pvc_name = 'nfs' # pvc name should be same as one in pipeline definition

def bolts_deploy_onprem_op(model_dir, step_name='deploy-on-prem'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipeline-deploy:0.2',
        arguments = [
            '--platform', 'onprem', # cloud or onprem 
            '--model-path', model_dir,
            '--server-name', '{{workflow.name}}',
            '--pvc-name', pvc_name,
            '--namespace', 'kubeflow'
        ]
    )

def bolts_cm_op(prediction_path, step_name='confusion_matrix'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'swiftdiaries/ml-pipelines-cm:0.4',
        arguments = [
            '--eval_output_csv', prediction_path,
        ]
    )

## Assemble the pipeline

In [8]:
@dsl.pipeline(
    name='Bolts Trainer',
    description='A pipeline to train and evaluate a classifer for the bolts image dataset'
)
def bolts_classifier_pipeline(
    image_dir='/mnt/workflow/small_bolt_images/',
    tfhub_module='https://tfhub.dev/google/imagenet/mobilenet_v1_100_224/quantops/feature_vector/1',
    tensorboard_dir='/mnt/workflow/tensorboard/',
    model_dir='/mnt/models/retrain/trained_models/cleu2019/',
    bottleneck_dir='/mnt/workflow/bottleneck/',
    eval_store_file='/mnt/workflow/eval/predictions_eval.csv',
    learning_rate='0.01',
    number_of_steps='2000',
    model_path='/ml/retrain/trained_models/mobilenet/1/saved_models.pb',
    labels_path='/ml/retrain/trained_models/mobilenet/1/output_labels.txt',
    image_path='/ml/retrain/201721895618_1_9.jpg',
    ):

    # k8s volume resources for workflow
    nfs_pvc = k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name=pvc_name)  
    # claim name should be same as one in deploy step
    nfs_volume = k8s_client.V1Volume(name='argo-workflow', persistent_volume_claim=nfs_pvc)
    nfs_volume_mount = k8s_client.V1VolumeMount(mount_path='/mnt/', name='argo-workflow')

    # Define individual steps and attach relevant volumes
    download = bolts_download_op()
    download.add_volume(nfs_volume)
    download.add_volume_mount(nfs_volume_mount)

    preprocess = bolts_preprocess_op(image_dir, tfhub_module, bottleneck_dir)
    preprocess.add_volume(nfs_volume)
    preprocess.add_volume_mount(nfs_volume_mount)

    train = bolts_train_op(image_dir, tfhub_module, model_dir, tensorboard_dir, bottleneck_dir,         eval_store_file, number_of_steps, learning_rate)
    train.add_volume(nfs_volume)
    train.add_volume_mount(nfs_volume_mount)

    tensorboard = bolts_tensorboard_op(tensorboard_dir)
    tensorboard.add_volume(nfs_volume)
    tensorboard.add_volume_mount(nfs_volume_mount)

    predict = bolts_predict_op(model_path, labels_path, image_path)
    predict.add_volume(nfs_volume)
    predict.add_volume_mount(nfs_volume_mount)

    
    deploy_onprem = bolts_deploy_onprem_op(model_dir)
    deploy_onprem.add_volume(nfs_volume)
    deploy_onprem.add_volume_mount(nfs_volume_mount)
    #deploy_onprem.set_image_pull_policy('Always')

    cm = bolts_cm_op(eval_store_file)
    cm.add_volume(nfs_volume)
    cm.add_volume_mount(nfs_volume_mount)

    # Define the workflow DAG
    preprocess.after(download)
    train.after(preprocess)
    deploy_onprem.after(train)
    tensorboard.after(train)
    predict.after(train)
    cm.after(predict)

## Compile the Pipeline

In [9]:
if __name__ == "__main__":
    import kfp.compiler as compiler
    compiler.Compiler().compile(bolts_classifier_pipeline, 'bolts_hybrid.tar.gz')

## Connect to Kubeflow client

In [10]:
client = kfp.Client()
import datetime
iteration = datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')

## Create an Experiment

In [12]:
try: 
    client.get_experiment(experiment_name='Bolts Classifier - MLA Demo')
except:
    bolts_experiment = client.create_experiment(name='Bolts Classifier - MLA Demo')

## Run the Pipeline

In [14]:
run = client.run_pipeline(client.get_experiment(experiment_name='Bolts Classifier - MLA Demo').id, 
                          'bolts_classifier_pipeline', 'bolts_hybrid.tar.gz', 
                          params={
    'image_dir': '/mnt/workflow/small_bolt_images/',
    'tfhub_module': 'https://tfhub.dev/google/imagenet/mobilenet_v1_100_224/quantops/feature_vector/1',
    'tensorboard_dir': '/mnt/workflow/tensorboard/',
    # Update model_dir to a unique path each time you execute this step                        
    'model_dir': '/mnt/models/retrain/trained_models/cleu2019/10/',
    'bottleneck_dir': '/mnt/workflow/bottleneck/',
    'eval_store_file': '/mnt/workflow/eval/predictions_eval.csv',
    'learning_rate': '0.01',
    'number_of_steps': '2000',
    'model_path': '/ml/retrain/trained_models/mobilenet/1/saved_models.pb',
    'labels_path': '/ml/retrain/trained_models/mobilenet/1/output_labels.txt',
    'image_path': 'retrain/201721895618_1_9.jpg'})

  return yaml.load(f)
