In [None]:
# Install tfx and kfp Python packages.
import sys
#!{sys.executable} -m pip install --user --upgrade -q tfx==0.23.0
#!{sys.executable} -m pip install --user --upgrade -q kfp==1.0.0

!{sys.executable} -m pip install --user -q -U -v --log /tmp/pip.log --use-feature=2020-resolver tfx==0.24.0
!{sys.executable} -m pip install --user -q -U -v --log /tmp/pip.log --use-feature=2020-resolver kfp==1.0.0
# Download skaffold and set it executable.
!curl -Lo skaffold https://storage.googleapis.com/skaffold/releases/latest/skaffold-linux-amd64 && chmod +x skaffold && mv skaffold /home/jupyter/.local/bin/
    

Skaffold is a command line tool that facilitates continuous development for Kubernetes applications.

It helps us easily manage and handle workflows for building, pushing, and deploying applications. 

You’ll get to understand the use of Skaffold later. 

In [None]:
!python3 -c "import tfx; print('TFX version: {}'.format(tfx.__version__))"

In [None]:
# Set `PATH` to include user python binary directory and a directory containing `skaffold`.
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

In [None]:
# Read GCP project id from env.
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
GOOGLE_CLOUD_PROJECT=shell_output[0]
%env GOOGLE_CLOUD_PROJECT={GOOGLE_CLOUD_PROJECT}
print("GCP project ID:" + GOOGLE_CLOUD_PROJECT)

In [None]:
ENDPOINT='https://6ce118f4a72b5e60-dot-us-west1.pipelines.googleusercontent.com'


Next, you’ll create a Docker name which will be used by Skaffold in bundling your pipeline. 



In [None]:
CUSTOM_TFX_IMAGE='gcr.io/' + GOOGLE_CLOUD_PROJECT + '/advert-pred-pipeline'

In [None]:
import os
#set base path and working directory
#BASE_PATH = str(os.path.join(os.getcwd(), 'advert-pred'))
BASE_PATH = str(os.getcwd())
%cd {BASE_PATH}

EXPERIMENTATION ANALYSIS


In [None]:
import os
import pprint
import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [None]:
data_root = 'data'
data_filepath = os.path.join(data_root, "advertising.csv")
!head {data_filepath}

In [None]:
context = InteractiveContext()

In [None]:
example_gen = CsvExampleGen(input=external_input(data_root))
context.run(example_gen)

In [None]:
# %%skip_for_export

artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

In [None]:
context.show(statistics_gen.outputs['statistics'])


In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schema_gen)

context.show(schema_gen.outputs['schema'])

In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])
context.run(example_validator)

context.show(example_validator.outputs['anomalies'])

In [None]:
advert_transform = 'model/advert-transform.py'

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=advert_transform)

context.run(transform)

In [None]:
transform.outputs

## Trainer
The Trainer component is used to train a model defined in Tensorflow/Keras. 
The Trainer will accept the schema, the transformed data and transformation graph, transform parameters, as well as your model definition code. 

In [None]:
advert_trainer = 'model/advert-trainer.py'

trainer = Trainer(
    module_file=advert_trainer,
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=1000),
    eval_args=trainer_pb2.EvalArgs(num_steps=500))

context.run(trainer)

## Evaluator
The Evaluator component computes model performance metrics in the evaluation set. It can also be used to validate any newly trained models. 

This is useful when you are improving and testing new models in production. To set up the Evaluator, you need to define a configuration. 

The configuration simply instructs the Evaluator on what metrics to report, what threshold to use in evaluating new models, and so on. See more about this here. 

In [None]:
eval_config = tfma.EvalConfig(
    model_specs=[tfma.ModelSpec(label_key='ClickedOnAd')],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.5}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': -1e-10})))
            ]
        )
    ])

model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(model_resolver)

In [None]:
evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)
context.run(evaluator)

In [None]:
context.show(evaluator.outputs['evaluation'])


Above, you can see the metrics reported by the Evaluator. If you train a new model, the performance is going to be compared with the baseline model, which in our case does not exist because this is our first model. 

Also, the Evaluator can tell us that a model is BLESSED or NOT BLESSED. A BLESSED model has successfully passed all evaluation criteria and is better than the current model. This can be then pushed and served by the Pusher component, else it throws an error. This means that you can easily automate model deployment. 

In [None]:
blessing_uri = evaluator.outputs.blessing.get()[0].uri
!ls -l {blessing_uri}

Note:
Our model is automatically blessed since it is the first model in our pipeline. If you train another model, and rerun the Evaluator pipeline, then it will be compared with the current model and it becomes BLESSED or NOT BLESSED.

## Pusher
The Pusher component which always comes last in the pipeline is used to export a BLESSED model to the serving directory. 

In [None]:
serving_model_dir = 'serving_model/advert-pred'

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=serving_model_dir)))
context.run(pusher)

## SET UP KUBEFLOW PIPELINE


In [None]:
!gsutil cp data/advertising.csv gs://{GOOGLE_CLOUD_PROJECT}-kubeflowpipelines-default/advert-pred/data/advertising.csv

In [None]:
PIPELINE_NAME = 'adclicked_pipeline'

In [None]:
!tfx pipeline create  \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT} \
--build-target-image={CUSTOM_TFX_IMAGE}

In [None]:
!tfx run create --pipeline-name={PIPELINE_NAME} --endpoint={ENDPOINT}

If you make an update to your pipeline, you can easily update and run it using the code below:

In [None]:
!tfx pipeline update \
--pipeline-path=kubeflow_dag_runner.py \
--endpoint={ENDPOINT}
!tfx run create --pipeline-name {PIPELINE_NAME} --endpoint={ENDPOINT}