In [1]:
!python3 -m pip install pip --upgrade --quiet --user
!python3 -m pip install kfp --upgrade --quiet --user
!python3 -m pip install tfx==0.22.0 --quiet --user

In [2]:
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


In [3]:
import os
from typing import Text

import kfp

import tensorflow_model_analysis as tfma

from tfx.components import Evaluator
from tfx.components import CsvExampleGen
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.orchestration import data_types
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import pusher_pb2
from tfx.utils.dsl_utils import external_input



In [4]:
pipeline_name = 'startup_success_pipeline'
pipeline_root = os.path.join('gs://hamoye-startup-success-kubeflowpipelines-default', 'std_mrg', kfp.dsl.RUN_ID_PLACEHOLDER)
# Location of input data, should be a GCS path under which there is a csv file.
data_root_param = data_types.RuntimeParameter(
    name='data-root',
    default="gs://hamoye-startup-success-kubeflowpipelines-default/'std_mrg.csv",
    ptype=Text,
)

In [5]:
startup_module_file_param = data_types.RuntimeParameter(
    name='module-file',
    default='gs://hamoye-startup-success-kubeflowpipelines-default/pipeline_util.py',
    ptype=Text,
)

# Number of epochs in training.
train_steps = data_types.RuntimeParameter(
    name='train-steps',
    default=10,
    ptype=int,
)

# Number of epochs in evaluation.
eval_steps = data_types.RuntimeParameter(
    name='eval-steps',
    default=5,
    ptype=int,
)

In [6]:
# The input data location is parameterized by _data_root_param
examples = external_input(data_root_param)
example_gen = CsvExampleGen(input=examples)

statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

infer_schema = SchemaGen(
    statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)

validate_stats = ExampleValidator(
  statistics=statistics_gen.outputs['statistics'],
  schema=infer_schema.outputs['schema'])

# The module file used in Transform and Trainer component is paramterized by
# _taxi_module_file_param.
transform = Transform(
  examples=example_gen.outputs['examples'],
  schema=infer_schema.outputs['schema'],
  module_file=startup_module_file_param)

# The numbers of steps in train_args are specified as RuntimeParameter with
# name 'train-steps' and 'eval-steps', respectively.
trainer = Trainer(
  module_file=startup_module_file_param,
  transformed_examples=transform.outputs['transformed_examples'],
  schema=infer_schema.outputs['schema'],
  transform_graph=transform.outputs['transform_graph'],
  train_args={'num_steps': train_steps},
  eval_args={'num_steps': eval_steps})

# Set the TFMA config for Model Evaluation and Validation.
eval_config = tfma.EvalConfig(
    model_specs=[
      # Using signature 'eval' implies the use of an EvalSavedModel. To use
      # a serving model remove the signature to defaults to 'serving_default'
      # and add a label_key.
      tfma.ModelSpec(signature_name='eval')
    ],
    metrics_specs=[
      tfma.MetricsSpec(
          # The metrics added here are in addition to those saved with the
          # model (assuming either a keras model or EvalSavedModel is used).
          # Any metrics added into the saved model (for example using
          # model.compile(..., metrics=[...]), etc) will be computed
          # automatically.
          metrics=[
              tfma.MetricConfig(class_name='ExampleCount')
          ],
          # To add validation thresholds for metrics saved with the model,
          # add them keyed by metric name to the thresholds map.
          thresholds = {
              'binary_accuracy': tfma.MetricThreshold(
                  value_threshold=tfma.GenericValueThreshold(
                      lower_bound={'value': 0.5}),
                  change_threshold=tfma.GenericChangeThreshold(
                     direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                     absolute={'value': -1e-10}))
          }
      )
    ],
    slicing_specs=[
      # An empty slice spec means the overall slice, i.e. the whole dataset.
      tfma.SlicingSpec(),
      # Data can be sliced along a feature column. In this case, data is
      # sliced along feature column trip_start_hour.
      tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

# The name of slicing column is specified as a RuntimeParameter.
evaluator = Evaluator(
  examples=example_gen.outputs['examples'],
  model=trainer.outputs['model'],
  eval_config=eval_config)

pusher = Pusher(
  model=trainer.outputs['model'],
  model_blessing=evaluator.outputs['blessing'],
  push_destination=pusher_pb2.PushDestination(
      filesystem=pusher_pb2.PushDestination.Filesystem(
          base_directory=os.path.join(
              str(pipeline.ROOT_PARAMETER), 'model_serving'))))

In [7]:
# Create the DSL pipeline object.
# This pipeline obj carries the business logic of the pipeline, but no runner-specific information
# was included.
dsl_pipeline = pipeline.Pipeline(
  pipeline_name=pipeline_name,
  pipeline_root=pipeline_root,
  components=[
      example_gen, statistics_gen, infer_schema, validate_stats, transform,
      trainer,evaluator, pusher
  ],
  enable_cache=True,
  beam_pipeline_args=['--direct_num_workers=%d' % 0],
)

In [8]:
# Specify a TFX docker image. For the full list of tags please see:
tfx_image = 'gcr.io/tfx-oss-public/tfx:0.22.0'
config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
      kubeflow_metadata_config=kubeflow_dag_runner
      .get_default_kubeflow_metadata_config(),
      tfx_image=tfx_image)
kfp_runner = kubeflow_dag_runner.KubeflowDagRunner(config=config)
# KubeflowDagRunner compiles the DSL pipeline object into KFP pipeline package.
# By default it is named <pipeline_name>.tar.gz
kfp_runner.run(dsl_pipeline)

In [9]:
run_result = kfp.Client(
    host='76a815540f141312-dot-us-central2.pipelines.googleusercontent.com/'  # Put your KFP endpoint here
).create_run_from_pipeline_package(
    pipeline_name + '.tar.gz', 
    arguments={
        # Uncomment following lines in order to use custom GCS bucket/module file/training data.
        # 'pipeline-root': 'gs://<your-gcs-bucket>/tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,
        # 'module-file': '<gcs path to the module file>',  # delete this line to use default module file.
        # 'data-root': '<gcs path to the data>'  # delete this line to use default data.
})