# KubeFlow Pipeline: Github Issue Summarization using Tensor2Tensor

Currently, this notebook must be run from the Kubeflow JupyterHub installation, as described in the codelab.

In this notebook, we will show how to:

* Interactively define a KubeFlow Pipeline using the Pipelines Python SDK
* Submit and run the pipeline
* Add a step in the pipeline

This example pipeline trains a [Tensor2Tensor](https://github.com/tensorflow/tensor2tensor/) model on Github issue data, learning to predict issue titles from issue bodies. It then exports the trained model and deploys the exported model to [Tensorflow Serving](https://github.com/tensorflow/serving). 
The final step in the pipeline launches a web app which interacts with the TF-Serving instance in order to get model predictions.

## Enviroinment Setup

Before any experiment can be conducted. We need to setup and initialize an environment: ensure all Python modules has been setup and configured, as well as python modules

Setting up python modules

In [None]:
!pip3 install --upgrade 'https://storage.googleapis.com/ml-pipeline/release/0.1.9/kfp.tar.gz' > /dev/null
!pip3 install --upgrade './extensions' > /dev/null
%load_ext extensions

All imports goes here

In [None]:
import boto3
import kfp
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.notebook

from ipython_secrets import get_secret
from kfp.compiler import Compiler

import extensions
import extensions.kaniko as kaniko
from os import environ

from extensions.kaniko import *

Do some imports and set some variables.  Set the `WORKING_DIR` to a path under the Cloud Storage bucket you created earlier.

In [None]:
EXPERIMENT_NAME = 'Github issue summarization'

USER = environ['JUPYTERHUB_USER']
# AWS_S3_BUCKET = get_secret('AWS_S3_BUCKET')
AWS_S3_BUCKET = 'asi-mldata'

DEPLOY_WEBAPP = 'false'
DOCKER_REGISTRY = get_secret('DOCKER_REGISTRY')
DOCKER_REGISTRY_SECRET = get_secret('DOCKER_REGISTRY_SECRET')
# we need to rotate image versions unless KFP dsl will support pullSecret
DOCKER_TAG = 'v62'

# we reuse docker tag as a safepoint for generated data
WORKING_DIR = f"s3://{AWS_S3_BUCKET}/{USER}"

GPU_SUPPORT = False
if GPU_SUPPORT:
    TENSORFLOW_IMAGE='tensorflow/tensorflow:latest-gpu'
else:
    TENSORFLOW_IMAGE='tensorflow/tensorflow:latest'

AWS_SECRET = 'jupyter-awscreds'

USE_ACCESS_SECRET_KEYS = True
if USE_ACCESS_SECRET_KEYS:
    sess = boto3.session.Session(
        aws_access_key_id=get_secret('AWS_ACCESS_KEY_ID'),
        aws_secret_access_key=get_secret('AWS_SECRET_ACCESS_KEY')
    )
else:
    sess = boto3.session.Session()
    
aws_to_kube_secret(secret_name=AWS_SECRET, session=sess)

The Kubeflow Pipeline (KFP) system requires an "Experiment" to group pipeline runs. 

To get reference to experiment we try naive but idempotent method. If experiment with desired name does not exists then retrieval function will `get_experiment()` with raise `ValueError`. In this case we will create a new KFP experiment. 

In [None]:
client = kfp.Client()
try:
    exp = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
    exp = client.create_experiment(EXPERIMENT_NAME)

## Prepare images

Before we can run training, we will build and compile docker container that we will use later in our pipeline

### Prepare dockerfile templates

Docker images can be rendered via `%%template` or `%templatefile` magics. It can intelligently use mustache `{{placeholder}}` templating syntax. Content will be replaced by the user namespace defined variable or system environment variable

You can use flags with the magic function:
* `-v` - to see content of rendered file. 
* `-h` - for more options


In [None]:
%%template Dockerfile.tf
FROM ubuntu:16.04
ENV PATH $PATH:/tools/ks/bin
RUN apt-get update -y
RUN apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip
RUN easy_install pip
RUN pip install pyyaml==3.12 six==1.11.0 requests==2.18.4 tensorflow==1.11.0
RUN pip install boto3 awscli
RUN wget -nv https://github.com/ksonnet/ksonnet/releases/download/v0.11.0/ks_0.11.0_linux_amd64.tar.gz && \
    tar -xvzf ks_0.11.0_linux_amd64.tar.gz && \
    mkdir -p /tools/ks/bin && \
    cp ./ks_0.11.0_linux_amd64/ks /tools/ks/bin && \
    rm ks_0.11.0_linux_amd64.tar.gz && \
    rm -r ks_0.11.0_linux_amd64
    
ADD https://storage.googleapis.com/kubernetes-release/release/v1.13.0/bin/linux/amd64/kubectl /usr/local/bin/kubectl
RUN chmod +x /usr/local/bin/kubectl
WORKDIR /ml

In [None]:
%%template Dockerfile.t2t
FROM tensorflow/tensorflow:latest
ENV PATH $PATH:/tools/node/bin
RUN apt-get update -y
RUN apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools \
                                                  wget unzip git
RUN easy_install pip
RUN pip install boto3 awscli
RUN pip install tensor2tensor
RUN pip install pyyaml==3.12 six==1.11.0

In [None]:
%%template Dockerfile.serve
FROM {{DOCKER_REGISTRY}}/library/tf:{{DOCKER_TAG}}
COPY serving /ml
ENTRYPOINT ["python", "/ml/deploy-tf-serve.py"]

In [None]:
%%template Dockerfile.deploy
FROM {{DOCKER_REGISTRY}}/library/tf:{{DOCKER_TAG}}
COPY deploy /ml
ENTRYPOINT ["python", "/ml/deploy-webapp.py"]

In [None]:
%%template Dockerfile.dataprep
FROM {{DOCKER_REGISTRY}}/library/t2t:{{DOCKER_TAG}}
COPY preproc /ml
RUN mkdir -p /ml/gh_data
RUN mkdir -p /ml/gh_data/tmp
WORKDIR /ml
ENTRYPOINT ["python", "/ml/datagen.py"]

In [None]:
%%template Dockerfile.webapp
FROM {{DOCKER_REGISTRY}}/library/t2t:{{DOCKER_TAG}}
# RUN pip install tensorflow_hub
RUN pip install tensorflow-serving-api
RUN pip install gunicorn
RUN pip install pandas
RUN pip install pyopenssl
COPY webapp /ml
WORKDIR /ml/app
CMD gunicorn -w 4 -b :8080 main:app

In [None]:
%%template Dockerfile.train
FROM {{TENSORFLOW_IMAGE}}
RUN apt-get update -y
RUN apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip git
RUN easy_install pip
RUN pip install boto3 awscli
RUN pip install tensor2tensor
RUN pip install tensorflow_hub
RUN pip install pyyaml==3.12 six==1.11.0
COPY training /ml
ENTRYPOINT ["python", "/ml/train_model.py"]

### Define build pipeline

Define build pipeline. Yes, we arguably using KFP to build images  that will be de-facto used by final pipeline.

We use [Kaniko](https://github.com/GoogleContainerTools/kaniko) and Kubernetes to handle build operations. Build status can be tracked via KFP pipeline dashboard

In fact build image job can be even combined with primary pipeline as physically it will be different Kubernetes pods. However for sake of general purpose efficiency we schedule build process via separate pipeline step

In [None]:
build_ctx=f"s3://{AWS_S3_BUCKET}/{EXPERIMENT_NAME}/dockerbuild.tar.gz"
upload_build_context_to_s3(build_ctx)

def kaniko_op(name, destination, dockerfile,
              context=build_ctx, aws_secret=AWS_SECRET, 
              pull_secret=DOCKER_REGISTRY_SECRET):
    """ template function for kaniko build operation
    """
    return dsl.ContainerOp(
        name=name,
        image='gcr.io/kaniko-project/executor:latest',
        arguments=['--destination', destination,
                   '--dockerfile', dockerfile,
                   '--context', context]
    ).apply(
        use_aws_credentials(secret_name=aws_secret)
    ).apply(
        kaniko.use_pull_secret(secret_name=pull_secret)
    )
    

@dsl.pipeline(
  name='Pipeline images',
  description='Build images that will be used by the pipeline'
)
def build_images():
    t2t = kaniko_op(
        name='tensor2tensor',
        destination=f"{DOCKER_REGISTRY}/library/t2t:{DOCKER_TAG}",
        dockerfile='Dockerfile.t2t'
    )
    tf = kaniko_op(
        name='tensorflow-cpu',
        destination=f"{DOCKER_REGISTRY}/library/tf:{DOCKER_TAG}",
        dockerfile='Dockerfile.tf'
    )
    deploy = kaniko_op(
        name='launch',
        destination=f"{DOCKER_REGISTRY}/library/deploy:{DOCKER_TAG}",
        dockerfile='Dockerfile.deploy'
    )

    serve = kaniko_op(
        name='serving',
        destination=f"{DOCKER_REGISTRY}/library/serving:{DOCKER_TAG}",
        dockerfile='Dockerfile.serve'
    )
    
    dataprep = kaniko_op(
        name='dataprep',
        destination=f"{DOCKER_REGISTRY}/library/dataprep:{DOCKER_TAG}",
        dockerfile='Dockerfile.dataprep'
    )

    webapp = kaniko_op(
        name='webapp',
        destination=f"{DOCKER_REGISTRY}/library/webapp:{DOCKER_TAG}",
        dockerfile='Dockerfile.webapp'
    )
        
    kaniko_op(
        name='training',
        destination=f"{DOCKER_REGISTRY}/library/training:{DOCKER_TAG}",
        dockerfile='Dockerfile.train',
    )

    # define dependencies
    deploy.after(tf)
    serve.after(tf)
    dataprep.after(t2t)
    webapp.after(t2t)
    
Compiler().compile(build_images, 'kaniko.tar.gz')

By default pipeline steps (`ContainerOp`) are running in parallel. However if you need a DAG, then you can link these teps with function `after()`.

Compiler transforms Python DSL into an [Argo Workflow](https://argoproj.github.io/docs/argo/readme.html). And stores generated artifacts in `kaniko.tar.gz`. So it could be executed multiple times. Perhaps with different parameters

In [None]:
run = client.run_pipeline(exp.id, 'Build images', 'kaniko.tar.gz')

Build process can be long a long term. Because often images that has been used for data science tasks are huge. In this case you might want to adjust `timeout` parameter

In [None]:
# block till completion
client.wait_for_run_completion(run.id, timeout=720).run.status

# Data Preparation

Data preparation contains quite simple but yet powerful experiment. We will download CSV file from remote location and process data so, it can be consumed by our main pipeline. If needed both pipelines can be combined together. It makes sense to split data preparation and training pipelines because data have it's own lifecycle which is different to the model training

## Define Pipeline


We define a pipeline that has two steps. Both steps are independent and can be executed in parallel. 

* `dataprep` - This steps takes a data set (Github issues CSV file) and creates a set of artifacts required by tensor2tensor for training
* `snapshot` - We store a snapshot of pre-trained model in S3 bucket. To make life easier for tensor2tensor. We put all data necessary for training into the single bucket before the training.
* `checkpoint` - Replicate a model checkpoint where it can be expected by the training

In [None]:
@dsl.pipeline(
  name='Data preparation',
  description="""Extract validate transform and load data into object storage. 
  So it could be accessible by the actual training
  """
)
def prepare_data(
    data_set: dsl.PipelineParam, 
    data_gen: dsl.PipelineParam,
    data_dir: dsl.PipelineParam,
    checkpoint_dir: dsl.PipelineParam,
    # default pipeline parameter     
    snapshot_dir: dsl.PipelineParam=dsl.PipelineParam(name='snapshot-dir', value='s3://asi-kubeflow-models/github/t2t_data_gh_all/'),
    checkpoint_bak: dsl.PipelineParam=dsl.PipelineParam(name='checkpoint-bak', value='s3://asi-kubeflow-models/github/model_output_tbase.bak2019000')
):
    dataprep = dsl.ContainerOp(
        name='gen-data',
        image=f"{DOCKER_REGISTRY}/library/dataprep:{DOCKER_TAG}",
        arguments=['--data-set', data_set,
                   '--data-gen', data_gen]
    )
    snaphot = dsl.ContainerOp(
        name='repl-snapshot',
        image='mesosphere/aws-cli',
        arguments=['s3', 'sync', snapshot_dir, data_dir]
    )
    checkpoint = dsl.ContainerOp(
        name='repl-checkpoint',
        image='mesosphere/aws-cli',
        arguments=['s3', 'sync', checkpoint_bak, checkpoint_dir]
    )
    
    if USE_ACCESS_SECRET_KEYS:
        dataprep.apply( use_aws_credentials(secret_name=AWS_SECRET) )
        snaphot.apply( use_aws_credentials(secret_name=AWS_SECRET) )
        checkpoint.apply( use_aws_credentials(secret_name=AWS_SECRET) )

Compiler().compile(prepare_data, 'dataprep.tar.gz')

Code below will run a pipeline and inject some pipeline parameters. Here we provide two versions of data sets
* `SAMPLE_DATA_SET` - Data set that has just over 2 megabytes. Not enough for sufficient training. However ideal for development, because of faster feedback.
* `FULL_DATA_SET` - Precreated data set with all github issues. 3 gigabytes. Good enough for sufficient model

Depending on your needs you can choose one or another data set and pass it as a pipeline parameter `data-set`

In [None]:
# github issues small: 2Mi data set (best for dev/test)
SAMPLE_DATA_SET = 'https://s3.us-east-2.amazonaws.com/asi-kubeflow-models/gh-issues/data-sample.csv'
# data set for 3Gi. (best for training)
FULL_DATA_SET = 'https://s3.us-east-2.amazonaws.com/asi-kubeflow-models/gh-issues/data-full.csv'

run = client.run_pipeline(exp.id, 'Prepare data', 'dataprep.tar.gz',
                          params={'data-set': FULL_DATA_SET,
                                  'data-gen': f"{WORKING_DIR}/gh_data/",
                                  'data-dir': f"{WORKING_DIR}/t2t_data_gh_all/",
                                  'checkpoint-dir': f"{WORKING_DIR}/model_output_tbase.ckd"})

In [None]:
# block till completion
client.wait_for_run_completion(run.id, timeout=720).run.status

## Define a Pipeline

Authoring a pipeline is like authoring a normal Python function. The pipeline function describes the topology of the pipeline. 

Each step in the pipeline is typically a `ContainerOp` --- a simple class or function describing how to interact with a docker container image. In the pipeline, all the container images referenced in the pipeline are already built. 

The pipeline starts by training a [Tensor2Tensor](https://github.com/tensorflow/tensor2tensor/) model, using already-preprocessed data. (More accurately, this step starts from an existing model checkpoint, then trains for a few more hundred steps).  When it finishes, it exports the model in a form suitable for serving by [TensorFlow serving](https://github.com/tensorflow/serving/).

The next step deploys a TF-serving instance with that model.

The last step launches a web app with which you can interact with the TF-serving instance to get model predictions.

Similar to Kaniko pipeline. We define a wrapper `tensorflow_op` for our `ContainerOp` that will serve as a template for our Tensorflow container operations. Sunc patterns simplifies readability of a pipeline

In [None]:
def tensorflow_op(name, image, arguments, file_outputs={}):
    """ template function for tensorflow or tensor2tensor container
    """
    from kubernetes.client import V1EnvVar
    
    op = dsl.ContainerOp(
        name = name,
        image = image,
        arguments = arguments,
        file_outputs = file_outputs
    ).add_env_variable(
        V1EnvVar(
            name='S3_USE_HTTPS', 
            value='1', 
    )).add_env_variable(
        V1EnvVar(
            name='S3_VERIFY_SSL', 
            value='1'
    ))
    
    if USE_ACCESS_SECRET_KEYS:
        op.apply( use_aws_credentials(secret_name=AWS_SECRET) )
    return op


@dsl.pipeline(
    name='Github issue summarization',
    description='Tensor2Tensor-based training and TF-Serving'
)
def main_experiment(
    train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=2019300),
    github_token: dsl.PipelineParam=dsl.PipelineParam(name='github-token', value='YOUR_GITHUB_TOKEN_HERE'),
    working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value=WORKING_DIR),
    checkpoint_dir: dsl.PipelineParam=dsl.PipelineParam(name='checkpoint-dir', value='s3://asi-kubeflow-models/github/model_output_tbase.bak2019000'),
    deploy_webapp: dsl.PipelineParam=dsl.PipelineParam(name='deploy-webapp', value='true'),
    data_dir: dsl.PipelineParam=dsl.PipelineParam(name='data-dir', value='s3://asi-kubeflow-models/github/t2t_data_gh_all/'),
    snapshot_dir: dsl.PipelineParam=dsl.PipelineParam(name='snapshot-dir', value='')):

    from kubernetes import client as kube_client
    
    train = tensorflow_op(
        name = 'training',
        image = f"{DOCKER_REGISTRY}/library/training:{DOCKER_TAG}",
        arguments = [ 
            "--data-dir", data_dir,
            "--checkpoint-dir", checkpoint_dir,
            "--model-dir", '%s/model_output' % working_dir,
            "--train-steps", train_steps, 
            "--deploy-webapp" , deploy_webapp],
        file_outputs={'output': '/tmp/output'})

    if GPU_SUPPORT:
        train.set_gpu_limit(1)
        
    serve = tensorflow_op(
        name = 'serving',
        image = f"{DOCKER_REGISTRY}/library/serving:{DOCKER_TAG}",
        arguments = [
            "--model_name", 'ghsumm-{{workflow.name}}',
            "--model_path", '%s/model_output/export' % working_dir,
            "--aws_secret", AWS_SECRET,
          ],
        file_outputs={'deployment': '/ml/tf-serve.yaml'}
    )
    serve.after(train)
    
    with dsl.Condition(train.output=='true'):
        webapp = tensorflow_op(
            name = 'webapp',
            image = f"{DOCKER_REGISTRY}/library/deploy:{DOCKER_TAG}",
            arguments = [
                "--model_name", 'ghsumm-%s' % ('{{workflow.name}}',),
                "--github_token", github_token,
                "--image", f"{DOCKER_REGISTRY}/library/webapp:{DOCKER_TAG}",
                "--data-dir", data_dir,
                "--aws_secret", AWS_SECRET],
            file_outputs={'deployment': '/ml/t2tapp.yaml'}
        )
        webapp.after(serve)
               
compiler.Compiler().compile(main_experiment, 'ghsumm.tar.gz')

## Submit an experiment *run*

The call below will run the compiled pipeline.  We won't actually do that now, but instead we'll add a new step to the pipeline, then run it.

In [None]:
# You'd uncomment this call to actually run the pipeline.
run = client.run_pipeline(exp.id, 'Github training', 'ghsumm.tar.gz',
                          params={'working-dir': WORKING_DIR,
#                                   'data-dir':  f"{WORKING_DIR}/gh_data/",
                                  'data-dir':  f"{WORKING_DIR}/t2t_data_gh_all/",
                                  'checkpoint-dir': f"{WORKING_DIR}/model_output_tbase.ckd",
                                  'train-steps': 2019300,
                                  'github-token': get_secret('GITHUB_TOKEN')})

In [None]:
# block till completion
c = client.wait_for_run_completion(run.id, timeout=12000)
c.run.status

![The new pipeline.](https://storage.googleapis.com/amy-jo/images/datagen_t2t_pipeline.png)

When this new pipeline finishes running, you'll be able to see your generated processed data files in S3 under the path: `WORKING_DIR/<username>/gh_data`. There isn't time in the workshop to pre-process the full dataset, but if there had been, we could have defined our pipeline to read from that generated directory for its training input.

-----------------------------
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.