# Advanced TFX

> I HAVE NOT FULLY TESTED THE CODE SEGMENTS IN THIS NOTEBOOK AND I AM SURE THAT CURRENT TFX DOCUMENTATION PROVIDE BETTER APPROACHES TO DO THE SAME THINGS!

> This notebook just points the topics that we should be aware of. Not actual working implementations.

As we saw most of the simpler ML works can be done using generic pipeline components provided by the TFX. But in some cases we need to go beyond the generic component flows and should have complex compoents. Here we are focusing on techniques we can use in such scenarios.


### Training multiple models simultaneously

In many production systems, it if often required to train more than one model. And TFX provide support to train several models using a single pipeline. In such cases all the data preparation/ validation and transformation steps remains the same, but from there onwards data may flow into different types of models. We can do this by defining several trainer components in the TFX pipeline.

In [None]:
def set_trainer(module_file, instance_name,
                train_steps=5000, eval_steps=100):
    return Trainer(
        module_file=module_file,
        custom_executor_spec=executor_spec.ExecutorClassSpec(
            GenericExecutor),
        examples=transform.outputs['transformed_examples'],
        transform_graph=transform.outputs['transform_graph'],
        schema=schema_gen.outputs['schema'],
        train_args=trainer_pb2.TrainArgs(num_steps=train_steps),
        eval_args=trainer_pb2.EvalArgs(num_steps=eval_steps),
        instance_name=instance_name)

Like using the above rough code segment, we can define different trainer components with different module files inside the pipeline. By doing this we can branch out and build complex ML pipelines than a single path pipeline.

<center><image src="imgs/6.jpg" width="500"/></center>

### TFLite model Exporting

Mobile application development is one of the most major task in today world software engineering tasks. When deploying ML models to such applications we have problems such as low computational power, low storage, restrained power usage etc. To use in such cases we can use TFLite models. We can convert our exising models as a part of our pipeline to TFLite models. But it should be noted that, not all TF operations are supported by the TFLite models. So should expect problems in converting problems if model uses complicated operations.

### Warm Start Model Training

Not to get confused with transfer learning. In tranfter learning we repurpose the weights learned to a different task. Instead in warm-start we train a model from a previous checkpoint and continue. This is extreamely useful in training large models and also to comply with GDPR like regulations.

### Human in the Loop

In some ML usecases, there are requirements to model to be checked by a human in the middle of the pipeline. Once this review is done model can be sent in to the next parts of the ML cycle and TFX provides experimental support to complete such requirements. 

For example, tfx provides a Slack component which will send a message in a slack channel notifying data scientist to review the model. Then data scientists can go and check the model with What if tool, validate edge cases etc and give model the blessing if its okay.

But basically speaking we can define our own custom components to connect to necessary channel to provide human in the loop like configuration.


### Custom TFX components

IMO custom TFX components are really important to get functionalities that would otherwise be hard to achieve using the provided components. Some example usages of custom components are,

- ingesting data from a custom database
- notifying responsible teams after a step in the pipeline(devops, daa science teams)
- triggering custom model deployment pipes
- tracking custom information of the ml pipeline

If we want to write a custom component, we need to implement few component pieces (spec, driver and executor). First we need to define the inputs and outputs of our component as a Component Spec. Then we can define the component executor which includes the logic for how the input data should be processed to required outputs. If we need additional data(inputs) that is not available in the metadata store, we may need to write a custom component driver as well.

For the demonstration we will consider a component, which will ingest jpeg images and its labels to the pipeline.

Before process the data we need to define the component specification, the inputs we are expecting and outputs we will send out (also need to define the parameters as well). These inputs are called channels in terms of TFX pipeline and pipeline components communicate through them. Below is a sample code segment for our example.

In [1]:
from tfx.types.component_spec import ChannelParameter
from tfx.types.component_spec import ExecutionParameter
from tfx.types import standard_artifacts
from tfx import types
from typing import Any, Dict, List, Text



class ImageIngestComponentSpec(types.ComponentSpec):

    PARAMETERS = {
        'name': ExecutionParameter(type=Text)
    }

    INPUTS = {
        'input': ChannelParameter(type=standard_artifacts.ExternalArtifact)
    }

    OUTPUTS = {
        'examples': ChannelParameter(type=standard_artifacts.Examples)
    }

Here we have defined 2 Channel parameters one for getting inputs as a path and other for storing the processed data(TFRecords). Also we are providing an additional parameter as name.

In above example we have used 2 types of `Channels` (the term used in TFX for defined inputs and outputs). Based on the usecase we might need to change the type of Channel for our inputs and outputs for example to customtrainer component we might need Example type channel as an input and ExampleStatistics as an output. Below are some of the available channel types we can use.

- ExampleStatistics                     
- Model
- ModelBlessing                         
- Bytes
- String                                
- Integer
- Float                                 
- HyperParameter

Once we have the spec we can define the executor part of our custom component. In this, we need to define a Do function which will be used by TFX for the execution. Below is the sample implementation.



In [4]:
from tfx.components.base import base_executor

class ImageIngestComponentExecutor(base_executor.BaseExecutor):

    def Do(self, input_dict: Dict[Text, List[types.Artifact]],
                 output_dict: Dict[Text, List[types.Artifact]],
                 exec_properties: Dict[Text, Any]) -> None:

        ...

The `Do` function expects 3 arguments namely input_dict, output_dict and exec_properties. These are python dictionaries and contain artifact references and execution properties we pass along. 

Before implementing the complete function, we will first write a dummy function to mimic reading images from disk.

In [5]:
import os
import tensorflow as tf

def _bytes_features(value):
    if isinstance(value, str):
        value = value.encode('utf8')
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _int64_features(value):
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def img_to_tfrecord(file_name, tf_writer, input_base_uri):

    '''dummy function to read image files from a given location'''

    image_path = os.path.join(input_base_uri, file_name)

    # We assume that last part of the image name contain its label.
    label = image_path.split()[1]

    raw_file = tf.io.read_file(image_path) 
    example = tf.train.Example(features=tf.train.Features(feature={ 
        'image_raw': _bytes_features(raw_file.numpy()),
        'label': _int64_features(label)
    }))
    tf_writer.write(example.SerializeToString())


Once we have the reading function in place, we can work on the actual implementation of the data ingestion component. It should be noted that reading files in batched need to be handled here. For simplicity below code does not include that. But ideally it should come as a execution parameter of the ComponentSpec.

In [6]:
from tfx.types import artifact_utils

class ImageIngestComponentExecutor(base_executor.BaseExecutor):

    def Do(self, input_dict: Dict[Text, List[types.Artifact]],
                 output_dict: Dict[Text, List[types.Artifact]],
                 exec_properties: Dict[Text, Any]) -> None:

        # Logging the starting of the function using the parent function
        self._log_startup(input_dict, output_dict, exec_properties)

        # Reading the `input` as we defined in the ComponentSpec
        input_base_uri = input_dict['input']
        image_files = tf.io.gfile.listdir(input_base_uri)

        # Spliting the data, This should be parameterized ideally
        train_images, eval_images = image_files[100:], image_files[:100]
        splits = [('train', train_images), ('eval', eval_images)]
        
        
        for split_name, images in splits:
            
            # Get the desired output dir details from the arguments.
            output_dir = artifact_utils.get_split_uri(
                output_dict['examples'], split_name)
            

            tf.io.gfile.mkdir(output_dir)
            tfrecords_filename = os.path.join(output_dir, 'images.tfrecords')
          
            options = tf.io.TFRecordOptions(compression_type=None)
            tf_writer = tf.io.TFRecordWriter(tfrecords_filename, options=options)

            for image_filename in images:
                example = img_to_tfrecord(image_filename, tf_writer, input_base_uri)
                tf_writer.write(example.SerializeToString())


TFX artifact_utils function provides various helper functions to get data from artifact dictionaries. 

Once we have the executor, we need to make sure it comply with the TFX. To do that we need to register our component inputs with the metadata store so that later components can identify the custom inputs. This is where custom drivers comes into play.

> It should be noted that custom drivers not very common to use. Usually we can reuse a existing TFX component's input/output architecture or if inputs are already registered with metadata store this is not needed.

We can use BaseDriver class provided by TFX to write a custom driver. There we need to overwrite the reslove_input_artifacts method. Here we need to use publish_artifacts function to register each of our inputs to the metadata store. Below is a sample implementation of such function.

In [7]:
# I HAVE NOT TESTED THIS CODE TO FUNCTION

from tfx.components.base import base_driver
from tfx.types import channel_utils

class ImageIngestDriver(base_driver.BaseDriver):

  """Custom driver for registering the image inputs."""

  def resolve_input_artifacts(
                    self,
                    input_channels: Dict[Text, types.Channel],
                    exec_properties: Dict[Text, Any],
                    driver_args,
                    pipeline_info) -> Dict[Text, List[types.Artifact]]:

    """Overrides BaseDriver.resolve_input_artifacts()."""
    
    # Deleting unused values
    del driver_args 
    del pipeline_info

    input_dict = channel_utils.unwrap_channel_dict(input_channels) 
    for input_list in input_dict.values():
        for single_input in input_list:

            # Publish the artifacts to the MEtadata store
            self._metadata_handler.publish_artifacts([single_input]) 
    return input_dict


But it should be noted in current TFX documentation they provide different and easy approaches. Also I have not tested this code, so better referring the documentation. 

[Tensorflow Extended Custom Components](tensorflow.org/tfx/tutorials/tfx/python_function_component#custom_python_function_components)

[Understanding Custom Components](https://www.tensorflow.org/tfx/guide/understanding_custom_components)


Anyhow, once we have all the required components we can assemble the custom component we need.

In [None]:
from tfx.components.base import base_component
from tfx import types
from tfx.types import channel_utils
from tfx.components.base import base_component, executor_spec

class ImageIngestComponent(base_component.BaseComponent):

    SPEC_CLASS = ImageIngestComponentSpec
    EXECUTOR_SPEC = executor_spec.ExecutorClassSpec(ImageIngestComponentExecutor)
    DRIVER_CLASS = ImageIngestDriver

    def __init__(self, input, output_data=None, name=None):

        if not output_data:
            examples_artifact = standard_artifacts.Examples()

            # This is not best practise. For demonstration only.
            examples_artifact.split_names = artifact_utils.encode_split_names(['train', 'eval'])

            output_data = channel_utils.as_channel([examples_artifact])

        spec = ImageIngestComponentSpec(input=input,
                                        examples=output_data,
                                        name=name)
        super(ImageIngestComponent, self).__init__(spec=spec)


Once we define our custom component we can use it like below.

In [None]:
import os
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext

context = InteractiveContext()
image_file_path = "/path/to/files"
example_gen = ImageIngestComponent(input=image_file_path,
                                   name=u'ImageIngestComponent')
context.run(example_gen)

So above code segments outline the basics of the custom components. But its not usable in production systems. There are no dynamic splitting (hardcoded), lots of boilerplate codes and not scalable. Therefore it is much better to reuse existing components and build upon them to perform our custom tasks.

This way we can use the functionality provided by apache beam pipelines and can build components with minimum amount of coding.