In [None]:
!pip3 install https://storage.googleapis.com/ml-pipeline/release/latest/kfp.tar.gz --upgrade --user
!pip3 install Pillow --upgrade --user

In [None]:
import json
from string import Template

import kfp
from kfp import components
from kfp.components import func_to_container_op
import kfp.dsl as dsl


### Prerequiste: 
1. Update pipeline-runner roles

There's an upstream isuse that pipeline-runner doesn't have permission to create istio virtual service

Adding this on Clusterrole to skip error: User "system:serviceaccount:kubeflow:pipeline-runner" cannot create resource "virtualservices" in API group "networking.istio.io" in the namespace "kubeflow"

```shell
kubectl edit clusterrole pipeline-runner -n kubeflow
```

Add following policies to cluster role lists.
```yaml
- apiGroups:
  - networking.istio.io
  resources:
  - '*'
  verbs:
  - '*'
```

2. Create a aws-secret with `AmazonS3FullAccess` policy in `kubeflow` namespace.

```yaml
kind: Secret
metadata:
  name: aws-secret
  namespace: kubeflow
type: Opaque
data:
  AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY
  AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS
```

> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`


### Replace example to your S3 bucket name

In [None]:
mnist_bucket= "jiaixn-kubeflow-pipeline-data"
s3_bucket_path = 's3://{}'.format(mnist_bucket)

### Build Kubeflow Pipeline

In [None]:
def convert_mnist_experiment_result(experiment_result) -> str:
    """
    convert results into args string
    """
    import json
    r = json.loads(experiment_result)
    args = []
    for hp in r:
        print(hp)
        args.append("%s=%s" % (hp["name"], hp["value"]))

    return " ".join(args)

@dsl.pipeline(
    name="End to end pipeline",
    description="An end to end example including hyperparameter tuning, train and inference."
)
def mnist_pipeline(
        name="mnist-{{workflow.uid}}",
        namespace="kubeflow",
        step="1000",
        s3bucketexportpath=""):

    # step 1: create a Katib experiment to tune hyperparameters
    objectiveConfig = {
      "type": "minimize",
      "goal": 0.001,
      "objectiveMetricName": "loss",
    }
    algorithmConfig = {"algorithmName" : "random"}
    parameters = [
      {"name": "--tf-learning-rate", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.03"}},
      {"name": "--tf-batch-size", "parameterType": "discrete", "feasibleSpace": {"list": ["16", "32", "64"]}},
    ]
    rawTemplate = {
      "apiVersion": "kubeflow.org/v1",
      "kind": "TFJob",
      "metadata": {
         "name": "{{.Trial}}",
         "namespace": "{{.NameSpace}}"
      },
      "spec": {
        "tfReplicaSpecs": {
          "Chief": {
            "replicas": 1,
            "restartPolicy": "OnFailure",
            "template": {
              "spec": {
                "containers": [
                {
                  "command": [
                    "sh",
                    "-c"
                  ],
                  "args": [
                    "python /opt/model.py --tf-train-steps=200 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                  ],
                  "image": "liuhougangxa/tf-estimator-mnist",
                  "name": "tensorflow"
                }
                ]
              }
            }
          },
          "Worker": {
            "replicas": 3,
            "restartPolicy": "OnFailure",
            "template": {
              "spec": {
                "containers": [
                {
                  "command": [
                    "sh",
                    "-c"
                  ],
                  "args": [ 
                    "python /opt/model.py --tf-train-steps=200 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
                  ],
                  "image": "liuhougangxa/tf-estimator-mnist",
                  "name": "tensorflow"
                }
                ]
              }
            }
          }
        }
      }
    }
    
    trialTemplate = {
      "goTemplate": {
        "rawTemplate": json.dumps(rawTemplate)
      }
    }

    metricsCollectorSpec = {
      "source": {
        "fileSystemPath": {
          "path": "/tmp/tf",
          "kind": "Directory"
        }
      },
      "collector": {
        "kind": "TensorFlowEvent"
      }
    }

    katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')
    op1 = katib_experiment_launcher_op(
            experiment_name=name,
            experiment_namespace=namespace,
            parallel_trial_count=3,
            max_trial_count=12,
            objective=str(objectiveConfig),
            algorithm=str(algorithmConfig),
            trial_template=str(trialTemplate),
            parameters=str(parameters),
            metrics_collector=str(metricsCollectorSpec),
            # experiment_timeout_minutes=experimentTimeoutMinutes,
            delete_finished_experiment=False)

    # step2: create a TFJob to train your model with best hyperparameter tuned by Katib
    tfjobjson_template = Template("""
{
  "apiVersion": "kubeflow.org/v1",
  "kind": "TFJob",
  "metadata": {
    "name": "$name",
    "namespace": "$namespace"
  },
  "spec": {
    "tfReplicaSpecs": {
      "Chief": {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
          "metadata": {
            "annotations": {
              "sidecar.istio.io/inject": "false"
            }
          },
          "spec": {
            "containers": [
              {
                "command": [
                  "sh",
                  "-c"
                ],
                "args": [
                  "python /opt/model.py --tf-train-steps=$step --tf-export-dir=$s3bucketexportpath $args"
                ],
                "image": "liuhougangxa/tf-estimator-mnist",
                "name": "tensorflow",
                "env": [
                  {
                    "name": "AWS_REGION",
                    "value": "us-west-2"
                  },
                  {
                    "name": "AWS_ACCESS_KEY_ID",
                    "valueFrom": {
                      "secretKeyRef": {
                        "name": "aws-secret",
                        "key": "AWS_ACCESS_KEY_ID"
                      }
                    }
                  },
                  {
                    "name": "AWS_SECRET_ACCESS_KEY",
                    "valueFrom": {
                      "secretKeyRef": {
                        "name": "aws-secret",
                        "key": "AWS_SECRET_ACCESS_KEY"
                      }
                    }
                  }
                ]
              }
            ]
          }
        }
      },
      "Worker": {
        "replicas": 3,
        "restartPolicy": "OnFailure",
        "template": {
          "metadata": {
            "annotations": {
              "sidecar.istio.io/inject": "false"
            }
          },
          "spec": {
            "containers": [
              {
                "command": [
                  "sh",
                  "-c"
                ],
                "args": [
                  "python /opt/model.py --tf-train-steps=$step --tf-export-dir=$s3bucketexportpath $args"
                ],
                "image": "liuhougangxa/tf-estimator-mnist",
                "name": "tensorflow",
                "env": [
                  {
                    "name": "AWS_REGION",
                    "value": "us-west-2"
                  },
                  {
                    "name": "AWS_ACCESS_KEY_ID",
                    "valueFrom": {
                      "secretKeyRef": {
                        "name": "aws-secret",
                        "key": "AWS_ACCESS_KEY_ID"
                      }
                    }
                  },
                  {
                    "name": "AWS_SECRET_ACCESS_KEY",
                    "valueFrom": {
                      "secretKeyRef": {
                        "name": "aws-secret",
                        "key": "AWS_SECRET_ACCESS_KEY"
                      }
                    }
                  }
                ]
              }
            ]
          }
        }
      }
    }
  }
}
""")

    convert_op = func_to_container_op(convert_mnist_experiment_result)
    op2 = convert_op(op1.output)

    tfjobjson = tfjobjson_template.substitute(
            {'args': op2.output,
             'name': name,
             'namespace': namespace,
             'step': step,
             's3bucketexportpath': s3bucketexportpath,
            })

    tfjob = json.loads(tfjobjson)

    train = dsl.ResourceOp(
        name="train",
        k8s_resource=tfjob,
        success_condition='status.replicaStatuses.Worker.succeeded==3,status.replicaStatuses.Chief.succeeded==1'
    )
    # step 3: model inferencese by Tensorflow Serving
    
    deployjson_template = Template("""
{
  "apiVersion": "apps/v1",
  "kind": "Deployment",
  "metadata": {
    "labels": {
      "app": "mnist"
    },
    "name": "$servingdeploy",
    "namespace": "$namespace"
  },
  "spec": {
    "selector": {
      "matchLabels": {
        "app": "$servingdeploy"
      }
    },
    "template": {
      "metadata": {
        "annotations": {
          "sidecar.istio.io/inject": "false"
        },
        "labels": {
          "app": "$servingdeploy",
          "version": "v1"
        }
      },
      "spec": {
        "serviceAccount": "default",
        "containers": [
          {
            "args": [
              "--port=9000",
              "--rest_api_port=8500",
              "--model_name=mnist",
              "--model_base_path=$s3bucketexportpath"
            ],
            "command": [
              "/usr/bin/tensorflow_model_server"
            ],
            "env": [
              {
                "name": "AWS_REGION",
                "value": "us-west-2"
              },
              {
                "name": "AWS_ACCESS_KEY_ID",
                "valueFrom": {
                  "secretKeyRef": {
                    "name": "aws-secret",
                    "key": "AWS_ACCESS_KEY_ID"
                  }
                }
              },
              {
                "name": "AWS_SECRET_ACCESS_KEY",
                "valueFrom": {
                  "secretKeyRef": {
                    "name": "aws-secret",
                    "key": "AWS_SECRET_ACCESS_KEY"
                  }
                }
              }
            ],
            "image": "tensorflow/serving:1.15.0",
            "imagePullPolicy": "IfNotPresent",
            "livenessProbe": {
              "initialDelaySeconds": 30,
              "periodSeconds": 30,
              "tcpSocket": {
                "port": 9000
              }
            },
            "name": "mnist",
            "ports": [
              {
                "containerPort": 9000
              },
              {
                "containerPort": 8500
              }
            ],
            "resources": {
              "limits": {
                "cpu": "1",
                "memory": "1Gi"
              },
              "requests": {
                "cpu": "1",
                "memory": "1Gi"
              }
            }
          }
        ]
      }
    }
  }
}
""")
    servingdeploy_name = 'mnist-model'
    deployjson = deployjson_template.substitute(
            {'namespace': namespace,
             's3bucketexportpath': s3bucketexportpath,
             'servingdeploy': servingdeploy_name,
            })

    deploy = json.loads(deployjson)
    
    deployment = dsl.ResourceOp(
        name="deploy",
        k8s_resource=deploy,
    ).after(train)
    
    servicejson_template = Template("""
{
  "apiVersion": "v1",
  "kind": "Service",
  "metadata": {
    "annotations": {
      "prometheus.io/path": "/monitoring/prometheus/metrics",
      "prometheus.io/port": "8500",
      "prometheus.io/scrape": "true"
    },
    "labels": {
      "app": "$servingdeploy"
    },
    "name": "mnist-service",
    "namespace": "$namespace"
  },
  "spec": {
    "ports": [
      {
        "name": "grpc-tf-serving",
        "port": 9000,
        "targetPort": 9000
      },
      {
        "name": "http-tf-serving",
        "port": 8500,
        "targetPort": 8500
      }
    ],
    "selector": {
      "app": "$servingdeploy"
    },
    "type": "ClusterIP"
  }
}
""")
    servicejson = servicejson_template.substitute(
        {'namespace': namespace, 
         'servingdeploy': servingdeploy_name,
        })

    serviceresource = json.loads(servicejson)
    
    service = dsl.ResourceOp(
        name="service",
        k8s_resource=serviceresource
    ).after(deployment)
    

    uideployjson_template = Template("""
{
  "apiVersion": "apps/v1",
  "kind": "Deployment",
  "metadata": {
    "name": "$uiname",
    "namespace": "$namespace"
  },
  "spec": {
    "replicas": 1,
    "selector": {
      "matchLabels": {
        "app": "mnist-web-ui"
      }
    },
    "template": {
      "metadata": {
        "labels": {
          "app": "mnist-web-ui"
        }
      },
      "spec": {
        "containers": [
          {
            "image": "gcr.io/kubeflow-examples/mnist/web-ui:v20190112-v0.2-142-g3b38225",
            "name": "web-ui",
            "ports": [
              {
                "containerPort": 5000
              }
            ]
          }
        ],
        "serviceAccount": "default"
      }
    }
  }
}
""")
    ui_name = 'mnist-ui'
    uideployjson = uideployjson_template.substitute(
        {'namespace': namespace,
         'uiname': ui_name,
        })

    uideployresource = json.loads(uideployjson)
    
    uideploy = dsl.ResourceOp(
        name="uideploy",
        k8s_resource=uideployresource
    ).after(train)
    
    uiservicejson_template = Template("""
{
  "apiVersion": "v1",
  "kind": "Service",
  "metadata": {
    "name": "$uiname",
    "namespace": "$namespace"
  },
  "spec": {
    "ports": [
      {
        "name": "http-mnist-ui",
        "port": 80,
        "targetPort": 5000
      }
    ],
    "selector": {
      "app": "mnist-web-ui"
    },
    "type": "ClusterIP"
  }
}
""")
    uiservicejson = uiservicejson_template.substitute(
        {'namespace': namespace,
         'uiname': ui_name,
        })

    uiserviceresource = json.loads(uiservicejson)
    
    uiservice = dsl.ResourceOp(
        name="uiservice",
        k8s_resource=uiserviceresource
    ).after(uideploy)
    
    uivirtualservicejson_template = Template("""
{
  "apiVersion": "networking.istio.io/v1alpha3",
  "kind": "VirtualService",
  "metadata": {
    "name": "$uiname",
    "namespace": "$namespace"
  },
  "spec": {
    "gateways": [
      "kubeflow/kubeflow-gateway"
    ],
    "hosts": [
      "*"
    ],
    "http": [
      {
        "match": [
          {
            "uri": {
              "prefix": "/mnist/$namespace/ui/"
            }
          }
        ],
        "rewrite": {
          "uri": "/"
        },
        "route": [
          {
            "destination": {
              "host": "$uiname.$namespace.svc.cluster.local",
              "port": {
                "number": 80
              }
            }
          }
        ],
        "timeout": "300s"
      }
    ]
  }
}
""")
    uivirtualservicejson = uivirtualservicejson_template.substitute(
        {'namespace': namespace,
         'uiname': ui_name,
        })

    uivirtualserviceresource = json.loads(uivirtualservicejson)
    
    uivirtualservice = dsl.ResourceOp(
        name="uivirtualservice",
        k8s_resource=uivirtualserviceresource
    ).after(uiservice)
    

### Submit the pipeline

In [None]:
pipeline = kfp.Client().create_run_from_pipeline_func(mnist_pipeline, arguments={"s3bucketexportpath":'{}/export'.format(s3_bucket_path)})

### Invoke serving API via Python client

In [None]:
import tensorflow as tf
from tensorflow import keras

# Helper libraries
import numpy as np
import os
import subprocess
import argparse

import random
import json
import requests


endpoint = "http://mnist-service.kubeflow.svc.cluster.local:8500/v1/models/mnist:predict"


# Prepare test dataset
fashion_mnist = keras.datasets.mnist
(train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()

# scale the values to 0.0 to 1.0
train_images = train_images / 255.0
test_images = test_images / 255.0

# reshape for feeding into the model
train_images = train_images.reshape(train_images.shape[0], 28, 28, 1)
test_images = test_images.reshape(test_images.shape[0], 28, 28, 1)

class_names = ['0','1','2','3','4','5','6','7','8','9']

# Random generate one image
rando = random.randint(0,len(test_images)-1)
data = json.dumps({"signature_name": "serving_default", "instances": test_images[rando:rando+1].tolist()})
print('Data: {} ... {}'.format(data[:50], data[len(data)-52:]))

# HTTP call
headers = {"content-type": "application/json"}
json_response = requests.post(endpoint, data=data, headers=headers)
predictions = json.loads(json_response.text)['predictions']

print(predictions)

title = 'The model thought this was a class {}, and it was actually a class {}'.format(
test_labels[rando], predictions[0]['classes'])
print('\n')
print(title)


### Invoke serving API via UI

Open your_ALB_endpoint + `/mnist/kubeflow/ui/` to visit mnist UI page.