<a href="https://colab.research.google.com/github/Satwikram/Tensorflow-For-Production/blob/main/Basics/Simple%20TFX%20Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Author: Satwik Ram K

### Upgrade Pip

In [1]:
try:
  import colab
  !pip install -q --upgrade pip
except:
  pass

[?25l[K     |▏                               | 10kB 18.7MB/s eta 0:00:01[K     |▍                               | 20kB 23.6MB/s eta 0:00:01[K     |▋                               | 30kB 11.6MB/s eta 0:00:01[K     |▉                               | 40kB 8.8MB/s eta 0:00:01[K     |█                               | 51kB 5.7MB/s eta 0:00:01[K     |█▎                              | 61kB 6.6MB/s eta 0:00:01[K     |█▌                              | 71kB 6.6MB/s eta 0:00:01[K     |█▊                              | 81kB 6.3MB/s eta 0:00:01[K     |██                              | 92kB 6.2MB/s eta 0:00:01[K     |██▏                             | 102kB 5.3MB/s eta 0:00:01[K     |██▎                             | 112kB 5.3MB/s eta 0:00:01[K     |██▌                             | 122kB 5.3MB/s eta 0:00:01[K     |██▊                             | 133kB 5.3MB/s eta 0:00:01[K     |███                             | 143kB 5.3MB/s eta 0:00:01[K     |███▏                    

### Install TFX

In [None]:
!pip install -q -U --use-deprecated=legacy-resolver tfx

### Imports

In [4]:
import tensorflow as tf
import tfx
from absl import logging
import urllib.request
import tempfile
import os

### Set up variables

In [5]:
PIPELINE_NAME = 'penguin-simple'

In [6]:
PIPELINE_ROOT = os.path.join('pipelines', PIPELINE_NAME)

In [7]:
METADATA_PATH = os.path.join('metadata', PIPELINE_NAME, 'metadata.db')

In [8]:
SERVING_MODEL_DIR = os.path.join('serving_model', PIPELINE_NAME)

In [9]:
logging.set_verbosity(logging.INFO)

### Create Temp Directory

In [10]:
DATA_ROOT = tempfile.mkdtemp(prefix = 'tfx-data') # Create a temporary directory.

In [11]:
data_url = "https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/penguin/data/penguins_processed.csv"

In [12]:
data_filepath = os.path.join(DATA_ROOT, "data.csv")

In [13]:
urllib.request.urlretrieve(data_url, data_filepath)

('/tmp/tfx-datanr9842i4/data.csv', <http.client.HTTPMessage at 0x7f55309481d0>)

In [14]:
!head {data_filepath}

species,culmen_length_mm,culmen_depth_mm,flipper_length_mm,body_mass_g
0,0.2545454545454545,0.6666666666666666,0.15254237288135594,0.2916666666666667
0,0.26909090909090905,0.5119047619047618,0.23728813559322035,0.3055555555555556
0,0.29818181818181805,0.5833333333333334,0.3898305084745763,0.1527777777777778
0,0.16727272727272732,0.7380952380952381,0.3559322033898305,0.20833333333333334
0,0.26181818181818167,0.892857142857143,0.3050847457627119,0.2638888888888889
0,0.24727272727272717,0.5595238095238096,0.15254237288135594,0.2569444444444444
0,0.25818181818181823,0.773809523809524,0.3898305084745763,0.5486111111111112
0,0.32727272727272727,0.5357142857142859,0.1694915254237288,0.1388888888888889
0,0.23636363636363636,0.9642857142857142,0.3220338983050847,0.3055555555555556


### Create a pipeline

In [26]:
# import shutil
# shutil.rmtree('/content/metadata')
# shutil.rmtree('/content/pipelines')

In [27]:
_trainer_module_file = 'penguin_trainer.py'

In [28]:
%%writefile {_trainer_module_file}


from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_transform.tf_metadata import schema_utils

from tfx.components.trainer.executor import TrainerFnArgs
from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx_bsl.tfxio import dataset_options
from tensorflow_metadata.proto.v0 import schema_pb2

_FEATURE_KEYS = [
    'culmen_length_mm', 'culmen_depth_mm', 'flipper_length_mm', 'body_mass_g'
]
_LABEL_KEY = 'species'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

_FEATURE_SPEC = {
    
    **{
        feature: tf.io.FixedLenFeature(shape = [1], dtype = tf.float32)
          for feature in _FEATURE_KEYS
    },

    _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}

def _input_fn(file_pattern : List[str],
              data_accessor: DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int = 200) -> tf.data.Dataset:


  return data_accessor.tf_dataset_factory(
      file_pattern,
      dataset_options.TensorFlowDatasetOptions(
          batch_size = batch_size, label_key = _LABEL_KEY),
      schema = schema).repeat() 


def _build_keras_model() -> tf.keras.Model:

  inputs = [tf.keras.layers.Input(shape = (1, ), name = f) for f in _FEATURE_KEYS]  

  d = tf.keras.layers.concatenate(inputs)   

  for _ in range(2):
    d = tf.keras.layers.Dense(units = 8, activation = 'relu')(d)

  outputs = keras.layers.Dense(3)(d)

  model = tf.keras.models.Model(inputs = inputs, outputs = outputs)
  
  model.compile(optimizer = tf.keras.optimizers.Adam(1e-2), 
                loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits = True),
                metrics = [tf.keras.metrics.SparseCategoricalAccuracy()])
  
  model.summary(print_fn=logging.info)

  return model

def run_fn(fn_args: TrainerFnArgs):

  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size = _TRAIN_BATCH_SIZE)
  
  eval_dataset = _input_fn(
    
    fn_args.eval_files,
    fn_args.data_accessor,
    schema,
    batch_size = _EVAL_BATCH_SIZE)
  
  model = _build_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)


  model.save(fn_args.serving_model_dir, save_format='tf')


Writing penguin_trainer.py


### Pipeline definition

In [29]:
from tfx.components import CsvExampleGen
from tfx.components import Pusher
from tfx.components import Trainer
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.components.base import executor_spec
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2

def _create_pipeline(pipeline_name: str, pipeline_root: str, data_root: str,
                     module_file: str, serving_model_dir: str,
                     metadata_path: str) -> pipeline.Pipeline:

  example_gen = CsvExampleGen(input_base = data_root)

  trainer = Trainer(
      module_file = module_file,
      custom_executor_spec = executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=example_gen.outputs['examples'],
      train_args=trainer_pb2.TrainArgs(num_steps = 100),
      eval_args=trainer_pb2.EvalArgs(num_steps = 5))
  
  pusher = Pusher(
      model=trainer.outputs['model'],
      push_destination = pusher_pb2.PushDestination(
      filesystem = pusher_pb2.PushDestination.Filesystem(
      base_directory = serving_model_dir)))
  
  components = [
      example_gen,
      trainer,
      pusher,
  ]

  return pipeline.Pipeline(
      pipeline_name = pipeline_name,
      pipeline_root = pipeline_root,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      components=components)

### Run the pipeline

In [30]:
import os
from tfx.orchestration.local import local_dag_runner


local_dag_runner.LocalDagRunner().run(
  _create_pipeline(
      pipeline_name = PIPELINE_NAME,
      pipeline_root = PIPELINE_ROOT,
      data_root = DATA_ROOT,
      module_file = _trainer_module_file,
      serving_model_dir = SERVING_MODEL_DIR,
      metadata_path = METADATA_PATH))

INFO:absl:Generating ephemeral wheel package for '/content/penguin_trainer.py' (including modules: ['penguin_trainer']).
INFO:absl:User module package has hash fingerprint version 7b935587638635e410ab89a02ad7df911267f263107929df65c985d919bc81b1.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmpr2jvptrz/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpjbrw1p21', '--dist-dir', '/tmp/tmpp5hlrz6f']
INFO:absl:Successfully built user code wheel distribution at 'pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7b935587638635e410ab89a02ad7df911267f263107929df65c985d919bc81b1-py3-none-any.whl'; target user module is 'penguin_trainer'.
INFO:absl:Full user module path is 'penguin_trainer@pipelines/penguin-simple/_wheels/tfx_user_code_Trainer-0.0+7b935587638635e410ab89a02ad7df911267f263107929df65c985d919bc81b1-py3-none-any.whl'
INFO:absl:Running pipeline:
 pipeline_info {
  id: "penguin-simple"
}
nodes {
  pipeline_node {
    node_info {
      type {
        name: "tfx.

INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets


INFO:tensorflow:Assets written to: pipelines/penguin-simple/Trainer/model/2/Format-Serving/assets
INFO:absl:Training complete. Model written to pipelines/penguin-simple/Trainer/model/2/Format-Serving. ModelRun written to pipelines/penguin-simple/Trainer/model_run/2
INFO:absl:Cleaning up stateless execution info.
INFO:absl:Execution 2 succeeded.
INFO:absl:Cleaning up stateful execution info.
INFO:absl:Publishing output artifacts defaultdict(<class 'list'>, {'model': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model/2"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2021-05-18T06:29:59.961661:Trainer:model:0"
  }
}
custom_properties {
  key: "tfx_version"
  value {
    string_value: "0.30.0"
  }
}
, artifact_type: name: "Model"
)], 'model_run': [Artifact(artifact: uri: "pipelines/penguin-simple/Trainer/model_run/2"
custom_properties {
  key: "name"
  value {
    string_value: "penguin-simple:2021-05-18T06:29:59.961661:Trainer:model_run:0"
  }


In [31]:
!find {SERVING_MODEL_DIR}


serving_model/penguin-simple
serving_model/penguin-simple/1621319410
serving_model/penguin-simple/1621319410/variables
serving_model/penguin-simple/1621319410/variables/variables.index
serving_model/penguin-simple/1621319410/variables/variables.data-00000-of-00001
serving_model/penguin-simple/1621319410/saved_model.pb
serving_model/penguin-simple/1621319410/assets
