## Copy your notebook version

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/googlecolab/colabtools/blob/master/notebooks/colab-github-demo.ipynb)

# Workshop - Developing TensorFlow Extended Components

TLDR: TensorFlow Extended (TFX) allows data scientists to assemble production pipelines for model updates and then run the pipelines of a variety of orchestration tools.

TFX provide basic components to ingest, validate and transform data, as well as for model training, tuning, validation and deployments. 

![TFX Pipeline](https://drive.google.com/uc?export=view&id=1yOPZTcIgF6arI7CLeWR1gTxrL1_MfQpL)

Figure taken from "Building Machine Learning Pipelines", O'Reilly July 2020, Hapke, Nelson

One of the strengths of TFX is the extensibility of the framework by building custom components.

### Applications for custom components can:

* Ingestion of user specific data (e.g. images or custom database tables)
* Compiling specific pipeline reports
* Communicating pipeline results (e.g. via Slack or MS Teams)
* Generating additional pipeline artifacts, e.g. model and data cards

## Workshop Outline

In this workshop, we'll introduce two ways of building your TFX components for your ML pipelines. In particular, we'll focus on:

* Brief overview of TFX and pipelines
* Presentation how to build a component from scratch
* Workshop how to extend existing components 

In this workshop we are implementing a TFX component to ingest images directly into the ML pipeline and generate labels for each image instead of converting the images to TFRecord representations outside of the pipeline.

What are the benefits of the implementation?

* Conversion is tracked in the ML Metadata store
* Component output can be cached
* No "glue code" required to connect the images to the pipeline

![TFX **Component**](https://drive.google.com/uc?export=view&id=1zyPDzXH7V-wY-AHcmOsUGpgkyPYYm2iG)

Figure taken from "Building Machine Learning Pipelines", O'Reilly July 2020, Hapke, Nelson

## TFX - Quick Intro

TFX provides a variety of stand-alone tools and pipeline components.


![TFX Components](https://drive.google.com/uc?export=view&id=15ftktZN2o1MBim4sN29GTRW3hoUWzwfR)
Figure taken from "Building Machine Learning Pipelines", O'Reilly July 2020, Hapke, Nelson



## TFX Components

TFX components consists of 3 parts: 

*   Component driver
*   Component executor
*   Component publisher

The driver and publisher communicate with the ML metadata store and they retrieve the ML artifacts. Components pass data references from component to component and not the actual data!

The action happens in the component executor. More later about that ...

![TFX **Component**](https://drive.google.com/uc?export=view&id=1TopCi6XjouwJyVho0PEDIZl9fvEOA1UB)

## How to implement a component?

![TFX **Component** Implementation Details](https://drive.google.com/uc?export=view&id=1nDYwHpNFyY3r2GrylD8qiuPE2xM60RHI)

Figures taken from "Building Machine Learning Pipelines", O'Reilly July 2020, Hapke, Nelson


## Extending existing TFX components

![TFX **Component**](https://drive.google.com/uc?export=view&id=1Hg-iUp8UF5Jh3dpdL-htG-Cw5g7GKqF3)

### Benefits:

* Less boiler plate code
* Reuse of existing component drivers and publishers
* Faster implementation

## Where to find more details?

If you are interested in a detailed introduction to TensorFlow Extended and other TensorFlow libraries, check out the recent O'Reilly publication on machine learning pipelines.

<img src="https://drive.google.com/uc?export=view&id=17Rtpso9UrE6HmhxCmtyd0aETr3WKSZ0e" width="450">

* [Amazon.com](https://www.amazon.com/dp/1492053198/)
* [Powells.com](https://www.powells.com/book/building-machine-learning-pipelines-9781492053194)

## Code Outline

* Download example dataset
* Install required Python packages
* Restart notebook kernel
* Import required packages & modules
* Define helper functions
* Walk through a component implementation from scratch
* Implement a component by overwriting the component executor
* Create a pipeline with the component

## Download example dataset

For this workshop we'll be using the public cats & dogs dataset created by Microsoft. The data set contains two folders: "Dog" and "Cat". 

In [None]:
!rm -rf /content/PetImages/
!rm *.zip

!wget https://download.microsoft.com/download/3/E/1/3E1C3F21-ECDB-4869-8368-6DEBA77B919F/kagglecatsanddogs_3367a.zip
!unzip -q -d /content/ /content/kagglecatsanddogs_3367a.zip

# number of images before reduction
!echo "Count images"
!ls -U /content/PetImages/Cat | wc -l
!ls -U /content/PetImages/Dog | wc -l

# remove lines below to test the pipeline with the entire dataset
!echo "Reduce images for demo purposes"
!cd /content/PetImages/Cat && ls -U | head -12000 | xargs rm 
!cd /content/PetImages/Dog && ls -U | head -12000 | xargs rm 

!echo "Count images after removal"
!ls -U /content/PetImages/Cat | wc -l
!ls -U /content/PetImages/Dog | wc -l

## Install required Python packages

In [None]:
%tensorflow_version 2.x

!pip install -qU tfx

## Restart notebook kernel

In [None]:
import IPython
IPython.Application.instance().kernel.do_shutdown(True)

## Import required packages & modules


In [None]:
import base64
import logging
import os
import random
import re
import sys
from typing import Any, Dict, Iterable, List, Text

import absl
import apache_beam as beam
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
from google.protobuf import json_format
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.saved import saved_transform_io
from tensorflow_transform.tf_metadata import (dataset_metadata, dataset_schema,
                                              metadata_io, schema_utils)
from tfx import types
from tfx.components import (Evaluator, Pusher, ResolverNode, StatisticsGen,
                            Trainer)
from tfx.components.base import (base_component, base_driver, base_executor,
                                 executor_spec)
from tfx.components.example_gen import driver
from tfx.components.example_gen.base_example_gen_executor import (
    INPUT_KEY, BaseExampleGenExecutor)
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen.import_example_gen.component import \
    ImportExampleGen
from tfx.components.example_gen.utils import dict_to_example
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.executor import GenericExecutor
from tfx.components.transform.component import Transform
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import data_types, metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.orchestration.experimental.interactive.interactive_context import \
    InteractiveContext
from tfx.proto import evaluator_pb2, example_gen_pb2, pusher_pb2, trainer_pb2
from tfx.types import (Channel, artifact_utils, channel_utils,
                       standard_artifacts)
from tfx.types.component_spec import ChannelParameter, ExecutionParameter
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.utils import io_utils
from tfx.utils.dsl_utils import external_input

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.CRITICAL)

## Define helper functions


In [None]:
def _int64_feature(value):
    """Wrapper for inserting int64 features into Example proto."""
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def _bytes_feature(value):
    """Wrapper for inserting bytes features into Example proto."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def get_label_from_filename(filename):
    """ Function to set the label for each image. In our case, we'll use the file 
    path of a label indicator. Based on your initial data 
    Args:
      filename: string, full file path
    Returns:
      int - label
    Raises:
      NotImplementedError if not label category was detected
    
    """

    lowered_filename = filename.lower()
    if "dog" in lowered_filename:
        label = 0
    elif "cat" in lowered_filename:
        label = 1
    else:
        raise NotImplementedError("Found unknown image")
    return label
    

def _convert_to_example(image_buffer, label):
    """Function to convert image byte strings and labels into tf.Example structures
      Args:
        image_buffer: byte string representing the image
        label: int
      Returns:
        TFExample data structure containing the image (byte string) and the label (int encoded)
    """

    example = tf.train.Example(
        features=tf.train.Features(
            feature={
                'image/raw': _bytes_feature(image_buffer),
                'label': _int64_feature(label)
            }))
    return example


def get_image_data(filename):
    """Process a single image file.
    Args:
      filename: string, path to an image file e.g., '/path/to/example.JPG'.
    Returns:
      TFExample data structure containing the image (byte string) and the label (int encoded)
    """
    label = get_label_from_filename(filename)
    byte_content = tf.io.read_file(filename)
    rs = _convert_to_example(byte_content.numpy(), label)
    return rs


## Walk through a component implementation from scratch


### Custom Component Specifications

https://github.com/tensorflow/tfx/blob/master/tfx/types/standard_artifacts.py

Difference between ChannelParameter and ExecutionParameter

In [None]:
class CustomIngestionComponentSpec(types.ComponentSpec):
    """ComponentSpec for Custom Ingestion Component."""
    
    PARAMETERS = {
        'name': ExecutionParameter(type=Text),
    }
    INPUTS = {
        'input': ChannelParameter(type=standard_artifacts.ExternalArtifact),
    }
    OUTPUTS = {
        'examples': ChannelParameter(type=standard_artifacts.Examples),
    }

### Custom Component Executor

In [None]:
class CustomIngestionExecutor(base_executor.BaseExecutor):
    """Executor for CustomIngestionComponent."""

    def Do(self, input_dict: Dict[Text, List[types.Artifact]],
          output_dict: Dict[Text, List[types.Artifact]],
          exec_properties: Dict[Text, Any]) -> None:

        input_base_uri = artifact_utils.get_single_uri(input_dict['input'])
        image_files = tf.io.gfile.listdir(input_base_uri)
        random.shuffle(image_files)

        train_images, eval_images = image_files[10000:], image_files[:10000]
        splits = [('train', train_images), ('eval', eval_images)]

        for split_name, images in splits:
            output_dir = artifact_utils.get_split_uri(
                output_dict['examples'], split_name)
            
            tfrecords_filename = os.path.join(output_dir, 'images.tfrecords')
            
            options = tf.io.TFRecordOptions(compression_type=None)
            writer = tf.io.TFRecordWriter(tfrecords_filename, options=options)

            for image_filename in images:
                image_path = os.path.join(input_base_uri, image_filename)
                example = get_image_data(image_path)
                writer.write(example.SerializeToString())

### Custom Component Driver

In [None]:
class CustomIngestionDriver(base_driver.BaseDriver):
    """Custom driver for CustomIngestion component.

    This driver supports file based ExampleGen, it registers external file path as
    an artifact, similar to the use cases CsvExampleGen and ImportExampleGen.
    """

    def resolve_input_artifacts(
        self,
        input_channels: Dict[Text, types.Channel],
        exec_properties: Dict[Text, Any],
        driver_args: data_types.DriverArgs,
        pipeline_info: data_types.PipelineInfo,
    ) -> Dict[Text, List[types.Artifact]]:
        """Overrides BaseDriver.resolve_input_artifacts()."""
        del driver_args  # unused
        del pipeline_info  # unused

        input_config = example_gen_pb2.Input()
        input_dict = channel_utils.unwrap_channel_dict(input_channels)
        for input_list in input_dict.values():
            for single_input in input_list:
                self._metadata_handler.publish_artifacts([single_input])
                
        return input_dict

### Component Component 

Putting all pieces together.

In [None]:
class CustomIngestionComponent(base_component.BaseComponent):
    """CustomIngestion Component."""
    SPEC_CLASS = CustomIngestionComponentSpec
    EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(CustomIngestionExecutor)
    DRIVER_CLASS = CustomIngestionDriver

    def __init__(self,
                input: types.Channel = None,
                output_data: types.Channel = None,
                name: Text = None):
      if not output_data:
          examples_artifact = standard_artifacts.Examples()
          examples_artifact.split_names = artifact_utils.encode_split_names(['train', 'eval'])
          output_data = channel_utils.as_channel([examples_artifact])
      spec = CustomIngestionComponentSpec(input=input,
                                          examples=output_data, 
                                          name=name)
      super(CustomIngestionComponent, self).__init__(spec=spec)

## Basic Pipeline

In [None]:
test_context = InteractiveContext()

data_root = os.path.join("/content/", 'PetImages', 'Dog')
examples = external_input(data_root)

ingest_images = CustomIngestionComponent(
    input=examples, name='ImageIngestionComponent')
test_context.run(ingest_images)

In [None]:
statistics_gen = StatisticsGen(
    examples=ingest_images.outputs['examples'])
test_context.run(statistics_gen)

test_context.show(statistics_gen.outputs['statistics'])

## Implement a component by overwriting the component executor

In [None]:
@beam.ptransform_fn 
def ImageToExample(
      pipeline: beam.Pipeline,
      input_dict: Dict[Text, List[types.Artifact]],
      exec_properties: Dict[Text, Any],
      split_pattern: Text) -> beam.pvalue.PCollection:
    """Read jpeg files and transform to TF examples.

    Note that each input split will be transformed by this function separately.

    Args:
        pipeline: beam pipeline.
        input_dict: Input dict from input key to a list of Artifacts.
          - input_base: input dir that contains the image data.
        exec_properties: A dict of execution properties.
        split_pattern: Split.pattern in Input config, glob relative file pattern
          that maps to input files with root directory given by input_base.

    Returns:
        PCollection of TF examples.
    """

    input_base_uri = artifact_utils.get_single_uri(input_dict['input'])
    image_pattern = os.path.join(input_base_uri, split_pattern)
    absl.logging.info(
        'Processing input image data {} to TFExample.'.format(image_pattern))

    image_files = tf.io.gfile.glob(image_pattern)
    if not image_files:
        raise RuntimeError(
            'Split pattern {} does not match any files.'.format(image_pattern))

    return (
        pipeline
        | beam.Create(image_files)
        | 'ConvertImagesToBase64' >> beam.Map(lambda file: get_image_data(file))
    )


In [None]:
class ImageExampleGenExecutor(BaseExampleGenExecutor):
    """TFX example gen executor for processing jpeg format.

    Example usage:

      from tfx.components.example_gen.component import
      FileBasedExampleGen
      from tfx.utils.dsl_utils import external_input

      example_gen = FileBasedExampleGen(
          input=external_input("/content/PetImages/"),
          input_config=input_config,
          output_config=output,
          custom_executor_spec=executor_spec.ExecutorClassSpec(_Executor))
    """

    def GetInputSourceToExamplePTransform(self) -> beam.PTransform:
        """Returns PTransform for image to TF examples."""
        return ImageToExample

## Building your ML Pipeline

In [None]:
context = InteractiveContext()

output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=4),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))

input_config = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='images', pattern='*/*.jpg'),
])

example_gen = FileBasedExampleGen(
    input=external_input("/content/PetImages/"),
    input_config=input_config,
    output_config=output,
    custom_executor_spec=executor_spec.ExecutorClassSpec(ImageExampleGenExecutor))

context.run(example_gen)

In [None]:
statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])
context.run(statistics_gen)

In [None]:
context.show(statistics_gen.outputs['statistics'])

In [None]:
schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)

context.run(schema_gen)

In [None]:
context.show(schema_gen.outputs['schema'])

In [None]:
example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])

context.run(example_validator)

In [None]:
%%writefile constants.py

from typing import Text

def transformed_name(key: Text) -> Text:
  """Generate the name of the transformed feature from original name."""
  return key + '_xf'

# Keys
LABEL_KEY = 'label'
INPUT_KEY = 'image/raw'

# Feature keys
RAW_FEATURE_KEYS = [INPUT_KEY]

# Constants
IMG_SIZE = 160

In [None]:
%%writefile transform.py

import tensorflow as tf
import tensorflow_transform as tft
import logging

from typing import Union, Dict

import constants
import numpy as np


def convert_image(raw_image: tf.Tensor) -> tf.Tensor:

    if tf.io.is_jpeg(raw_image):
        image = tf.io.decode_jpeg(raw_image, channels=3)
        image = tf.cast(image, tf.float32)
        image = (image / 127.5) - 1 
        image = tf.image.resize(image, [constants.IMG_SIZE, constants.IMG_SIZE])
        return image
    return tf.constant(np.zeros((constants.IMG_SIZE, constants.IMG_SIZE, 3)), tf.float32)

def fill_in_missing(x: Union[tf.Tensor, tf.SparseTensor]) -> tf.Tensor:
    """Replace missing values in a SparseTensor.

    Fills in missing values of `x` with '' or 0, and converts to a dense tensor.

    Args:
      x: A `SparseTensor` of rank 2.  Its dense shape should have size at most 1
        in the second dimension.

    Returns:
      A rank 1 tensor where missing values of `x` have been filled in.
    """
    if isinstance(x, tf.sparse.SparseTensor):
        default_value = "" if x.dtype == tf.string else 0
        x = tf.sparse.to_dense(
            tf.SparseTensor(x.indices, x.values, [x.dense_shape[0], 1]),
            default_value,
        )
    return tf.squeeze(x, axis=1)


def preprocessing_fn(inputs: Dict[str, Union[tf.Tensor, tf.SparseTensor]]) -> Dict[str, tf.Tensor]:
    """tf.transform's callback function for preprocessing inputs.
    """
    outputs = {}

    for key in constants.RAW_FEATURE_KEYS:
        image = fill_in_missing(inputs[key])
        outputs[constants.transformed_name(key)] = convert_image(images)
    
    outputs[constants.transformed_name(constants.LABEL_KEY)] = inputs[constants.LABEL_KEY]

    return outputs

In [None]:
transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath("transform.py"))

context.run(transform)

In [None]:
%%writefile {"trainer.py"}

from typing import List, Text, Dict

import os
import absl
import tensorflow as tf
import tensorflow_transform as tft
from datetime import datetime

from tfx.components.trainer.executor import TrainerFnArgs

import constants

TRAIN_BATCH_SIZE = 32
EVAL_BATCH_SIZE = 32


def _gzip_reader_fn(filenames):
    """Small utility returning a record reader that can read gzip'ed files."""

    return tf.data.TFRecordDataset(
        filenames,
        compression_type='GZIP')
  

def _get_label_for_image(model, tf_transform_output):
    """Returns a function that parses a raw byte image and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()
      
    @tf.function
    def serve_images_fn(image_raw):
        """Returns the output to be used in the serving signature."""

        image_raw = tf.reshape(image_raw, [-1, 1])
        parsed_features = {'image': image_raw}
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_images_fn


def _get_serve_tf_examples_fn(model, tf_transform_output):
    """Returns a function that parses a serialized tf.Example and applies TFT."""

    model.tft_layer = tf_transform_output.transform_features_layer()

    @tf.function
    def serve_tf_examples_fn(serialized_tf_examples):
        """Returns the output to be used in the serving signature."""
        
        feature_spec = tf_transform_output.raw_feature_spec()
        feature_spec.pop(constants.LABEL_KEY)

        parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
        transformed_features = model.tft_layer(parsed_features)
        return model(transformed_features)

    return serve_tf_examples_fn


def _input_fn(file_pattern: List[Text], 
              tf_transform_output: tft.TFTransformOutput, 
              batch_size: int = 32, 
              is_train: bool = False) -> tf.data.Dataset:
    """Generates features and label for tuning/training.

    Args:
      file_pattern: input tfrecord file pattern.
      tf_transform_output: A TFTransformOutput.
      batch_size: representing the number of consecutive elements of returned
        dataset to combine in a single batch

    Returns:
      A dataset that contains (features, indices) tuple where features is a
        dictionary of Tensors, and indices is a single Tensor of label indices.
    """
    transformed_feature_spec = (
        tf_transform_output.transformed_feature_spec().copy())

    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=file_pattern,
        batch_size=batch_size,
        features=transformed_feature_spec,
        reader=_gzip_reader_fn,
        label_key=constants.transformed_name(constants.LABEL_KEY))

    return dataset


def get_model() -> tf.keras.Model:
    """Creates a CNN Keras model based on transfer learning for classifying image data.

    Returns:
      A keras Model.
    """
    img_shape = (constants.IMG_SIZE, constants.IMG_SIZE, 3)

    # Create the base model from the pre-trained model MobileNet V2
    base_model = tf.keras.applications.MobileNetV2(input_shape=img_shape,
                                                    include_top=False,
                                                    weights='imagenet')
    base_model.trainable = False
    base_model.summary()
    global_average_layer = tf.keras.layers.GlobalAveragePooling2D()
      
    output = tf.keras.layers.Dense(1)
      
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=img_shape, name=constants.transformed_name(constants.INPUT_KEY)),
        base_model,
        global_average_layer,
        tf.keras.layers.Dropout(0.2),
        output
    ])

    model.compile(optimizer=tf.optimizers.RMSprop(lr=0.01),
        loss=tf.losses.BinaryCrossentropy(from_logits=True),
        metrics=[tf.metrics.BinaryAccuracy(name='accuracy')])
    model.summary()
      
    return model


def run_fn(fn_args: TrainerFnArgs):
    """Train the model based on given args.

    Args:
      fn_args: Holds args used to train the model as name/value pairs.
    """

    tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

    train_dataset = _input_fn(fn_args.train_files, tf_transform_output,
                              TRAIN_BATCH_SIZE, is_train = True)
    eval_dataset = _input_fn(fn_args.eval_files, tf_transform_output,
                             EVAL_BATCH_SIZE)

    # check for availabe tpu and gpu units
    try:
        tpu = tf.distribute.cluster_resolver.TPUClusterResolver()
        tf.config.experimental_connect_to_cluster(tpu)
        tf.tpu.experimental.initialize_tpu_system(tpu)
        strategy = tf.distribute.experimental.TPUStrategy(tpu)
    except ValueError:
        strategy = tf.distribute.MirroredStrategy()

    with strategy.scope():
        model = get_model()

    model.fit(
        train_dataset,
        steps_per_epoch=fn_args.train_steps,
        validation_data=eval_dataset,
        validation_steps=fn_args.eval_steps,
    )

    signatures = {
        'serving_default':
            _get_serve_tf_examples_fn(model,
                                      tf_transform_output).get_concrete_function(
                                          tf.TensorSpec(
                                              shape=[None],
                                              dtype=tf.string,
                                              name='examples')),

    }
    model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)


In [None]:
trainer = Trainer(
    module_file=os.path.abspath("trainer.py"),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=1600),
    eval_args=trainer_pb2.EvalArgs(num_steps=200))

context.run(trainer)

In [None]:
eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='label')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='AUC'),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.65}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': 0.01})))
            ]
        )
    ],
    slicing_specs=[
        tfma.SlicingSpec()
    ])

model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))

context.run(model_resolver)

In [None]:
evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)

context.run(evaluator)

In [None]:
context.show(evaluator.outputs['evaluation'])

In [None]:
_serving_model_dir = "/content/exported_model"

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))

context.run(pusher)



---
## Entire End-to-End Pipeline with Apache Beam


In [None]:
import IPython
IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
import base64
import logging
import os
import random
import re
import sys
from typing import Any, Dict, Iterable, List, Text

import absl
import apache_beam as beam
import tensorflow as tf
import tensorflow_model_analysis as tfma
import tfx
from google.protobuf import json_format
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.saved import saved_transform_io
from tensorflow_transform.tf_metadata import (dataset_metadata, dataset_schema,
                                              metadata_io, schema_utils)
from tfx import types
from tfx.components import (Evaluator, Pusher, ResolverNode, StatisticsGen,
                            Trainer)
from tfx.components.base import (base_component, base_driver, base_executor,
                                 executor_spec)
from tfx.components.example_gen import driver
from tfx.components.example_gen.base_example_gen_executor import (
    INPUT_KEY, BaseExampleGenExecutor)
from tfx.components.example_gen.component import FileBasedExampleGen
from tfx.components.example_gen.import_example_gen.component import \
    ImportExampleGen
from tfx.components.example_gen.utils import dict_to_example
from tfx.components.example_validator.component import ExampleValidator
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.executor import GenericExecutor
from tfx.components.transform.component import Transform
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import data_types, metadata, pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.orchestration.experimental.interactive.interactive_context import \
    InteractiveContext
from tfx.proto import evaluator_pb2, example_gen_pb2, pusher_pb2, trainer_pb2
from tfx.types import (Channel, artifact_utils, channel_utils,
                       standard_artifacts)
from tfx.types.component_spec import ChannelParameter, ExecutionParameter
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.utils import io_utils
from tfx.utils.dsl_utils import external_input


In [None]:
%%writefile {"component_helper.py"}

import tensorflow as tf

def _int64_feature(value):
    """Wrapper for inserting int64 features into Example proto."""
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def _bytes_feature(value):
    """Wrapper for inserting bytes features into Example proto."""
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))


def get_label_from_filename(filename):
    """ Function to set the label for each image. In our case, we'll use the file 
    path of a label indicator. Based on your initial data 
    Args:
      filename: string, full file path
    Returns:
      int - label
    Raises:
      NotImplementedError if not label category was detected
    
    """

    lowered_filename = filename.lower()
    if "dog" in lowered_filename:
        label = 0
    elif "cat" in lowered_filename:
        label = 1
    else:
        raise NotImplementedError("Found unknown image")
    return label
    

def _convert_to_example(image_buffer, label):
    """Function to convert image byte strings and labels into tf.Example structures
      Args:
        image_buffer: byte string representing the image
        label: int
      Returns:
        TFExample data structure containing the image (byte string) and the label (int encoded)
    """

    example = tf.train.Example(
        features=tf.train.Features(
            feature={
                'image/raw': _bytes_feature(image_buffer),
                'label': _int64_feature(label)
            }))
    return example


def get_image_data(filename):
    """Process a single image file.
    Args:
      filename: string, path to an image file e.g., '/path/to/example.JPG'.
    Returns:
      TFExample data structure containing the image (byte string) and the label (int encoded)
    """
    label = get_label_from_filename(filename)
    byte_content = tf.io.read_file(filename)
    rs = _convert_to_example(byte_content.numpy(), label)
    return rs


In [None]:
from component_helper import get_image_data

@beam.ptransform_fn 
def ImageToExample(
      pipeline: beam.Pipeline,
      input_dict: Dict[Text, List[types.Artifact]],
      exec_properties: Dict[Text, Any],
      split_pattern: Text) -> beam.pvalue.PCollection:
    """Read jpeg files and transform to TF examples.

    Note that each input split will be transformed by this function separately.

    Args:
        pipeline: beam pipeline.
        input_dict: Input dict from input key to a list of Artifacts.
          - input_base: input dir that contains the image data.
        exec_properties: A dict of execution properties.
        split_pattern: Split.pattern in Input config, glob relative file pattern
          that maps to input files with root directory given by input_base.

    Returns:
        PCollection of TF examples.
    """

    input_base_uri = artifact_utils.get_single_uri(input_dict['input'])
    image_pattern = os.path.join(input_base_uri, split_pattern)
    absl.logging.info(
        'Processing input image data {} to TFExample.'.format(image_pattern))

    image_files = tf.io.gfile.glob(image_pattern)
    if not image_files:
        raise RuntimeError(
            'Split pattern {} does not match any files.'.format(image_pattern))

    return (
        pipeline
        | beam.Create(image_files)
        | 'ConvertImagesToBase64' >> beam.Map(lambda file: get_image_data(file))
    )

class ImageExampleGenExecutor(BaseExampleGenExecutor):
    """TFX example gen executor for processing jpeg format.

    Example usage:

      from tfx.components.example_gen.component import
      FileBasedExampleGen
      from tfx.utils.dsl_utils import external_input

      example_gen = FileBasedExampleGen(
          input=external_input("/content/PetImages/"),
          input_config=input_config,
          output_config=output,
          custom_executor_spec=executor_spec.ExecutorClassSpec(_Executor))
    """

    def GetInputSourceToExamplePTransform(self) -> beam.PTransform:
        """Returns PTransform for image to TF examples."""
        return ImageToExample

In [None]:
output = example_gen_pb2.Output(
             split_config=example_gen_pb2.SplitConfig(splits=[
                 example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=4),
                 example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))

input_config = example_gen_pb2.Input(splits=[
    example_gen_pb2.Input.Split(name='images', pattern='*/*.jpg'),
])

eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(label_key='label')
    ],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(class_name='ExampleCount'),
                tfma.MetricConfig(class_name='AUC'),
                tfma.MetricConfig(class_name='BinaryAccuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.65}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': 0.01})))
            ]
        )
    ],
    slicing_specs=[
        tfma.SlicingSpec()
    ])

_serving_model_dir = "/content/exported_model"

In [None]:
example_gen = FileBasedExampleGen(
    input=external_input("/content/PetImages/"),
    input_config=input_config,
    output_config=output,
    custom_executor_spec=executor_spec.ExecutorClassSpec(ImageExampleGenExecutor))

statistics_gen = StatisticsGen(
    examples=example_gen.outputs['examples'])

schema_gen = SchemaGen(
    statistics=statistics_gen.outputs['statistics'],
    infer_feature_shape=True)

example_validator = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=schema_gen.outputs['schema'])

transform = Transform(
    examples=example_gen.outputs['examples'],
    schema=schema_gen.outputs['schema'],
    module_file=os.path.abspath("transform.py"))

trainer = Trainer(
    module_file=os.path.abspath("trainer.py"),
    custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    train_args=trainer_pb2.TrainArgs(num_steps=500),
    eval_args=trainer_pb2.EvalArgs(num_steps=200))

model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))

evaluator = Evaluator(
    examples=example_gen.outputs['examples'],
    model=trainer.outputs['model'],
    baseline_model=model_resolver.outputs['model'],
    eval_config=eval_config)

pusher = Pusher(
    model=trainer.outputs['model'],
    model_blessing=evaluator.outputs['blessing'],
    push_destination=pusher_pb2.PushDestination(
        filesystem=pusher_pb2.PushDestination.Filesystem(
            base_directory=_serving_model_dir)))


In [None]:
pipeline_name = "dogs_cats_pipeline"

# pipeline inputs
base_dir = os.getcwd()
pipeline_dir = os.path.join(base_dir, "pipeline")

# pipeline outputs
output_base = os.path.join(pipeline_dir, "output", pipeline_name)
pipeline_root = os.path.join(output_base, "pipeline_root")
metadata_path = os.path.join(pipeline_root, "metadata.sqlite")


def init_beam_pipeline(
    components, pipeline_root: Text, direct_num_workers: int
) -> pipeline.Pipeline:

    absl.logging.info(f"Pipeline root set to: {pipeline_root}")
    beam_arg = [
        f"--direct_num_workers={direct_num_workers}",
    ]

    p = pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=components,
        enable_cache=True,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(
            metadata_path
        ),
        beam_pipeline_args=beam_arg,
    )
    return p

In [None]:
logger = logging.getLogger()
logger.setLevel(logging.INFO)

components = [
    example_gen,
    statistics_gen,
    schema_gen,
    example_validator,
    transform,
    trainer,
    model_resolver,
    evaluator,
    pusher,
]

p = init_beam_pipeline(components, pipeline_root, direct_num_workers=1)

In [None]:
BeamDagRunner().run(p)