<a href="https://colab.research.google.com/github/Muzhi1920/awesome-models/blob/main/09-Estimator/00_Wide%26Deep.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import math

import tensorflow as tf

os.environ['TF_CONFIG'] = '{"task":{"type":"worker","index":0}}' #单机演示

## Datasets读取与解析pipeline

### 1. 定义特征proto

In [2]:
feature_proto = {
    'sparse_000': tf.io.VarLenFeature(tf.int64),
    'sparse_001': tf.io.VarLenFeature(tf.int64),
    'sparse_002': tf.io.VarLenFeature(tf.int64),
    'dense_000': tf.io.FixedLenFeature((), tf.int64, 0),
    'dense_001': tf.io.FixedLenFeature((), tf.float32, 0.0), #label ,need pop
    'dense_002': tf.io.FixedLenFeature((), tf.float32, 0.0),
    'dense_003': tf.io.FixedLenFeature((), tf.float32, 0.0)
}

### 2. 实现datasets消费与解析

In [3]:
def parse_example_batch(example, feature_proto):
    '''
     - serialized: A vector (1-D Tensor) of strings, a batch of binary serialized `Example` protos.
     - features: A `dict` mapping feature keys to `FixedLenFeature`, `VarLenFeature`, `SparseFeature`, and `RaggedFeature` values.
    '''
    features = tf.io.parse_example(example, feature_proto) #dict
    label_shape = tf.shape(features['dense_001'])

    label_dict = dict()
    pos_label = tf.fill(dims=label_shape, value=tf.constant(1.0, tf.float32))
    neg_label = tf.fill(dims=label_shape, value=tf.constant(0.0, tf.float32))
    label_dict['ctr'] = tf.where(tf.greater(features['dense_001'], 0.0), pos_label, neg_label)
    return features, label_dict

def _tfrecord_input_fn(data_path, batch_size, epochs, feature_proto, num_shards, index):
    tf_files = tf.io.match_filenames_once(data_path)
    dataset = tf.data.Dataset.from_tensor_slices(tf_files).shard(num_shards, index).shuffle(10, reshuffle_each_iteration=True).interleave(
        lambda _file: tf.data.TFRecordDataset(_file),
        cycle_length=2,
        block_length=8,
        num_parallel_calls=tf.data.AUTOTUNE).shuffle(batch_size * 20, reshuffle_each_iteration=True).repeat(epochs).batch(batch_size)
    prefetch_dataset = dataset.map(lambda example: parse_example_batch(example, feature_proto), num_parallel_calls=tf.data.AUTOTUNE).prefetch(tf.data.AUTOTUNE)
    return prefetch_dataset

def _input_fn(data_path, feature_proto):
    return lambda: _tfrecord_input_fn(data_path=data_path, batch_size=4, epochs=20, feature_proto=feature_proto, num_shards=1, index=0)

In [4]:
ds = _tfrecord_input_fn('./data/simple.tfrecord', 4, 20, feature_proto, 1, 0)
ds

<PrefetchDataset element_spec=({'sparse_000': SparseTensorSpec(TensorShape([None, None]), tf.int64), 'sparse_001': SparseTensorSpec(TensorShape([None, None]), tf.int64), 'sparse_002': SparseTensorSpec(TensorShape([None, None]), tf.int64), 'dense_000': TensorSpec(shape=(None,), dtype=tf.int64, name=None), 'dense_001': TensorSpec(shape=(None,), dtype=tf.float32, name=None), 'dense_002': TensorSpec(shape=(None,), dtype=tf.float32, name=None), 'dense_003': TensorSpec(shape=(None,), dtype=tf.float32, name=None)}, {'ctr': TensorSpec(shape=(None,), dtype=tf.float32, name=None)})>

### 查看dataset数据

In [5]:
iter = tf.compat.v1.data.make_one_shot_iterator(ds)
features, labels = iter.get_next()
features['sparse_000'], tf.sparse.to_dense(features['sparse_000']), labels,

(<tensorflow.python.framework.sparse_tensor.SparseTensor at 0x7f6c9c97dcd0>,
 <tf.Tensor: shape=(4, 25), dtype=int64, numpy=
 array([[                  0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0],
        [ 168392281970674538, 1044814902578433622, 1388551246485349775,
                           0,                   0,                   0,
                           0,                   0,                   0,
                           0,                   0,  

## Feature Column实现特征处理

In [6]:
from tensorflow import feature_column as fc
def build_columns(feature_proto):
    wide_columns = []
    deep_columns = []
    for feature, _ in feature_proto.items():
        if 'sparse' in feature:
            hash_bkt_input = fc.categorical_column_with_hash_bucket(feature, 64, tf.int64)
            emb_input = fc.embedding_column(hash_bkt_input, 8, 'sqrtn', tf.keras.initializers.VarianceScaling(distribution='uniform'))
            wide_columns.append(hash_bkt_input)
            deep_columns.append(emb_input)
        elif 'dense' in feature:
            dense_input = fc.numeric_column(feature, default_value=0.0)
            deep_columns.append(dense_input)
            wide_columns.append(dense_input)
        else:
            print('exception feature is : ',feature)
    return wide_columns, deep_columns

In [7]:
'''serving_input_receiver_fn that expects all features to be fed directly.'''
serving_feature = feature_proto.copy()
serving_feature.pop('dense_001') # feed features into model, except labes
wide_columns, deep_columns = build_columns(serving_feature)
wide_columns, deep_columns

([HashedCategoricalColumn(key='sparse_000', hash_bucket_size=64, dtype=tf.int64),
  HashedCategoricalColumn(key='sparse_001', hash_bucket_size=64, dtype=tf.int64),
  HashedCategoricalColumn(key='sparse_002', hash_bucket_size=64, dtype=tf.int64),
  NumericColumn(key='dense_000', shape=(1,), default_value=(0.0,), dtype=tf.float32, normalizer_fn=None),
  NumericColumn(key='dense_002', shape=(1,), default_value=(0.0,), dtype=tf.float32, normalizer_fn=None),
  NumericColumn(key='dense_003', shape=(1,), default_value=(0.0,), dtype=tf.float32, normalizer_fn=None)],
 [EmbeddingColumn(categorical_column=HashedCategoricalColumn(key='sparse_000', hash_bucket_size=64, dtype=tf.int64), dimension=8, combiner='sqrtn', initializer=<keras.initializers.initializers_v2.VarianceScaling object at 0x7f6c9c9ee490>, ckpt_to_load_from=None, tensor_name_in_ckpt=None, max_norm=None, trainable=True, use_safe_embedding_lookup=True),
  EmbeddingColumn(categorical_column=HashedCategoricalColumn(key='sparse_001', has

## 样本维度Tensor输入model

### Wide部分

In [8]:
from tensorflow_estimator.python.estimator.canned.linear import _linear_model_fn_builder_v2 as wide_model
wide_output, wide_weights = wide_model(units=1, feature_columns=wide_columns, sparse_combiner='sum', features=features)
wide_output #, wide_weights

  getter=tf.compat.v1.get_variable)


<tf.Tensor: shape=(4, 1), dtype=float32, numpy=
array([[0.],
       [0.],
       [0.],
       [0.]], dtype=float32)>

### Deep部分

In [9]:
input_layer = tf.keras.layers.DenseFeatures(feature_columns=deep_columns, name='deep_input')
input_layer

<keras.feature_column.dense_features_v2.DenseFeatures at 0x7f6c956ecc90>

In [10]:
input_layer(features)

<tf.Tensor: shape=(4, 27), dtype=float32, numpy=
array([[ 0.0000000e+00,  0.0000000e+00, -8.3323002e-02,  0.0000000e+00,
         0.0000000e+00,  0.0000000e+00,  0.0000000e+00,  0.0000000e+00,
         0.0000000e+00,  0.0000000e+00,  0.0000000e+00, -3.7095070e-02,
         2.3599686e-01,  3.6472061e-01,  1.1396408e-01, -2.8706622e-01,
        -3.0907178e-01,  1.2102008e-02, -1.8952183e-01,  0.0000000e+00,
         0.0000000e+00,  0.0000000e+00,  0.0000000e+00,  0.0000000e+00,
         0.0000000e+00,  0.0000000e+00,  0.0000000e+00],
       [ 0.0000000e+00,  0.0000000e+00, -1.5305100e-01, -3.5761565e-02,
        -1.4589924e-01,  1.9014908e-01, -4.0890642e-02,  1.3951302e-02,
         5.0776634e-02,  4.8859835e-02,  6.7970626e-02, -8.5156314e-02,
         1.2871352e-01,  2.6085344e-01, -3.9815154e-02, -2.5092021e-01,
        -3.8565766e-02,  1.4258026e-01,  1.9833410e-01, -3.5256825e-02,
         1.0200072e-01,  1.1975102e-02, -3.8204312e-02, -1.0419901e-01,
        -1.6082984e-01,  9.240

## 基于Estimator封装Wide&Deep模型

- 重写model_fn函数，实现自定义estimator；
- 通过各model模块得到目标输出和优化op；
- 配置优化器得到优化loss的op

In [11]:
from tensorflow.keras.layers import Dense
from tensorflow.python.keras.engine import training
class DNN(training.Model):
  """A DNN Model."""
  def __init__(self,
               deep_columns = None,
               dnn_dims = [16, 8, 1],
               name=None,
               **kwargs):
    super(DNN, self).__init__(name=name, **kwargs)
    self._input_layer = tf.keras.layers.DenseFeatures(feature_columns=deep_columns, name='deep')
    self._dnn = [Dense(dims, 'relu') for dims in dnn_dims]
  
  def call(self, features, mode):
    inputs = self._input_layer(features)
    for nn in self._dnn:
        inputs = nn(inputs)
    dnn_output = inputs
    return dnn_output

In [12]:
class WideDeepModel(tf.estimator.Estimator):

    def __init__(self, model_dir=None, config=None, warm_start_from=None, wide_columns=None, linear_optimizer=None,
                 deep_columns=None, dnn_optimizer=None):

        def _model_fn(features, labels, mode, config):
            
            # wide & deep
            linear_logits, linear_trainable_variables = wide_model(units=1, feature_columns=wide_columns, sparse_combiner='sum', features=features)
            
            deep_model = DNN(deep_columns=deep_columns, name='dnn')
            dnn_logits = deep_model(features, mode)
            dnn_trainable_variables = deep_model.trainable_variables
            dnn_update_ops = deep_model.updates

            logistic_output = dict()
            logistic_output['ctr'] = tf.sigmoid(dnn_logits + linear_logits)

            obj_head = []
            ctr_head = tf.estimator.BinaryClassHead(weight_column=None, name='ctr')
            obj_head.append(ctr_head)
            multi_head = tf.estimator.MultiHead(obj_head)


            def _train_op_fn(loss):
                """Returns the op to optimize the loss."""
                train_ops = []
                if dnn_logits is not None:
                    train_ops.extend(dnn_optimizer.get_updates(loss, dnn_trainable_variables))
                if dnn_update_ops is not None:
                    train_ops.extend(dnn_update_ops)
                if linear_logits is not None:
                    train_ops.extend(linear_optimizer.get_updates(loss, linear_trainable_variables))
                train_op = tf.group(*train_ops)
                return train_op

            if mode == tf.estimator.ModeKeys.TRAIN:
              # In TRAIN mode, asssign global_step variable to optimizer.iterations to
              # make global_step increased correctly, as Hooks relies on global step as
              # step counter. Note that, Only one model's optimizer needs this assignment.
                if dnn_logits is not None:
                    dnn_optimizer.iterations = tf.compat.v1.train.get_or_create_global_step()
                else:
                    linear_optimizer.iterations = tf.compat.v1.train.get_or_create_global_step()
    
            return multi_head.create_estimator_spec(features=features, mode=mode, labels=labels, train_op_fn=_train_op_fn, logits=logistic_output)

        super(WideDeepModel, self).__init__(model_fn=_model_fn, model_dir=model_dir, config=config, warm_start_from=warm_start_from)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

### Estimator模型的生命周期

In [13]:
tf.compat.v1.disable_eager_execution()
batch_size = 8
epochs = 6
sample_nums = 3060
max_steps = int(sample_nums * epochs / batch_size)

In [14]:
def _serving_input_receiver_fn(serving_feature):
    '''expects all features to be fed directly.'''
    features = {}
    for feature_name, _ in serving_feature.items():
        if 'dense' in feature_name:
            features[feature_name] = tf.compat.v1.placeholder(dtype=tf.float32, shape=[None, None])
        elif 'sparse' in feature_name:
            features[feature_name] = tf.compat.v1.placeholder(dtype=tf.int64, shape=[None, None])
        else:
            print('exception feature type')
    return tf.estimator.export.build_raw_serving_input_receiver_fn(features)

def auc_compare_fn(best_eval_result, current_eval_result ):
    return best_eval_result['auc/ctr'] < current_eval_result['act/ctr']


In [15]:
train_model_spec = tf.estimator.TrainSpec(input_fn=_input_fn('./data/simple.tfrecord', feature_proto), max_steps=max_steps)

model_exporter = tf.estimator.BestExporter(
    serving_input_receiver_fn=_serving_input_receiver_fn(serving_feature),
    compare_fn=auc_compare_fn,
    exports_to_keep=4)

eval_model_spec = tf.estimator.EvalSpec(input_fn=_input_fn('./data/simple.tfrecord', feature_proto),
                                  steps=10,
                                  throttle_secs=3,
                                  exporters=model_exporter)

wd_model = WideDeepModel(model_dir='./model',
                         wide_columns=wide_columns,
                         linear_optimizer=tf.keras.optimizers.Ftrl(learning_rate=0.001,l1_regularization_strength=0.05,l2_regularization_strength=0.2),
                         deep_columns=deep_columns,
                         dnn_optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.008, epsilon=1e-8))

INFO:tensorflow:TF_CONFIG environment variable: {'task': {'type': 'worker', 'index': 0}}
INFO:tensorflow:Using default config.
INFO:tensorflow:Using config: {'_model_dir': './model', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': None, '_save_checkpoints_secs': 600, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_checkpoint_save_graph_def': True, '_service': None, '_cluster_spec': ClusterSpec({}), '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


train、eval、exporter各参数配置的具体含义。

In [16]:
!rm -rf ./model
tf.estimator.train_and_evaluate(wd_model, train_model_spec, eval_model_spec)
wd_model.export_saved_model("./model/saved_model", _serving_input_receiver_fn(serving_feature))

INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps None or save_checkpoints_secs 600.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
INFO:tensorflow:Calling model_fn.


  getter=tf.compat.v1.get_variable)


Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Calling checkpoint listeners before saving checkpoint 0...
INFO:tensorflow:Saving checkpoints for 0 into ./model/model.ckpt.
INFO:tensorflow:Calling checkpoint listeners after saving checkpoint 0...
INFO:tensorflow:loss = 0.97846556, step = 0
INFO:tensorflow:global_step/sec: 237.074
INFO:tensorflow:loss = 0.508775, step = 100 (0.428 sec)
INFO:tensorflow:global_step/sec: 532.508
INFO:tensorflow:loss = 0.680966, step = 200 (0.185 sec)
INFO:tensorflow:global_step/sec: 542.162
INFO:tensorflow:loss = 0.7166904, step = 300 (0.183 sec)
INFO:tensorflow:global_step/sec: 514.63
INFO:tensorflow:loss = 0.77224016, step = 400 (0.198 sec)
INFO:tensorflow:global_step/se

b'./model/saved_model/1651752786'

In [17]:
# 基于estimator自带Wide-n-Deep model
# from tensorflow.estimator import DNNLinearCombinedClassifier as WideDeepModelV2
# wd_model_v2 = WideDeepModelV2(model_dir='./model',
#                               linear_feature_columns=wide_columns,
#                               dnn_feature_columns=deep_columns,
#                               dnn_hidden_units = [16, 8])
# tf.estimator.train_and_evaluate(wd_model_v2, train_model_spec, eval_model_spec)
# wd_model_v2.export_saved_model("./model/saved_model", _serving_input_receiver_fn(serving_feature))

In [18]:
!saved_model_cli show --dir ./model/saved_model/*/ --tag_set serve --signature_def predict

The given SavedModel SignatureDef contains the following input(s):
  inputs['dense_000'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, -1)
      name: Placeholder_9:0
  inputs['dense_002'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, -1)
      name: Placeholder_10:0
  inputs['dense_003'] tensor_info:
      dtype: DT_FLOAT
      shape: (-1, -1)
      name: Placeholder_11:0
  inputs['sparse_000'] tensor_info:
      dtype: DT_INT64
      shape: (-1, -1)
      name: Placeholder_6:0
  inputs['sparse_001'] tensor_info:
      dtype: DT_INT64
      shape: (-1, -1)
      name: Placeholder_7:0
  inputs['sparse_002'] tensor_info:
      dtype: DT_INT64
      shape: (-1, -1)
      name: Placeholder_8:0
The given SavedModel SignatureDef contains the following output(s):
  outputs['ctr/all_class_ids'] tensor_info:
      dtype: DT_INT32
      shape: (-1, 2)
      name: ctr/ctr/predictions/Tile:0
  outputs['ctr/all_classes'] tensor_info:
      dtype: DT_STRING
      shape: (-1, 2)
      