In [7]:
%%writefile {'modules/components.py'}

import os

import tensorflow_model_analysis as tfma
from tfx.components import (
    CsvExampleGen,
    StatisticsGen,
    SchemaGen,
    ExampleValidator,
    Transform,
    Tuner,
    Trainer,
    Evaluator,
    Pusher
)
from tfx.proto import example_gen_pb2, trainer_pb2, pusher_pb2
from tfx.types import Channel
from tfx.dsl.components.common.resolver import Resolver
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.dsl.input_resolution.strategies.latest_blessed_model_strategy import (
    LatestBlessedModelStrategy
)

def init_components(
        data_dir,
        transform_module,
        tuner_module,
        training_module,
        training_steps,
        eval_steps,
        serving_model_dir
    ):
    '''
    Initiate TFX pipeline components.
    
    Args:
        data_dir (str): Path to the data.
        transform_module (str): Path to the transform module.
        tuner_module (str): Path to the tuner module.
        training_module (str): Path to the training module.
        training_steps (int): Number of training steps.
        eval_steps (int): Number of evaluation steps.
        serving_model_dir (str): Path to the serving model directory.
    
    Returns:
        TFX components.
    '''

    output = example_gen_pb2.Output(
        split_config=example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=8),
            example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2)
        ])
    )

    example_gen = CsvExampleGen(input_base=data_dir, output_config=output)

    statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

    schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'])

    example_validator = ExampleValidator(
        statistics=statistics_gen.outputs['statistics'],
        schema=schema_gen.outputs['schema']
    )

    transform = Transform(
        examples=example_gen.outputs['examples'],
        schema=schema_gen.outputs['schema'],
        module_file=os.path.abspath(transform_module)
    )

    tuner = Tuner(
        module_file=os.path.abspath(tuner_module),
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(
            splits=['train'],
            num_steps=training_steps),
        eval_args=trainer_pb2.EvalArgs(
            splits=['eval'],
            num_steps=eval_steps)
    )

    trainer = Trainer(
        module_file=os.path.abspath(training_module),
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        hyperparameters=tuner.outputs['best_hyperparameters'],
        train_args=trainer_pb2.TrainArgs(
            splits=['train'],
            num_steps=training_steps),
        eval_args=trainer_pb2.EvalArgs(
            splits=['eval'],
            num_steps=eval_steps)
    )
    
    model_resolver = Resolver(
        strategy_class=LatestBlessedModelStrategy,
        model=Channel(type=Model),
        model_blessing=Channel(type=ModelBlessing)
    ).with_id('Latest_blessed_model_resolver')

    eval_config = tfma.EvalConfig(
        model_specs=[tfma.ModelSpec(label_key='diabetes')],
        slicing_specs=[
            tfma.SlicingSpec(),
            tfma.SlicingSpec(feature_keys=['diabetes'])
        ],
        metrics_specs=[
            tfma.MetricsSpec(metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='AUC'),
                tfma.MetricConfig(class_name='Precision'),
                tfma.MetricConfig(class_name='Recall'),
                tfma.MetricConfig(
                    class_name='BinaryAccuracy',
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={'value': 0.5}
                        ),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={'value': 0.0001}
                        )
                    )
                )
            ])
        ]
    )

    evaluator = Evaluator(
        examples=example_gen.outputs['examples'],
        model=trainer.outputs['model'],
        baseline_model=model_resolver.outputs['model'],
        eval_config=eval_config
    )

    pusher = Pusher(
        model=trainer.outputs['model'],
        model_blessing=evaluator.outputs['blessing'],
        push_destination=pusher_pb2.PushDestination(
            filesystem=pusher_pb2.PushDestination.Filesystem(
                base_directory=serving_model_dir
            )
        )
    )

    components = (
        example_gen,
        statistics_gen,
        schema_gen,
        example_validator,
        transform,
        tuner,
        trainer,
        model_resolver,
        evaluator,
        pusher
    )

    return components
    

Overwriting modules/components.py


In [8]:
%%writefile {'modules/transform.py'}

import tensorflow as tf
import tensorflow_transform as tft

CATEGORICAL_FEATURES = [
    'high_bp',
    'high_chol',
    'chol_check',
    'smoker',
    'stroke',
    'heart_disease_or_attack',
    'phys_activity',
    'fruits',
    'veggies',
    'hvy_alcohol_consump',
    'any_healthcare',
    'no_docbc_cost',
    'diff_walk',
    'sex'
]

NUMERICAL_FEATURES = [
    'bmi',
    'gen_hlth',
    'ment_hlth',
    'phys_hlth',
    'age',
    'education',
    'income'
]

LABEL_KEY = 'diabetes'

def transformed_name(key):
    '''
    Rename transformed features.

    Args:
        key (str): Feature name to be transformed.
    
    Returns:
        str: Transformed feature name.
    '''

    return key + '_xf'

def preprocessing_fn(inputs):
    '''
    Preprocess input features into transformed features.
    
    Args:
        inputs (dict): Map from feature keys to raw features.
        
    Returns:
        outputs (dict): Map from feature keys to transformed features.
    '''

    outputs = {}

    for feature in CATEGORICAL_FEATURES:
        outputs[transformed_name(feature)] = tf.cast(inputs[feature], tf.int64)

    for feature in NUMERICAL_FEATURES:
        outputs[transformed_name(feature)] = tft.scale_to_0_1(inputs[feature])

    outputs[transformed_name(LABEL_KEY)] = tf.cast(inputs[LABEL_KEY], tf.int64)

    return outputs
    

Overwriting modules/transform.py


In [9]:
%%writefile {'modules/tuner.py'}

from typing import NamedTuple, Dict, Text, Any, List
import tensorflow as tf
import tensorflow_transform as tft
from tfx.components.trainer.fn_args_utils import FnArgs

from keras.layers import (
    Input,
    concatenate,
    Dense,
    BatchNormalization,
    Dropout
)
from keras.callbacks import EarlyStopping
import kerastuner as kt
from keras_tuner.engine import base_tuner

from modules.transform import (
    CATEGORICAL_FEATURES,
    NUMERICAL_FEATURES,
    LABEL_KEY,
    transformed_name
)

# Define the result type for the tuner function.
TunerFnResult = NamedTuple('TunerFnResult', [
    ('tuner', base_tuner.BaseTuner), 
    ('fit_kwargs', Dict[Text, Any])
])

# Configure early stopping to prevent overfitting.
early_stopping = EarlyStopping(
    monitor='val_binary_accuracy', 
    patience=3,
    verbose=1, 
    mode='max'
)

def gzip_reader_fn(filenames):
    '''Loads compressed data.'''
    
    return tf.data.TFRecordDataset(filenames, compression_type='GZIP')

def input_fn(
        file_pattern,
        tf_transform_output,
        batch_size=64
    ) -> tf.data.Dataset:
    '''
    Generates features and labels for tuning/training.
    
    Args:
        file_pattern (str): Input tfrecord file pattern.
        tf_transform_output: A TFTransformOutput.
        batch_size (int): Number of consecutive elements of returned dataset.

    Returns:
        A dataset that provides (features, label) tuples.
    '''

    transformed_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy()
    )

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=gzip_reader_fn,
        label_key=transformed_name(LABEL_KEY)
    )

    return dataset

def tuner_model(hp):
    '''
    Builds the model for hyperparameter tuning.

    Args:
        hp: Hyperparameters object.

    Returns:
        tf.keras.Model: Compiled model.
    '''

    # Define input layers.
    input_features = []

    for feature in CATEGORICAL_FEATURES + NUMERICAL_FEATURES:
        input_features.append(
            Input(shape=(1,), name=transformed_name(feature))
        )

    # Concatenate inputs and pass through Sequential layers.
    concatenated = concatenate(input_features)

    # Define dense layers as a Sequential model.
    dense_layers = tf.keras.models.Sequential([
        Dense(
            hp.Choice('dense_units', values=[128, 256, 512]), 
            activation='relu'
        ),
        BatchNormalization(),
        Dropout(hp.Float('dropout_rate', min_value=0.1, max_value=0.5, step=0.1)),
        Dense(1, activation='sigmoid')
    ], name='dense_stack')
    
    outputs = dense_layers(concatenated)

    # Build and compile model.
    model = tf.keras.Model(inputs=input_features, outputs=outputs)
    model.compile(
        loss='binary_crossentropy',
        optimizer=tf.keras.optimizers.Adam(
            hp.Choice('learning_rate', values=[1e-2, 1e-3, 1e-4])
        ),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )

    return model

def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
    '''
    Tune the hyperparameters for the model.

    Args:
        fn_args: Holds args as name/value pairs.
    
    Returns:
        A namedtuple contains the tuner and fit_kwargs.
    '''

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_graph_path)

    # Prepare training and validation datasets.
    train_set = input_fn(fn_args.train_files[0], tf_transform_output)
    val_set = input_fn(fn_args.eval_files[0], tf_transform_output)

    # Initialize the tuner.
    tuner = kt.Hyperband(
        tuner_model,
        objective='val_binary_accuracy',
        max_epochs=10,
        factor=3,
        directory=fn_args.working_dir,
        project_name='diabetes_kt_hyperband'
    )

    # Return the tuner and fit arguments.
    return TunerFnResult(
        tuner=tuner,
        fit_kwargs={
            'x': train_set,
            'validation_data': val_set,
            'steps_per_epoch': 1000,
            'validation_steps': 1000,
            'callbacks': [early_stopping]
        }
    )


Overwriting modules/tuner.py


In [10]:
%%writefile {'modules/trainer.py'}

import os

import tensorflow as tf
import tensorflow_transform as tft
from keras.utils.vis_utils import plot_model
from keras.layers import (
    Input,
    concatenate,
    Dense,
    BatchNormalization,
    Dropout
)
from keras.callbacks import TensorBoard, EarlyStopping, ModelCheckpoint

from modules.transform import (
    CATEGORICAL_FEATURES,
    NUMERICAL_FEATURES,
    LABEL_KEY,
    transformed_name
)
from modules.tuner import gzip_reader_fn, input_fn

def trainer_model(hp):
    '''
    Builds the model for training.

    Args:
        hp: Hyperparameters object.

    Returns:
        tf.keras.Model: Compiled model.
    '''

    # Define input layers.
    input_features = []

    for feature in CATEGORICAL_FEATURES + NUMERICAL_FEATURES:
        input_features.append(
            Input(shape=(1,), name=transformed_name(feature))
        )

    # Concatenate inputs and pass through Sequential layers.
    concatenated = concatenate(input_features)

    # Define dense layers as a Sequential model.
    dense_layers = tf.keras.models.Sequential([
        Dense(hp['dense_units'], activation='relu'),
        BatchNormalization(),
        Dropout(hp['dropout_rate']),
        Dense(1, activation='sigmoid')
    ], name='dense_stack')
    
    outputs = dense_layers(concatenated)

    # Build and compile model.
    model = tf.keras.Model(inputs=input_features, outputs=outputs)
    model.compile(
        loss='binary_crossentropy',
        optimizer=tf.keras.optimizers.Adam(hp['learning_rate']),
        metrics=[tf.keras.metrics.BinaryAccuracy()]
    )

    model.summary()

    return model

def get_serve_tf_examples_fn(model, tf_transform_output):
    '''Returns a function that parses a serialized tf.Example.'''

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        '''Returns the output to be used in the serving signature.'''

        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(LABEL_KEY)
        parsed_features = tf.io.parse_example(
            serialized_tf_examples, feature_spec
        )
        transformed_features = model.tft_layer(parsed_features)
        outputs = model(transformed_features)

        return {'outputs': outputs}
    
    return serve_tf_examples_fn

def run_fn(fn_args) -> None:
    '''
    Train the model based on given args.
    
    Args:
        fn_args: Holds args used to train the model as name/value pairs.
    '''

    # Load the transform output.
    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    # Prepare training and validation datasets.
    train_set = input_fn(fn_args.train_files, tf_transform_output)
    val_set = input_fn(fn_args.eval_files, tf_transform_output)

    # Build model.
    hp = fn_args.hyperparameters['values']
    model = trainer_model(hp)

    # Define callbacks.
    log_dir = os.path.join(os.path.dirname(fn_args.serving_model_dir), 'logs')
    tensorboard = TensorBoard(log_dir=log_dir, update_freq='batch')
    early_stopping = EarlyStopping(
        monitor='val_binary_accuracy', 
        patience=3, 
        verbose=1, 
        mode='max', 
        restore_best_weights=True
    )
    model_checkpoint = ModelCheckpoint(
        fn_args.serving_model_dir,
        monitor='val_binary_accuracy',
        verbose=1,
        save_best_only=True,
        mode='max'
    )

    # Train model.
    model.fit(
        train_set,
        steps_per_epoch=fn_args.train_steps,
        validation_data=val_set,
        validation_steps=fn_args.eval_steps,
        callbacks=[tensorboard, early_stopping, model_checkpoint],
        epochs=10
    )

    # Define and save model signatures for serving
    signatures = {
        'serving_default': get_serve_tf_examples_fn(
            model, tf_transform_output
        ).get_concrete_function(
            tf.TensorSpec(shape=[None], dtype=tf.string, name='examples')
        )
    }

    model.save(
        fn_args.serving_model_dir, save_format='tf', signatures=signatures
    )

    # Save model plot.
    plot_model(
        model, 
        to_file='images/model_plot.png', 
        show_shapes=True, 
        show_layer_names=True
    )


Overwriting modules/trainer.py


In [11]:
%%writefile {'local_pipeline.py'}

import os
from typing import Text

from absl import logging
from tfx.orchestration import metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

PIPELINE_NAME = 'diabetes-pipeline'

# Pipeline inputs.
DATA_ROOT = 'data'
TRANSFORM_MODULE_FILE = 'modules/transform.py'
TUNER_MODULE_FILE = 'modules/tuner.py'
TRAINER_MODULE_FILE = 'modules/trainer.py'

# Pipeline outputs.
OUTPUT_BASE = 'output'
serving_model_dir = os.path.join(OUTPUT_BASE, 'serving_model')
pipeline_root = os.path.join(OUTPUT_BASE, PIPELINE_NAME)
metadata_path = os.path.join(pipeline_root, 'metadata.sqlite')

def init_local_pipeline(
        components,
        pipeline_root: Text
    ) -> pipeline.Pipeline:

    logging.info(f'Pipeline root set to: {pipeline_root}')
    beam_args = [
        '--direct_running_mode=multi_processing',
        '--direct_num_workers=1'
    ]

    return pipeline.Pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        beam_pipeline_args=beam_args
    )

if __name__ == '__main__':
    logging.set_verbosity(logging.INFO)

    from modules.components import init_components

    components = init_components(
        data_dir=DATA_ROOT,
        transform_module=TRANSFORM_MODULE_FILE,
        tuner_module=TUNER_MODULE_FILE,
        training_module=TRAINER_MODULE_FILE,
        training_steps=10000,
        eval_steps=1000,
        serving_model_dir=serving_model_dir
    )

    pipeline = init_local_pipeline(components, pipeline_root)
    BeamDagRunner().run(pipeline=pipeline)
    

Overwriting local_pipeline.py


In [12]:
!python local_pipeline.py

2025-02-11 13:23:57.795001: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


2025-02-11 13:23:57.916983: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2025-02-11 13:23:57.917034: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2025-02-11 13:23:57.945570: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2025-02-11 13:23:58.579986: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2025-02-11 13:23:58.580082: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: ca