 ============================================================================== \
 Copyright 2020 Google LLC. This software is provided as-is, without warranty \
 or representation for any use or purpose. Your use of it is subject to your \
 agreement with Google. \
 ============================================================================== 
 
 Author: Elvin Zhu, Chanchal Chatterjee \
 Email: elvinzhu@google.com \
<img src="img/google-cloud-icon.jpg" alt="Drawing" style="width: 200px;"/>

In [1]:
# !python3 -m pip install kfp

In [2]:
import os
import kfp
import yaml
import kfp.components as comp
import kfp.dsl as dsl
from typing import NamedTuple
from kfp.compiler import compiler

In [3]:
def data_preprocess(
    bucket_name: str,
    input_file: str,
    target_column: str,
    ) -> NamedTuple('PreprocessOutput', 
              [
                  ('x_train_name', str),
                  ('x_test_name', str),
                  ('y_train_name', str),
                  ('y_test_name', str),
                  ('n_classes', int),
              ]):
    
    from collections import namedtuple
    from sklearn.model_selection import train_test_split
    import pandas as pd
    import os
    import logging


    logging.info("Preprocessing raw data:")
    logging.info(" => Drop id column:")
    logging.info(" => One hot encoding categorical features")
    logging.info(" => Count number of classes")
    logging.info(" => Perform train/test split")

    logging.info("Reading raw data file: {}".format(input_file))
    dataset = pd.read_csv(input_file)
    # Drop unique id column which is not useful for ML
    logging.info("Drop unique id column which is not an useful feature for ML: {}".format('LOAN_SEQUENCE_NUMBER'))
    dataset.drop(['LOAN_SEQUENCE_NUMBER'], axis=1, inplace=True)

    # Convert categorical columns into one-hot encodings
    logging.info("Convert categorical columns into one-hot encodings")
    [logging.info("categorical feature: {}".format(col)) for col in dataset.columns if dataset[col].dtype == 'object']
    str_cols = [col for col in dataset.columns if dataset[col].dtype == 'object']
    dataset = pd.get_dummies(dataset, columns=str_cols)
    
    # Count number of unique classes
    logging.info("Count number of unique classes ...")
    n_classes = dataset[target_column].nunique()
    logging.info("No. of Classes: {}".format(n_classes))

    # Split with a small test size so as to allow our model to train on more data
    logging.info("Perform train/test split ...")
    x_train, x_test, y_train, y_test = train_test_split(
        dataset.drop(target_column, axis=1), 
        dataset[target_column], 
        test_size=0.1,
        random_state=1,
        shuffle=True, 
        stratify=dataset[target_column], 
        )

    # Fill Nan value with zeros
    x_train = x_train.fillna(0)
    x_test = x_test.fillna(0)
    
    logging.info("Get feature/label shapes ...")
    logging.info("x_train shape = {}".format(x_train.shape))
    logging.info("x_test shape = {}".format(x_test.shape))
    logging.info("y_train shape = {}".format(y_train.shape))
    logging.info("y_test shape = {}".format(y_test.shape))
    
    # Build data split file name from input file name
    base_file_name = os.path.basename(input_file)
    base_name, ext_name = os.path.splitext(base_file_name)
    x_train_name = "{}_x_train{}".format(base_name, ext_name)
    x_test_name = "{}_x_test{}".format(base_name, ext_name)
    y_train_name = "{}_y_train{}".format(base_name, ext_name)
    y_test_name = "{}_y_test{}".format(base_name, ext_name)
    
    x_train_name = os.path.join("gs://", bucket_name, "data_split", x_train_name)
    x_test_name = os.path.join("gs://", bucket_name, "data_split", x_test_name)
    y_train_name = os.path.join("gs://", bucket_name, "data_split", y_train_name)
    y_test_name = os.path.join("gs://", bucket_name, "data_split", y_test_name)

    # Save split data to gcs
    x_train.to_csv(x_train_name, index=False)
    x_test.to_csv(x_test_name, index=False)

    # The preprocessing for label column is different 
    # between tensorflow and XGBoost models
    pd.get_dummies(y_train).to_csv(y_train_name, index=False, header=None)
    pd.get_dummies(y_test).to_csv(y_test_name, index=False, header=None)
    
    # Saving data
    logging.info("Saving data ...")
    logging.info("x_train saved to {}".format(x_train_name))
    logging.info("y_train saved to {}".format(y_train_name))
    logging.info("x_test saved to {}".format(x_test_name))
    logging.info("y_test saved to {}".format(y_test_name))
    logging.info("finished")
    
    PreprocessOutput = namedtuple('PreprocessOutput', 
        ['x_train_name', 'x_test_name', 'y_train_name', 'y_test_name', 'n_classes'])
    return PreprocessOutput(
        x_train_name=x_train_name,
        x_test_name=x_test_name,
        y_train_name=y_train_name,
        y_test_name=y_test_name,
        n_classes=n_classes,
    )

In [4]:
# !gsutil cp ./config/config_hpt.yaml gs://tuti_asset/config/config_hpt.yaml
# !gsutil cp ./config/config_hpt.yaml gs://tuti_asset/config/config.yaml

In [5]:
def hypertune(
        job_name: str,
        bucket_name: str,
        job_folder_name: str,
        region: str,
        train_feature_path: str,
        train_label_path: str,
        val_feature_path: str,
        val_label_path: str,
        epochs: int,
        config_yaml: str = None,
    ) -> NamedTuple('TrainOutput', 
              [('response', str), ('job_name', str)]):
    from collections import namedtuple
    import subprocess
    import logging

    job_dir = 'gs://{}/{}/{}'.format(
        bucket_name,
        job_folder_name,
        job_name,
        )
    job_name = job_name + "_hpt"
    package_path = "/pipelines/component/trainer"
    module_name = "trainer.train_hpt"
    
    job_config = "/pipelines/component/config/config_hpt.yaml"
    # if user input config yaml, then replace the default
    if config_yaml is not None:
        with open(job_config, 'w') as fout:
            fout.write(config_yaml)

    logging.info("JOB_NAME = {} ".format(job_name))
    logging.info("JOB_DIR = {} ".format(job_dir))
    logging.info("JOB_CONFIG = {} ".format(job_config))
        
    response = subprocess.run([
        "gcloud", "ai-platform", "jobs", "submit", "training",
        job_name,
        "--package-path", package_path,
        "--module-name", module_name,
        "--python-version", "3.7",
        "--runtime-version", "2.2",
        "--job-dir", job_dir,
        "--region", region,
        "--config", job_config,
        "--",
        "--train_feature_name", train_feature_path,
        "--train_label_name", train_label_path,
        "--test_feature_name", val_feature_path,
        "--test_label_name", val_label_path,
        "--epochs", str(epochs),
        ], stdout=subprocess.PIPE)   
    
    response = subprocess.run([
        "gcloud", "ai-platform", "jobs", "describe", job_name,
        ], stdout=subprocess.PIPE)
    
    TrainOutput = namedtuple('TrainOutput',['response', 'job_name'])
        
    return TrainOutput(response=response.stdout.decode(), job_name=job_name)

In [6]:
def get_job_status(
        response: str,
        job_name: str,
        time_out: int = 9000, # timeout after 2.5 hours by default
        time_sleep: int = 60, # check status every minute by default
    ) -> NamedTuple('LRO_Output', 
              [('response', str), ('status', bool)]):
    from collections import namedtuple
    import subprocess
    import time
    import yaml
    import logging

    time0 = time.time()
    status = False
    while time.time() - time0 < time_out:
        response = subprocess.run([
            "gcloud", "ai-platform", "jobs", "describe", job_name,
            ], stdout=subprocess.PIPE)
        response = response.stdout.decode()
        response_dict = yaml.safe_load(response)
        if 'state' in response_dict and response_dict.get('state') == 'SUCCEEDED':
            status = True
            break
        else:
            logging.info("Checking status ...")
            logging.info(response)
            time.sleep(time_sleep)
    if not status:
        raise TimeoutError("No successful job found. Timeout after {} seconds".format(time_out))
        
    LRO_Output = namedtuple('LRO_Output',['response', 'status'])
    return LRO_Output(response=response, status=status)        

In [7]:
def get_hyperparameter(
        project_id: str,
        job_name: str, 
        status: bool,
    ) -> NamedTuple('Ghp_Output', 
              [('model_depth', int), ('dropout_rate', float), ('learning_rate', float), ('batch_size', int)]):
    from googleapiclient import discovery
    import json
    import logging
    import pandas as pd
    from collections import namedtuple
    
    # Define the project id and the job id and format it for the api request
    job_id = 'projects/{}/jobs/{}'.format(project_id, job_name)

    # Build the service
    ml = discovery.build('ml', 'v1', cache_discovery=False)
    # Execute the request and pass in the job id
    request = ml.projects().jobs().get(name=job_id).execute()
    logging.info(json.dumps(request, indent=4))
    # Print response
    logging.info(json.dumps(request, indent=4))
    trials = request['trainingOutput']['trials']
    trials = pd.DataFrame(trials)
    trials['hyperparameters.model_depth'] = trials['hyperparameters'].apply(lambda x: x['model_depth'])
    trials['hyperparameters.dropout_rate'] = trials['hyperparameters'].apply(lambda x: x['dropout_rate'])
    trials['hyperparameters.learning_rate'] = trials['hyperparameters'].apply(lambda x: x['learning_rate'])
    trials['hyperparameters.batch_size'] = trials['hyperparameters'].apply(lambda x: x['batch_size'])
    trials['finalMetric.trainingStep'] = trials['finalMetric'].apply(lambda x: x['trainingStep'])
    trials['finalMetric.objectiveValue'] = trials['finalMetric'].apply(lambda x: x['objectiveValue'])
    trials = trials.sort_values(['finalMetric.objectiveValue'], ascending=False)
    
    model_depth=trials['hyperparameters'][0]['model_depth']
    dropout_rate=trials['hyperparameters'][0]['dropout_rate']
    learning_rate=trials['hyperparameters'][0]['learning_rate']
    batch_size=trials['hyperparameters'][0]['batch_size']

    Ghp_Output = namedtuple('Ghp_Output',['model_depth', 'dropout_rate', 'learning_rate', 'batch_size'])
    return Ghp_Output(model_depth=model_depth, dropout_rate=dropout_rate, learning_rate=learning_rate, batch_size=batch_size )  

In [8]:
def train(
        job_name: str,
        bucket_name: str,
        job_folder_name: str,
        region: str,
        train_feature_path: str,
        train_label_path: str,
        val_feature_path: str,
        val_label_path: str,
        model_depth: int,
        dropout_rate: float,
        learning_rate: float,
        batch_size: int,
        epochs: int,
        config_yaml: str = None,
    ) -> NamedTuple('TrainOutput', 
              [('response', str), ('job_name', str)]):
    from collections import namedtuple
    import subprocess
    import logging

    job_dir = 'gs://{}/{}/{}'.format(
        bucket_name,
        job_folder_name,
        job_name,
        )
    package_path = "/pipelines/component/trainer"
    module_name = "trainer.train"
    job_config = "/pipelines/component/config/config.yaml"
    logging.info("JOB_NAME = {} ".format(job_name))
    logging.info("JOB_DIR = {} ".format(job_dir))
    logging.info("JOB_CONFIG = {} ".format(job_config))
    # if user input config yaml, then replace the default
    if config_yaml is not None:
        with open(job_config, 'w') as fout:
            fout.write(config_yaml)
            
    response = subprocess.run([
        "gcloud", "ai-platform", "jobs", "submit", "training",
        job_name,
        "--job-dir", job_dir,
        "--package-path", package_path,
        "--module-name", module_name,
        "--region", region,
        "--python-version", "3.7",
        "--runtime-version", "2.2",
        "--config", job_config,
        "--",
        "--train_feature_name", train_feature_path,
        "--train_label_name", train_label_path,
        "--test_feature_name", val_feature_path,
        "--test_label_name", val_label_path,
        "--model_depth", str(model_depth),
        "--dropout_rate", str(dropout_rate),
        "--learning_rate", str(learning_rate),
        "--batch_size", str(batch_size),
        "--epochs", str(epochs),
    ], stdout=subprocess.PIPE)
    
    response = subprocess.run([
        "gcloud", "ai-platform", "jobs", "describe", job_name,
    ], stdout=subprocess.PIPE)
    
    TrainOutput = namedtuple('TrainOutput',['response', 'job_name'])
        
    return TrainOutput(response=response.stdout.decode(), job_name=job_name)

In [9]:
def deploy(
    bucket_name: str,
    job_folder_name: str,
    job_name: str,
    model_name: str,
    model_version: str,
    region:str,
    model_framework:str,
    model_description: str,
    status: bool,
    ):
    from collections import namedtuple
    import subprocess
    import logging
    import re
       
    latest_model_dir = "gs://{}/{}/{}".format(bucket_name, job_folder_name, job_name)
    
    # Check if model exists:
    response = subprocess.run([
            "gcloud", "ai-platform", "models", "list",
            "--region", "global",
        ], stdout=subprocess.PIPE)
    response = response.stdout.decode().split("\n")[1:]
    list_of_models = [re.sub(" +", " ", x).split(" ")[0] for x in response]

    # create model if not exists
    if not model_name in list_of_models:
        # create model
        response = subprocess.run([
            "gcloud", "ai-platform", "models", "create",
            model_name,
            "--region", region,
            "--enable-logging",
        ], stdout=subprocess.PIPE)
    
    # create model version
    response = subprocess.run([
        "gcloud","beta", "ai-platform", "versions", "create",
        model_version,
        "--model", model_name,
        "--origin", latest_model_dir,
        "--region", "global",
        "--python-version", "3.7",
        "--runtime-version", "2.2",
        "--framework", model_framework,
        "--description", model_description,
    ], stdout=subprocess.PIPE)
    
    DeployOutput = namedtuple('DeployOutput',['response'])
        
    return DeployOutput(response=response.stdout.decode())

### Compile python functions to components

In [10]:
component_dir = "./components"

base_image = "gcr.io/deeplearning-platform-release/tf2-gpu.2-1"
yaml_name = '{}/preprocess.yaml'.format(component_dir)

preprocess_op = comp.func_to_container_op(
    data_preprocess, 
    output_component_file=yaml_name,
    base_image=base_image)


base_image = "gcr.io/img-seg-3d/trainer-tf:v1"
yaml_name = '{}/train_hpt.yaml'.format(component_dir)

hypertune_op = comp.func_to_container_op(
    hypertune, 
    output_component_file=yaml_name,
    base_image=base_image)


base_image = "gcr.io/deeplearning-platform-release/tf2-gpu.2-1"
yaml_name = '{}/lro.yaml'.format(component_dir)

lro_op = comp.func_to_container_op(
    get_job_status, 
    output_component_file=yaml_name,
    base_image=base_image)

base_image = "gcr.io/deeplearning-platform-release/tf2-gpu.2-1"
yaml_name = '{}/ghp.yaml'.format(component_dir)

ghp_op = comp.func_to_container_op(
    get_hyperparameter, 
    output_component_file=yaml_name,
    base_image=base_image)

base_image = "gcr.io/img-seg-3d/trainer-tf@sha256:9565ad78f43f05b69fb6afe86b43f61627b76407f998b7de00ada6b886cb05d8"
yaml_name = '{}/train.yaml'.format(component_dir)

train_op = comp.func_to_container_op(
    train, 
    output_component_file=yaml_name,
    base_image=base_image)


base_image = "gcr.io/deeplearning-platform-release/tf2-gpu.2-1"
yaml_name = '{}/deploy.yaml'.format(component_dir)

deploy_op = comp.func_to_container_op(
    deploy, 
    output_component_file=yaml_name,
    base_image=base_image)


### Compile KFP pipeline

In [11]:
@dsl.pipeline(
   name='generic prediction pipeline',
   description='A pipeline that performs generic seismic image segmentation.'
)
def train_pipeline(
    job_name: str,
    project_id: str,
    region: str,
    user_name: str,
    bucket_name: str,
    input_file: str,
    job_folder_name: str,
    target_column: str,
    deployed_model_name: str,
    deployed_model_version: str,
    deployed_model_description: str,
    config_yaml_hpt: str,
    config_yaml: str,
    epochs_hpt: int,
    epochs_final: int,
    ):
    preprocess_task = preprocess_op(
        bucket_name = bucket_name,
        input_file = input_file,
        target_column = target_column,
    )
    
    hpt_task = hypertune_op(
        job_name = job_name,
        bucket_name = bucket_name,
        job_folder_name = job_folder_name,
        region = region,
        train_feature_path = preprocess_task.outputs['x_train_name'],
        train_label_path = preprocess_task.outputs['y_train_name'],
        val_feature_path = preprocess_task.outputs['x_test_name'],
        val_label_path = preprocess_task.outputs['y_test_name'],
        epochs = epochs_hpt,
        config_yaml = config_yaml_hpt,
    )
           
    lro_task = lro_op(
        response = hpt_task.outputs['response'],
        job_name = hpt_task.outputs['job_name'],
    )
    
    ghp_task = ghp_op(
        project_id = project_id,
        job_name = hpt_task.outputs['job_name'],
        status = lro_task.outputs['status'],
    )
    
    train_task = train_op(
        job_name = job_name,
        bucket_name = bucket_name,
        job_folder_name = job_folder_name,
        region = region,
        train_feature_path = preprocess_task.outputs['x_train_name'],
        train_label_path = preprocess_task.outputs['y_train_name'],
        val_feature_path = preprocess_task.outputs['x_test_name'],
        val_label_path = preprocess_task.outputs['y_test_name'],
        model_depth = ghp_task.outputs['model_depth'],
        dropout_rate = ghp_task.outputs['dropout_rate'],
        learning_rate = ghp_task.outputs['learning_rate'],
        batch_size = ghp_task.outputs['batch_size'],
        epochs = epochs_final,
        config_yaml = config_yaml,
    )
    
    lro_task_2 = lro_op(
        response = train_task.outputs['response'],
        job_name = train_task.outputs['job_name'],
    )
    
    deploy_task = deploy_op(
        status = lro_task_2.outputs['status'],
        bucket_name = bucket_name,
        job_folder_name = job_folder_name,
        job_name = train_task.outputs['job_name'],
        region = 'global',
        model_framework = 'tensorflow',
        model_name = deployed_model_name,
        model_version = deployed_model_version,
        model_description = deployed_model_description,
    )
    
pipeline_pkg_path="./train_pipeline.tar.gz"

compiler.Compiler().compile(train_pipeline, package_path=pipeline_pkg_path)

### Run KFP pipeline on AI Platform hosted Kubernetes cluster

In [12]:
# ============== Uncomment to run the pipeline ==============

# from datetime import datetime
# from pytz import timezone
# my_timezone = 'US/Pacific'

# # Coping config file to worker VM
# config_hpt = "./config/config_hpt.yaml"
# with open(config_hpt, 'r') as fin:
#     config_yaml_hpt = fin.read()
    
# config = "./config/config.yaml"
# with open(config, 'r') as fin:
#     config_yaml = fin.read()
    
# # Define pipeline input
# params = {
#     "job_name": 'tf_train_elvinzhu_{}'.format(
#         datetime.now(timezone(my_timezone)).strftime("%m%d%y_%H%M")
#         ),
#     "project_id": 'img-seg-3d',
#     "region": 'us-central1',
#     "user_name": 'elvinzhu',
#     "job_folder_name": 'tf_train_job',
#     "bucket_name": 'tuti_job',
#     "input_file": 'gs://tuti_asset/datasets/mortgage_structured.csv',
#     "target_column": 'TARGET',
#     "deployed_model_name": "kfp_tf_model",
#     "deployed_model_version": "kfp_tf_bst_v0_2",
#     "deployed_model_description": "best_tensorflow_hpt",
#     "config_yaml_hpt": config_yaml_hpt,
#     "config_yaml": config_yaml,
#     "epochs_hpt": 5, # No. of epochs for training in hypertune
#     "epochs_final": 10, # No. of epochs for final training
# }
# kfp_host_name = '6ff530db99970db2-dot-us-central2.pipelines.googleusercontent.com'
# kfp_exp_name = 'tensorflow_ai_platform'
# kfp_run_name = 'demo_tensorflow'

# client = kfp.Client(host=kfp_host_name) 
# # Create Experiment GROUP
# exp = client.create_experiment(name = kfp_exp_name)
# # Create Experiment RUN
# run = client.run_pipeline(exp.id, kfp_run_name, pipeline_pkg_path, params=params)