# Developing Custom TFX Components

[Tensorflow Extended (TFX)](https://www.tensorflow.org/tfx/) provides ready made components for typical steps in a machine learning workflow. Other courses in this specialization focus on specific components and in this lab, you will learn how to make your own. This will be useful in case your project has specific needs that fall outside the standard TFX components. It will make your pipelines more flexible while still leveraging the experiment tracking and orchestration that TFX provides. In particular, you will:
- Build a custom component using Python functions
- Build a custom component by reusing an existing TFX component
- Run a TFX pipeline locally using a built-in pipeline orchestrator

### Import

In [1]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

2022-11-10 22:41:48.249910: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-11-10 22:41:48.249965: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


TensorFlow version: 2.9.2




TFX version: 1.10.0


### Set up variables

There are some variables used to define a pipeline. You can customize these
variables as you want. By default all output from the pipeline will be
generated under the current directory.

In [2]:
import os

PIPELINE_NAME = "penguin-simple"

# Output directory to store artifacts generated from the pipeline.
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

# Path to a SQLite DB file to use as an MLMD storage.
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)  # Set default logging level.

### Dataset

We will download the [Penguin dataset](https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv). There are 4 features in this dataset: `culmen_length_mm`, `culmen_depth_mm`, `flipper_length_mm`, and `body_mass_g`.

All features were already normalized to have range [0,1]. We will build a
classification model which predicts the `species` of penguins.

In [3]:
DATA_ROOT = "data"
_data_filepath = os.path.join(DATA_ROOT, "data.csv")

In [4]:
!head {_data_filepath}

species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556


You should be able to see five values. `species` is one of 0, 1 or 2, and all
other features should have values between 0 and 1.

## Create a pipeline

TFX pipelines are defined using Python APIs. We will define a pipeline which
consists of following three components.
- CsvExampleGen: Reads in data files and convert them to TFX internal format
for further processing. There are multiple
[ExampleGen](https://www.tensorflow.org/tfx/guide/examplegen)s for various
formats. In this tutorial, we will use CsvExampleGen which takes CSV file input.
- Trainer: Trains an ML model.
[Trainer component](https://www.tensorflow.org/tfx/guide/trainer) requires a
model definition code from users. You can use TensorFlow APIs to specify how to
train a model and save it in a _saved_model_ format.
- Pusher: Copies the trained model outside of the TFX pipeline.
[Pusher component](https://www.tensorflow.org/tfx/guide/pusher) can be thought
of as a deployment process of the trained ML model.

Before actually define the pipeline, we need to write a model code for the
Trainer component first.

### Write model training code

We will create a simple DNN model for classification using TensorFlow Keras
API. This model training code will be saved to a separate `train.py` file.

In this tutorial we will use
[Generic Trainer](https://www.tensorflow.org/tfx/guide/trainer#generic_trainer)
of TFX which support Keras-based models. You need to write a Python file
containing `run_fn` function, which is the entrypoint for the `Trainer`
component.

In [5]:
_trainer_module_file = "train.py"

In [6]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2


_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'


# Define feature and label spec
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
           for feature in _FEATURE_KEYS
       },
    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


# Define batch size
_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10


# Define data inputs
def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:
    """Generates features and label for training.

    Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: number of examples in a single batch.

    Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


# Define model
def _build_keras_model() -> tf.keras.Model:
    inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
    d = keras.layers.concatenate(inputs)
    for _ in range(2):
        d = keras.layers.Dense(8, activation='relu')(d)
    outputs = keras.layers.Dense(3)(d)

    model = keras.Model(inputs=inputs, outputs=outputs)
    model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      metrics=[keras.metrics.SparseCategoricalAccuracy()])

    model.summary(print_fn=logging.info)
    return model


# Define trainer
def run_fn(fn_args: tfx.components.FnArgs):
    """Train the model based on given args.

    Args:
    fn_args: Holds args used to train the model as name/value pairs.
    """

    # This schema is usually either an output of SchemaGen or a manually-curated
    # version provided by pipeline author. A schema can also derived from TFT
    # graph if a Transform component is used. In the case when either is missing,
    # `schema_from_feature_spec` could be used to generate schema from very simple
    # feature_spec, but the schema returned would be very primitive.
    schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        schema,
        batch_size=_TRAIN_BATCH_SIZE)
    eval_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        schema,
        batch_size=_EVAL_BATCH_SIZE)

    model = _build_keras_model()
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps)

    # Saved model directory.
    model.save(fn_args.serving_model_dir, save_format='tf')

Writing train.py


Now you have completed all preparation steps to build a TFX pipeline.

### Write a pipeline definition

We define a function to create a TFX pipeline. A `Pipeline` object
represents a TFX pipeline which can be run using one of the pipeline
orchestration systems that TFX supports.


In [7]:
def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
    """Creates a three component penguin pipeline with TFX."""
    # Brings data into the pipeline.
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    # Uses user-provided Python function that trains a model.
    trainer = tfx.components.Trainer(
        module_file=module_file,
        examples=example_gen.outputs['examples'],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5))

    # Pushes the model to a filesystem destination.
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=serving_model_dir)))

    # Following three components will be included in the pipeline.
    components = [
      example_gen,
      trainer,
      pusher,
    ]

    return tfx.dsl.Pipeline(
          pipeline_name=pipeline_name,
          pipeline_root=pipeline_root,
          metadata_connection_config=tfx.orchestration.metadata
          .sqlite_metadata_connection_config(metadata_path),
          components=components)

## Run the pipeline

TFX supports multiple orchestrators to run pipelines.
In this tutorial we will use `LocalDagRunner` which is included in the TFX
Python package and runs pipelines on local environment.
We often call TFX pipelines "DAGs" which stands for directed acyclic graph.

`LocalDagRunner` provides fast iterations for development and debugging.
TFX also supports other orchestrators including Kubeflow Pipelines and Apache
Airflow which are suitable for production use cases. See
[TFX on Cloud AI Platform Pipelines](https://www.tensorflow.org/tfx/tutorials/tfx/cloud-ai-platform-pipelines)
or
[TFX Airflow Tutorial](https://www.tensorflow.org/tfx/tutorials/tfx/airflow_workshop)
to learn more about other orchestration systems.

In [8]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Generating ephemeral wheel package for '/home/hoang/Documents/mlops-labs/tensorflow-extended-components/custom-component/train.py' (including modules: ['train']).
INFO:absl:User module package has hash fingerprint version b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691.
INFO:absl:Executing: ['/usr/bin/python3.7', '/tmp/tmp79_nfz91/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpt70nzps8', '--dist-dir', '/tmp/tmpocirfu3_']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying train.py -> build/lib
installing to /tmp/tmpt70nzps8
running install
running install_lib
copying build/lib/train.py -> /tmp/tmpt70nzps8
running install_egg_info


INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl'; target user module is 'train'.
INFO:absl:Full user module path is 'train@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl'
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "Pusher"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.pusher.executor.Executor"
    }
  }
}
executor_specs {
  key: "Trainer"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.trainer.executor.GenericExecutor"
    }
  }
}
custom_driver_specs {
  ke

running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmpt70nzps8/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3.7.egg-info
running install_scripts
creating /tmp/tmpt70nzps8/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691.dist-info/WHEEL
creating '/tmp/tmpocirfu3_/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl' and adding '/tmp/tmpt70nzps8' to it
adding 'train.py'
adding 'tfx_user_code_Trainer-

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:select span and version = (0, None)
INFO:absl:latest span and version = (0, None)
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Going to run a new execution 1
INFO:absl:Going to run a new execution: ExecutionInfo(execution_id=1, input_dict={}, output_dict=defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1665904369,sum_checksum:1665904369"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-11-10T22:42:39.874072:CsvExampleGen:1:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
name: "penguin-simple:2022-11-10T22:42:39.874072:CsvExampleGen:1:examples:0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
pro

INFO:absl:Processing input csv data data/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1665904369,sum_checksum:1665904369"
  }
}
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-11-10T22:42:39.874072:CsvExampleGen:1:examples:0"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
custom_properties

INFO:absl:Train on the 'train' split when train_args.splits is not set.
INFO:absl:Evaluate on the 'eval' split when eval_args.splits is not set.
INFO:absl:udf_utils.get_fn {'train_args': '{\n  "num_steps": 100\n}', 'module_path': 'train@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl', 'custom_config': 'null', 'eval_args': '{\n  "num_steps": 5\n}'} 'run_fn'
INFO:absl:Installing 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl' to a temporary directory.
INFO:absl:Executing: ['/usr/bin/python3.7', '-m', 'pip', 'install', '--target', '/tmp/tmp0ofmkcvv', 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl']


Processing ./pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl
Installing collected packages: tfx-user-code-Trainer
Successfully installed tfx-user-code-Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691



[notice] A new release of pip available: 22.3 -> 22.3.1
[notice] To update, run: python3.7 -m pip install --upgrade pip
INFO:absl:Successfully installed 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+b229a6e2bc03c2463f2b827910ea514c8ff9a4546130850c7ee2967d293ab691-py3-none-any.whl'.
INFO:absl:Training model.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
2022-11-10 22:42:48.048144: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-11-10 22:4

INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/penguin-simple/Trainer/model/2/Format-Serving. ModelRun written to pipelines/penguin-simple/Trainer/model_run/2
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/2"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-11-10T22:42:39.874072:Trainer:2:model_run:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.10.0"
  }
}
name: "penguin-simple:2022-11-10T22:42:39.874072:Trainer:2:model_run:0"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/2"
custom_properties {
  key: "name"
  value {
    s

INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Pusher is finished.


The pusher component pushes the trained model to the `SERVING_MODEL_DIR` which
is the `serving_model/penguin-simple` directory.

In [9]:
!find {SERVING_MODEL_DIR}

serving_model/penguin-simple
serving_model/penguin-simple/1668094972
serving_model/penguin-simple/1668094972/keras_metadata.pb
serving_model/penguin-simple/1668094972/variables
serving_model/penguin-simple/1668094972/variables/variables.data-00000-of-00001
serving_model/penguin-simple/1668094972/variables/variables.index
serving_model/penguin-simple/1668094972/assets
serving_model/penguin-simple/1668094972/saved_model.pb


## Building Custom Components
Let's say you want to modify the pipeline above to filter the data first before running the trainer. Without using a TFX component, you would run something like the code below. This will just get the rows where the `culmen_length_mm` feature is greater than `0.3`.

In [10]:
import pandas as pd
import glob

# search directory for one or more CSVs
files = glob.glob(f'{DATA_ROOT}/*.csv')

# filter the dataset
for file in files:
    df = pd.read_csv(file, index_col=False)
    filtered_df = df[df['culmen_length_mm'] > 0.3].reset_index(drop=True)

# print latest modified file
filtered_df

Unnamed: 0,species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0,0.327273,0.535714,0.169492,0.138889
1,0,0.378182,0.904762,0.423729,0.500000
2,0,0.505455,1.000000,0.372881,0.416667
3,0,0.309091,0.654762,0.186441,0.236111
4,0,0.305455,0.571429,0.254237,0.138889
...,...,...,...,...,...
227,2,0.549091,0.071429,0.711864,0.618056
228,2,0.534545,0.142857,0.728814,0.597222
229,2,0.665455,0.309524,0.847458,0.847222
230,2,0.476364,0.202381,0.677966,0.694444


You can save the dataset above to a different directory and point the TFX pipeline to it. That definitely works but you can include this code in a custom component as well so it's part of the TFX pipeline.

## Custom components through Python functions
TFX provides a way to define an executor by just using a component decorator on your function. If you've done the Kubeflow Pipelines, this will look very familiar. It uses the same concepts such as defining the inputs and outputs using type annotations then using those parameters in the function body.

In [11]:
#import artifact type that you will use
from tfx.types.standard_artifacts import String

# import the decorator
from tfx.dsl.component.experimental.decorators import component

# import type annotations
from tfx.dsl.component.experimental.annotations import InputArtifact, OutputArtifact, Parameter, OutputDict

@component
def CustomFilterComponent(input_base: Parameter[str], 
                    output_base: Parameter[str],
                    ) -> OutputDict(output_path=str):
    '''
    Args:
    input_base - location of the raw CSV
    output_base - location where you want to save the filtered CSV

    Returns:
    OutputDict:
      output_path - String artifact that just holds the `output_base` value
    '''
    import pandas as pd
    import glob
    import os

    # create the output base if it does not exist yet
    if not os.path.exists(output_base):
        os.mkdir(output_base)

    # search for CSVs in the input base
    files = glob.glob(f'{input_base}/*.csv')

    # loop through CSVs
    for file in files:

        # read the CSV
        df = pd.read_csv(file, index_col=False)

        # filter the data
        filtered_df = df[df['culmen_length_mm'] > 0.3].reset_index(drop=True)

        # compose output filename
        filename = os.path.basename(file).replace('.csv','')
        filtered_filename = f'{filename}_filtered.csv'

        # save filtered CSV to output base
        filtered_df.to_csv(f'{output_base}/{filtered_filename}', index=False)

    # define the output artifact
    return {'output_path': output_base}

You can now run your newly built component. You will just run a single node pipeline to prove that it works.

In [12]:
# define a filter task
filter_task = CustomFilterComponent(input_base=DATA_ROOT, 
                                    output_base=f'{DATA_ROOT}_filtered')

# include the task
components = [filter_task]

# define a pipeline with only the single component
pipeline = tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(METADATA_PATH),
      components=components)

# run the pipeline
tfx.orchestration.LocalDagRunner().run(pipeline)

INFO:absl:Using deployment config:
 executor_specs {
  key: "CustomFilterComponent"
  value {
    python_class_executable_spec {
      class_path: "__main__.CustomFilterComponent_Executor"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metadata/penguin-simple/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "metadata/penguin-simple/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CustomFilterComponent is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "__main__.CustomFilterComponent"
  }
  id: "CustomFilterComponent"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        string_value: "penguin-simple"
      }
    }
  }
  contexts {
    type {
      name: "pipeline_run"
    }
    name {
      field_value {
        string_value: "2022-11-10T22:49:49.2

You should now see the filtered CSV in the data_filtered folder in the file explorer. As expected, you can also see that it has less lines than the original because of the filtering.

In [13]:
# number of rows in original csv
!cat data/data.csv | wc -l

# number of rows in filtered csv
!cat data_filtered/data_filtered.csv | wc -l

335
233


## Building a custom component using standard components
If we try to use our new component with CsvExampleGen, you will encounter an error as shown below:

In [14]:
# Define filter task
filter_task = CustomFilterComponent(input_base=DATA_ROOT, 
                                    output_base=f'{DATA_ROOT}_filtered')

# Try using the custom component with CsvExampleGen. This code will expectedly throw an error.
try:
    example_gen = tfx.components.CsvExampleGen(input_base=filter_task.outputs['output_path'])
except Exception as e:
    print(e)

Expected type <class 'str'> for parameter 'input_base' but got OutputChannel(artifact_type=String, producer_component_id=CustomFilterComponent, output_key=output_path, additional_properties={}, additional_custom_properties={}) instead.


As you can see, the output of our custom component cannot be accepted into `CsvExampleGen` because it is configured to accept a primitive string. TFX components generate and consume [`Channel`](https://www.tensorflow.org/tfx/api_docs/python/tfx/v1/dsl/Channel) objects and that's what our custom component outputs. For that, we need to build a custom data ingestion component that reuses the `CsvExampleGen` code but accepts a `Channel`. You will do that in the following sections.

## Standard ExampleGen code

TFX is open source so the code for standard components can easily be found the public [repo](https://github.com/tensorflow/tfx/tree/89d3cb6c59acd0d487916bff703711815f1506b5/tfx). We placed links in the following sections of the actual files that you'll be modifying/overriding in case you want to compare what was changed for your custom ExampleGen. 

The class heirarchy for these components is pretty deep but in summary, you will only need to modify three:

* Component Spec - this describes the inputs, outputs, and parameters passed on to the component
* Executor - code for processing the inputs, outputs, and parameters
* Component Class - puts everything together so your code can be run by the TFX runner.

## Modify the Component Spec

First, you will need to modify the Component Spec. This file describes the parameters, inputs, and outputs of the standard components. Parameters are values supplied at runtime while inputs and outputs are values read from the metadata store. The original for ExampleGen is found [here](https://github.com/tensorflow/tfx/blob/89d3cb6c59acd0d487916bff703711815f1506b5/tfx/types/standard_component_specs.py#L187).

You will need to implement the same for our custom ExampleGen but it should accept a `Channel` parameter The revised version is shown below. Notice that we revised the `INPUTS` dictionary to have a `ChannelParameter` whereas in the original, all are just in the `PARAMETERS` dictionary.

In [15]:
from tfx.types.component_spec import ChannelParameter
from tfx.types.component_spec import ExecutionParameter
from tfx.types.component_spec import ComponentSpec
from tfx.types import standard_artifacts
from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2


# Key for example_gen input that we want to use
INPUT_BASE_KEY = 'input_base'

# Other keys
INPUT_CONFIG_KEY = 'input_config'
OUTPUT_CONFIG_KEY = 'output_config'
OUTPUT_DATA_FORMAT_KEY = 'output_data_format'
RANGE_CONFIG_KEY = 'range_config'
CUSTOM_CONFIG_KEY = 'custom_config'
EXAMPLES_KEY = 'examples'

class MyCustomExampleGenSpec(ComponentSpec):
    """File-based ExampleGen component spec."""

    PARAMETERS = {
      INPUT_CONFIG_KEY:
          ExecutionParameter(type=example_gen_pb2.Input),
      OUTPUT_CONFIG_KEY:
          ExecutionParameter(type=example_gen_pb2.Output),
      OUTPUT_DATA_FORMAT_KEY:
          ExecutionParameter(type=int),
      CUSTOM_CONFIG_KEY:
          ExecutionParameter(type=example_gen_pb2.CustomConfig, optional=True),
      RANGE_CONFIG_KEY:
          ExecutionParameter(type=range_config_pb2.RangeConfig, optional=True),
    }

    # Now accepts a channel
    INPUTS = {
        INPUT_BASE_KEY:
              ChannelParameter(type=standard_artifacts.String),
    }
  
    OUTPUTS = {
        EXAMPLES_KEY: ChannelParameter(type=standard_artifacts.Examples),
  }

### Customize the Executor

With that, you should now modify the executor code to take note of this change of input types. Instead of looking at just the parameters, it should also look into Channel inputs passed onto the component.

Executor classes are executed by TFX starting with the `Do` function so you will need to modify that. The original Executor for `CsvExampleGen` can be found [here](https://github.com/tensorflow/tfx/blob/89d3cb6c59acd0d487916bff703711815f1506b5/tfx/components/example_gen/csv_example_gen/executor.py) and it inherits the base class [here](https://github.com/tensorflow/tfx/blob/89d3cb6c59acd0d487916bff703711815f1506b5/tfx/components/example_gen/base_example_gen_executor.py#L132). The base class includes the `Do()` function and that's what you'll be overriding in your new custom executor below. 

Basically, you're using all the functions defined in the standard component but you're modifying it so it can find the `input_base` value from the Channel inputs.

*Take note that this tutorial prioritizes code brevity. In your projects, you may take a different approach such as modifying the [`_CsvToExample`](https://github.com/tensorflow/tfx/blob/89d3cb6c59acd0d487916bff703711815f1506b5/tfx/components/example_gen/csv_example_gen/executor.py#L124) code to look for the string value in the `input_dict` instead of `exec_properties`. Doing that here in this Colab will result in very long code blocks so the shorter approach is taken.*

In [16]:
from typing import Any, Dict, Iterable, List, Text, Union

from tfx.components.example_gen.csv_example_gen.executor import Executor as CsvExampleGenExecutor
from tfx.types import standard_component_specs
from tfx.types import artifact_utils
from tfx import types

class MyCustomExecutor(CsvExampleGenExecutor):
  """Generic TFX CSV example gen executor."""

  def Do(
      self,
      input_dict: Dict[Text, List[types.Artifact]],
      output_dict: Dict[Text, List[types.Artifact]],
      exec_properties: Dict[Text, Any],
  ) -> None:
    """Take input data source and generates serialized data splits.
    The output is intended to be serialized tf.train.Examples or
    tf.train.SequenceExamples protocol buffer in gzipped TFRecord format,
    but subclasses can choose to override to write to any serialized records
    payload into gzipped TFRecord as specified, so long as downstream
    component can consume it. The format of payload is added to
    `payload_format` custom property of the output Example artifact.
    Args:
      input_dict: Input dict from input key to a list of Artifacts. Depends on
        detailed example gen implementation.
        - input_base: an external directory containing the data files.
      output_dict: Output dict from output key to a list of Artifacts.
        - examples: splits of serialized records.
      exec_properties: A dict of execution properties. Depends on detailed
        example gen implementation.
        - input_config: JSON string of example_gen_pb2.Input instance,
          providing input configuration.
        - output_config: JSON string of example_gen_pb2.Output instance,
          providing output configuration.
        - output_data_format: Payload format of generated data in output
          artifact, one of example_gen_pb2.PayloadFormat enum.
    Returns:
      None
    """
    self._log_startup(input_dict, output_dict, exec_properties)

    # Get the artifact from the Channel input
    filter_component_artifact = artifact_utils.get_single_instance(
        input_dict[standard_component_specs.INPUT_BASE_KEY])
    
    # Put the input string value into the exec_properties fictionary
    exec_properties[standard_component_specs.INPUT_BASE_KEY] = filter_component_artifact.value
    
    # execute superclass
    super(MyCustomExecutor, self).Do(input_dict=input_dict, output_dict=output_dict, exec_properties=exec_properties)

## Define the Component class

Lastly, you will need to put everything together in a class. This will be the one you instantiate so you can run the component later. For comparison, the original `CsvExampleGen` component class is found [here](https://github.com/tensorflow/tfx/blob/89d3cb6c59acd0d487916bff703711815f1506b5/tfx/components/example_gen/csv_example_gen/component.py) and it inherits the `FileBasedExampleGen` from [here](https://github.com/tensorflow/tfx/blob/89d3cb6c59acd0d487916bff703711815f1506b5/tfx/components/example_gen/component.py#L115). The revised version is shown below:

In [18]:
from typing import Any, Dict, Optional, Text, Union

from tfx.dsl.components.base import base_beam_component
from tfx.dsl.components.base import executor_spec

from tfx.proto import example_gen_pb2
from tfx.proto import range_config_pb2

from tfx import types
from tfx.components.example_gen import utils

class MyCustomExampleGen(base_beam_component.BaseBeamComponent):

  # Define the Spec class and executor spec using the functions and
  # classes you defined earlier.
  SPEC_CLASS = MyCustomExampleGenSpec
  EXECUTOR_SPEC = executor_spec.BeamExecutorSpec(MyCustomExecutor)

  # Define init function. Notice that `input_base` now accepts a Channel.
  def __init__(self,
               input_base: types.Channel = None,
               input_config: Optional[Union[example_gen_pb2.Input,
                                            Dict[Text, Any]]] = None,
               output_config: Optional[Union[example_gen_pb2.Output,
                                             Dict[Text, Any]]] = None,
               range_config: Optional[Union[range_config_pb2.RangeConfig,
                                            Dict[Text, Any]]] = None,
               output_data_format: Optional[int] = example_gen_pb2.FORMAT_TF_EXAMPLE):
    """Customized ExampleGen component.
    Args:
      input_base: an external directory containing the CSV files. Accepts a Channel
        from a previous TFX component.
      input_config: An example_gen_pb2.Input instance, providing input
        configuration. If unset, the files under input_base will be treated as a
        single split. If any field is provided as a RuntimeParameter,
        input_config should be constructed as a dict with the same field names
        as Input proto message.
      output_config: An example_gen_pb2.Output instance, providing output
        configuration. If unset, default splits will be 'train' and 'eval' with
        size 2:1. If any field is provided as a RuntimeParameter, output_config
        should be constructed as a dict with the same field names as Output
        proto message.
      range_config: An optional range_config_pb2.RangeConfig instance,
        specifying the range of span values to consider. If unset, driver will
        default to searching for latest span with no restrictions.
    """
    
    # Configure inputs and outputs.
    input_config = input_config or utils.make_default_input_config()
    output_config = output_config or utils.make_default_output_config(
        input_config)
    
    # Define output type.
    example_artifacts = types.Channel(type=standard_artifacts.Examples)
    
    # Pass input arguments to your custom ExampleGen spec.
    spec = MyCustomExampleGenSpec(
        input_base=input_base,
        input_config=input_config,
        output_config=output_config,
        range_config=range_config,
        output_data_format=output_data_format,
        examples=example_artifacts)
    
    # This will check if the values passed are the correct type else
    # it will throw the error you saw earlier.
    super(MyCustomExampleGen, self).__init__(
        spec=spec)

You can now use the custom component (`MyCustomExampleGen`) in your code as shown below. It will no longer get an error because you reconfigured the `input_base` to accept a channel.

In [19]:
# Filter the dataset
filter_task = CustomFilterComponent(input_base=DATA_ROOT, 
                                    output_base=f'{DATA_ROOT}_filtered')

# Use the output of filter_task to know the input_base for this custom ExampleGen
custom_example_gen_task = MyCustomExampleGen(input_base=filter_task.outputs['output_path'])

# Define components to include
components = [filter_task,
              custom_example_gen_task]

# Create the pipeline
pipeline = tfx.dsl.Pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(METADATA_PATH),
      components=components)

# Run the pipeline
tfx.orchestration.LocalDagRunner().run(pipeline)

INFO:absl:Using deployment config:
 executor_specs {
  key: "CustomFilterComponent"
  value {
    python_class_executable_spec {
      class_path: "__main__.CustomFilterComponent_Executor"
    }
  }
}
executor_specs {
  key: "MyCustomExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "__main__.MyCustomExecutor"
      }
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metadata/penguin-simple/metadata.db"
      connection_mode: READWRITE_OPENCREATE
    }
  }
}

INFO:absl:Using connection config:
 sqlite {
  filename_uri: "metadata/penguin-simple/metadata.db"
  connection_mode: READWRITE_OPENCREATE
}

INFO:absl:Component CustomFilterComponent is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "__main__.CustomFilterComponent"
  }
  id: "CustomFilterComponent"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
      field_value {
        st

INFO:absl:Generating examples.
INFO:absl:Processing input csv data data_filtered/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 6 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-simple/MyCustomExampleGen/examples/6"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2022-11-11T08:00:30.402509:MyCustomExampleGen:6:examples:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "1.10.0"
  }
}
name: "penguin-simple:2022-11-11T08:00:30.402509:MyCustomExampleGen:6:examples:0"
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
properties {
  key: "version"

As a sanity check, you can compute the number of examples for both the training and eval splits. It should equal the number of examples found in your filtered CSV. You can use the code below by replacing the `EXECUTION_ID` with the ID shown in your latest run. You can see it in the last three lines of the output cell above. For example:

```
)]}) for execution 6
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component MyCustomExampleGen is finished.
```

In [20]:
EXECUTION_ID = 6 # PLACE THE EXECUTION ID HERE

# Create a `TFRecordDataset` to read these files
train_dataset = tf.data.TFRecordDataset(f'{PIPELINE_ROOT}/MyCustomExampleGen/examples/{EXECUTION_ID}/Split-train/data_tfrecord-00000-of-00001.gz', compression_type="GZIP")
eval_dataset = tf.data.TFRecordDataset(f'{PIPELINE_ROOT}/MyCustomExampleGen/examples/{EXECUTION_ID}/Split-eval/data_tfrecord-00000-of-00001.gz', compression_type="GZIP")

# Get number of records for each dataset (only use for small datasets to avoid memory issues)
num_train_data = len(list(train_dataset))
num_eval_data = len(list(eval_dataset))

# Get the total
total_examples = num_train_data + num_eval_data

print(f'total number of examples: {total_examples}')

total number of examples: 232


## Wrap Up

In this lab, you were able to use custom components to create a pipeline. This shows that you can go outside the standard TFX components if your project calls for it. To know more about custom components, you can read more [here](https://www.tensorflow.org/tfx/guide/understanding_custom_components) and see the examples [here](https://github.com/tensorflow/tfx/tree/2e41786328f5b69720e90ec4d9ecae500f5c157a/tfx/examples/custom_components).