**Chicago Taxi example using TensorFlow Extended (TFX)**

In [None]:

!pip install -q -U --use-feature=2020-resolver tfx

In [None]:
# Import required packages
import os
import pprint
import tempfile
import urllib
import absl
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()
import tfx
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
from tfx.utils.dsl_utils import external_input
from typing import List, Text
import os
import absl
import datetime
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options
import taxi_constants
import tensorflow_model_analysis as tfma
%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip

In [None]:
# Check the library versions
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

**Pipeline path setup**

In [None]:
# Below is the root directory for TFX pip package installation.
_tfx_root = tfx.__path__[0]

# Directory containing the TFX Chicago Taxi Pipeline example.
_taxi_root = os.path.join(_tfx_root, 'examples/chicago_taxi_pipeline')

# Model serving path.
_serving_model_dir = os.path.join(tempfile.mkdtemp(), 'serving_model/taxi_simple')

# Set up logging.
absl.logging.set_verbosity(absl.logging.INFO)

**Download Taxi Trips dataset**

In [None]:
_data_root = tempfile.mkdtemp(prefix='tfx-data')
dataset = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv'
_data_filepath = os.path.join(_data_root, "data.csv")
urllib.request.urlretrieve(dataset, _data_filepath)

In [None]:
# Check the dataset
!head {_data_filepath}

**Interactive Context Creation**

In [None]:
# Create InteractiveContext allowing to run TFX components interactively.
interactive_context = InteractiveContext()

**ExampleGen**

The `ExampleGen` component will perform following activities:

1.   Split data into training and evaluation sets (by default, 2/3 training + 1/3 eval)
2.   Convert data into the `tf.Example` format
3.   Copy data into the `_tfx_root` directory for other components to access


In [None]:
example_gen = CsvExampleGen(input=external_input(_data_root))
interactive_context.run(example_gen)

In [None]:
# Check examplegen output artifacts containing training and evaluation examples

artifact = example_gen.outputs['examples'].get()[0]
print(artifact.split_names, artifact.uri)

In [None]:
# URI of the output artifact representing the training examples
train_uri = os.path.join(example_gen.outputs['examples'].get()[0].uri, 'train')

tfrecord_filename = [os.path.join(train_uri, name) for name in os.listdir(train_uri)]

# Create a `TFRecordDataset`
dataset = tf.data.TFRecordDataset(tfrecord_filename, compression_type="GZIP")

# Show first three records and decode them
for i in dataset.take(3):
  serialized_example = i.numpy()
  record = tf.train.Example()
  record.ParseFromString(serialized_example)
  pp.pprint(record)

**Data Analysis with StatisticsGen** </br>
The StatisticsGen component computes statistics over the dataset for data analysis and for downstream components.

In [None]:
statistic_gen = StatisticsGen(examples=example_gen.outputs['examples'])
context.run(statistic_gen)

In [None]:
# Visualization of output statistics
context.show(statistic_gen.outputs['statistics'])

**SchemaGen**

The SchemaGen component generates a schema based on data statistics.

In [None]:
schemagen = SchemaGen(
    statistics=statistic_gen.outputs['statistics'],
    infer_feature_shape=False)
context.run(schemagen)

In [None]:
# Visualization of generated schema in table format
context.show(schemagen.outputs['schema'])

**ExampleValidator** </br>
Based on input statistics and Schema, the ExampleValidator component detects anomalies present in data

In [None]:
examples_validator = ExampleValidator( statistics=statistics_gen.outputs['statistics'], schema=schema_gen.outputs['schema'])
context.run(examples_validator)

In [None]:
# Visualization of anamolies
context.show(example_validator.outputs['anomalies'])

**Transform** </br>
Based on input data from ExampleGen, schema from Schemagen and module containing user-defined transform code, the Transform component performs feature engineering for both training and serving. 


In [None]:
_taxi_constant_module_file = 'taxi_constants.py'

In [None]:
%%writefile {_taxi_constant_module_file}

# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUE = [24, 31, 12]

CATEGORICAL_FEATURE_KEY = ['trip_start_hour', 'trip_start_day', 'trip_start_month',
    'pickup_census_tract', 'dropoff_census_tract', 'pickup_community_area',
    'dropoff_community_area']

DENSE_FLOAT_FEATURE_KEY = ['trip_miles', 'fare', 'trip_seconds']

FEATURE_BUCKET_COUNT = 10

BUCKET_FEATURE_KEY = ['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude']

# No. of vocabulary term used for encoding VOCAB_FEATURE by tf.transform
VOCAB_SIZE = 1000

# No. of out-of-vocab buckets
OOV_SIZE = 10
VOCAB_FEATURE_KEY = ['payment_type','company',]
LABEL_KEY = 'tips'
FARE_KEY = 'fare'

def transformed_name(key):
  return key + '_xf'

In [None]:
_taxi_transform_module_file = 'taxi_transform.py'

In [None]:
# 
%%writefile {_taxi_transform_module_file}

import tensorflow as tf
import tensorflow_transform as tft
import taxi_constants

_DENSE_FLOAT_FEATURE_KEY = taxi_constants.DENSE_FLOAT_FEATURE_KEY
_VOCAB_FEATURE_KEY = taxi_constants.VOCAB_FEATURE_KEY
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEY = taxi_constants.BUCKET_FEATURE_KEY
_CATEGORICAL_FEATURE_KEY = taxi_constants.CATEGORICAL_FEATURE_KEY
_FARE_KEY = taxi_constants.FARE_KEY
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name


def preprocessing(inputs):
# This function returns a map from string feature key to transformed feature operations.
  output = {}
  for key in _DENSE_FLOAT_FEATURE_KEY:
    # Preserve this feature as a dense float, setting nan's to the mean.
    output[_transformed_name(key)] = tft.scale_to_z_score(
        _fill_in_missing(inputs[key]))

  for key in _VOCAB_FEATURE_KEY:
    # Build a vocabulary for this feature.
    output[_transformed_name(key)] = tft.compute_and_apply_vocabulary(
        _fill_in_missing(inputs[key]),
        top_k=_VOCAB_SIZE,
        num_oov_buckets=_OOV_SIZE)

  for key in _BUCKET_FEATURE_KEY:
    output[_transformed_name(key)] = tft.bucketize(
        _fill_in_missing(inputs[key]), _FEATURE_BUCKET_COUNT)

  for key in _CATEGORICAL_FEATURE_KEY:
    output[_transformed_name(key)] = _fill_in_missing(inputs[key])

  taxi_fare = _fill_in_missing(inputs[_FARE_KEY])
  tips = _fill_in_missing(inputs[_LABEL_KEY])
  output[_transformed_name(_LABEL_KEY)] = tf.where(
      tf.math.is_nan(taxi_fare),
      tf.cast(tf.zeros_like(taxi_fare), tf.int64),
      # Check whether the tip was > 20% of the fare
      tf.cast(tf.greater(tips, tf.multiply(taxi_fare, tf.constant(0.2))), tf.int64))
  return output

def _fill_in_missing_values(x):
# This function replaces missing values in a SparseTensor.
  default_values = '' if x.dtype == tf.string else 0
  return tf.squeeze(tf.sparse.to_dense(tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),default_values), axis=1)

Now, we pass in this feature engineering code to the `Transform` component and run it to transform your data.

In [None]:
# Transform data
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath(_taxi_transform_module_file))
context.run(transform)

In [None]:
# Check the transform graphs and transformed examples generated by Trasnform module.
transform.outputs

In [None]:
# Check transform output directories
train_uri = transform.outputs['transform_graph'].get()[0].uri
os.listdir(train_uri)

In [None]:
# URI of the output artifact
train_uri = os.path.join(transform.outputs['transformed_examples'].get()[0].uri, 'train')
# TFRecord files
tfrecord_filename = [os.path.join(train_uri, name) for name in os.listdir(train_uri)]
# Create TFRecord Dataset
tfrecord_dataset = tf.data.TFRecordDataset(tfrecord_filename, compression_type="GZIP")
# Iterate over the first 3 records and decode them.
for tf_record in tfrecord_dataset.take(3):
  serialized_example = tf_record.numpy()
  record = tf.train.Example()
  record.ParseFromString(serialized_example)
  pp.pprint(record)

**Trainer** </br>
The `Trainer` component trains a model that you define in TensorFlow.

In [None]:
_taxi_trainer_module_file = 'taxi_trainer.py'

In [None]:
%%writefile {_taxi_trainer_module_file}

_DENSE_FLOAT_FEATURE_KEY = taxi_constants.DENSE_FLOAT_FEATURE_KEY
_VOCAB_FEATURE_KEY = taxi_constants.VOCAB_FEATURE_KEY
_VOCAB_SIZE = taxi_constants.VOCAB_SIZE
_OOV_SIZE = taxi_constants.OOV_SIZE
_FEATURE_BUCKET_COUNT = taxi_constants.FEATURE_BUCKET_COUNT
_BUCKET_FEATURE_KEY = taxi_constants.BUCKET_FEATURE_KEY
_CATEGORICAL_FEATURE_KEY = taxi_constants.CATEGORICAL_FEATURE_KEY
_max_categorical_feature_value = taxi_constants.max_categorical_feature_value
_LABEL_KEY = taxi_constants.LABEL_KEY
_transformed_name = taxi_constants.transformed_name

def _transformed_name(keys):
  return [_transformed_name(k) for k in keys]

def _get_serve_tf_example(model, tf_transform_output):
  model.tft_layer = tf_transform_output.transform_features_layer()
  @tf.function
  def serve_tf_examples(serialized_tf_example):
    feature_specs = tf_transform_output.raw_feature_spec()
    feature_specs.pop(_LABEL_KEY)
    parsed_feature = tf.io.parse_example(serialized_tf_example, feature_specs)
    transformed_feature = model.tft_layer(parsed_feature)
    return model(transformed_feature)
  return serve_tf_examples

def _input_fn(file_pattern: List[Text], data_accessor: DataAccessor, tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  return data_accessor.tf_dataset_factory(file_pattern, dataset_options.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_transformed_name(_LABEL_KEY)),
      tf_transform_output.transformed_metadata.schema)

def _build_keras_model(hidden_units: List[int] = None) -> tf.keras.Model:
  # Creates a DNN Keras model for taxi data classification.
  real_valued_column = [
      tf.feature_column.numeric_column(key, shape=())
      for key in _transformed_names(_DENSE_FLOAT_FEATURE_KEY)
  ]
  categorical_column = [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_VOCAB_SIZE + _OOV_SIZE, default_value=0)
      for key in _transformed_names(_VOCAB_FEATURE_KEY)
  ]
  categorical_column += [
      tf.feature_column.categorical_column_with_identity(
          key, num_buckets=_FEATURE_BUCKET_COUNT, default_value=0)
      for key in _transformed_names(_BUCKET_FEATURE_KEY)
  ]
  categorical_column += [
      tf.feature_column.categorical_column_with_identity( 
          key,
          num_buckets=num_buckets,
          default_value=0) for key, num_buckets in zip(
              _transformed_names(_CATEGORICAL_FEATURE_KEY),
              _max_categorical_feature_value)
  ]
  indicator_column = [
      tf.feature_column.indicator_column(categorical_column)
      for categorical_column in categorical_columns
  ]

  model_1 = _wide_and_deep_classifier(
      wide_columns=indicator_column,
      deep_columns=real_valued_column,
      dnn_hidden_units=hidden_units or [100, 70, 50, 25])
  return model_1


def _wide_and_deep_classifier(wide_column, deep_column, dnn_hidden_unit):
  # Builds a simple keras wide and deep model.
  input_layer = {
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype=tf.float32)
      for colname in _transformed_name(_DENSE_FLOAT_FEATURE_KEY)
  }
  input_layer.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_name(_VOCAB_FEATURE_KEY)
  })
  input_layer.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_name(_BUCKET_FEATURE_KEY)
  })
  input_layer.update({
      colname: tf.keras.layers.Input(name=colname, shape=(), dtype='int32')
      for colname in _transformed_name(_CATEGORICAL_FEATURE_KEY)
  })

  deep = tf.keras.layers.DenseFeatures(deep_column)(input_layer)
  for num_nodes in dnn_hidden_units:
    deep = tf.keras.layers.Dense(num_nodes)(deep)
  wide = tf.keras.layers.DenseFeatures(wide_column)(input_layer)

  output = tf.keras.layers.Dense(
      1, activation='sigmoid')(
          tf.keras.layers.concatenate([deep, wide]))

  model_1 = tf.keras.Model(input_layer, output)
  model_1.compile(
      loss='binary_crossentropy',
      optimizer=tf.keras.optimizers.Adam(lr=0.001),
      metrics=[tf.keras.metrics.BinaryAccuracy()])
  model_1.summary(print_fn=absl.logging.info)
  return model_1

def run_fn(fn_arg: TrainerFnArgs):
  # Trains the model
  # Define parameters
  first_dnn_layer_size = 100
  num_dnn_layer = 4
  dnn_decay_factor = 0.7

  tf_transform_output = tft.TFTransformOutput(fn_arg.transform_output)

  train_dataset = _input_fn(fn_arg.train_files, fn_arg.data_accessor, 
                            tf_transform_output, 40)
  eval_dataset = _input_fn(fn_arg.eval_files, fn_arg.data_accessor, 
                           tf_transform_output, 40)

  model = _build_keras_model(
      hidden_unit=[
          max(2, int(first_dnn_layer_size * dnn_decay_factor**i))
          for i in range(num_dnn_layer)
      ])

  tensorboard_callback = tf.keras.callbacks.TensorBoard(
      log_dir=fn_arg.model_run_dir, update_freq='batch')
  model.fit(
      train_dataset,
      steps_per_epoch=fn_arg.train_step,
      validation_data=eval_dataset,
      validation_step=fn_arg.eval_step,
      callback=[tensorboard_callback])

  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

In [None]:
# Train the model using trainer component
trainer = Trainer(
    module_file=os.path.abspath(_taxi_trainer_module_file),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=10000),
    eval_args=trainer_pb2.EvalArgs(num_steps=5000))
context.run(trainer)

**Analyze Training**


In [None]:
model_artifact_directory = trainer.outputs['model'].get()[0].uri
pp.pprint(os.listdir(model_artifact_dirrectory))
model_dir = os.path.join(model_artifact_directory, 'serving_model_dir')
pp.pprint(os.listdir(model_dir))

In [None]:
model_run_artifact_dir = trainer.outputs['model_run'].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {model_run_artifact_dir}

**Evaluator** </br>
The Evaluator component computes model performance metrics.

In [None]:
eval_configuration = tfma.EvalConfig(
    model_spec=[
        # This assumes a serving model with signature 'serving_default'. 
        tfma.ModelSpec(label_key='tips')],
    metrics_spec=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.5}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': -1e-10})))
            ]
        )
    ],
    slicing_spec=[
        tfma.SlicingSpec(),
        tfma.SlicingSpec(feature_keys=['trip_start_hour'])
    ])

In [None]:
# Compute a evaluation statistics over features of a model and validate them.
modelresolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
context.run(modelresolver)

stats_evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)
context.run(stats_evaluator)

In [None]:
stats_evaluator.outputs

In [None]:
# Visualization of evalation output
context.show(evaluator.outputs['evaluation'])

In [None]:
# Get the TFMA output result path and load the result.
path_to_result = evaluator.outputs['evaluation'].get()[0].uri
tfma_results = tfma.load_eval_result(path_to_result)
tfma.view.render_slicing_metrics(
    tfma_results, slicing_column='trip_start_hour')

In [None]:
path_result = evaluator.outputs['evaluation'].get()[0].uri
print(tfma.load_validation_result(path_result))

**Pusher** </br>
The Pusher component checks whether a model has passed validation.

In [None]:
pusher_component = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))
context.run(pusher_component)

In [None]:
# check the output of pusher component
pusher_component.outputs

In [None]:
# If model was validated successfully, Pusher component will export the model in savedmodel format.
pushuri = pusher.outputs.model_push.get()[0].uri
model = tf.saved_model.load(pushuri)
for i in model.signatures.items():
  pp.pprint(i)