# Source
https://cloud.google.com/solutions/machine-learning/data-preprocessing-for-ml-with-tf-transform-pt2#introduction  
https://github.com/GoogleCloudPlatform/tf-estimator-tutorials/blob/master/00_Miscellaneous/tf_transform/tft-02%20-%20Babyweight%20Estimation%20with%20Transformed%20Data.ipynb

## Code adapted to TF2 partially based on  
https://www.tensorflow.org/tfx/tutorials/transform/census  
https://github.com/tensorflow/tfx/blob/master/docs/tutorials/transform/census.ipynb

# Babyweight Estimation with Transformed Data

### Set global flags

In [1]:
PROJECT ='mlteam-ml-specialization-2021' # change to your project_Id
BUCKET = 'mlteam-ml-specialization-2021-taxi' # change to your bucket name
REGION = 'europe-west1' # change to your region
ROOT_DIR = 'babyweight_tft' # directory where the output is stored locally or on GCS

RUN_LOCAL = False

In [2]:
import os

os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['ROOT_DIR'] = ROOT_DIR
os.environ['RUN_LOCAL'] = 'true' if RUN_LOCAL else 'false'

## Import required packages and modules

In [3]:
import os

import tensorflow as tf
from tensorflow import data

import tensorflow_transform as tft
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import dataset_schema

from tensorflow_transform.tf_metadata import metadata_io
from tensorflow_transform.beam.tft_beam_io import transform_fn_io

In [4]:
!pip list | grep 'tensorflow'
!pip list | grep 'beam'
!pip list | grep 'cloud-dataflow'

tensorflow                     2.4.0
tensorflow-cloud               0.1.13
tensorflow-data-validation     0.28.0
tensorflow-datasets            3.0.0
tensorflow-estimator           2.4.0
tensorflow-hub                 0.9.0
tensorflow-io                  0.15.0
tensorflow-metadata            0.28.0
tensorflow-model-analysis      0.28.0
tensorflow-probability         0.11.0
tensorflow-serving-api         2.4.0
tensorflow-transform           0.28.0
apache-beam                    2.28.0


In [5]:
OUTPUT_DIR = ROOT_DIR if RUN_LOCAL==True else "gs://{}/{}".format(BUCKET,ROOT_DIR)
TRANSFORM_ARTEFACTS_DIR = os.path.join(OUTPUT_DIR,'transform')
TRANSFORMED_DATA_DIR = os.path.join(OUTPUT_DIR,'transformed')
TEMP_DIR = os.path.join(OUTPUT_DIR, 'tmp')
MODELS_DIR = os.path.join(OUTPUT_DIR,'models')

## Transform Metadata

In [6]:
transformed_metadata = metadata_io.read_metadata(
        os.path.join(TRANSFORM_ARTEFACTS_DIR,"transformed_metadata"))

TARGET_FEATURE_NAME = 'weight_pounds'

transformed_metadata.schema

feature {
  name: "is_male_index"
  type: INT
  int_domain {
    min: -1
    max: 1
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "is_multiple_index"
  type: INT
  int_domain {
    min: -1
    max: 1
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "mother_age_bucketized"
  type: INT
  int_domain {
    min: 0
    max: 4
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "mother_age_log"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "mother_age_normalized"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "mother_race_index"
  type: INT
  int_domain {
    min: -1
    max: 10
    is_categorical: true
  }
  presence {
    min_fraction: 1.0
  }
  shape {
  }
}
feature {
  name: "weight_pounds"
  type: FLOAT
  presence {
    min_fraction: 1.0
  }
  shape {


## Input Function

In [7]:
def tfrecords_input_fn(files_name_pattern, transformed_metadata,
                       mode=tf.estimator.ModeKeys.EVAL,  
                       num_epochs=1, 
                       batch_size=500):
    
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=files_name_pattern,
        batch_size=batch_size,
        features=tft.TFTransformOutput(TRANSFORM_ARTEFACTS_DIR).transformed_feature_spec(),
        reader=tf.data.TFRecordDataset,
        num_epochs=num_epochs,
        shuffle=True if mode == tf.estimator.ModeKeys.TRAIN else False,
        shuffle_buffer_size=1+(batch_size*2),
        prefetch_buffer_size=1
    )
    
    #iterator = dataset.make_one_shot_iterator()
    features = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
    target = features.pop(TARGET_FEATURE_NAME)
    return features, target

## Feature columns

In [8]:
def create_wide_and_deep_feature_columns(transformed_metadata, hparams):
    types={
        "INT":2,
        "FLOAT":3
    }
    
    deep_feature_columns = []
    wide_feature_columns = []
    
    features = transformed_metadata.schema.feature
    
    for feature in features:
        if feature.name == TARGET_FEATURE_NAME:
            continue
        
        # creating numerical features
        if feature.type == types["FLOAT"]:
            deep_feature_columns.append(tf.feature_column.numeric_column(feature.name))
            
        # creating categorical features with identity
        elif feature.type == types["INT"]:
            if feature.int_domain.is_categorical:
                wide_feature_columns.append(
                    tf.feature_column.categorical_column_with_identity(
                        feature.name, 
                        num_buckets=feature.int_domain.max+1)
                )
            else:
                deep_feature_columns.append(tf.feature_column.numeric_column(feature.name)) 
     
    if hparams.extend_feature_columns==True:
        mother_race_X_mother_age_bucketized = tf.feature_column.crossed_column(
            ['mother_age_bucketized', 'mother_race_index'],  55)
        
        wide_feature_columns.append(mother_race_X_mother_age_bucketized)
        
        mother_race_X_mother_age_bucketized_embedded = tf.feature_column.embedding_column(
            mother_race_X_mother_age_bucketized, hparams.embed_dimension)
        
        deep_feature_columns.append(mother_race_X_mother_age_bucketized_embedded)
    
    print("Wide columns:")
    print(wide_feature_columns)
    print("")
    print("Deep columns:")
    print(deep_feature_columns)
    print("")
    
    return wide_feature_columns, deep_feature_columns

## Estimator

In [9]:
def create_estimator(run_config, hparams):
  
    wide_feature_columns, deep_feature_columns = create_wide_and_deep_feature_columns(transformed_metadata, 
                                                                                      hparams)
    print(f"model will be saved to {run_config.model_dir}")
    estimator = tf.estimator.DNNLinearCombinedRegressor(
                linear_feature_columns = wide_feature_columns,
                dnn_feature_columns = deep_feature_columns,
                dnn_hidden_units=hparams.hidden_units,
                config = run_config,
                model_dir = run_config.model_dir
                )
    
    return estimator

## Experiment

In [10]:
# replacing the old tf.contrib.training.HParams with EasyDict as a workaround
class EasyDict(dict):
    def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
    def __getattr__(self, name): return self[name]
    def __setattr__(self, name, value): self[name] = value
    def __delattr__(self, name): del self[name]

In [11]:
hparams = EasyDict( #tf.contrib.training.HParams(
    num_epochs=10,
    batch_size=500,
    hidden_units=[32, 16],
    max_steps=100,
    embed_dimension=5,
    extend_feature_columns=False,
    evaluate_after_sec=10
)

model_dir = os.path.join(MODELS_DIR,"dnn_estimator")
run_config = tf.estimator.RunConfig(
    tf_random_seed=19830610,
    model_dir=model_dir
)

In [12]:
train_data_files = os.path.join(TRANSFORMED_DATA_DIR, "train-*.tfrecords")
eval_data_files = os.path.join(TRANSFORMED_DATA_DIR, "eval-*.tfrecords")

# TrainSpec
train_spec = tf.estimator.TrainSpec(
  input_fn = lambda: tfrecords_input_fn(train_data_files,transformed_metadata,
    mode=tf.estimator.ModeKeys.TRAIN,
    num_epochs= hparams.num_epochs,
    batch_size = hparams.batch_size
  ),
  max_steps=hparams.max_steps,
)

# EvalSpec
eval_spec = tf.estimator.EvalSpec(
  input_fn =lambda: tfrecords_input_fn(eval_data_files,transformed_metadata),
  steps = None,
  throttle_secs = hparams.evaluate_after_sec # evalute after each 10 training seconds!
)

In [13]:
from datetime import datetime

if tf.io.gfile.exists(model_dir):
    tf.io.gfile.rmtree(model_dir)

estimator = create_estimator(run_config, hparams)

time_start = datetime.utcnow() 
print("")
print("Experiment started at {}".format(time_start.strftime("%H:%M:%S")))
print(".......................................") 


tf.estimator.train_and_evaluate(
  estimator,
  train_spec,
  eval_spec
)


time_end = datetime.utcnow() 
print(".......................................")
print("Experiment finished at {}".format(time_end.strftime("%H:%M:%S")))
print("")
time_elapsed = time_end - time_start
print("Experiment elapsed time: {} seconds".format(time_elapsed.total_seconds()))

Wide columns:
[IdentityCategoricalColumn(key='is_male_index', number_buckets=2, default_value=None), IdentityCategoricalColumn(key='is_multiple_index', number_buckets=2, default_value=None), IdentityCategoricalColumn(key='mother_age_bucketized', number_buckets=5, default_value=None), IdentityCategoricalColumn(key='mother_race_index', number_buckets=11, default_value=None)]

Deep columns:
[NumericColumn(key='mother_age_log', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None), NumericColumn(key='mother_age_normalized', shape=(1,), default_value=None, dtype=tf.float32, normalizer_fn=None)]

model will be saved to gs://mlteam-ml-specialization-2021-taxi/babyweight_tft/models/dnn_estimator
INFO:tensorflow:Using config: {'_model_dir': 'gs://mlteam-ml-specialization-2021-taxi/babyweight_tft/models/dnn_estimator', '_tf_random_seed': 19830610, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: t



Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into gs://mlteam-ml-specialization-2021-taxi/babyweight_tft/models/dnn_estimator/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 58.640434, step = 0
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 100...
INFO:tensorflow:Saving checkpoints for 100 into gs://mlteam-ml-specialization-2021-taxi/babyweight_tft/models/dnn_estimator/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 100...
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Done calling m

In [20]:
transformed_metadata.schema.feature

[name: "is_male_index"
type: INT
int_domain {
  min: -1
  max: 1
  is_categorical: true
}
presence {
  min_fraction: 1.0
}
shape {
}
, name: "is_multiple_index"
type: INT
int_domain {
  min: -1
  max: 1
  is_categorical: true
}
presence {
  min_fraction: 1.0
}
shape {
}
, name: "mother_age_bucketized"
type: INT
int_domain {
  min: 0
  max: 4
  is_categorical: true
}
presence {
  min_fraction: 1.0
}
shape {
}
, name: "mother_age_log"
type: FLOAT
presence {
  min_fraction: 1.0
}
shape {
}
, name: "mother_age_normalized"
type: FLOAT
presence {
  min_fraction: 1.0
}
shape {
}
, name: "mother_race_index"
type: INT
int_domain {
  min: -1
  max: 10
  is_categorical: true
}
presence {
  min_fraction: 1.0
}
shape {
}
, name: "weight_pounds"
type: FLOAT
presence {
  min_fraction: 1.0
}
shape {
}
]

## Raw data metadata

In [14]:
CATEGORICAL_FEATURE_NAMES = ['is_male', 'mother_race']
NUMERIC_FEATURE_NAMES = ['mother_age', 'plurality', 'gestation_weeks']
TARGET_FEATURE_NAME = 'weight_pounds'
KEY_COLUMN = 'key'

def create_placeholders():
    return dict(
        [(name, tf.keras.Input((), dtype=tf.string, name=name))
         for name in CATEGORICAL_FEATURE_NAMES] +
        [(name, tf.keras.Input((), dtype=tf.float32, name=name))
         for name in NUMERIC_FEATURE_NAMES]
    )

## Export Estimator to SavedModel

In [15]:
def serving_input_receiver_fn():
    tf_transform_output = tft.TFTransformOutput(TRANSFORM_ARTEFACTS_DIR)
    def serving_input_fn():
        raw_input_features = create_placeholders()
        transformed_features = tf_transform_output.transform_raw_features(
            raw_input_features, drop_unused_features=True)
        return tf.estimator.export.ServingInputReceiver(
            transformed_features, raw_input_features)
    return serving_input_fn

    
export_dir = os.path.join(model_dir, 'export')

if tf.io.gfile.exists(export_dir):
    tf.io.gfile.rmtree(export_dir)
        
estimator.export_saved_model(
    export_dir_base=export_dir,
    serving_input_receiver_fn=serving_input_receiver_fn()
)

os.environ['export_dir'] = export_dir

value: "\n\013\n\tConst_2:0\022\013mother_race"

value: "\n\013\n\tConst_4:0\022\007is_male"

value: "\n\013\n\tConst_6:0\022\013is_multiple"

INFO:tensorflow:Saver not created because there are no variables in the graph to restore
Instructions for updating:
Use ref() instead.
INFO:tensorflow:Calling model_fn.


  run_metadata_ptr)


INFO:tensorflow:Done calling model_fn.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
INFO:tensorflow:Signatures INCLUDED in export for Classify: None
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['predict']
INFO:tensorflow:Signatures INCLUDED in export for Train: None
INFO:tensorflow:Signatures INCLUDED in export for Eval: None
INFO:tensorflow:Signatures EXCLUDED from export because they cannot be be served via TensorFlow Serving APIs:
INFO:tensorflow:'serving_default' : Regression input must be a single string Tensor; got {'is_male': <tf.Tensor 'is_male:0' shape=(None,) dtype=string>, 'mother_race': <tf.Tensor 'mother_race:0' shape=(None,) dtype=string>, 'mother_age': <tf.Tensor 'mother_age:0' shape=(None,) dtype=float32>, 'plurality': <tf.Tensor 'plurality

## Inspect the Exported Model

In [16]:
%%bash
if [ ${RUN_LOCAL} ]
then 
saved_model_dir=$(gsutil ls ${export_dir} | tail -n 1)
else
saved_model_dir=${export_dir}/$(ls ${export_dir} | tail -n 1)
fi

echo $saved_model_dir
saved_model_cli show --dir=${saved_model_dir} --all

gs://mlteam-ml-specialization-2021-taxi/babyweight_tft/models/dnn_estimator/export/1617891749/

MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['predict']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['gestation_weeks'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: gestation_weeks:0
    inputs['is_male'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: is_male:0
    inputs['mother_age'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: mother_age:0
    inputs['mother_race'] tensor_info:
        dtype: DT_STRING
        shape: (-1)
        name: mother_race:0
    inputs['plurality'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: plurality:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['predictions'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: add:0
  Metho

2021-04-08 14:22:37.354645: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0


## Use Exported Model for Prediction

In [18]:
saved_model_dir=os.path.join(export_dir, tf.io.gfile.listdir(export_dir)[0])
model = tf.saved_model.load(saved_model_dir)
predictor_fn = model.signatures["predict"]
predictor_fn(
    is_male=tf.constant(['True']),
    mother_age=tf.constant([26.0]),
    mother_race=tf.constant(['Asian Indian']),
    plurality=tf.constant([1.0]),
    gestation_weeks=tf.constant([39.0]),
)

{'predictions': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.5084222]], dtype=float32)>}

In [19]:
instance = {
        'is_male': 'True',
        'mother_age': 26.0,
        'mother_race': 'Asian Indian',
        'plurality': 1.0,
        'gestation_weeks': 39.0
}

feed=dict((k, tf.constant([v])) for k, v in instance.items())
predictor_fn(**feed)

{'predictions': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.5084222]], dtype=float32)>}