# Summary
One very popular data science example is the Taxi Cab (or Chicago Taxi) example that predicts trips that result in tips greater than 20% of the fare. We are going to showcase the same example on Kubeflow running on HPECP 5.2. We have updated few Docker images used in the example to store the UI metadata and Metrics artifacts in internal MinIO used by Kubeflow. We’ll explain all the changes in each of pipeline steps.

# Importing libraries and setting up variables

In [8]:
import kfp
from kfp import components
from kfp import dsl
from kfp import gcp
from kfp import onprem
from kubernetes.client.models import V1EnvVar

In [21]:
# Setting up connection to Kubeflow's minio to store our pipeline visualizations 
secretKey = V1EnvVar(name='MINIO_SECRET_KEY', value='minio123')
accessKey = V1EnvVar(name='MINIO_ACCESS_KEY', value='minio')
minio_endpoint = V1EnvVar(name='MINIO_ENDPOINT', value='minio-service:9000')

platform = 'local'
proxy = ""

# Downloading yaml files

In [22]:
# import os
# os.environ['http_proxy'] = "http://web-proxy.corp.hpecorp.net:8080"
# os.environ['https_proxy'] = "http://web-proxy.corp.hpecorp.net:8080"

dataflow_tf_data_validation_op  = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/tfdv_component.yaml')
dataflow_tf_transform_op        = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/tft_component.yaml')
tf_train_op                     = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/dnntrainer_component.yaml')
dataflow_tf_model_analyze_op    = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/tfma_component.yaml')
dataflow_tf_predict_op          = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/predict_component.yaml')

confusion_matrix_op             = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/confusion_matrix_component.yaml')
roc_op                          = components.load_component_from_url('https://raw.githubusercontent.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio/main/yamls/roc_component.yaml')

# os.environ['http_proxy'] = ""
# os.environ['https_proxy'] = ""

# Creating kubeflow pipeline 

In [23]:
@dsl.pipeline(
  name='TFX Taxi Cab Classification Pipeline Example',
  description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
    project,
    output="/mnt/shared",
    column_names='/mnt/shared/pipelines/column-names.json',
    key_columns='trip_start_timestamp',
    train='/mnt/shared/pipelines/train.csv',
    evaluation='/mnt/shared/pipelines/eval.csv',
    mode='local',
    preprocess_module='/mnt/shared/pipelines/preprocessing.py',
    learning_rate=0.1,
    hidden_layer_size='1500',
    steps=3000,
    analyze_slice_column='trip_start_hour'
):
    output_template = str(output) + '/{{workflow.uid}}/{{pod.name}}/data'
    target_lambda = """lambda x: (x['target'] > x['fare'] * 0.2)"""
    target_class_lambda = """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0"""

    tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'

    
    '''
    Creating PVC and cloning GIT repo 
    '''
    if platform != 'GCP':
        vop = dsl.VolumeOp(
            name="create_pvc",
            resource_name="pipeline-pvc",
            modes=dsl.VOLUME_MODE_RWM,
            size="1Gi"
        )
    if proxy != "":
        checkout = dsl.ContainerOp(
        name="checkout",
        image="alpine/git:latest",
        command=["git", "clone", "https://github.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio.git", str(output) + "/pipelines", "-c", "http.proxy={}".format(proxy)],
    ).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
    else:
        checkout = dsl.ContainerOp(
        name="checkout",
        image="alpine/git:latest",
        command=["git", "clone", "https://github.com/II-VSB-II/TaxiClassificationKubeflowPipelineMinio.git", str(output) + "/pipelines"],
    ).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))


    checkout.after(vop)

    '''
    TFDV can compute descriptive statistics that provide a quick overview of the data 
    in terms of the features that are present and the shapes of their value distributions
    
    Here we are going to validate our dataset and infer our schema 
    '''
    validation = dataflow_tf_data_validation_op(
        inference_data=train,
        validation_data=evaluation,
        column_names=column_names,
        key_columns=key_columns,
        gcp_project=project,
        run_mode=mode,
        validation_output=output_template,
    )

    '''
    using tf.Transform to do our data preprocessing and feature transformations
    here we are generating a boolean feature if a tip was > 20% and saving the features
    '''
    preprocess = dataflow_tf_transform_op(
        training_data_file_pattern=train,
        evaluation_data_file_pattern=evaluation,
        schema=validation.outputs['schema'],
        gcp_project=project,
        run_mode=mode,
        preprocessing_module=preprocess_module,
        transformed_data_dir=output_template
    )
    
    '''
    train a neural network model with Tensorflow
    '''
    training = tf_train_op(
        transformed_data_dir=preprocess.output,
        schema=validation.outputs['schema'],
        learning_rate=learning_rate,
        hidden_layer_size=hidden_layer_size,
        steps=steps,
        target='tips',
        preprocessing_module=preprocess_module,
        training_output_dir=output_template
    ).add_env_variable(secretKey).add_env_variable(accessKey).add_env_variable(minio_endpoint)

    '''
    analyze model and save visualizations into minio
    '''
    analysis = dataflow_tf_model_analyze_op(
        model=training.output,
        evaluation_data=evaluation,
        schema=validation.outputs['schema'],
        gcp_project=project,
        run_mode=mode,
        slice_columns=analyze_slice_column,
        analysis_results_dir=output_template
    ).add_env_variable(secretKey).add_env_variable(accessKey).add_env_variable(minio_endpoint)

    
    '''
    generate predictions into a table with it's features and save visualization into minio
    '''
    prediction = dataflow_tf_predict_op(
        data_file_pattern=evaluation,
        schema=validation.outputs['schema'],
        target_column='tips',
        model=training.output,
        run_mode=mode,
        gcp_project=project,
        predictions_dir=output_template
    ).add_env_variable(secretKey).add_env_variable(accessKey).add_env_variable(minio_endpoint)


    '''
    generate confusion matrix and save into minio
    '''
    cm = confusion_matrix_op(
        predictions=prediction.output,
        target_lambda=target_lambda,
        output_dir=output_template
    ).add_env_variable(secretKey).add_env_variable(accessKey).add_env_variable(minio_endpoint)

    
    ''' 
    generate ROC curve and save into minio
    '''
    roc = roc_op(
        predictions_dir=prediction.output,
        target_lambda=target_class_lambda,
        output_dir=output_template
    ).add_env_variable(secretKey).add_env_variable(accessKey).add_env_variable(minio_endpoint)


    steps = [validation, preprocess, training, analysis, prediction, cm, roc]
    
    '''
    mount the pvc to each of the pipeline steps 
    '''
    for step in steps:
        step.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))

In [24]:
arguments = {
    "project" : "project",
    "output" : "/mnt/shared",
    "column_names" : '/mnt/shared/pipelines/column-names.json',
    "key_columns" :'trip_start_timestamp',
    "train" : '/mnt/shared/pipelines/train.csv',
    "evaluation": '/mnt/shared/pipelines/eval.csv',
    "mode" : 'local',
    "preprocess_module": '/mnt/shared/pipelines/preprocessing.py',
    "learning_rate": 0.1,
    "hidden_layer_size": '1500',
    "steps": 3000,
    "analyze_slice_column": 'trip_start_hour'
}

# Compiles pipeline into a zip file in which you can upload directly to the KFP UI

In [13]:
kfp.compiler.Compiler().compile(taxi_cab_classification, "TaxiPipelineMinio" + '.zip', arguments)

# Executing pipeline directly from KubeDirector notebook

In [15]:
import kfp
import json
import kfp.dsl as dsl


# #For a non ssl system, enter the kubeflow dashboard url with '/pipeline' at the end.
# url = ''

# from ezmeral_kf_utils import KfSession
# K = KfSession(url)
# client=K.kf_client()

# #For as ssl enabled system, set both kubeflow url with '/pipeline' at the end and location of certificate.
# url = ''
# cert = ''

# from ezmeral_kf_utils import KfSession
# K = KfSession(url,cert)
# client=K.kf_client()


# client.create_run_from_pipeline_func(
#     taxi_cab_classification, 
#     experiment_name="Taxi Cab",
#     arguments=arguments)

# Executing directly from Kubeflow notebook

In [26]:
kfp.Client().create_run_from_pipeline_func(
    taxi_cab_classification, 
    experiment_name="Taxi Cab",
    arguments=arguments)

RunPipelineResult(run_id=9294c3d9-aaa4-4be7-8109-6b1892f63b4c)

## Now look at the Kubeflow pipelines UI and watch the training happen! 