## Chicago Taxi Example using Tensorflow Extended (not ready)

In [None]:
> Note: this tutorial is not ready, please skip it.

The Chicago Taxi example demonstrates the end-to-end workflow and steps of how to analyze, validate and transform data, train a model, analyze and serve it. It uses:


Note: This site provides applications using data that has been modified for use from its original source, www.cityofchicago.org, the official website of the City of Chicago. The City of Chicago makes no claims as to the content, accuracy, timeliness, or completeness of any of the data provided at this site. The data provided at this site is subject to change at any time. It is understood that the data provided at this site is being used at one’s own risk.


## The dataset

This sample is based on the model-analysis example 
[here](https://github.com/tensorflow/tfx/tree/master/tfx/examples/chicago_taxi).

The sample trains and analyzes a model based on the 
[Taxi Trips dataset](https://data.cityofchicago.org/Transportation/Taxi-Trips/wrvz-psew)
released by the City of Chicago.

Kubeflow website has a very detail expaination of kubeflow components, please go to [Introduction to the Pipelines SDK](https://www.kubeflow.org/docs/pipelines/sdk/sdk-overview/) for details

## Prerequisite

TFX doesn't support S3 now and all data have to be shared in a network file system. We need to prepare a persistvolume for it.

## Build Pipeline

In [None]:
import kfp.dsl as dsl
import datetime
from kubernetes.client.models import V1EnvVar
from kubernetes import client as k8s_client

def dataflow_tf_data_validation_op(inference_data: 'GcsUri', validation_data: 'GcsUri', column_names: 'GcsUri[text/json]', key_columns, project: 'GcpProject', mode, validation_output: 'GcsUri[Directory]', step_name='validation'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfdv:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
        arguments = [
            '--csv-data-for-inference', inference_data,
            '--csv-data-to-validate', validation_data,
            '--column-names', column_names,
            '--key-columns', key_columns,
            '--project', project,
            '--mode', mode,
            '--output', '%s/{{workflow.name}}/validation' % validation_output,
        ],
        file_outputs = {
            'schema': '/schema.txt',
            'validation': '/output_validation_result.txt',
        }
    )

def dataflow_tf_transform_op(train_data: 'GcsUri', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', preprocess_mode, preprocess_module: 'GcsUri[text/code/python]', transform_output: 'GcsUri[Directory]', step_name='preprocess'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tft:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
        arguments = [
            '--train', train_data,
            '--eval', evaluation_data,
            '--schema', schema,
            '--project', project,
            '--mode', preprocess_mode,
            '--preprocessing-module', preprocess_module,
            '--output', '%s/{{workflow.name}}/transformed' % transform_output,
        ],
        file_outputs = {'transformed': '/output.txt'}
    )


def tf_train_op(transformed_data_dir, schema: 'GcsUri[text/json]', learning_rate: float, hidden_layer_size: int, steps: int, target: str, preprocess_module: 'GcsUri[text/code/python]', training_output: 'GcsUri[Directory]', step_name='training'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-tf-trainer:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
        arguments = [
            '--transformed-data-dir', transformed_data_dir,
            '--schema', schema,
            '--learning-rate', learning_rate,
            '--hidden-layer-size', hidden_layer_size,
            '--steps', steps,
            '--target', target,
            '--preprocessing-module', preprocess_module,
            '--job-dir', '%s/{{workflow.name}}/train' % training_output,
        ],
        file_outputs = {'train': '/output.txt'}
    )

def dataflow_tf_model_analyze_op(model: 'TensorFlow model', evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', project: 'GcpProject', analyze_mode, analyze_slice_column, analysis_output: 'GcsUri', step_name='analysis'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tfma:2c2445df83fa879387a200747cc20f72a7ee9727',
        arguments = [
            '--model', model,
            '--eval', evaluation_data,
            '--schema', schema,
            '--project', project,
            '--mode', analyze_mode,
            '--slice-columns', analyze_slice_column,
            '--output', '%s/{{workflow.name}}/analysis' % analysis_output,
        ],
        file_outputs = {'analysis': '/output.txt'}
    )


def dataflow_tf_predict_op(evaluation_data: 'GcsUri', schema: 'GcsUri[text/json]', target: str, model: 'TensorFlow model', predict_mode, project: 'GcpProject', prediction_output: 'GcsUri', step_name='prediction'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-dataflow-tf-predict:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
        arguments = [
            '--data', evaluation_data,
            '--schema', schema,
            '--target', target,
            '--model',  model,
            '--mode', predict_mode,
            '--project', project,
            '--output', '%s/{{workflow.name}}/predict' % prediction_output,
        ],
        file_outputs = {'prediction': '/output.txt'}
    )


def confusion_matrix_op(predictions: 'GcsUri', output: 'GcsUri', step_name='confusion_matrix'):
  return dsl.ContainerOp(
      name=step_name,
      image='gcr.io/ml-pipeline/ml-pipeline-local-confusion-matrix:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
      arguments=[
        '--output', '%s/{{workflow.name}}/confusionmatrix' % output,
        '--predictions', predictions,
        '--target_lambda', """lambda x: (x['target'] > x['fare'] * 0.2)""",
     ])


def roc_op(predictions: 'GcsUri', output: 'GcsUri', step_name='roc'):
  return dsl.ContainerOp(
      name=step_name,
      image='gcr.io/ml-pipeline/ml-pipeline-local-roc:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
      arguments=[
        '--output', '%s/{{workflow.name}}/roc' % output,
        '--predictions', predictions,
        '--target_lambda', """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0""",
     ])


def kubeflow_deploy_op(model: 'TensorFlow model', tf_server_name, step_name='deploy'):
    return dsl.ContainerOp(
        name = step_name,
        image = 'gcr.io/ml-pipeline/ml-pipeline-kubeflow-deployer:a277f87ea1d4707bf860d080d06639b7caf9a1cf',
        arguments = [
            '--model-export-path', '%s/export/export' % model,
            '--server-name', tf_server_name,
            '--pvc-name', 'efs-claim',
            '--cluster-name', 'eks-cluster'
        ]
    )


@dsl.pipeline(
  name='TFX Taxi Cab Classification Pipeline Example',
  description='Example pipeline that does classification with model analysis based on a public BigQuery dataset.'
)
def taxi_cab_classification(
    output,
    project,
    column_names='/mnt/taxi/column-names.json',
    key_columns='trip_start_timestamp',
    train='/mnt/taxi/train.csv',
    evaluation='/mnt/taxi/eval.csv',
    mode='local',
    preprocess_module='/mnt/taxi/preprocessing.py',
    learning_rate=0.1,
    hidden_layer_size='1500',
    steps=3000,
    analyze_slice_column='trip_start_hour'):

  tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'
  validation = dataflow_tf_data_validation_op(train, evaluation, column_names,
      key_columns, project, mode, output
  )
  preprocess = dataflow_tf_transform_op(train, evaluation, validation.outputs['schema'],
      project, mode, preprocess_module, output
  )
  training = tf_train_op(preprocess.output, validation.outputs['schema'], learning_rate,
      hidden_layer_size, steps, 'tips', preprocess_module, output
  )
  analysis = dataflow_tf_model_analyze_op(training.output, evaluation,
      validation.outputs['schema'], project, mode, analyze_slice_column, output
  )
  prediction = dataflow_tf_predict_op(evaluation, validation.outputs['schema'], 'tips',
      training.output, mode, project, output
  )
  cm = confusion_matrix_op(prediction.output, output)
  roc = roc_op(prediction.output, output)
  deploy = kubeflow_deploy_op(training.output, tf_server_name)

  validation.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  preprocess.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  training.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  analysis.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  prediction.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  cm.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  roc.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))
  deploy.add_volume_mount(k8s_client.V1VolumeMount(mount_path='/mnt',name='data-storage')).add_volume(k8s_client.V1Volume(name='data-storage',persistent_volume_claim=k8s_client.V1PersistentVolumeClaimVolumeSource(claim_name='efs-claim')))

if __name__ == '__main__':
  import kfp.compiler as compiler
  compiler.Compiler().compile(taxi_cab_classification, __file__ + '.zip')

After successful installation, the command dsl-compile should be available. You can use this command to verify it

In [7]:
!which dsl-compile

/opt/conda/bin/dsl-compile
