## Overview
Simple tf training template.

Data generated from pyspark
```python
df = spark.createDataFrame([('user_a','18','male','tommy','film,music',4,180.3,'shenzhen',1,3.2,),
                           ('user_b','16','female','sammy','animation,music',0,166.8,'beijing',0,0.0,),
                           ('user_c','22','','raymond','',1,0.0,'',1,5.0,)],
                           ['user_id','age','gender','nickname','cates','active_days','height','city','click','duration'])
df.write.mode('overwrite').format("tfrecords") \
    .option("recordType", "Example") \
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
    .save("hdfs://cluster/path")
```
| user_id | age | gender | nickname | cate_list        | active_days | height | city   | click | duration |
| ----  | ----  | ----   | ----     | ----             | ----        | ----   | ----   | ----  | ----     |
|user_a |18     |male    |tommy     |[film, music]     |4            |180.3   |shenzhen|1      |3.2       |
|user_b |16     |female  |sammy     |[animation, music]|0            |166.8   |beijing |0      |0.0       |
|user_c |22     | \N     |raymond   |[]                |1            |0.0     | \N     |1      |5.0       |

Under Python3.9, Tensorflow 2.8, Anaconda, Mac

## 1. import

In [1]:
import logging
import os
import sys
from datetime import datetime, timedelta
from pprint import pprint

import numpy as np
import tensorflow as tf

## 2. config setting

### 2.1 env

In [2]:
os.environ['TF_GPU_THREAD_MODE']='gpu_private'
print(sys.version)
print(tf.__version__)

cpus = tf.config.list_physical_devices('CPU')
print(cpus)
gpus = tf.config.list_physical_devices('GPU')
print(gpus)
if gpus:
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)

3.9.12 (main, Apr  5 2022, 01:53:17) 
[Clang 12.0.0 ]
2.8.0
[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU')]
[]


### 2.2 train configs

In [3]:
# feature_config = {
#     'hash_sample': {
#         'preprocessor': 'hash',  # probabilistic collision
#         'num_bins': 10_000_000,
#         'embedding_size': 8,
#         'embedding_type': 'tfra', # tensorflow recommenders addons, dynamic embedding contributed by weixin, ref: https://github.com/tensorflow/recommenders-addons/blob/master/rfcs/20200424-sparse-domain-isolation.md
#     },
#     'int_sample': {
#         'preprocessor': 'int',
#         'vocabulary': 'vocab/int_sample.vocab',
#         'max_tokens': None,
#         'num_oov_indices': 1,
#         'embedding_size': 2,
#     },
#     'norm_sample': {
#         'preprocessor': 'norm',
#         'mean': 36742.3,
#         'variance': 1.3353073e+09,
#     },
#     'discrete_sample': {
#         'preprocessor': 'dis',
#         'num_bins': 6, # eq to len(bin_boundaries)+1
#         'embedding_size': 8,
#         'bin_boundaries': [6246, 7616, 8824, 9915, 10031],
#     },
#     'discrete_sample_from_existed_float': {
#         'feature_name': 'norm_sample', # relate to raw input name, which can convert to different logic features
#         'preprocessor': 'dis',
#         'num_bins': 6, # eq to len(bin_boundaries)+1
#         'embedding_size': 8,
#         'bin_boundaries': [6246, 7616, 8824, 9915, 10031],
#     },
#     'float_sample': {
#         'preprocessor': 'float',
#     },
#     'string_sample': {
#         'preprocessor': 'string', # no collision in vocab, but vocab size limit
#         'vocabulary': 'vocab/string_sample.vocab',
#         'max_tokens': None,
#         'num_oov_indices': 1000,
#         'embedding_size': 8,
#     },
#     'shared_embedding_sample': {
#         'preprocessor' : 'shared_embedding',
#         'shared_embedding': 'hash_sample',
#         'pooling': True,
#     },
# }
feature_config = {
    'user_id': {
        'preprocessor': 'hash',  # probabilistic collision
        'num_bins': 100,
        'mask_value': None,
        'embedding_size': 8,
        # 'embedding_type': 'tfra', # tensorflow recommenders addons, dynamic embedding contributed by weixin, ref: https://github.com/tensorflow/recommenders-addons/blob/master/rfcs/20200424-sparse-domain-isolation.md
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.int64, name="user_id"),
    },
    'age': {
        'preprocessor': 'string',
        'vocabulary': '../vocab/test/age.vocab',
        'max_tokens': None,
        'num_oov_indices': 1,
        'embedding_size': 2,
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.int64, name="age"),
    },
    'height_norm': {
        'feature_name': 'height',
        'preprocessor': 'norm',
        'mean': 170.0,
        'variance': 1.0,
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.float32, name="height_norm"),
    },
    'height_dis': {
        'feature_name': 'height',
        'preprocessor': 'dis',
        'num_bins': 5, # eq to len(bin_boundaries)+1
        'embedding_size': 8,
        'bin_boundaries': [150, 160, 170, 180],
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.int64, name="height_dis"),
    },
    'active_days_dis': {
        'feature_name': 'active_days',
        'preprocessor': 'dis',
        'num_bins': 5, # eq to len(bin_boundaries)+1
        'embedding_size': 8,
        'bin_boundaries': [0, 2, 4, 6],
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.int64, name="active_days_dis"),
    },
    # 'city': {
    #     'preprocessor': 'string', # no collision in vocab, but vocab size limit
    #     'vocabulary': '../vocab/test/city.vocab',
    #     'max_tokens': None,
    #     'num_oov_indices': 1000,
    #     'embedding_size': 8,
    # },
    # 'height': {
    #     'preprocessor': 'float',
    # },
    'city': {
        'preprocessor': 'hash',  # probabilistic collision
        'num_bins': 100,
        'mask_value': None,
        'embedding_size': 8,
        # 'embedding_type': 'tfra', # tensorflow recommenders addons, dynamic embedding contributed by weixin, ref: https://github.com/tensorflow/recommenders-addons/blob/master/rfcs/20200424-sparse-domain-isolation.md
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.int64, name="city"),
    },
    'cate_list': {
        # 'feature_name': 'cate_list',
        'preprocessor': 'string', # no collision in vocab, but vocab size limit
        'vocabulary': '../vocab/test/cates.vocab',
        'max_tokens': None,
        'num_oov_indices': 1000,
        'embedding_size': 8,
        'pooling': True,
        'serving_input': tf.TensorSpec(shape=[None,None], dtype=tf.int64, name="cate_list"),
    },
    'cate_list_hash': {
        'feature_name': 'cate_list',
        'preprocessor': 'hash',  # probabilistic collision
        'num_bins': 100,
        'mask_value': None,
        'embedding_size': 8,
        'pooling': True,
        'serving_input': tf.TensorSpec(shape=[None,None], dtype=tf.int64, name="cate_list_hash"),
    },
    'gender': {
        'preprocessor': 'string', # no collision in vocab, but vocab size limit
        'vocabulary': '../vocab/test/gender.vocab',
        'max_tokens': None,
        'num_oov_indices': 1000,
        'embedding_size': 8,
        'serving_input': tf.TensorSpec(shape=[None,], dtype=tf.int64, name="gender"),
    },
    # 'shared_embedding_sample': {
    #     'preprocessor' : 'shared_embedding',
    #     'shared_embedding': 'hash_sample',
    #     'pooling': True,
    # },
    'cate_list_2': {
        'feature_name': 'cate_list',
        'preprocessor' : 'shared_embedding',
        'shared_embedding': 'user_id',
        'pooling': True,
        'serving_input': tf.TensorSpec(shape=[None,None], dtype=tf.int64, name="cate_list_2"),
    },
}

## 3. define model

In [4]:
class ToyModel(tf.keras.Model):
    # 3.1 init vars
    def __init__(self, config, preprocessoring_layer, embedding_size=None):
        super().__init__()
        self._embeddings = {}
        self._config = config
        self._average_pooling_1d = tf.keras.layers.GlobalAveragePooling1D()
        self._average_pooling_2d = tf.keras.layers.GlobalAveragePooling2D()
        # 3.1.2 define input layer, typically embedding(index2emb)
        for feature_name, feature_config in config.items():
            preprocessor_type = feature_config['preprocessor']
            layers = None
            preprocessor = preprocessoring_layer.preprocessors[feature_name] # get info from preprocess stage
            embedding_size = embedding_size if embedding_size is not None else feature_config.get('embedding_size', 8)
            if preprocessor_type == 'hash':
                layers = [tf.keras.layers.Embedding(feature_config['num_bins'], embedding_size, mask_zero=False,
                                                embeddings_initializer=tf.keras.initializers.GlorotNormal(),
                                                name=f'{feature_name}_embedding')]
                if feature_config.get('pooling'):
                    layers.append(self._average_pooling_1d)
            elif preprocessor_type == 'string':
                layers = [
                    tf.keras.layers.Embedding(preprocessor.vocabulary_size(), embedding_size, mask_zero=False, name=f'{feature_name}_embedding'),
                ]
                if feature_config.get('pooling'):
                    layers.append(self._average_pooling_1d)
            #                 layers = tf.keras.Sequential(layers, name=feature_name+'_bottom_'+preprocessor_type)
            elif preprocessor_type == 'int':
                layers = [
                    tf.keras.layers.Embedding(preprocessor.vocabulary_size(), embedding_size, mask_zero=False, name=f'{feature_name}_embedding'),
                ]
                if feature_config.get('pooling'):
                    layers.append(self._average_pooling_1d)
            #                 layers = tf.keras.Sequential(layers, name=feature_name+'_bottom_'+preprocessor_type)
            elif preprocessor_type == 'text':
                layers = [
                    tf.keras.layers.Embedding(preprocessor.vocabulary_size(), embedding_size, mask_zero=False, name=f'{feature_name}_embedding'),
                    self._average_pooling_1d,
                ]
            elif preprocessor_type == 'norm':
                layers = None
            elif preprocessor_type == 'dis':
                vocab_size = preprocessor.num_bins if preprocessor.num_bins else (len(preprocessor.bin_boundaries)+1)
                layers = [
                    tf.keras.layers.Embedding(vocab_size, embedding_size, mask_zero=False, name=f'{feature_name}_embedding'),
                ]
            elif preprocessor_type == 'shared_embedding':
                embedding_feature_name = feature_config['shared_embedding']
                embedding = self._embeddings[embedding_feature_name].layers[0]
                layers = [embedding]
                if feature_config.get('pooling'):
                    layers.append(self._average_pooling_2d if type(preprocessor) == tf.keras.layers.TextVectorization else self._average_pooling_1d)
            self._embeddings[feature_name] = layers
        # 3.1.3 define mlp layer
        self._target_classification_layer_1 = tf.keras.layers.Dense(128,activation='relu',name='target_classification_layer_1')
        self._target_classification_layer_2 = tf.keras.layers.Dense(1, activation='sigmoid', name='target_classification_layer_2')
        self._target_regression_layer_1 = tf.keras.layers.Dense(128,activation='relu',name='target_regression_layer_1')
        self._target_regression_layer_2 = tf.keras.layers.Dense(1, use_bias=True, name='target_regression_layer_2')
        # 3.1.4 define target layer, especially for complex multi-task network
        # omitted, todo
        # 3.1.5 setup metrics
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")
        self.mae_metric = tf.keras.metrics.MeanAbsoluteError(name='mae_regression')
        self.mse_metric = tf.keras.metrics.MeanSquaredError(name='mse_regression')
        self.mape_metric = tf.keras.metrics.MeanAbsolutePercentageError(name='mape_regression')
        self.msle_metric = tf.keras.metrics.MeanSquaredLogarithmicError(name='msle_regression')
        self.auc_metric = tf.keras.metrics.AUC(name='auc_classification')
        # or UDF metrics inherited from tf.keras.metrics.Metric

    # 3.2 overwrite call function, define mapping (input: features --> output: predict)
    # signature ref: https://stackoverflow.com/questions/60827999/use-dictionary-in-tf-function-input-signature-in-tensorflow-2-0
    @tf.function(input_signature=[{feat_name:feat_conf["serving_input"] for feat_name, feat_conf in feature_config.items()}])
    def call(self, inputs):
        # 3.2.1 from raw feature to embeddings, e.g. preprocessing, embedding lookup
        embeddings = []
        for feature_name, config in self._config.items():
            # original_feature_name = config['feature_name'] if 'feature_name' in config and config['feature_name'] else feature_name
            original_feature_name = feature_name
            embedding_fn = self._embeddings[feature_name]
            bottom = inputs[original_feature_name]
            if isinstance(embedding_fn, list):
                for layer in embedding_fn:
                    bottom = layer(bottom)
            elif isinstance(embedding_fn, tf.keras.Sequential):
                bottom = embedding_fn(bottom)
            elif config['preprocessor'] == 'norm' or not config['preprocessor']:
                bottom = tf.reshape(tf.cast(bottom, tf.float32), [-1, 1])
            bottom = tf.where(tf.math.is_nan(bottom), tf.zeros_like(bottom), bottom)
            embeddings.append(bottom)
        x = tf.concat(embeddings, axis=1)
        # 3.2.2 go through network get predict values
        target_classification = self._target_classification_layer_2(self._target_classification_layer_1(x))
        target_regression = self._target_regression_layer_2(self._target_regression_layer_1(x))
        return {"target_classification": target_classification, "target_regression": target_regression}
    # 3.3 overwrite train_step function, input train data(x and label), output metrics(for tensorboard callback)
    def train_step(self, data):
        # 3.3.1 process label, e.g. duration to 0/1
        x, y = data
        y = self.process_label(y)
        # 3.3.2 with tf.GradientTape() as tape: predict with model, then calculate losses
        with tf.GradientTape() as tape:
            y_pred = self(x)
            loss = self.compiled_loss(y,y_pred) # passed when compile
        # 3.3.3 gradients = tape.gradient(loss, trainable_vars); optimizer.apply_gradients(zip(gradients, trainable_vars))
        trainable_vars = self.trainable_variables # self.trainable_variables: inherited attribute
        gradients = tape.gradient(loss, trainable_vars)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars)) # self.optimizer: inherited attribute, assigned when compile
        # 3.3.4 update metrics
        self.loss_tracker.update_state(loss)
        self.mae_metric.update_state(y["target_regression"], y_pred["target_regression"])
        self.mse_metric.update_state(y["target_regression"], y_pred["target_regression"])
        self.msle_metric.update_state(y["target_regression"], y_pred["target_regression"])
        self.auc_metric.update_state(y["target_classification"], y_pred["target_classification"])
        return {m.name: m.result() for m in self.metrics}
    # 3.4 overwrite test_step(), same with train_step() except gradient
    def test_step(self, data):
        x, y = data
        y = self.process_label(y)
        y_pred = self(x)
        loss = self.compiled_loss(y,y_pred)
        self.loss_tracker.update_state(loss)
        self.mae_metric.update_state(y["target_regression"], y_pred["target_regression"])
        self.mse_metric.update_state(y["target_regression"], y_pred["target_regression"])
        self.msle_metric.update_state(y["target_regression"], y_pred["target_regression"])
        self.auc_metric.update_state(y["target_classification"], y_pred["target_classification"])
        return {m.name: m.result() for m in self.metrics}
    # 3.5 metrics, ref: https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit?hl=en#going_lower-level
    # def reset_metrics(self):
    #     self.loss_tracker.reset_states()
    #     self.mae_metric.reset_states()
    #     self.mse_metric.reset_states()
    #     self.msle_metric.reset_states()
    #     self.auc_metric.reset_states()
    @property
    def metrics(self):
        # We list our `Metric` objects here so that `reset_states()` can be
        # called automatically at the start of each epoch
        # or at the start of `evaluate()`.
        # If you don't implement this property, you have to call
        # `reset_states()` yourself at the time of your choosing.
        return [self.loss_tracker,
                self.mae_metric, self.mse_metric, self.msle_metric,
            self.auc_metric
        ]
    def process_label(self,y):
        y_target_classification = tf.reshape(y["target_classification"], (-1,1))
        y_target_regression_raw = y["target_regression"] / tf.ones_like(y["target_regression"])
        y_target_regression = tf.clip_by_value(y_target_regression_raw, clip_value_min=0.0, clip_value_max=3.0)
        return {"target_classification":y_target_classification,"target_regression":y_target_regression}

## 4. load dataset

### 4.1 define input description, parse raw input

In [5]:
# raw input, todo: complete and confirm
# feature_description = {
#     # features, created in form of float/int/string or array(float/int/string)
#     'hash_sample': tf.io.FixedLenFeature([], tf.string, default_value=''),
#     'int_sample': tf.io.FixedLenFeature([], tf.int64, default_value=0),
#     'norm_sample': tf.io.FixedLenFeature([], tf.float32, default_value=0),
#     'discrete_sample': tf.io.FixedLenFeature([], tf.float32, default_value=0),
#     'string_sample': tf.io.FixedLenFeature([], tf.float32, default_value=0),
#     'float_sample': tf.io.FixedLenFeature([], tf.float32, default_value=0),
#     'shared_embedding_sample': tf.io.FixedLenFeature([], tf.float32, default_value=0),
#     # 'fixed_len_seq_feature': tf.io.FixedLenSequenceFeature([], tf.string, allow_missing=True, default_value=None) # must follow by pooling/attention layer before embed
#     # 'ragged_sample': tf.io.RaggedFeature(tf.string), # must follow by pooling/attention layer before embed
# }
feature_description = {
    # features, created in form of float/int/string or array(float/int/string)
    'user_id': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'age': tf.io.FixedLenFeature([], tf.string, default_value='0'),
    'gender': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'nickname': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'height': tf.io.FixedLenFeature([], tf.float32, default_value=0),
    'active_days': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    'city': tf.io.FixedLenFeature([], tf.string, default_value=''),
    'click': tf.io.FixedLenFeature([], tf.int64, default_value=0),
    'duration': tf.io.FixedLenFeature([], tf.float32, default_value=0),
    'cate_list': tf.io.FixedLenSequenceFeature([], tf.string, allow_missing=True, default_value=None),
    # 'cate_list': tf.io.RaggedFeature(tf.string), # not compatible for model serving
}
def _parse_function(example_proto):
    return tf.io.parse_example(example_proto, feature_description)

### 4.2 get a general preprocess object

In [6]:
class PreprocessingLayer(tf.keras.Model):
    def __init__(self, config):
        super().__init__()
        self.preprocessors = {
        }
        self.config = config
        for feature_name, config in self.config.items():
            preprocessor = None
            preprocessor_type = config['preprocessor']
            if preprocessor_type == 'hash':
                preprocessor = tf.keras.layers.Hashing(num_bins=config['num_bins'], mask_value=config['mask_value'], name=f'{feature_name}_{preprocessor_type}')
            elif preprocessor_type == 'string':
                preprocessor = tf.keras.layers.StringLookup(max_tokens=config['max_tokens'], num_oov_indices=config['num_oov_indices'],
                                                            vocabulary=config.get('vocabulary'), name=f'{feature_name}_{preprocessor_type}')
                if config.get('vocabulary'):
                    print(f'{feature_name} loaded {config.get("vocabulary")}, vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'int':
                print(preprocessor_type,config['max_tokens'],config['num_oov_indices'],config.get('vocabulary'))
                preprocessor = tf.keras.layers.IntegerLookup(max_tokens=config['max_tokens'], num_oov_indices=config['num_oov_indices'],
                                                             vocabulary=config.get('vocabulary'), name=f'{feature_name}_{preprocessor_type}')
                if config.get('vocabulary'):
                    print(f'{feature_name} loaded {config.get("vocabulary")}, vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'text':
                preprocessor = tf.keras.layers.TextVectorization(max_tokens=config['max_tokens'], standardize=config['standardize'], split=config['split'],
                                                                 vocabulary=config.get('vocabulary'), name=f'{feature_name}_{preprocessor_type}')
                if config.get('vocabulary'):
                    print(f'{feature_name} loaded {config.get("vocabulary")}, vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'norm':
                preprocessor = tf.keras.layers.Normalization(axis=None, mean=config.get('mean'), variance=config.get('variance'),
                                                             name=f'{feature_name}_{preprocessor_type}')
            elif preprocessor_type == 'dis':
                if 'bin_boundaries' in config:
                    preprocessor = tf.keras.layers.Discretization(bin_boundaries=config['bin_boundaries'], name=f'{feature_name}_{preprocessor_type}')
                else:
                    preprocessor = tf.keras.layers.Discretization(num_bins=config['num_bins'], name=f'{feature_name}_{preprocessor_type}')
            elif preprocessor_type == 'shared_embedding':
                shared_preprocessor_feature_name = config['shared_embedding']
                preprocessor = self.preprocessors[shared_preprocessor_feature_name]
            self.preprocessors[feature_name] = preprocessor
        self.is_adapted=False

    def call(self, inputs):
        # if use string/text or other adapt demanding preprocessors, should assure is_adapted
        # if not self.is_adapted:
        #     raise RuntimeError('PreprocessingLayer is not adapted yet, call adapt() first.')
        embeddings = {}
        for feature_name, config in self.config.items():
            original_feature_name = config['feature_name'] if 'feature_name' in config and config['feature_name'] else feature_name
            preprocessor = self.preprocessors[feature_name]
            if not preprocessor:
                bottom = inputs[original_feature_name]
            elif type(preprocessor) == tf.keras.layers.TextVectorization:
                bottom = preprocessor(tf.expand_dims(inputs[original_feature_name], -1))
            else:
                bottom = preprocessor(inputs[original_feature_name])
            embeddings[feature_name] = bottom
        # set_trace()
        #         x = tf.concat(embeddings, axis=1)
        return embeddings

    def adapt(self, dataset): # return a dict of feature_name: preprocessing_layer
        for feature_name, config in self.config.items():
            original_feature_name = config['feature_name'] if 'feature_name' in config and config['feature_name'] else feature_name
            preprocessor_type = config['preprocessor']
            preprocessor = self.preprocessors[feature_name]
            if preprocessor_type in ('string', 'int', 'text') and config.get('vocabulary'):
                print(f'{feature_name} loaded {config.get("vocabulary")}, vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'norm' and config.get('mean') is not None and config.get('variance') is not None:
                print(f'{feature_name} loaded with mean: {config.get("mean")} variance: {config.get("variance")}')
            elif preprocessor_type == 'dis' and config.get('bin_boundaries'):
                print(f'{feature_name} loaded with num_bins: {preprocessor.num_bins} bin_boundaries: {preprocessor.bin_boundaries}')
            elif preprocessor_type == 'string':
                preprocessor.adapt(dataset.map(lambda x: x[original_feature_name]))
                print(f'finished {feature_name} vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'int':
                preprocessor.adapt(dataset.map(lambda x: x[original_feature_name]))
                print(f'finished {feature_name} vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'text':
                preprocessor.adapt(dataset.map(lambda x: x[original_feature_name]))
                print(f'finished {feature_name} vocab_size: {preprocessor.vocabulary_size()}')
            elif preprocessor_type == 'norm':
                preprocessor.adapt(dataset.map(lambda x: x[original_feature_name]))
                print(f'finished {feature_name} mean: {preprocessor.mean} variance: {preprocessor.variance}')
            elif preprocessor_type == 'dis':
                preprocessor.adapt(dataset.map(lambda x: x[original_feature_name]))
                print(f'finished {feature_name} num_bins: {preprocessor.num_bins} bin_boundaries: {preprocessor.bin_boundaries}')
            else:
                continue
        self.is_adapted=True

preprocessing_layer = PreprocessingLayer(feature_config)
raw_dataset = tf.data.TFRecordDataset('part-r-00000.gz',  compression_type='GZIP')
for raw_record in raw_dataset.take(1):
    print(tf.train.Example.FromString(raw_record.numpy()))

age loaded ../vocab/test/age.vocab, vocab_size: 4
cate_list loaded ../vocab/test/cates.vocab, vocab_size: 1003
gender loaded ../vocab/test/gender.vocab, vocab_size: 1003
features {
  feature {
    key: "active_days"
    value {
      int64_list {
        value: 4
      }
    }
  }
  feature {
    key: "age"
    value {
      bytes_list {
        value: "18"
      }
    }
  }
  feature {
    key: "cate_list"
    value {
      bytes_list {
        value: "film"
        value: "music"
      }
    }
  }
  feature {
    key: "city"
    value {
      bytes_list {
        value: "shenzhen"
      }
    }
  }
  feature {
    key: "click"
    value {
      int64_list {
        value: 1
      }
    }
  }
  feature {
    key: "duration"
    value {
      float_list {
        value: 3.200000047683716
      }
    }
  }
  feature {
    key: "gender"
    value {
      bytes_list {
        value: "male"
      }
    }
  }
  feature {
    key: "height"
    value {
      float_list {
        value: 180.30

2022-05-11 18:45:27.684898: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


### 4.3 load from hdfs/local/...

In [7]:
def get_files_by_date_range(lastest_date_str, num_hours, data_dir, data_pattern):
    DATE_FORMAT = '%Y-%m-%d/%H'
    logging.info('The latest date is {}'.format(lastest_date_str))
    lastest_date = datetime.strptime(lastest_date_str, DATE_FORMAT)
    files = []
    for hours_back in range(num_hours-1, -1, -1): # from new to old
        curr_date = lastest_date - timedelta(hours=hours_back)
        curr_date_str = curr_date.strftime(DATE_FORMAT)
        dir_path = os.path.join(data_dir, curr_date_str)
        logging.info("datadir={}".format(dir_path))
        if tf.io.gfile.exists(dir_path):
            tfiles = tf.io.gfile.glob(dir_path + '/' + data_pattern)
            logging.info("adding {} files at {}".format(len(tfiles), dir_path))
            files.extend(tfiles)
    assert files
    logging.info('Training over : %d files'%len(files))
    return files

### 4.4 prefetch, batch, map(parse with tf.io.parse_example), map(preprocessor(raw2index preprocessor))

In [8]:
def make_dataset_remote(lastest_date, num_hours, batch_size=1,
                        data_dir='hdfs://cluster/.../sample_tfrecord/', data_pattern='part-*'):
    # training_files = get_files_by_date_range(lastest_date, num_hours, data_dir, data_pattern)
    training_files = []
    if tf.io.gfile.exists("."):
        tfiles = tf.io.gfile.glob('.' + '/' + data_pattern)
        logging.info("adding {} files at {}".format(len(tfiles), '.'))
        training_files.extend(tfiles)
    assert training_files
    logging.info('Training over : %d files'%len(training_files))
    print(f'{len(training_files)} training files.')
    dataset = tf.data.Dataset.from_tensor_slices(training_files)
    dataset = dataset.interleave(lambda x: tf.data.TFRecordDataset(x, buffer_size=1, num_parallel_reads=1,
                                                                   compression_type='GZIP'),
                                 # cycle_length=4,
                                 num_parallel_calls=tf.data.AUTOTUNE,
                                 block_length=batch_size,
                                 deterministic=False)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
    # dataset = dataset.shuffle(buffer_size=10000) # optional
    dataset = dataset.batch(batch_size)
    dataset = dataset.map(_parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    # TODO: preprocessing_layer not integrated into serving model
    dataset = dataset.map(lambda data: (preprocessing_layer(data), {"target_classification":data["click"],
                                                                     "target_regression":data["duration"]}),
                          num_parallel_calls=tf.data.experimental.AUTOTUNE)
    return dataset

### 4.5 print(dataset.take(1)) for testing

In [9]:
dataset_train = make_dataset_remote('2022-01-20/14', num_hours=1, batch_size=1)
for row in dataset_train.take(1):
    pprint(row[0])

1 training files.
{'active_days_dis': <tf.Tensor: shape=(1,), dtype=int64, numpy=array([3])>,
 'age': <tf.Tensor: shape=(1,), dtype=int64, numpy=array([0])>,
 'cate_list': <tf.Tensor: shape=(1, 2), dtype=int64, numpy=array([[808, 601]])>,
 'cate_list_2': <tf.Tensor: shape=(1, 2), dtype=int64, numpy=array([[8, 1]])>,
 'cate_list_hash': <tf.Tensor: shape=(1, 2), dtype=int64, numpy=array([[8, 1]])>,
 'city': <tf.Tensor: shape=(1,), dtype=int64, numpy=array([77])>,
 'gender': <tf.Tensor: shape=(1,), dtype=int64, numpy=array([499])>,
 'height_dis': <tf.Tensor: shape=(1,), dtype=int64, numpy=array([4])>,
 'height_norm': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([10.300003], dtype=float32)>,
 'user_id': <tf.Tensor: shape=(1,), dtype=int64, numpy=array([81])>}


## 5. train

### 5.1 compile model

In [10]:
training_model = ToyModel(feature_config, preprocessing_layer)
training_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss = {"target_classification":tf.losses.binary_crossentropy,
            "target_regression":tf.losses.mean_squared_error,},
    loss_weights = {"target_classification":1.5,
                   "target_regression":1.0,},
    #     run_eagerly=True
)

# check model output
for row in dataset_train.take(1):
    pprint(training_model(row[0]))

{'target_classification': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[0.17893597]], dtype=float32)>,
 'target_regression': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[-3.1409397]], dtype=float32)>}


### 5.2 callbacks

In [11]:
log_dir = './tensorboard'
tensorboard_callback_train = tf.keras.callbacks.TensorBoard(
    log_dir=log_dir,
    histogram_freq=1,
    update_freq=100,
    profile_batch=1000,
    write_steps_per_second=True,
    embeddings_freq=1
)
checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    # Path where to save the model
    # The two parameters below mean that we will overwrite
    # the current checkpoint if and only if
    # the `val_loss` score has improved.
    # The saved model name will include the current epoch.
    filepath="modelckpt",
    save_best_only=False,
    verbose=1,
    save_freq='epoch',
)
# custom callback, ref: https://www.tensorflow.org/guide/keras/custom_callback
class CustomCallback(tf.keras.callbacks.Callback):
    def on_test_batch_end(self, batch, logs=None):
        # do something
        pass

2022-05-11 18:45:29.079630: I tensorflow/core/profiler/lib/profiler_session.cc:110] Profiler session initializing.
2022-05-11 18:45:29.079650: I tensorflow/core/profiler/lib/profiler_session.cc:125] Profiler session started.
2022-05-11 18:45:29.080104: I tensorflow/core/profiler/lib/profiler_session.cc:143] Profiler session tear down.


### 5.3 model.fit()

In [12]:
training_model.fit(dataset_train, epochs=3, callbacks=[tensorboard_callback_train,checkpoint_callback])

Epoch 1/3
      1/Unknown - 3s 3s/step - loss: 40.2922 - mae_regression: 6.1409 - mse_regression: 37.7111 - msle_regression: 1.9218 - auc_classification: 0.0000e+00
Epoch 1: saving model to modelckpt
INFO:tensorflow:Assets written to: modelckpt/assets
Epoch 2/3
Epoch 2: saving model to modelckpt
INFO:tensorflow:Assets written to: modelckpt/assets
Epoch 3/3
Epoch 3: saving model to modelckpt
INFO:tensorflow:Assets written to: modelckpt/assets


2022-05-11 18:45:34.747822: W tensorflow/python/util/util.cc:368] Sets are not currently considered sequences, but this may change in the future, so consider avoiding using them.
INFO:tensorflow:Assets written to: modelckpt/assets
INFO:tensorflow:Assets written to: modelckpt/assets
INFO:tensorflow:Assets written to: modelckpt/assets


<keras.callbacks.History at 0x7fe252ababb0>

### 6. model.save(), if need call manually

In [13]:
model_name = f'model/{training_model.__class__.__name__}_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
training_model.save(model_name)
# print variable scale
print(np.sum([np.prod(v.shape) for v in training_model.variables]))

INFO:tensorflow:Assets written to: model/ToyModel_20220511_184542/assets
38572.0


INFO:tensorflow:Assets written to: model/ToyModel_20220511_184542/assets


### 7. model.evaluate()

In [14]:
dataset_test = dataset_train
tensorboard_callback_test = tensorboard_callback_train
training_model.evaluate(dataset_test, callbacks=[tensorboard_callback_test])



[10.611014366149902,
 2.2530229091644287,
 9.688084602355957,
 0.6996736526489258,
 0.5]

## 8. load model
### 8.1 load entire model. may lose any custom train_step/test_step code. not recommend

In [15]:
# model_loaded = tf.keras.models.load_model(model_name)

### 8.2 alternative option: define whole model and just load weights, with custom code reserved

In [16]:
model_loaded = ToyModel(feature_config, preprocessing_layer)
model_loaded.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss = {"target_classification":tf.losses.binary_crossentropy,
            "target_regression":tf.losses.mean_squared_error,},
    loss_weights = {"target_classification":1.5,
                   "target_regression":1.0,},
    #     run_eagerly=True
)
model_loaded.load_weights(model_name)

2022-05-11 18:45:48.523984: W tensorflow/core/util/tensor_slice_reader.cc:96] Could not open model/ToyModel_20220511_184542: FAILED_PRECONDITION: model/ToyModel_20220511_184542; Is a directory: perhaps your file is in a different file format and you need to use a different restore operator?


<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7fe2360ac9a0>

## 9. continue training

In [17]:
model_loaded.fit(dataset_train, epochs=3, callbacks=[tensorboard_callback_train])
model_loaded.evaluate(dataset_test, callbacks=[tensorboard_callback_test])

Epoch 1/3
Epoch 2/3
Epoch 3/3


[8.100228309631348,
 2.060763120651245,
 7.392379283905029,
 0.7269999980926514,
 0.5]

## 10. serving
Ref: https://github.com/tensorflow/serving

### 10.1 prepare docker env
### 10.2 pull tensorflow serving image
### 10.3 prepare model
Prepare model files order by version

Ref: https://stackoverflow.com/questions/45544928/tensorflow-serving-no-versions-of-servable-model-found-under-base-path

### 10.4 check model signature

In [18]:
print(training_model.save_spec())
serving_model = tf.saved_model.load(model_name)
print(serving_model.signatures["serving_default"])

([{'user_id': TensorSpec(shape=(None,), dtype=tf.int64, name='user_id'), 'age': TensorSpec(shape=(None,), dtype=tf.int64, name='age'), 'height_norm': TensorSpec(shape=(None,), dtype=tf.float32, name='height_norm'), 'height_dis': TensorSpec(shape=(None,), dtype=tf.int64, name='height_dis'), 'active_days_dis': TensorSpec(shape=(None,), dtype=tf.int64, name='active_days_dis'), 'city': TensorSpec(shape=(None,), dtype=tf.int64, name='city'), 'cate_list': TensorSpec(shape=(None, 2), dtype=tf.int64, name='cate_list'), 'cate_list_hash': TensorSpec(shape=(None, 2), dtype=tf.int64, name='cate_list_hash'), 'gender': TensorSpec(shape=(None,), dtype=tf.int64, name='gender'), 'cate_list_2': TensorSpec(shape=(None, 2), dtype=tf.int64, name='cate_list_2')}], {})
ConcreteFunction signature_wrapper(*, height_norm, user_id, age, city, cate_list, active_days_dis, cate_list_hash, cate_list_2, height_dis, gender)
  Args:
    active_days_dis: int64 Tensor, shape=(None,)
    age: int64 Tensor, shape=(None,)
 

### 10.5 start a container

### 10.6 double check model signature
```bash
curl http://${host}:${port}/v1/models/${model_name}/metadata
```
<details>
  <summary>response, click to unfold</summary>
  <pre><code class="language-json">
{
    "metadata": {
        "signature_def": {
            "signature_def": {
                "__saved_model_init_op": {
                    "inputs": {},
                    "method_name": "",
                    "outputs": {
                        "__saved_model_init_op": {
                            "dtype": "DT_INVALID",
                            "name": "NoOp",
                            "tensor_shape": {
                                "dim": [],
                                "unknown_rank": true
                            }
                        }
                    }
                },
                "serving_default": {
                    "inputs": {
                        "active_days": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_active_days:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "age": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_age:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "cate_list": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_cate_list:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    },
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "cate_list_2": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_cate_list_2:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    },
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "cate_list_hash": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_cate_list_hash:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    },
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "city": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_city:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "gender": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_gender:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "height_dis": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_height_dis:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "height_norm": {
                            "dtype": "DT_FLOAT",
                            "name": "serving_default_height_norm:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "user_id": {
                            "dtype": "DT_INT64",
                            "name": "serving_default_user_id:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        }
                    },
                    "method_name": "tensorflow/serving/predict",
                    "outputs": {
                        "target_classification": {
                            "dtype": "DT_FLOAT",
                            "name": "StatefulPartitionedCall:0",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    },
                                    {
                                        "name": "",
                                        "size": "1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        },
                        "target_regression": {
                            "dtype": "DT_FLOAT",
                            "name": "StatefulPartitionedCall:1",
                            "tensor_shape": {
                                "dim": [
                                    {
                                        "name": "",
                                        "size": "-1"
                                    },
                                    {
                                        "name": "",
                                        "size": "1"
                                    }
                                ],
                                "unknown_rank": false
                            }
                        }
                    }
                }
            }
        }
    },
    "model_spec": {
        "name": "toy_model",
        "signature_name": "",
        "version": "1"
    }
}
  </code></pre>
</details>

<br/>
Preprocessing_layer is not integrated into saved model in our case.
Signatures are mapping to tensors before embedding layers.

### 10.7 predict

Flawed example
```bash
curl -X POST -d \
'{
 "instances": [
   {
     "user_id": 44,
     "age": 1,
     "height_norm": 140.3,
     "height_dis": 2,
     "active_days_dis": 0,
     "city": 67,
     "cate_list": [638,522],
     "cate_list_hash": [4,2],
     "gender": 2,
     "cate_list_2": [13,33]
   },
   {
     "user_id": 23,
     "age": 2,
     "height_norm": 37.4,
     "height_dis": 1,
     "active_days_dis": 3,
     "city": 98,
     "cate_list": [11,11],
     "cate_list_hash": [12,12],
     "gender": 42,
     "cate_list_2": [72] # attention!
     # instances[1] wil fail cos shape("cate_list_2") varies from instances[0]
     # solution: pad seq features to same length, e.g. [72]->[72,0]
   }
 ]
}' http://${host}:${port}/v1/models/${model_name}:predict
```

response
```json
{
    "predictions": [
        {
            "target_classification": [2.61704299e-05],
            "target_regression": [-32.5374947]
        },
        {
            "target_classification": [0.0522824526],
            "target_regression": [-8.58270836]
        }
    ]
}
```

### 10.8 confirm serving predict result same to saved model

In [19]:
predict_data = {
                   "user_id": [[44],[23]],
                   "age": [[1],[2]],
                   "height_norm": [[140.3],[37.4]],
                   "height_dis": [[2],[1]],
                   "active_days_dis": [[0],[3]],
                   "city": [[67],[98]],
                   "cate_list": [[[638,522]],[[11,11]]],
                   "cate_list_hash": [[[4,2]],[[12,12]]],
                   "gender": [[2],[42]],
                   "cate_list_2": [[[13,33]],[[72,0]]]
               }
training_model.predict(
    tf.data.Dataset.from_tensor_slices(predict_data).map(lambda x:{
        "user_id":tf.cast(x["user_id"],tf.int64),
        "age":tf.cast(x["age"],tf.int64),
        "height_norm":tf.cast(x["height_norm"],tf.float32),
        "height_dis":tf.cast(x["height_dis"],tf.int64),
        "active_days_dis":tf.cast(x["active_days_dis"],tf.int64),
        "city":tf.cast(x["city"],tf.int64),
        "cate_list":tf.cast(x["cate_list"],tf.int64),
        "cate_list_hash":tf.cast(x["cate_list_hash"],tf.int64),
        "gender":tf.cast(x["gender"],tf.int64),
        "cate_list_2":tf.cast(x["cate_list_2"],tf.int64),
    })
)


{'target_classification': array([[2.6170430e-05],
        [5.2282512e-02]], dtype=float32),
 'target_regression': array([[-32.537495],
        [ -8.582708]], dtype=float32)}