##### Copyright 2021 The TensorFlow Authors.

In [None]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Feature Engineering with TFX Pipeline Tutorial using Penguin dataset

***Transform input data and traing a model with a TFX pipeline.***

Note: We recommend running this tutorial in a Colab notebook, with no setup required!  Just click "Run in Google Colab".

<div class="devsite-table-wrapper"><table class="tfo-notebook-buttons" align="left">
<td><a target="_blank" href="https://www.tensorflow.org/tfx/tutorials/tfx/penguin_transform">
<img src="https://www.tensorflow.org/images/tf_logo_32px.png"/>View on TensorFlow.org</a></td>
<td><a target="_blank" href="https://colab.research.google.com/github/tensorflow/tfx/blob/master/docs/tutorials/tfx/penguin_transform.ipynb">
<img src="https://www.tensorflow.org/images/colab_logo_32px.png">Run in Google Colab</a></td>
<td><a target="_blank" href="https://github.com/tensorflow/tfx/tree/master/docs/tutorials/tfx/penguin_transform.ipynb">
<img width=32px src="https://www.tensorflow.org/images/GitHub-Mark-32px.png">View source on GitHub</a></td>
<td><a href="https://storage.googleapis.com/tensorflow_docs/tfx/docs/tutorials/tfx/penguin_transform.ipynb"><img src="https://www.tensorflow.org/images/download_logo_32px.png" />Download notebook</a></td>
</table></div>

In this notebook-based tutorial, we will create and run a TFX pipeline
to ingest raw input data and preprocess it appropriate for ML training.
This notebook is based on the TFX pipeline we built at
[Data validation using TFX Pipeline and TFDV Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_tfdv).
Please read it first if you missed it.

You can increase the predictive quality of your data and/or reduce
dimensionality with feature engineering. One of the benefits of using TFX is
that you will write your transformation code once, and the resulting transforms
will be consistent between training and serving.

We will add `Transform` component to the pipeline. Transform component is 
implemented using
[TensorFlow Transform](https://www.tensorflow.org/tfx/transform/get_started) library.

Please see
[Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)
guide to learn more about various concepts in TFX.

## Set Up
We first need to install TFX Python package and download
dataset for our ML model.

### Upgrade Pip

To avoid upgrading Pip in a system when running locally,
check to make sure that we are running in Colab.
Local systems can of course be upgraded separately.

In [None]:
try:
  import colab
  !pip install --upgrade pip
except:
  pass

### Install TFX


In [None]:
# TODO(b/178712706): Stop using legacy resolver after PIP issue is resolved.
!pip install -U --use-deprecated=legacy-resolver tfx==0.26.1

### Did you restart the runtime?

If you are using Google Colab, the first time that you run
the cell above, you must restart the runtime by clicking
above "RESTART RUNTIME" button or using "Runtime > Restart
runtime ..." menu. This is because of the way that Colab
loads packages.

Check the TensorFlow and TFX versions.

In [None]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
import tfx
print('TFX version: {}'.format(tfx.__version__))

### Set up variables

There are some variables used to define a pipeline. You can customize these
variables as you want. By default all output from the pipeline will be
generated under the current directory.

In [None]:
import os

PIPELINE_NAME = "penguin-transform"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)
# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')
# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

### Prepare example data
We download the example dataset for use in our TFX pipeline. The dataset we
are using is
[Palmer Penguins dataset](https://allisonhorst.github.io/palmerpenguins/articles/intro.html).

However, unlike previous tutorials which used already preprocessed dataset, we will use **raw** Palmer Penguins dataset.


Because TFX ExampleGen reads inputs from a directory, we need to create a
directory and copy dataset to it.

In [None]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_path = 'https://storage.googleapis.com/download.tensorflow.org/data/palmer_penguins/penguins_size.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_path, _data_filepath)

Take a quick look at how the raw data looks like.

In [None]:
!head {_data_filepath}

There are some entries with missing values which represendted as `NA`.
We will just delete those entries in this tutorial. It means that we expect that
all feature values are at inference time.

In [None]:
!sed -i '/\bNA\b/d' {_data_filepath}
!head {_data_filepath}

You should be able to see seven features. We will use the same features as last
tutorials. The only difference will be the input data is not preprocessed.

## Create a pipeline

TFX pipelines are defined using Python APIs. We will add `Transform` component to the pipeline we created in the [Data Validation tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple).

![Transform Component](https://www.tensorflow.org/tfx/tutorials/tfx/images/airflow_workshop/transform.png)

Transform component requires input data from ExampleGen and schema from SchemaGen and produces a "transform graph" and "transformed data". Both of outputs will be used in `Trainer` component.

One thing to note is that we need to define a Python function to describe how input data should be transformed. This is similar to Trainer component which also requires user code for model definition.

### Write preprocessing and training code

We need to define two Python functions. One for Transform and one for Trainer.

#### preprocessing_fn
Transform component will find a function named `preprocessing_fn` in the given module file. You can also specify the specific function using [`preprocessing_fn` parameter](https://github.com/tensorflow/tfx/blob/142de6e887f26f4101ded7925f60d7d4fe9d42ed/tfx/components/transform/component.py#L113) of Transform component.

In this example, we will do two kinds of transformation. For continuous numeric features like `culmen_length_mm` or `body_mass_g`, we will normalize values using [tft.scale_to_z_score](https://www.tensorflow.org/tfx/transform/api_docs/python/tft/scale_to_z_score) function. For the label function, we will convert string labels into numeric index values. We will use [`tf.lookup.StaticHashTable`](https://www.tensorflow.org/api_docs/python/tf/lookup/StaticHashTable) for conversion.

To identify transformed fields easily, we append `_xf` suffix to the transformed
feature names.

#### run_fn

The model itself is the same as the previous tutorials, but this time we can
get the schema from output of Transform component.

One more important differnce is that now we export a model for serving which includes not only computation graph of the model, but also computation graph
for preprocessing which is generated in Transform component.

In [None]:
_module_file = 'penguin_utils.py'

In [None]:
%%writefile {_module_file}

from typing import List, Text
import absl
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import schema_utils

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options


# Specify features that we will use.
_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# NEW: Transformed features will have '_xf' suffix.
def _transformed_name(key):
  return key + '_xf'


# NEW: TFX Transform will call this function.
def preprocessing_fn(inputs):
  """tf.transform's callback function for preprocessing inputs.

  Args:
    inputs: map from feature keys to raw not-yet-transformed features.

  Returns:
    Map from string feature key to transformed feature operations.
  """
  outputs = {}

  # Uses features defined in _FEATURE_KEYS only.
  for key in _FEATURE_KEYS:
    outputs[_transformed_name(key)] = tft.scale_to_z_score(inputs[key])

  # For the label column we provide the mapping from string to index.
  table_keys = ['Adelie', 'Chinstrap', 'Gentoo']
  initializer = tf.lookup.KeyValueTensorInitializer(
      keys=table_keys,
      values=tf.cast(tf.range(len(table_keys)), tf.int64),
      key_dtype=tf.string,
      value_dtype=tf.int64)
  table = tf.lookup.StaticHashTable(initializer, default_value=-1)
  outputs[_transformed_name(_LABEL_KEY)] = table.lookup(inputs[_LABEL_KEY])

  return outputs


def _input_fn(file_pattern: List[Text],
              data_accessor: DataAccessor,
              tf_transform_output: tft.TFTransformOutput,
              batch_size: int = 200) -> tf.data.Dataset:
  """Generates features and label for tuning/training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    tf_transform_output: A TFTransformOutput.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_transformed_name(_LABEL_KEY)),
      schema=tf_transform_output.transformed_metadata.schema).repeat()


def _build_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=_transformed_name(f))
                for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  outputs = keras.layers.Dense(3)(d)
 
  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

  model.summary(print_fn=absl.logging.info)
  return model

# NEW: This function will create a handler function which gets a serialized
#      tf.example, preprocess and run an inference with it.
def _get_serve_tf_examples_fn(model, tf_transform_output):
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    # Expected input is a string which is serialized tf.Example format.
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)
    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)

    # Preprocess parsed input with transform operation defined in 
    # preprocessing_fn().
    transformed_features = model.tft_layer(parsed_features)

    # Run inference with ML model.
    return model(transformed_features)

  return serve_tf_examples_fn


# TFX Trainer will call this function.
def run_fn(fn_args: TrainerFnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      tf_transform_output,
      batch_size=_EVAL_BATCH_SIZE)

  model = _build_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # NEW: Save a computation graph including transform layer.
  # We need to specify the serving signature, and only a concrete function of
  # @tf.function can be used as a signature.
  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples')),
  }
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Now you have completed all preparation steps to build a TFX pipeline.

### Write a pipeline definition

We define a function to create a TFX pipeline. A `Pipeline` object
represents a TFX pipeline which can be run using one of pipeline
orchestration systems that TFX supports.


In [None]:
from typing import List, Optional

from tfx.components import CsvExampleGen
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> pipeline.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = CsvExampleGen(input_base=data_root)

  # Computes statistics over data for visualization and example validation.
  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  # Generates schema based on statistics files.
  schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=module_file)
  
  # Uses user-provided Python function that trains a model.
  trainer = Trainer(
      module_file=module_file,
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=transform.outputs['transformed_examples'],
      transform_graph=transform.outputs['transform_graph'],
      train_args=trainer_pb2.TrainArgs(num_steps=100),
      eval_args=trainer_pb2.EvalArgs(num_steps=5))

  # Pushes the model to a filesystem destination.
  pusher = Pusher(
      model=trainer.outputs['model'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=serving_model_dir)))

  # Following three components will be included in the pipeline.
  components = [
      example_gen,
      statistics_gen,
      schema_gen,
      transform,
      trainer,
      pusher,
  ]

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      components=components)

## Run the pipeline

We will use `LocalDagRunner` as in the previous tutorial.

In [None]:
import os
from tfx.orchestration.local import local_dag_runner

local_dag_runner.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

You should see "INFO:absl:Component Pusher is finished." if the pipeline
finished successfully. Because `Pusher` component is the last component of
our pipeline.

The pusher component pushes the trained model to `SERVING_MODEL_DIR` which is
`serving_model/penguin-transform` directory if you didn't change variables in
previous steps. You can see the result from the file browser in the left-side
panel in Colab or using a following command:

In [None]:
# List files in created model directory.
!find {SERVING_MODEL_DIR}

You can also check the signature of the generated model using
[`saved_model_cli` tool](https://www.tensorflow.org/guide/saved_model#show_command).

In [None]:
!saved_model_cli show --dir {SERVING_MODEL_DIR}/$(ls -1 {SERVING_MODEL_DIR} | sort -nr | head -1) --tag_set serve --signature_def serving_default

## Next steps

- Following tutorial will be added soon.
  - Model analysis with TFX pipeline and TFMA
  - Defining Custom Component

TFX provides many useful components and you can also create your own easily.
Please see [TFX components tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/components_keras).


Please read
[Understanding TFX Pipelines](https://www.tensorflow.org/tfx/guide/understanding_tfx_pipelines)
guide to learn more about various concepts in TFX.

You can find more resources on https://www.tensorflow.org/tfx/tutorials.
