# End to end experiment: Github Issue Summarization

Currently, this notebook must be run from the Kubeflow JupyterHub installation, as described in the codelab.

In this notebook, we will show how to:

* Interactively define a KubeFlow Pipeline using the Pipelines Python SDK
* Submit and run the pipeline
* Add a step in the pipeline

This example pipeline trains a [Tensor2Tensor](https://github.com/tensorflow/tensor2tensor/) model on Github issue data, learning to predict issue titles from issue bodies. It then exports the trained model and deploys the exported model to [Tensorflow Serving](https://github.com/tensorflow/serving). 
The final step in the pipeline launches a web app which interacts with the TF-Serving instance in order to get model predictions.

## Enviroinment Setup

Before any experiment can be conducted. We need to setup and initialize an environment: ensure all Python modules has been setup and configured, as well as python modules

### Imports
Setting up python modules

In [None]:
%reload_ext autoreload
%autoreload 2

%reload_ext nbextensions
%load_nbvars

import kfp.dsl as dsl
import kfp.gcp as gcp
import pandas as pd
from ipython_secrets import get_secret
from kfp.compiler import Compiler
from kfp.components import load_component_from_file
from os import environ
import boto3, kfp

from nbextensions.pv import use_pvc
from nbextensions.kubernetes import use_pull_secret
from nbextensions.aws import upload_to_s3

import nbextensions.utils as utils
from datetime import datetime
from urllib.parse import urlparse

import warnings
warnings.filterwarnings('ignore')

### Define global variables

Initialize global namespace variables. It is a good practice to place all global namespace variables in one cell. So, the notebook could be configured all-at-once. 

To enhance readability we would advice to capitalize such variables.

In [None]:
USER = environ.get('NB_USER', 'John Doe')
TAG = 'latest'
# TAG = 'v8'

BUILD_CONTEXT = f"{TAG}/buildcontext"
TRAINING_IMAGE = f"{DOCKER_REGISTRY}/training:{TAG}"
SERVING_IMAGE = f"{DOCKER_REGISTRY}/seldon:{TAG}"
FLASK_APP_IMAGE = f"{DOCKER_REGISTRY}/flask:{TAG}"
TRAINING_ROOT = f"{MOUNT_PATH}/{TAG}/training"

DATASET_FILE = f"{TRAINING_ROOT}/dataset.csv"
MODEL_FILE = f"{TRAINING_ROOT}/training1.h5"
TITLE_PP_FILE = f"{TRAINING_ROOT}/title_preprocessor.dpkl"
BODY_PP_FILE = f"{TRAINING_ROOT}/body_preprocessor.dpkl"
TRAIN_DF_FILE = f"{TRAINING_ROOT}/traindf.csv"
TEST_DF_FILE =  f"{TRAINING_ROOT}/testdf.csv"
TRAIN_TITLE_VECS = f"{TRAINING_ROOT}/train_title_vecs.npy"
TRAIN_BODY_VECS = f"{TRAINING_ROOT}/train_body_vecs.npy"

s3 = boto3.session.Session().client('s3', endpoint_url=BUCKET_ENDPOINT)

client = kfp.Client()
try:
    exp = client.get_experiment(experiment_name=APPLICATION_NAME)
except:
    exp = client.create_experiment(APPLICATION_NAME)

### Define build docker image pipeline

Define build pipeline. Yes, we arguably using KFP to build images  that will be de-facto used by final pipeline.

We use [Kaniko](https://github.com/GoogleContainerTools/kaniko) and Kubernetes to handle build operations. Build status can be tracked via KFP pipeline dashboard

In fact build image job can be even combined with primary pipeline as physically it will be different Kubernetes pods. However for sake of general purpose efficiency we schedule build process via separate pipeline step

In [None]:
kaniko_op = load_component_from_file('components/kaniko/deploy.yaml')

@dsl.pipeline(
  name='Pipeline images',
  description='Build images that will be used by the pipeline'
)
def build_image(
        image, 
        build_context=None, 
        dockerfile: dsl.PipelineParam=dsl.PipelineParam(name='dockerfile', value='Dockerfile')):
    kaniko_op(
        image=image,
        dockerfile=dockerfile,
        build_context=build_context
    ).apply(
        # docker registry credentials 
        use_pull_secret(secret_name=DOCKER_REGISTRY_PULL_SECRET)
    ).apply(
        # s3 bucket volume clame has been injected here        
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    )
        
Compiler().compile(build_image, '.kaniko.tar.gz')

Compiler transforms Python DSL into an [Argo Workflow](https://argoproj.github.io/docs/argo/readme.html). And stores generated artifacts in [`.kaniko.tar.gz`](.kaniko.tar.gz). So it could be executed multiple times. Perhaps with different parameters

## Distributed Training
Training is an integral part of our experiment. Distributed training means that it will be executed outside of a Jupyter Notebook and utilize maximum capacity of the current cluster. To achieve this we need to perform following actions:
* Build a dokcer image for training
* Define a training pipeline
* Run the experiment

### Building a training image
Once pipeline has been defined we can reuse it multiple times by supplying different input parameters.

Next section will upload all files to s3, to share access with the pipeline. Files that should be ignored can be customized in [kanikoignore.txt](./kanikoignore.txt). To understand upload scenario you can review and modify: [aws.py](./extensions/kaniko/aws.py)

In [None]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3
)

run = client.run_pipeline(
    exp.id, f'Build image: training:{TAG}', '.kaniko.tar.gz', 
    params={
        'image': TRAINING_IMAGE,
        'build-context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/training"
    })

Build process can be long a long term. Because often images that has been used for data science tasks are huge. In this case you might want to adjust `timeout` parameter

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

### Define Pipeline
We have extracted code for training pipeline into a [component](components/training). Python code that defines `training_op` as well as a `http_download_op` can be found [here](components/training/component.py)

Below we will define a pipeline that will run the training pipeline as an experimnet. This pipeline will do the following. Every training operation (except download) will be encapsulated by the python script. You can change the scripts at your will however, you will need to rebuild a training image.

* Download dataset from http 
* Split data into sample and test. It can also put a rownum limit into a dataset to increase feedback
* Preprocess data for machine learning (clean, tokenize and transform text into vector)
* Apply sequence to sequence training with Keras. By the completion trained model will be uplooaded into s3 bucket 

In [None]:
from components.training import (http_download_op, training_op)

@dsl.pipeline(
  name='Training',
  description="""
  Download dataset, 
  Split data set for training and validation, 
  Clean and preprocess data, 
  Train the model
  """
)
def training_pipeline(
    import_from: dsl.PipelineParam, 
    dataset_file: dsl.PipelineParam,
    dataset_md5: dsl.PipelineParam,
    train_df_file: dsl.PipelineParam,
    test_df_file: dsl.PipelineParam,
    title_pp_file: dsl.PipelineParam,
    body_pp_file: dsl.PipelineParam,
    train_title_vecs: dsl.PipelineParam,
    train_body_vecs: dsl.PipelineParam,
    model_file: dsl.PipelineParam,
    sample_size: dsl.PipelineParam=dsl.PipelineParam(name='sample-size', value='200'),
    learning_rate: dsl.PipelineParam=dsl.PipelineParam(name='learning-rate', value=0.001),
):  
    download = http_download_op(
        url=import_from,
        md5sum=dataset_md5,
        download_to=dataset_file
    ).apply(
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    )
    
    # Generates the training and test set. Only processes "sample-size" rows.
    process = training_op(
        script='process_data.py',
        arguments=[
            '--input_csv', dataset_file,
            '--sample_size', sample_size,
            '--output_traindf_csv', train_df_file, 
            '--output_testdf_csv', test_df_file,
        ]
    ).apply(
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    ).after(download)
    
    # Preprocess for deep learning
    preproc_for_ml = training_op(
        script='preproc.py',
        arguments=[
            '--input_traindf_csv', train_df_file,
            '--output_title_preprocessor_dpkl', title_pp_file,
            '--output_body_preprocessor_dpkl', body_pp_file,
            '--output_train_title_vecs_npy', train_title_vecs,
            '--output_train_body_vecs_npy', train_body_vecs,
        ],
        file_outputs={
            'title-example': '/tmp/train_title_raw.txt',
            'title-processed': '/tmp/train_title_vecs.txt',
            'body-example': '/tmp/train_body_raw.txt',
            'body-processed': '/tmp/train_body_vecs.txt',
        }
    ).apply(
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    ).after(process)
    
    # Training
    training = training_op(
        script='train.py',
        arguments=[
            '--input_title_preprocessor_dpkl', title_pp_file,
            '--input_body_preprocessor_dpkl', body_pp_file,
            '--input_train_title_vecs_npy', train_title_vecs,
            '--input_train_body_vecs_npy', train_body_vecs,
            '--script_name_base', '/tmp/seq2seq',
            '--output_model_h5', model_file,
            '--learning_rate', learning_rate,
           '--tempfile', "True",
        ],
        file_outputs={'train': '/tmp/seq2seq.log'},
    ).apply(
        use_pvc(name=BUCKET_PVC, mount_to=MOUNT_PATH)
    ).after(preproc_for_ml)
    
#     training.set_memory_request('2G')
#     training.set_cpu_request('1')

Compiler().compile(training_pipeline, '.training.tar.gz')

### Run the pipeline

Code below will run a pipeline and inject some pipeline parameters. Here we provide two versions of data sets
* `SAMPLE_DATA_SET` - Data set that has just over 2 megabytes. Not enough for sufficient training. However ideal for development, because of faster feedback.
* `FULL_DATA_SET` - Precreated data set with all github issues. 3 gigabytes. Good enough for sufficient model

Depending on your needs you can choose one or another data set and pass it as a pipeline parameter `data-set`

In [None]:
# github issues small: 2Mi data set (best for dev/test)
SAMPLE_DATASET = 'https://s3.us-east-2.amazonaws.com/asi-kubeflow-models/gh-issues/data-sample.csv'
SAMPLE_DATASET_MD5 = '916af946f2fe1d1779b26205d4d8378f'
# data set for 3Gi. (best for training)
FULL_DATASET = 'https://s3.us-east-2.amazonaws.com/asi-kubeflow-models/gh-issues/data-full.csv'
FULL_DATASET_MD5 = '57dc987c04d41a94d0d9daf4d0ebf8ba'

run = client.run_pipeline(exp.id, f'Training model {TAG}: {datetime.now():%m%d-%H%M}', '.training.tar.gz',
                          params={
                              'import-from': SAMPLE_DATASET,
                              'dataset-md5': SAMPLE_DATASET_MD5,
                              'dataset-file': DATASET_FILE,
                              'title-pp-file': TITLE_PP_FILE,
                              'body-pp-file': BODY_PP_FILE,
                              'train-df-file': TRAIN_DF_FILE,
                              'test-df-file': TEST_DF_FILE,
                              'train-title-vecs': TRAIN_TITLE_VECS,
                              'train-body-vecs': TRAIN_BODY_VECS,
                              'model-file': MODEL_FILE,
                              'learning-rate': 0.001,
                              'sample-size': 100,
                          })

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

## Serving with Seldon
Prepping a container for serving. 

Here we define all variables that will be needed for our dockerfile tempalte. 

- `MODEL_WRAPPER`: is a name of a python class that adapts keras model for serving
- `MODEL_NAME`: used in seldon deployment
- `MODEL_VERSION`: one model can be served multiple times with different versions simulteniously
- `SELDON_DEPLOYMENT`: name of kubernetes resource
- `SELDON_OAUTH_KEY`: part of shared secret between `SeldonDeployment` and a client application
- `SELDON_OAUTH_SECRET`: part of shared secret between `SeldonDeployment` and a client application
- `REPLICAS`: number of replicas for `SeldonDeployment` pod

In [None]:
import re

MODEL_WRAPPER = 'IssueSummarizationModel'
MODEL_NAME = re.sub(r'\W+', '-', MODEL_WRAPPER).lower()
MODEL_VERSION = TAG
SELDON_DEPLOYMENT = f"{MODEL_NAME}-{MODEL_VERSION}"
# here we hash a information about model, so it would be predictable
SELDON_OAUTH_KEY = utils.sha1(MODEL_NAME, MODEL_VERSION, NAMESPACE)
# for secure secret we will use hash of user defined shared secret salted with OAUTH_KEY
SELDON_OAUTH_SECRET = utils.sha1(SELDON_OAUTH_KEY, get_secret('USER_SECRET_FOR_MODEL'))
SELDON_APISERVER_ADDR=f"seldon-seldon-apiserver.{NAMESPACE}:8080"

SELDON_DEPLOYMENT_REPLICAS = 1

### Building a serving image

`SeldonDeployment` needs a docker image that contains a model wrapper written in (but not limited) Python

This step will build a container and serve it

In [None]:
%%template components/serving/Dockerfile
FROM seldonio/seldon-core-s2i-python3

FROM {{TRAINING_IMAGE}}
RUN pip3 install --no-cache-dir -U 'seldon-core'

COPY --from=0 /microservice /microservice
COPY src/serving.py /microservice/{{MODEL_WRAPPER}}.py
COPY src/seq2seq_utils.py /microservice
COPY src/text_utils.py /microservice

WORKDIR /microservice
ENTRYPOINT ["python","-u","microservice.py"]
CMD ["{{MODEL_WRAPPER}}", "REST"]

To be able to serve trained model we build an image with our serving microservice. To achieve this we reuse our kaniko pipeline defined above

In [None]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3,
)

run = client.run_pipeline(exp.id, f'Build image: serving:{TAG}', '.kaniko.tar.gz', 
                          params={
                              'image': SERVING_IMAGE,
                              'build-context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/serving"
                          })

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

### Serve model

Then we render our `SeldonDeployment` template and deploy it with `kubectl`, similar as we have done before with `pvc` definition. Here we define reference to the model that will be used for serving

In [None]:
%templatefile components/serving/templates/seldon.yaml -o seldon.yaml
!kubectl apply -f seldon.yaml --wait
!kubectl get -f seldon.yaml -o jsonpath='{.status.state}'

### Validate

It can take few minutes while a seldon applicaiton will be deployed. Once it will be deployed. Then we can send a test prediction

Test model serving by accessing seldon api server. Because Seldon API server provides an oauth, we need to receive a temporrary bearer token. We can receive this token by providing oauth key and secret that has been used in our `SeldonDeployment`

In [None]:
from IPython.display import Code
import nbextensions.seldon as seldon

test_payload = {
    "data":{"ndarray": [["try to stop flask from using multiple threads"]]},
}
                         
t = seldon.get_token(
    server=SELDON_APISERVER_ADDR,
    oauth_key=SELDON_OAUTH_KEY,
    oauth_secret=SELDON_OAUTH_SECRET,
)
result = seldon.prediction(
    server=SELDON_APISERVER_ADDR,
    payload=test_payload,
    token=t,
)
if result.get('status') == 'FAILURE':
    print("Error connecting to seldon core.", 
          "This may happen when Seldon has not been up and running yet.",
          "Try again later")
    display(Code(f"{result.get('reason')}: {result.get('info')}"))
else:
    display(
        pd.DataFrame.from_dict({
            'test': test_payload['data']['ndarray'][0],
            'prediction': result['data']['ndarray'][0],
    }))


## Deploy a client application

This section will be focused on application deployment routines.
- `FLASK_APP`: name of the kubernetes deployment associated with the applicaiton
- `FLASK_REPLICAS`: number of replicas for application deployment pod
- `GITHUB_TOKEN`: Github Token to access Github API. This will help application to fetch a random github issue

In [None]:
FLASK_APP=APPLICATION_NAME
FLASK_REPLICAS = 1
GITHUB_TOKEN=get_secret('github_token')

### Building an application container

User application has been implemented inside [app.py](components/flaskapp/src/app.py). We bake this applicaiton inside of docker container and deploy it further

In [None]:
%%template components/flaskapp/Dockerfile -v
FROM {{TRAINING_IMAGE}}
RUN pip3 install --no-cache-dir -U 'flask>=0.12.3'
COPY src/ /app
WORKDIR /app
ENTRYPOINT ["python","-u","app.py"]

In [None]:
upload_to_s3(
    destination=f"s3://{BUCKET_NAME}/{BUILD_CONTEXT}",
    ignorefile='components/kaniko/ignorefile.txt',
    workspace='.',
    s3_client=s3,
)

run = client.run_pipeline(
    exp.id, f'Build image: application:{TAG}', '.kaniko.tar.gz', 
    params={
      'image': FLASK_APP_IMAGE,
      'build-context': f"{MOUNT_PATH}/{BUILD_CONTEXT}/components/flaskapp"
    }
)

In [None]:
%%time
# block until job completion
print(f"Waiting for run: {run.id}...")
result = client.wait_for_run_completion(run.id, timeout=720).run.status
print(f"Finished with: {result}")

### Deploy a web application

Client web application is a simple Python [flask](http://flask.pocoo.org) application. Deployment manifest can be defined via kubernetes deployment template file [link](components/flaskapp/templates/application.yaml). We render this template with current notebook global variables and then use `kubectl` to deploy.

For web access application deployment will use an Ambassador http router, which is part of Kubeflow stack

In [None]:
%templatefile components/flaskapp/templates/application.yaml -o application.yaml
!kubectl apply -f application.yaml --wait

from IPython.display import Markdown, HTML
display(HTML(f'Application can be accessible <a href="/{FLASK_APP}/" target="_blank">here</a>'))

## Tear down

Uppon completion, let's tear everything down

In [None]:
# !kubectl delete -f seldon.yaml
!kubectl delete -f application.yaml