In [1]:
!pip install tfx 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting tfx
  Downloading tfx-1.9.1-py3-none-any.whl (2.5 MB)
[K     |████████████████████████████████| 2.5 MB 8.3 MB/s 
[?25hCollecting tensorflow!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<2.10,>=1.15.5
  Downloading tensorflow-2.9.2-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (511.8 MB)
[K     |████████████████████████████████| 511.8 MB 6.2 kB/s 
Collecting tensorflow-serving-api!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.*,!=2.4.*,!=2.5.*,!=2.6.*,!=2.7.*,!=2.8.*,<3,>=1.15
  Downloading tensorflow_serving_api-2.10.0-py2.py3-none-any.whl (37 kB)
Collecting apache-beam[gcp]<3,>=2.38
  Downloading apache_beam-2.41.0-cp37-cp37m-manylinux2010_x86_64.whl (10.9 MB)
[K     |████████████████████████████████| 10.9 MB 47.9 MB/s 
Collecting pyarrow<6,>=1
  Downloading pyarrow-5.0.0-cp37-cp37m-manylinux2014_x86_64.whl (23.6 MB)
[K     |██████████████████████████

In [24]:
import os 
pipeline_dir = '/content/tfx'
pipeline_name = 'salary_pipeline'

data_dir =  os.path.join(pipeline_dir, 'data')
module_file = os.path.join(pipeline_dir, 'components', 'module.py')

output_base = os.path.join(pipeline_dir, 'output', pipeline_name)
serving_model_dir = os.path.join(output_base, pipeline_name)
pipeline_root = os.path.join(output_base, 'pipeline_root')

metadata_path = os.path.join(pipeline_root, 'metadata.sqlite')

In [25]:
from tfx import components
import tensorflow_model_analysis as tfma 
from tfx.components import (CsvExampleGen, Evaluator, ExampleValidator, Pusher, SchemaGen, StatisticsGen, Trainer, Transform)
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.proto import pusher_pb2, trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model, ModelBlessing
from tfx.proto import example_gen_pb2
from tfx import v1 as tfx
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
import os
import absl

In [29]:
def init_components(data_root, module_file, serving_model_dir, training_steps=2000, eval_steps=200):

  output = example_gen_pb2.Output(split_config = example_gen_pb2.SplitConfig(splits=[
            example_gen_pb2.SplitConfig.Split(name='train', hash_buckets=6),
            example_gen_pb2.SplitConfig.Split(name='eval', hash_buckets=2),
            example_gen_pb2.SplitConfig.Split(name='test', hash_buckets=2)
        ])
  )
        
  example_gen = CsvExampleGen(input_base = data_root, output_config =output)

  statistics_gen = StatisticsGen(examples = example_gen.outputs['examples'])

  schema_gen = SchemaGen(statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  example_validator = ExampleValidator(statistics = statistics_gen.outputs['statistics'],schema=schema_gen.outputs['schema'])

  transform = Transform(examples=example_gen.outputs['examples'],schema=schema_gen.outputs['schema'],module_file=module_file)

  trainer = Trainer(module_file=(module_file),transformed_examples=transform.outputs['transformed_examples'],transform_graph=transform.outputs['transform_graph'],
                      schema=schema_gen.outputs['schema'],train_args=trainer_pb2.TrainArgs(num_steps=training_steps),eval_args=trainer_pb2.EvalArgs(num_steps=eval_steps))

  model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(type=tfx.types.standard_artifacts.ModelBlessing),
  ) 
  
  eval_config = tfma.EvalConfig(
    model_specs=[
        tfma.ModelSpec(
            signature_name="serving_default",
            label_key="label",
        )
    ],
    slicing_specs=[tfma.SlicingSpec(), tfma.SlicingSpec(feature_keys=["product"])],
    metrics_specs=[
        tfma.MetricsSpec(
            metrics=[
                tfma.MetricConfig(
                    class_name="BinaryAccuracy",
                    threshold=tfma.MetricThreshold(
                        value_threshold=tfma.GenericValueThreshold(
                            lower_bound={"value": 0.65}
                        ),
                        change_threshold=tfma.GenericChangeThreshold(
                            direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                            absolute={"value": -1e-10},
                        ),
                    ),
                ),
                tfma.MetricConfig(class_name="Precision"),
                tfma.MetricConfig(class_name="Recall"),
                tfma.MetricConfig(class_name="ExampleCount"),
                tfma.MetricConfig(class_name="AUC"),
            ],
        )
    ],
)


  
  
  evaluator = Evaluator(examples=example_gen.outputs['examples'],model=trainer.outputs['model'],eval_config=eval_config)

  pusher = Pusher(model=trainer.outputs['model'],model_blessing=evaluator.outputs['blessing'],
                  push_destination=pusher_pb2.PushDestination(filesystem=pusher_pb2.PushDestination.Filesystem(base_directory=serving_model_dir)))

  
 
  components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          transform,
          trainer,
          evaluator,
          pusher,
      ]

  return components

In [30]:
def init_beam_pipeline(components, pipeline_root, direct_num_workers):

  absl.logging.info("Pipeline root set to:{}".format(pipeline_root))
  beam_arg =[
      "--direct_num_workers={}".format(direct_num_workers),
  ]

  p = pipeline.Pipeline(
      pipeline_name = pipeline_name,
      pipeline_root = pipeline_root,
      components = components,
      enable_cache = False,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path),
      beam_pipeline_args=beam_arg
  )
  return p

In [31]:
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner

components = init_components(data_dir, module_file, serving_model_dir,
 training_steps=500, eval_steps=400)
pipeline = init_beam_pipeline(components, pipeline_root, 2)
BeamDagRunner().run(pipeline)



Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 age_xf (InputLayer)            [(None, 1)]          0           []                               
                                                                                                  
 capital_gain_xf (InputLayer)   [(None, 1)]          0           []                               
                                                                                                  
 capital_loss_xf (InputLayer)   [(None, 1)]          0           []                               
                                                                                                  
 education_num_xf (InputLayer)  [(None, 1)]          0           []                               
                                                                                            

Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`
