In [1]:
import os
import pprint
import numpy as np
import tempfile
import urllib

import absl
import pandas as pd
import tensorflow as tf
import tensorflow_model_analysis as tfma
tf.get_logger().propagate = False
pp = pprint.PrettyPrinter()

import tfx
from tfx.components import CsvExampleGen
from typing import Dict, List, Text
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
#from tfx.components import ResolverNode
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.components.base import executor_spec
from tfx.components.trainer.executor import GenericExecutor
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing
#from tfx.utils.dsl_utils import external_input


%load_ext tfx.orchestration.experimental.interactive.notebook_extensions.skip



In [2]:
print('TensorFlow version: {}'.format(tf.__version__))
print('TFX version: {}'.format(tfx.__version__))

TensorFlow version: 2.4.1
TFX version: 0.30.0


In [3]:
! mkdir tfx
! mkdir tfx/pipelines
! mkdir tfx/metadata
! mkdir tfx/logs
! mkdir tfx/data
! mkdir tfx/serving_model


! mkdir eval_data/

!wget https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv

--2021-06-17 14:06:58--  https://raw.githubusercontent.com/tensorflow/tfx/master/tfx/examples/chicago_taxi_pipeline/data/simple/data.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.111.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1922812 (1,8M) [text/plain]
Saving to: ‘data.csv’


2021-06-17 14:07:00 (1,60 MB/s) - ‘data.csv’ saved [1922812/1922812]



In [4]:
df = pd.read_csv('data.csv')

##Drop useless columns
df = df.drop(['trip_start_timestamp','trip_miles','pickup_census_tract',
              'dropoff_census_tract','trip_seconds','payment_type','tips', 
              'company','dropoff_community_area','pickup_community_area'], axis=1)

#Drop NA rows
df = df.dropna()

##Keep a test set for final testing( TFX internally splits train and validation data )
np.random.seed(seed=2)
msk = np.random.rand(len(df)) < 0.9
traindf = df[msk]
evaldf = df[~msk]

print(len(traindf))
print(len(evaldf))

traindf.to_csv("tfx/data/data_trans.csv", index=False, header=True)
evaldf.to_csv("eval.csv", index=False, header=False)

13077
1442


In [5]:
##Define all constant
_tfx_root = os.path.join(os.getcwd(), 'tfx');        # Create location ~/tfx
_pipeline_root = os.path.join(_tfx_root, 'pipelines');      # Join ~/tfx/pipelines/
_metadata_db_root = os.path.join(_tfx_root, 'metadata.db');    # Join ~/tfx/metadata.db
_log_root = os.path.join(_tfx_root, 'logs');
_model_root = os.path.join(_tfx_root, 'model');
_data_root = os.path.join(_tfx_root, 'data');
_serving_model_dir = os.path.join(_tfx_root, 'Format-Serving')
_data_filepath = os.path.join(_data_root, "data_trans.csv")

_input_fn_module_file = 'inputfn_trainer.py'
_constants_module_file = 'constants_trainer.py'
_model_trainer_module_file = 'model_trainer.py'

In [6]:
def create_final_pipeline(
    pipeline_name: Text,
    root_path: Text,
    data_path: Text,
    training_params: Dict[Text, Text],
    # beam_pipeline_args: List[Text],
) -> pipeline.Pipeline:

  _pipeline_root = os.path.join(root_path, 'pipelines');     # Join ~/tfx/pipelines/
  _metadata_db_root = os.path.join(root_path, 'metadata.db');    # Join ~/tfx/metadata.db
  _log_root = os.path.join(root_path, 'logs');
  _model_root = os.path.join(root_path, 'model');
  _serving_model_dir = os.path.join(root_path, 'Format-Serving')

  # Full pipeline
  example_gen = CsvExampleGen(input_base=data_path)

  statistics_gen = StatisticsGen(examples=example_gen.outputs['examples'])

  infer_schema = SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=False)

  validate_stats = ExampleValidator(
    statistics=statistics_gen.outputs['statistics'],
    schema=infer_schema.outputs['schema'])

  trainer = Trainer(
      module_file=os.path.abspath(_model_trainer_module_file),
      custom_executor_spec=executor_spec.ExecutorClassSpec(GenericExecutor),
      examples=example_gen.outputs['examples'],
      train_args=trainer_pb2.TrainArgs(),
      eval_args=trainer_pb2.EvalArgs(),
      custom_config=(training_params)
      )

  pusher = Pusher(
      model=trainer.outputs['model'],
      push_destination=pusher_pb2.PushDestination(
          filesystem=pusher_pb2.PushDestination.Filesystem(
              base_directory=_serving_model_dir)))

  # This pipeline obj carries the business logic of the pipeline, but no runner-specific information
  # was included.
  return pipeline.Pipeline(
    pipeline_name=  pipeline_name,
    pipeline_root=  root_path,
    components=[
        example_gen, statistics_gen, infer_schema, validate_stats,
        trainer, pusher
    ],
    # metadata_connection_config = metadata.sqlite_metadata_connection_config(_metadata_db_root),
    metadata_connection_config = metadata.sqlite_metadata_connection_config(_metadata_db_root),
    enable_cache=True,
    beam_pipeline_args=['--direct_num_workers=%d' % 0],
  )

In [7]:
#Run pipeline locally
from tfx.orchestration.local.local_dag_runner import LocalDagRunner

##Define all paths
_tfx_root = os.path.join(os.getcwd(), 'tfx')

#Config params
training_params = {"epochs": 50}

#Create and run pipeline
p_ = create_final_pipeline(root_path = _tfx_root, 
                           pipeline_name="local_pipeline", 
                           data_path=_data_root,
                           training_params=training_params)

LocalDagRunner().run(p_)



ERROR:absl:udf_utils.get_fn {'eval_args': '{}', 'custom_config': '{"epochs": 50}', 'module_path': 'model_trainer@/home/limin/my_CICD/tfx/_wheels/tfx_user_code_Trainer-0.0+a8d50ea8986471a27ac7e631700544b1297a7385fa28e725c3812bb59b916c9b-py3-none-any.whl', 'train_args': '{}'} 'run_fn'


2.4.1
INFO:tensorflow:Using MirroredStrategy with devices ('/device:CPU:0',)
INFO:tensorflow:Single-worker MultiWorkerMirroredStrategy with local_devices = ('/device:CPU:0',), communication = CommunicationImplementation.AUTO
Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
pickup_latitude (InputLayer)    [(None, 1)]          0                                            
__________________________________________________________________________________________________
pickup_longitude (InputLayer)   [(None, 1)]          0                                            
__________________________________________________________________________________________________
dropoff_latitude (InputLayer)   [(None, 1)]          0                                            
___________________________________________________________________

__________________________________________________________________________________________________
concatenate (Concatenate)       (None, 16)           0           pickup_latitude[0][0]            
                                                                 pickup_longitude[0][0]           
                                                                 dropoff_latitude[0][0]           
                                                                 dropoff_longitude[0][0]          
                                                                 distance[0][0]                   
                                                                 tf.math.reduce_sum[0][0]         
                                                                 tf.math.reduce_sum_1[0][0]       
                                                                 tf.strings.to_number[0][0]       
                                                                 tf.strings.to_number_1[0][0]     
          



Epoch 2/50


Epoch 3/50


Epoch 4/50


Epoch 5/50


Epoch 6/50


Epoch 7/50
Epoch 8/50
Epoch 9/50


Epoch 10/50
Epoch 11/50



Epoch 00011: ReduceLROnPlateau reducing learning rate to 0.00020000000949949026.
Epoch 12/50


Epoch 13/50
Epoch 14/50


Epoch 15/50


Epoch 16/50



Epoch 00016: ReduceLROnPlateau reducing learning rate to 4.0000001899898055e-05.
Epoch 17/50
Epoch 18/50


Epoch 19/50


Epoch 20/50
Epoch 21/50
Epoch 22/50


Epoch 23/50
Epoch 24/50


Epoch 25/50
Epoch 26/50


Epoch 27/50


Epoch 28/50
Epoch 29/50
Epoch 30/50


Epoch 31/50
Epoch 32/50


Epoch 33/50


Epoch 34/50


Epoch 35/50



Epoch 00035: ReduceLROnPlateau reducing learning rate to 1e-05.
Epoch 36/50
Epoch 37/50


Epoch 38/50


Epoch 39/50
Epoch 40/50


Epoch 41/50


Epoch 42/50
Epoch 43/50


Epoch 44/50


Epoch 45/50


Epoch 46/50


Epoch 47/50
Epoch 48/50


Epoch 49/50
Epoch 50/50


INFO:tensorflow:Assets written to: /home/limin/my_CICD/tfx/Trainer/model/3/Format-Serving/assets




2021-06-17 14:17:32.008540: W tensorflow/stream_executor/platform/default/dso_loader.cc:60] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2021-06-17 14:17:32.008571: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
Traceback (most recent call last):
  File "/usr/local/bin/saved_model_cli", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.8/dist-packages/tensorflow/python/tools/saved_model_cli.py", line 1192, in main
    args.func(args)
  File "/usr/local/lib/python3.8/dist-packages/tensorflow/python/tools/saved_model_cli.py", line 719, in show
    _show_all(args.dir)
  File "/usr/local/lib/python3.8/dist-packages/tensorflow/python/tools/saved_model_cli.py", line 296, in _show_all
    tag_sets = saved_model_utils.get_saved_model_tag_sets(saved_model_dir)
  File "/usr/local/lib/python3.8/dist-pa

In [10]:
#LOCAL: Predict using Keras prediction function
saved_mod = tf.saved_model.load("tfx/Pusher/pushed_model/4")

#Get prediction function from serving
f = saved_mod.signatures['serving_default']

#Run prediction function from serving
f(dropoff_latitude=tf.convert_to_tensor([41.920452]), dropoff_longitude = tf.convert_to_tensor([-87.679955]), pickup_latitude = tf.convert_to_tensor([41.952823]), 
  pickup_longitude =tf.convert_to_tensor([-87.653244]), trip_start_day=tf.convert_to_tensor(["1"]), trip_start_hour=tf.convert_to_tensor(["5"]),
  trip_start_month=tf.convert_to_tensor(["6"]))



{'output_0': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[10.463501]], dtype=float32)>}