<a href="https://colab.research.google.com/github/TrungKhoaLe/ml-system-zone/blob/main/google-tfx/tfx_pipeline_and_tfdv.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

## Upgrade pip

In [1]:
try:
    import colab
    !pip install --upgrade pip
except:
    pass

## Install TFX

In [2]:
!pip install -U tfx &> /dev/null

## Uninstall shapely

In [3]:
!pip uninstall shapely -y &> /dev/null

In [None]:
# restart the runtime after the above steps

import os
os.kill(os.getpid(), 9)

In [None]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))

## Set up variables

In [2]:
import os
# two pipelines will be created, one for schema generation and one for training
SCHEMA_PIPELINE_NAME = "penguin-tfdv-schema"
PIPELINE_NAME = "penguin-tfdv"

SCHEMA_PIPELINE_ROOT = os.path.join("pipelines", SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join("pipelines", PIPELINE_NAME)

SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME,
                                    'metadata.db')

METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO)

## Prepare example data

In [3]:
import urllib.request
import tempfile

DATA_ROOT = tempfile.mkdtemp(prefix='tfx-data')  # Create a temporary directory.
_data_url = 'https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/labelled/penguins_processed.csv'
_data_filepath = os.path.join(DATA_ROOT, "data.csv")
urllib.request.urlretrieve(_data_url, _data_filepath)

('/tmp/tfx-datai0erkdwf/data.csv', <http.client.HTTPMessage at 0x7f2477f648b0>)

# Generate a preliminary schema

## Write a pipeline definition

In [4]:
def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
    """Creates a pipeline for schema generation"""
    # bring data into the pipeline
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    # computes statistics over data for visualization and schema generation
    statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])
    # generates schema based on the generated statistics
    schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)
    
    components = [
      example_gen,
      statistics_gen,
      schema_gen,
  ]

    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        metadata_connection_config=tfx.orchestration.metadata
        .sqlite_metadata_connection_config(metadata_path),
        components=components)

## Run the pipeline

Use a local orchestrator

In [5]:
tfx.orchestration.LocalDagRunner().run(
  _create_schema_pipeline(
      pipeline_name=SCHEMA_PIPELINE_NAME,
      pipeline_root=SCHEMA_PIPELINE_ROOT,
      data_root=DATA_ROOT,
      metadata_path=SCHEMA_METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Using deployment config:
 executor_specs {
  key: "CsvExampleGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.example_gen.csv_example_gen.executor.Executor"
      }
    }
  }
}
executor_specs {
  key: "SchemaGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.schema_gen.executor.Executor"
    }
  }
}
executor_specs {
  key: "StatisticsGen"
  value {
    beam_executable_spec {
      python_executor_spec {
        class_path: "tfx.components.statistics_gen.executor.Executor"
      }
    }
  }
}
custom_driver_specs {
  key: "CsvExampleGen"
  value {
    python_class_executable_spec {
      class_path: "tfx.components.example_gen.driver.FileBasedDriver"
    }
  }
}
metadata_connection_config {
  database_connection_config {
    sqlite {
      filename_uri: "metada

INFO:absl:Processing input csv data /tmp/tfx-datai0erkdwf/* to TFExample.
INFO:absl:Examples generated.
INFO:absl:Value type <class 'NoneType'> of key version in exec_properties is not supported, going to drop it
INFO:absl:Value type <class 'list'> of key _beam_pipeline_args in exec_properties is not supported, going to drop it
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 1 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'examples': [Artifact(artifact: uri: "pipelines/penguin-tfdv-schema/CsvExampleGen/examples/1"
custom_properties {
  key: "input_fingerprint"
  value {
    string_value: "split:single_split,num_files:1,total_bytes:25648,xor_checksum:1681398139,sum_checksum:1681398139"
  }
}
custom_properties {
  key: "span"
  value {
    int_value: 0
  }
}
, artifact_type: name: "Examples"
properties {
  key: "span"
  value: INT
}
properties {
  key: "split_names"
  value: STRING
}
proper

## Review outputs of the pipeline

A TFX pipeline produces two kinds of outputs, artifacts and a metadata DB (MLMD) which contains metadata of artifacts and pipeline executions.

In [6]:
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.portable.mlmd import execution_lib

def get_latest_artifacts(metadata, pipeline_name, component_id):
    context = metadata.store.get_context_by_type_and_name(
        "node", f"{pipeline_name}.{component_id}"
    )
    executions = metadata.store.get_executions_by_context(context.id)
    latest_execution = max(executions,
                           key=lambda e: e.last_update_time_since_epoch)
    return execution_lib.get_output_artifacts(metadata, latest_execution.id)


from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
  """Visualizes artifacts using standard visualization modules."""
  for artifact in artifacts:
    visualization = visualizations.get_registry().get_visualization(
        artifact.type_name)
    if visualization:
      visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations
standard_visualizations.register_standard_visualizations()

In [7]:
from tfx.orchestration.metadata import Metadata
from tfx.types import standard_component_specs

metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  # Find output artifacts from MLMD.
  stat_gen_output = get_latest_artifacts(metadata_handler, SCHEMA_PIPELINE_NAME,
                                         'StatisticsGen')
  stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

  schema_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME, 'SchemaGen')
  schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [8]:
visualize_artifacts(stats_artifacts)

In [9]:
visualize_artifacts(schema_artifacts)

Unnamed: 0_level_0,Type,Presence,Valency,Domain
Feature name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
'body_mass_g',FLOAT,required,,-
'culmen_depth_mm',FLOAT,required,,-
'culmen_length_mm',FLOAT,required,,-
'flipper_length_mm',FLOAT,required,,-
'species',INT,required,,-


## Export the schema for future use

In [10]:
import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

'schema/schema.pbtxt'

In [11]:
print(f"Schema at {SCHEMA_PATH} -----")
!cat {SCHEMA_PATH}/*

Schema at schema -----
feature {
  name: "body_mass_g"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_depth_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "culmen_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "flipper_length_mm"
  type: FLOAT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}
feature {
  name: "species"
  type: INT
  presence {
    min_fraction: 1.0
    min_count: 1
  }
  shape {
    dim {
      size: 1
    }
  }
}


# Validate input examples and train an ML model

## Write model training code

In [12]:
_trainer_module_file = 'penguin_trainer.py'

In [13]:
%%writefile {_trainer_module_file}
from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200
) -> tf.data.Dataset:
    return data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(
            batch_size=batch_size,
            label_key=_LABEL_KEY,
        ),
        schema=schema
    ).repeat()

def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
    feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
    inputs = [keras.layers.Input(shape=(1, ), name=f) for f in feature_keys]
    d = keras.layers.concatenate(inputs)
    for _ in range(2):
        d = keras.layers.Dense(8, activation="relu")(d)
    outputs = keras.layers.Dense(3)(d)
    model = keras.Model(inputs=inputs, outputs=outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(1e-2),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[keras.metrics.SparseCategoricalAccuracy()]
    )
    model.summary(print_fn=logging.info)
    return model

# TFX's entry point
def run_fn(fn_args: tfx.components.FnArgs):
    schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pb2.Schema())
    print(f"[INFO] {fn_args.train_files}")
    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        schema,
        batch_size=_TRAIN_BATCH_SIZE
    )
    eval_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        schema,
        batch_size=_EVAL_BATCH_SIZE
    )
    model = _build_keras_model(schema)
    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps
    )
    model.save(fn_args.serving_model_dir, save_format="tf")

Writing penguin_trainer.py


## Write a pipeline definition

In [14]:
def _create_pipeline(pipeline_name:str, pipeline_root: str, data_root: str,
                     schema_path: str, module_file: str, serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
    # bring data into the pipeline
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)
    # computes statistics over data for visualization and example validation
    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs["examples"]
    )
    # import the schema
    schema_importer = tfx.dsl.Importer(
        source_uri=schema_path,
        artifact_type=tfx.types.standard_artifacts.Schema
    ).with_id("schema_importer")
    # perform anomaly detection based on statistics and data schema
    example_validator = tfx.components.ExampleValidator(
        statistics=statistics_gen.outputs["statistics"],
        schema=schema_importer.outputs["result"]
    )
    # use my own model code to fit into the data
    trainer = tfx.components.Trainer(
        module_file=module_file,
        examples=example_gen.outputs["examples"],
        schema=schema_importer.outputs["result"],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5)
    )
    # push the model to a filesystem destination
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        push_destination=tfx.proto.PushDestination(
        filesystem=tfx.proto.PushDestination.Filesystem(
              base_directory=serving_model_dir))
    )
    
    components = [
        example_gen,
        statistics_gen,
        schema_importer,
        example_validator,
        trainer,
        pusher,
    ]

    return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      metadata_connection_config=tfx.orchestration.metadata
      .sqlite_metadata_connection_config(metadata_path),
      components=components)

## Run the pipeline

In [15]:
tfx.orchestration.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name=PIPELINE_NAME,
      pipeline_root=PIPELINE_ROOT,
      data_root=DATA_ROOT,
      schema_path=SCHEMA_PATH,
      module_file=_trainer_module_file,
      serving_model_dir=SERVING_MODEL_DIR,
      metadata_path=METADATA_PATH))

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/content/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version 404b8ae592f99ec8995e193f9a60cb34097a6c4ffee3090c55b3cd49d71ea655.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmp8u1cl5y0/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpnqq2ucen', '--dist-dir', '/tmp/tmpwl601cqt']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-tfdv/_wheels/tfx_user_code_Trainer-0.0+404b8ae592f99ec8995e193f9a60cb34097a6c4ffee3090c55b3cd49d71ea655-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-tfdv/_wheels/tfx_user_code_Trainer-0.0+404b8ae592f99ec8995e193f9a60cb34097a6c4ffee3090c55b3cd49d71ea655-py3-none-any.whl'
INFO:absl:Using de

[INFO] ['pipelines/penguin-tfdv/CsvExampleGen/examples/1/Split-train/*']


INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature flipper_length_mm has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature species has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature body_mass_g has a shape dim {
  size: 1
}
. Setting to DenseTensor.
INFO:absl:Feature culmen_depth_m



INFO:absl:Training complete. Model written to pipelines/penguin-tfdv/Trainer/model/4/Format-Serving. ModelRun written to pipelines/penguin-tfdv/Trainer/model_run/4
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 4 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model_run': [Artifact(artifact: uri: "pipelines/penguin-tfdv/Trainer/model_run/4"
, artifact_type: name: "ModelRun"
)], 'model': [Artifact(artifact: uri: "pipelines/penguin-tfdv/Trainer/model/4"
, artifact_type: name: "Model"
base_type: MODEL
)]}) for execution 4
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Component Trainer is finished.
INFO:absl:Component ExampleValidator is running.
INFO:absl:Running launcher for node_info {
  type {
    name: "tfx.components.example_validator.component.ExampleValidator"
  }
  id: "ExampleValidator"
}
contexts {
  contexts {
    type {
      name: "pipeline"
    }
    name {
 

## Examine outputs of the pipeline

In [16]:
metadata_connection_config = tfx.orchestration.metadata.sqlite_metadata_connection_config(
    METADATA_PATH)

with Metadata(metadata_connection_config) as metadata_handler:
  ev_output = get_latest_artifacts(metadata_handler, PIPELINE_NAME,
                                   'ExampleValidator')
  anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

INFO:absl:MetadataStore with DB connection initialized


In [17]:
visualize_artifacts(anomalies_artifacts)