In [1]:
import os
import tensorflow as tf
from tfx import v1 as tfx

2023-10-29 11:24:31.261501: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-10-29 11:24:31.775265: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-10-29 11:24:31.778865: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


Using TensorFlow backend


# Set up variables

In [3]:
SCHEMA_PIPELINE_NAME = 'doc_to_sleep_schema'
PIPELINE_NAME = 'doc_to_sleep'

# Output directory to store artifacts generated from the pipelines.
SCHEMA_PIPELINE_ROOT = os.path.join('pipelines', SCHEMA_PIPELINE_NAME)
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

# Path to a SQLite database file to use as an MLMD storage.
SCHEMA_METADATA_PATH = os.path.join('metadata', SCHEMA_PIPELINE_NAME, 'metadata.db')
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

# Output directory where created models from the pipeline will be exported.
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

from absl import logging
logging.set_verbosity(logging.INFO) # Set default logging level.

In [14]:
DATA_ROOT = os.path.join('converted_data', 'poly3.csv')

# Generate a preliminary schema

In [8]:
from tfx.orchestration.metadata import Metadata, sqlite_metadata_connection_config

def _create_schema_pipeline(pipeline_name: str,
                            pipeline_root: str,
                            data_root: str,
                            metadata_path: str) -> tfx.dsl.Pipeline:
    '''Creates a pipeline for schema generation'''
    # Brings data into the pipeline.
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    # Computes statistics over data for visualization and schema generation.
    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs['examples']
    )

    # Generates schema based on the generated statistics.
    schema_gen = tfx.components.SchemaGen(
        statistics=statistics_gen.outputs['statistics'],
        infer_feature_shape=True
    )

    components = [example_gen,
                  statistics_gen,
                  schema_gen]
    
    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        metadata_connection_config=sqlite_metadata_connection_config(metadata_path),
        components=components)

Run the pipeline.

In [None]:
tfx.orchestration.LocalDagRunner().run(
    _create_schema_pipeline(
        pipeline_name=SCHEMA_PIPELINE_NAME,
        pipeline_root=SCHEMA_PIPELINE_ROOT,
        data_root=DATA_ROOT,
        metadata_path=SCHEMA_METADATA_PATH
    )
)

# Review outputs of the schema pipeline

In [None]:
from ml_metadata.proto import metadata_store_pb2
from tfx.orchestration.portable.mlmd import execution_lib

def get_latest_artifacts(metadata, pipeline_name, component_id):
    '''Output artifacts of the latest run of the component.'''
    context = metadata.store.get_context_by_type_and_name(
        'node', f'{pipeline_name}.{component_id}'
    )
    executions = metadata.store.get_executions_by_context(context.id)
    latest_execution = max(executions,
    key=lambda e:e.last_update_time_since_epoch)
    return execution_lib.get_output_artifacts(metadata, latest_execution.id)

from tfx.orchestration.experimental.interactive import visualizations

def visualize_artifacts(artifacts):
    '''Visualizes artifacts using standard visualization modules.'''
    for artifact in artifacts:
        visualization = visualizations.get_registry().get_visualization(
            artifact.type_name
        )
        if visualization:
            visualization.display(artifact)

from tfx.orchestration.experimental.interactive import standard_visualizations

standard_visualizations.register_standard_visualizations()

Examine the outputs from the pipeline execution.

In [None]:
from tfx.orchestration.metadata import Metadata, sqlite_metadata_connection_config
from tfx.types import standard_component_specs

metadata_connection_config = sqlite_metadata_connection_config(
    SCHEMA_METADATA_PATH
)

with Metadata(metadata_connection_config) as metadata_handler:
    # Find output artifacts from MLMD.
    stat_gen_output = get_latest_artifacts(metadata_handler,
                                           SCHEMA_PIPELINE_NAME,
                                           'StatisticsGen')
    stats_artifacts = stat_gen_output[standard_component_specs.STATISTICS_KEY]

    schema_gen_output = get_latest_artifacts(metadata_handler,
                                             SCHEMA_PIPELINE_NAME,
                                             'SchemaGen')
    schema_artifacts = schema_gen_output[standard_component_specs.SCHEMA_KEY]

Output from StatisticsGen

In [None]:
visualize_artifacts(stats_artifacts)

Output from SchemaGen

In [None]:
visualize_artifacts(schema_artifacts)

Export the schema for future use

In [None]:
import shutil

_schema_filename = 'schema.pbtxt'
SCHEMA_PATH = 'schema'

os.makedirs(SCHEMA_PATH, exist_ok=True)
_generated_path = os.path.join(schema_artifacts[0].uri, _schema_filename)

# Copy the 'schema.pbtxt' file from the artifact uri to a predefined path.
shutil.copy(_generated_path, SCHEMA_PATH)

Run the cell below to view the exported schema.

In [None]:
!cat {SCHEMA_PATH}/*

# Write model training code

In [None]:
%%writefile {_trainer_module_file}

from typing import List
from absl import logging
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from tensorflow_metadata.proto.v0 import schema_pb2

# Insert the target label below
_LABEL_KEY = None

_TRAIN_BATCH_SIZE = None
_EVAL_BATCH_SIZE = None

def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int=200) -> tf.data.Dataset:
    '''Generates features and label for training.
    
    Args:
        file_pattern: List of paths or patterns of input tfrecord files.
        data_accessor: DataAccessor for converting input to RecordBatch.
        schema: schema of the input data.
        batch_size: representing the number of consecutive elements of
            returned dataset to combine in a single batch.
            
    Returns:
        A dataset that contains (features, indices) tuple where features is a
            dictionary of Tensors, and indices is a single Tensor of label indices.
    '''
    return data_accessor.tf_dataset_factory(
        file_pattern,
        tfxio.TensorFlowDatasetOptions(
            batch_size=batch_size, label_key=_LABEL_KEY
        ),
        schema=schema
    ).repeat()

def _build_keras_model(schema: schema_pb2.Schema) -> tf.keras.Model:
    '''Creates a DNN Keras model for the data.
    
    Returns:
        A Keras model.
    '''
    # Extract features from the schema except for the target label.
    feature_keys = [f.name for f in schema.feature if f.name != _LABEL_KEY]
    inputs = [keras.layers.Input(shape=(1,), name=f) for f in feature_keys]

    # Build and compile model below

# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
    '''Train the model based on given args.
    
    Args:
        fn_args: Holds args used to train the model as name/value pairs.
    '''
    # Reads in schema file passed to the Trainer component.
    schema = tfx.utils.parse_pbtxt_file(fn_args.schema_path, schema_pbs.Schema())

    train_dataset = _input_fn(
        fn_args.train_files,
        fn_args.data_accessor,
        schema,
        batch_size=_TRAIN_BATCH_SIZE
    )
    eval_dataset = _input_fn(
        fn_args.eval_files,
        fn_args.data_accessor,
        schema,
        batch_size=_EVAL_BATCH_SIZE
    )

    model = _build_keras_model(schema)
    model.fit(train_dataset,
              steps_per_epoch=fn_args.train_steps,
              validation_data=eval_dataset,
              validation_steps=fn_args.eval_steps)
    
    # The result of the training should be saved in 'fn_args.serving_model_dir' directory
    model.save(fn_args.serving_model_dir, save_format='tf')

Write a pipeline definition.

In [None]:
def _create_pipeline(pipeline_name: str,
                     pipeline_root: str,
                     data_root: str,
                     schema_path: str,
                     module_file: str,
                     serving_model_dir: str,
                     metadata_path: str) -> tfx.dsl.Pipeline:
    '''Creates a pipeline using predefined schema with TFX.'''
    # Brings data into the pipeline.
    example_gen = tfx.components.CsvExampleGen(input_base=data_root)

    # Computes statistics over data for visualization and example validation.
    statistics_gen = tfx.components.StatisticsGen(
        examples=example_gen.outputs['examples']
    )

    # Import the schema.
    schema_importer = tfx.dsl.Importer(
        source_uri=schema_path,
        artifact_type=tfx.types.standard_artifacts.Schema
    ).with_id('schema_importer')

    # Performs anomaly detection based on statistics and data schema.
    example_validator = tfx.components.ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_importer.outputs['result']
    )

    # Uses user-provided Python function that trains a model.
    trainer = tfx.components.Trainer(
        module_file=module_file,
        examples=example_gen.outputs['examples'],
        schema=schema_importer.outputs['result'],
        train_args=tfx.proto.TrainArgs(num_steps=100),
        eval_args=tfx.proto.EvalArgs(num_steps=5)
    )

    # Pushes the model to a filesystem destination.
    pusher = tfx.components.Pusher(
        model=trainer.outputs['model'],
        push_destination=tfx.proto.PushDestination(
            filesystem=tfx.proto.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        )
    )

    components = [example_gen,
                  statistics_gen,
                  schema_importer,
                  example_validator,
                  trainer,
                  pusher]
    
    return tfx.dsl.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        metadata_connection_config=sqlite_metadata_connection_config(metadata_path),
        components=components
    )

The cell beneath runs the pipeline.

In [None]:
tfx.orchestration.LocalDagRunner().run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        data_root=DATA_ROOT,
        schema_path=SCHEMA_PATH,
        module_file=_trainer_module_file,
        serving_model_dir=SERVING_MODEL_DIR,
        metadata_path=METADATA_PATH
    )
)

Run the cell below to examine the outputs of the pipeline.

In [None]:
metadata_connection_config = sqlite_metadata_connection_config(
    METADATA_PATH
)

with Metadata(metadata_connection_config) as metadata_handler:
    ev_output=get_latest_artifacts(metadata_handler,
                                   PIPELINE_NAME,
                                   'ExampleValidator')
    anomalies_artifacts = ev_output[standard_component_specs.ANOMALIES_KEY]

Run the cell below to visualize the anomalies from ExampleValidator.

In [None]:
visualize_artifacts(anomalies_artifacts)