# Creating a reproducible pipeline using TFX

![](https://www.tensorflow.org/tfx/guide/images/prog_fin.png)
          

## Story
1. To make work reproducible we define the workflow as a TFX pipeline
1. Additionally we generate statistics from the data we use for training
1. After training, before pushing to next stage (possibly integration or production) we execute a couple of tests working as binary predictates
   * Tesla does this with their models for autonomous driving: https://youtu.be/hx7BXih7zx8?t=776
   * Could be like: 
     * for well known inputs we expect outputs known to be correct within a certain margin
     * certain scores need to be matched
     

## Resources     
Parts copied from https://www.tensorflow.org/tfx/guide#tfx_standard_components: 
1. **Basics:** https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/penguin_simple.ipynb
  * ExampleGen is the initial input component of a pipeline that ingests and optionally splits the input dataset: https://www.tensorflow.org/tfx/guide/examplegen
  * Trainer trains the model: https://www.tensorflow.org/tfx/guide/trainer
  * Pusher deploys the model on a serving infrastructure: https://www.tensorflow.org/tfx/guide/pusher
1. **Data validation:** https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/penguin_tfdv.ipynb
  * https://www.tensorflow.org/tfx/guide/tfdv
  * StatisticsGen calculates statistics for the dataset: https://www.tensorflow.org/tfx/guide/statsgen
  * SchemaGen examines the statistics and creates a data schema: https://www.tensorflow.org/tfx/guide/schemagen
  * ExampleValidator looks for anomalies and missing values in the dataset: https://www.tensorflow.org/tfx/guide/exampleval
1. **Feature Engineering:** https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/penguin_tft.ipynb
  * https://www.tensorflow.org/tfx/guide/tft
  * Transform performs feature engineering on the dataset: https://www.tensorflow.org/tfx/guide/transform
1. **Model Analysis:** https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/penguin_tfma.ipynb
  * https://www.tensorflow.org/tfx/guide/tfma
  * Evaluator performs deep analysis of the training results and helps you validate your exported models, ensuring that they are "good enough" to be pushed to production: https://www.tensorflow.org/tfx/guide/evaluator

https://www.tensorflow.org/tfx/tutorials



In [1]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.6.0
TFX version: 1.3.0


In [2]:
from absl import logging

# logging.set_verbosity(logging.INFO)
logging.set_verbosity(logging.WARNING)

In [3]:
DATA_DIR = "data"

In [4]:
!mkdir {DATA_DIR}

mkdir: cannot create directory ‘data’: File exists


In [5]:
!curl -o {DATA_DIR}/data.csv https://raw.githubusercontent.com/embarced/notebooks/master/mlops/insurance-customers-risk-1500.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 54421  100 54421    0     0   345k      0 --:--:-- --:--:-- --:--:--  342k


In [6]:
!ls -l data

total 112
-rw-r--r-- 1 olli olli 54421 Oct 29 16:02 data.csv
-rw-r--r-- 1 olli olli 54500 Oct 27 16:09 drifted-data.csv


In [7]:
!head {DATA_DIR}/data.csv

speed,age,miles,group,risk
97.0,44.0,30.0,1,0.5976112279191053
135.0,63.0,29.0,1,0.4527103520003165
111.0,26.0,34.0,0,0.750233962021037
97.0,25.0,10.0,1,0.32524900971290915
114.0,38.0,22.0,2,0.26973096398543817
130.0,55.0,34.0,0,0.5871633471963134
118.0,40.0,51.0,0,0.8753213424169751
143.0,42.0,34.0,2,0.23665405507569381
110.0,43.0,31.0,2,0.0


In [8]:
!ls -l

total 14844
-rw-r--r--  1 olli olli 1433875 Oct 28 13:48 1_mlops_train.ipynb
-rw-r--r--  1 olli olli 1819071 Oct 28 14:03 1_mlops_train_nsl.ipynb
-rw-r--r--  1 olli olli  175864 Oct 28 14:03 2_mlops_serve.ipynb
-rw-r--r--  1 olli olli 1687116 Oct 28 14:03 3_mlops_shift.ipynb
-rw-r--r--  1 olli olli   47260 Oct 29 16:02 4_mlops_tfx_training.ipynb
drwxr-xr-x  2 olli olli    4096 Oct 29 16:00 __pycache__
drwxr-xr-x  4 olli olli    4096 Oct 28 10:40 classifier
-rw-r--r--  1 olli olli 3126552 Oct 28 10:40 classifier.h5
-rw-r--r--  1 olli olli 2891405 Oct 28 10:40 classifier.tgz
drwxr-xr-x  2 olli olli    4096 Oct 27 16:09 data
drwxr-xr-x  2 olli olli    4096 Oct 27 16:12 drifted_data
-rw-r--r--  1 olli olli 2110237 Aug 14 20:29 generate.ipynb
drwxr-xr-x  3 olli olli    4096 Oct 24 18:42 insurance
-rw-r--r--  1 olli olli   54500 Aug 14 20:29 insurance-customers-risk-1500-shift.csv
-rw-r--r--  1 olli olli   54435 Aug 14 20:29 insurance-customers-risk-1500-test.csv
-rw-r--r--  

In [9]:
_trainer_module_file = 'trainer.py'

In [10]:
%%writefile {_trainer_module_file}

from tensorflow.keras.layers import InputLayer, Dense, Dropout, \
                                    BatchNormalization, Activation,\
                                    Input, concatenate
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = ['age', 'speed']
_LABEL_KEY = 'group'

_TRAIN_BATCH_SIZE = 32
_EVAL_BATCH_SIZE = 32

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}

num_features = len(_FEATURE_KEYS)
dropout = 0.5

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """

  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = Dense(500)(d)
    d = Activation('relu')(d)
    d = BatchNormalization()(d)
    d = Dropout(dropout)(d)

  outputs = Dense(name='output', units=3, activation='softmax')(d)

  model = keras.Model(inputs=inputs, outputs=outputs)

  model.compile(
      optimizer=keras.optimizers.Adam(),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=logging.info)
  return model


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')

Overwriting trainer.py


In [11]:
# !cat trainer.py

In [12]:
import tensorflow_model_analysis as tfma

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
  """Creates a three component penguin pipeline with TFX."""
  # Brings data into the pipeline.
  example_gen = tfx.components.CsvExampleGen(input_base=data_root)


  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  # Uses user-provided Python function that trains a model.
  trainer = tfx.components.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=5000),
      eval_args=tfx.proto.EvalArgs(num_steps=15))

  #Get the latest blessed model for Evaluator.
  model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')

  #Uses TFMA to compute evaluation statistics over features of a model and
  #   perform quality validation of a candidate model (compared to a baseline).

  eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='group')],
      slicing_specs=[
          # An empty slice spec means the overall slice, i.e. the whole dataset.
          tfma.SlicingSpec(),
          # Calculate metrics for each risk group
          tfma.SlicingSpec(feature_keys=['group']),
          ],
      metrics_specs=[
          tfma.MetricsSpec(per_slice_thresholds={
              'sparse_categorical_accuracy':
                  tfma.PerSliceMetricThresholds(thresholds=[
                      tfma.PerSliceMetricThreshold(
                          slicing_specs=[tfma.SlicingSpec()],
                          threshold=tfma.MetricThreshold(
                              value_threshold=tfma.GenericValueThreshold(
                                   lower_bound={'value': 0.6}),
                              # Change threshold will be ignored if there is no
                              # baseline model resolved from MLMD (first run).
                              change_threshold=tfma.GenericChangeThreshold(
                                  direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                                  absolute={'value': -1e-10}))
                       )]),
          })],
      )
  evaluator = tfx.components.Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      eval_config=eval_config)

  # Checks whether the model passed the validation steps and pushes the model
  # to a file destination if check passed.
  pusher = tfx.components.Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'], # Pass an evaluation result.
      push_destination=tfx.proto.PushDestination(
          filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  components = [
      example_gen,
      statistics_gen,
      trainer,

      # Following two components were added to the pipeline.
      model_resolver,
      evaluator,

      pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

In [13]:
%%time 

pipeline = _create_pipeline(
      pipeline_name='insurance-basic',
      pipeline_root='pipeline',
      data_root=DATA_DIR,
      module_file=_trainer_module_file,
      serving_model_dir='model',
      metadata_path='metadata.db')

tfx.orchestration.LocalDagRunner().run(pipeline)



INFO:tensorflow:Assets written to: pipeline/Trainer/model/108/Format-Serving/assets


INFO:tensorflow:Assets written to: pipeline/Trainer/model/108/Format-Serving/assets


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


CPU times: user 40 s, sys: 15.8 s, total: 55.8 s
Wall time: 36.2 s


In [14]:
from ml_metadata.proto import metadata_store_pb2
# Non-public APIs, just for showcase.
from tfx.orchestration.portable.mlmd import execution_lib

# TODO(b/171447278): Move these functions into the TFX library.

def get_latest_artifacts(metadata, pipeline_name, component_id):
  """Output artifacts of the latest run of the component."""
  context = metadata.store.get_context_by_type_and_name(
      'node', f'{pipeline_name}.{component_id}')
  executions = metadata.store.get_executions_by_context(context.id)
  latest_execution = max(executions,
                         key=lambda e:e.last_update_time_since_epoch)
  return execution_lib.get_artifacts_dict(metadata, latest_execution.id, 
                                          metadata_store_pb2.Event.OUTPUT)

# Non-public APIs, just for showcase.
from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

In [15]:
# Non-public APIs, just for showcase.
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config("metadata.db")

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  example_gen_output = get_latest_artifacts(metadata_handler, "insurance-basic", 'CsvExampleGen')
  example_artifacts = example_gen_output[standard_component_specs.EXAMPLES_KEY]

  stat_gen_output = get_latest_artifacts(metadata_handler, "insurance-basic", 'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  evaluator_output = get_latest_artifacts(metadata_handler, "insurance-basic", 'Evaluator')
  eval_artifact = evaluator_output[standard_component_specs.EVALUATION_KEY][0]

In [16]:
stats_artifacts[0].uri

'pipeline/StatisticsGen/statistics/107'

In [17]:
!ls -l {stats_artifacts[0].uri}

total 8
drwxr-xr-x 2 olli olli 4096 Oct 29 16:02 Split-eval
drwxr-xr-x 2 olli olli 4096 Oct 29 16:02 Split-train


In [18]:
!mkdir stats

mkdir: cannot create directory ‘stats’: File exists


In [19]:
!cp {stats_artifacts[0].uri}/Split-train/FeatureStats.pb stats/TrainFeatureStats.pb 

In [20]:
visualize_artifacts(stats_artifacts)

In [21]:
!ls -l pipeline/Evaluator

total 8
drwxr-xr-x 7 olli olli 4096 Oct 29 16:03 blessing
drwxr-xr-x 7 olli olli 4096 Oct 29 16:03 evaluation


In [22]:
!ls -l pipeline/Evaluator/blessing

total 20
drwxr-xr-x 2 olli olli 4096 Oct 29 16:00 103
drwxr-xr-x 2 olli olli 4096 Oct 29 16:03 109
drwxr-xr-x 2 olli olli 4096 Oct 29 15:38 85
drwxr-xr-x 2 olli olli 4096 Oct 29 15:45 91
drwxr-xr-x 2 olli olli 4096 Oct 29 15:53 97


In [23]:
!ls -l pipeline/Evaluator/blessing/91

total 0
-rw-r--r-- 1 olli olli 0 Oct 29 15:45 BLESSED


In [24]:
!ls -l pipeline/Evaluator/evaluation/91/plots

-rw-r--r-- 1 olli olli 105 Oct 29 15:45 pipeline/Evaluator/evaluation/91/plots


In [25]:
!ls -l model/

total 32
drwxr-xr-x 4 olli olli 4096 Oct 26 17:17 1635261467
drwxr-xr-x 4 olli olli 4096 Oct 26 17:32 1635262335
drwxr-xr-x 4 olli olli 4096 Oct 26 17:46 1635263194
drwxr-xr-x 4 olli olli 4096 Oct 27 15:57 1635343033
drwxr-xr-x 4 olli olli 4096 Oct 27 16:15 1635344127
drwxr-xr-x 4 olli olli 4096 Oct 27 16:49 1635346140
drwxr-xr-x 4 olli olli 4096 Oct 29 15:38 1635514703
drwxr-xr-x 4 olli olli 4096 Oct 29 15:45 1635515159


In [29]:
%matplotlib inline

In [30]:
import tensorflow_model_analysis as tfma

eval_result = tfma.load_eval_result(eval_artifact.uri)
tfma.view.render_slicing_metrics(eval_result, slicing_column='group')

SlicingMetricsViewer(config={'weightedExamplesColumn': 'example_count'}, data=[{'slice': 'group:0', 'metrics':…