[toc]

# Tensorflow Estimator

In [27]:
import warnings
warnings.filterwarnings('ignore')
import tensorflow as tf

TRAIN_URL = "http://download.tensorflow.org/data/iris_training.csv"
TEST_URL = "http://download.tensorflow.org/data/iris_test.csv"

def downloadfiles():
    train_path = tf.keras.utils.get_file(fname=r'./data', origin=TRAIN_URL)
    test_path = tf.keras.utils.get_file(fname=r'./data', origin=TEST_URL)
    return train_path, test_path

train_path,test_path = downloadfiles()
print("train_path: {}\ntest_path: {}".format(train_path, test_path))

BATCH_SIZE = 16
EPOCHS = 400
STEPS = 40
LR = 0.0001

train_path: /Users/bytedance/.keras/datasets/./data
test_path: /Users/bytedance/.keras/datasets/./data


## 定义模型

模型的主要部分

In [29]:
def create_model(features, feature_columns, hiddens, output_dim):
    inputs = tf.feature_column.input_layer(features=features, feature_columns=feature_columns)

    for hidden_unit in hiddens:
        inputs = tf.layers.dense(inputs=inputs, units=hidden_unit, activation=tf.nn.relu)
    logits = tf.layers.dense(inputs=inputs, units=output_dim)
    return logits

## 定义模型层

光有模型的主要部分还不够，我们常常希望我们的模型可以在不同情况下输出不同的值。比如

- 如果我们在训练阶段，那么我们希望模型输出 loss 和 train_op
- 如果我们在测试阶段，那么我们希望模型可以输出一些指标，如 acc、precision、recall
- 如果我们在预测阶段，那么我们希望模型可以直接输出预测结果。

这就需要我们通过模型层，根据不同的 mode 来改变我们模型的输出。

![ee723668dc7b29f293d05ee10e3e3feb.png](evernotecid://17DACF27-DD15-47AE-A79A-0E370E882109/appyinxiangcom/22483756/ENResource/p13198)

模型层是一个回调函数，它接受一堆参数，并返回一个 `tf.estimator.EstimatorSpec`

我们的模型会根据不同的 mode 来转换不同的输出。这个是通过判断 mode 来返回不同的 `tf.estimator.EstimatorSpec` 来实现的

### tf.estimator.EstimatorSpec

#### 原型

它是一个class(类)，是定义在model\_fn中的，并且model\_fn返回的也是它的一个实例，这个实例是用来初始化Estimator类的。其源代码如下:

```
class EstimatorSpec():
  def __new__(cls,
              mode,
              predictions=None,
              loss=None,
              train_op=None,
              eval_metric_ops=None,
              export_outputs=None,
              training_chief_hooks=None,
              training_hooks=None,
              scaffold=None,
              evaluation_hooks=None,
              prediction_hooks=None):
```

重要函数参数：

*   mode：一个ModeKeys,指定是training(训练)、evaluation(计算)还是prediction(预测).
*   predictions：Predictions `Tensor` or dict of `Tensor`.
*   loss：Training loss `Tensor`. Must be either scalar, or with shape `[1]`.
*   train\_op：适用于训练的步骤.
*   eval\_metric\_ops: Dict of metric results keyed by name.
    The values of the dict can be one of the following:
    *   (1) instance of `Metric` class.
    *   (2) Results of calling a metric function, namely a `(metric_tensor, update_op)` tuple. `metric_tensor` should be evaluated without any impact on state (typically is a pure computation results based on variables.). For example, it should not trigger the `update_op` or requires any input fetching.

其他参数的作用可参见[源代码](https://github.com/tensorflow/estimator/tree/master/tensorflow_estimator/python/estimator/model_fn.py)说明

#### 不同模式需要传入不同参数

根据mode的值的不同,需要不同的参数,即：

*   对于mode == ModeKeys.TRAIN：必填字段是 loss 和 train_op.
*   对于mode == ModeKeys.EVAL：必填字段是 loss.
*   对于mode == ModeKeys.PREDICT：必填字段是predictions.

上面的参数说明看起来还是一头雾水，下面给出例子帮助理解：

##### 最简单的情况： predict

只需要传入`mode`和`predictions`

```
# Compute predictions.
predicted_classes = tf.argmax(logits, 1)
if mode == tf.estimator.ModeKeys.PREDICT:
    predictions = {
        'class_ids': predicted_classes[:, tf.newaxis],
        'probabilities': tf.nn.softmax(logits),
        'logits': logits,
    }
    return tf.estimator.EstimatorSpec(mode, predictions=predictions)
```

##### 评估模式：eval

需要传入`mode`,`loss`,`eval_metric_ops`

如果调用 Estimator 的 evaluate 方法，则 `model_fn` 会收到 mode = ModeKeys.EVAL。在这种情况下，模型函数必须返回一个包含模型损失和一个或多个指标（可选）的 tf.estimator.EstimatorSpec。


返回方式如下：

```
# Compute loss.
loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
metrics = {'accuracy': accuracy}
# Compute evaluation metrics.
accuracy = tf.metrics.accuracy(labels=labels,
                               predictions=predicted_classes,
                               name='acc_op')

if mode == tf.estimator.ModeKeys.EVAL:
    return tf.estimator.EstimatorSpec(
```

##### 训练模式：train

需要传入`mode`,`loss`,`train_op`

loss同eval模式：

```
if mode == tf.estimator.ModeKeys.EVAL:
    loss = tf.losses.sparse_softmax_cross_entropy(labels=labels, logits=logits)
    optimizer = tf.train.AdagradOptimizer(learning_rate=0.1)
    train_op = optimizer.minimize(loss,global_step=tf.train.get_global_step())
    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)
```

##### 通用模式

`model_fn` 可以还可以填充独立于模式的所有参数.在这种情况下,Estimator将忽略某些参数.在eval和infer模式中, `train_op` 将被忽略.例子如下：

```
def my_model_fn(mode, features, labels):
  predictions = ...
  loss = ...
  train_op = ...
  return tf.estimator.EstimatorSpec(
      mode=mode,
      predictions=predictions,
      loss=loss,
      train_op=train_op)
```

### 代码示例

In [26]:
def model_fn_builder(lr): # 这个只是利用来传递参数 lr，真正的部分是 model_fn
    
    def model_fn(features, labels, mode, params, config):
        logits = create_model(features, params['feature_columns'], params['hiddens'], params['output_dim'])
        predict_pro  = tf.nn.softmax(logits)
        predict_cls = tf.argmax(logits, axis=1)
        if mode != tf.estimator.ModeKeys.PREDICT:
            loss = tf.losses.sparse_softmax_cross_entropy(labels=labels,logits=logits)

        def get_metric(labels, predictions):
            '''
            define metrics
            '''
            accuracy = tf.metrics.accuracy(labels=labels, 
                                           predictions=predictions, 
                                           name='iris_accuracy')
            recall = tf.metrics.recall(labels=labels,
                                       predictions=predictions,
                                       name='iris_recall')
            precision, precision_update=tf.metrics.precision(labels=labels,predictions=predictions,name='iris_precision')
            
            return {
                'accuracy':accuracy,
                'recall': recall,
                'precision':(precision,precision_update)                  
            }

        if mode == tf.estimator.ModeKeys.TRAIN:
            train_op = tf.train.AdamOptimizer(lr).minimize(loss=loss, global_step=tf.train.get_global_step())
            return tf.estimator.EstimatorSpec(mode=mode,
                                              loss=loss,
                                              train_op=train_op,
                                              eval_metric_ops=get_metric(labels,predict_cls))
        
        elif mode == tf.estimator.ModeKeys.EVAL:
            return tf.estimator.EstimatorSpec(mode=mode,
                                              loss=loss,
                                              eval_metric_ops=get_metric(labels,predict_cls))
        
        elif mode == tf.estimator.ModeKeys.PREDICT:
            predictions={'predict_cls':predict_cls,
                         'predict_pro':predict_pro}
            return tf.estimator.EstimatorSpec(mode=mode,
                                              predictions=predictions)  
    return model_fn

## 定义输入层

输出层是一个函数，返回 dataset

In [6]:
CSV_TYPES=[[0.0], [0.0], [0.0], [0.0], [0]]
CSV_COLUMN_NAMES = ['SepalLength', 'SepalWidth',
                    'PetalLength', 'PetalWidth', 'label']
label = ['Setosa', 'Versicolor', 'Virginica']

def input_fn_builder(file_path, epochs, batch_size, istrain=False):
    
    def parse_line(line): # 这个给 map 函数用来解析行
        '''
        parse csv line to features fromat
        '''
        fileds = tf.decode_csv(line,record_defaults=CSV_TYPES)
        features = dict(zip(CSV_COLUMN_NAMES,fileds))
        label = features.pop('label')
        return features,label
    
    def input_fn():
        dataset = tf.data.TextLineDataset(file_path).skip(1)
        dataset = dataset.map(parse_line)
        if istrain:
            dataset = dataset.shuffle(1000)
        dataset = dataset.repeat(epochs).batch(batch_size)
        return dataset # 返回的 顺序要和 model_fn一致 或者 dataset元素 格式为（features,label）元组 也可以
    
    return input_fn

In [7]:
model_dir = r'./model'
params = {}
feature_columns = []
for i in range(len(CSV_COLUMN_NAMES)-1):
    feature_columns.append(
        tf.feature_column.numeric_column(CSV_COLUMN_NAMES[i])
    )
params['feature_columns'] = feature_columns
params['hiddens'] = [128, 256, 256]
params['output_dim'] = len(label)

In [8]:
config = tf.estimator.RunConfig(save_checkpoints_steps=100)
estimator = tf.estimator.Estimator(
    model_fn=model_fn_builder(LR), # 这里需要一个函数
    model_dir=model_dir, 
    params=params,
    config=config)

INFO:tensorflow:Using config: {'_model_dir': './model', '_tf_random_seed': None, '_save_summary_steps': 100, '_save_checkpoints_steps': 100, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 5, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': None, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f8bf1040510>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [9]:
train = estimator.train(input_fn=input_fn_builder(file_path=train_path,
                                                    batch_size=BATCH_SIZE,
                                                    epochs=EPOCHS), # 这里也需要一个函数
                        steps=STEPS)

Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.

INFO:tensorflow:Calling model_fn.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Instructions for updating:
The old _FeatureColumn APIs are being deprecated. Please use the new FeatureColumn APIs instead.
Instructions for updating:
Use keras.layers.Dense instead.
Instructions for updating:
Please use `layer.__call__` method instead.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where
train
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Graph was finalized.
INFO:tensorflo

### evaluate


In [10]:
# evaluate(  input_fn,    steps=None,    hooks=None,    checkpoint_path=None,    name=None)
estimator.evaluate(input_fn=input_fn_builder(file_path=test_path,
                                            batch_size=BATCH_SIZE,
                                            epochs=EPOCHS), steps=STEPS)

INFO:tensorflow:Calling model_fn.
eval
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Starting evaluation at 2021-02-03T15:55:22Z
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Restoring parameters from ./model/model.ckpt-120
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:Evaluation [4/40]
INFO:tensorflow:Evaluation [8/40]
INFO:tensorflow:Evaluation [12/40]
INFO:tensorflow:Evaluation [16/40]
INFO:tensorflow:Evaluation [20/40]
INFO:tensorflow:Evaluation [24/40]
INFO:tensorflow:Evaluation [28/40]
INFO:tensorflow:Evaluation [32/40]
INFO:tensorflow:Evaluation [36/40]
INFO:tensorflow:Evaluation [40/40]
INFO:tensorflow:Finished evaluation at 2021-02-03-15:55:23
INFO:tensorflow:Saving dict for global step 120: accuracy = 0.7859375, global_step = 120, loss = 0.48074174, precision = 1.0, recall = 1.0
INFO:tensorflow:Saving 'checkpoint_path' summary for global step 120: ./model/model.ckpt-120


{'accuracy': 0.7859375,
 'loss': 0.48074174,
 'precision': 1.0,
 'recall': 1.0,
 'global_step': 120}

### predict 

In [11]:
# predict(    input_fn,    predict_keys=None,    hooks=None,    checkpoint_path=None,    yield_single_examples=True)
estimator.predict(...)

<generator object Estimator.predict at 0x7f8bf102ea50>

## serving

tensorflow 使用 pb 模型格式作为 serving 的模型。而 train 和 test 还都是 checkpoint 格式的数据，需要将我们 train 出来的 checkpoint 格式的数据转换为 pb 格式的数据。

`tf.estimator` 提供了 `tf.estimator.export_savedmodel` 这个函数来实现上面的功能，它做了下面的几件事

1. 增加placeholders到graph中，serving系统在获得inference请求时会进行feed数据

2. 增加了额外ops：可以将原有输入格式的数据转换成模型所需特征tensors

### 定义 serving 层

#### tf.estimator.export.ServingInputReceiver

In [12]:
def serving_input_receiver_fn():
    input_str = tf.placeholder(tf.string,name='inputs')
    
    # 在这里的处理方式，根据输入的不同，处理方式 会不同，我这里只是demo
    line = tf.string_split(input_str,',').values 
    features = {
      'SepalLength': tf.string_to_number([line[0]], tf.float32),
      'SepalWidth': tf.string_to_number([line[1]], tf.float32),
      'PetalLength':  tf.string_to_number([line[2]], tf.float32),
      'PetalWidth': tf.string_to_number([line[3]], tf.float32)
    }   
    
    receiver_tensors = {'inputs': input_str}

    return tf.estimator.export.ServingInputReceiver(features, receiver_tensors)

上面的例子中，有 receiver_tensors 和 features，其中 reciever_tensors 是我们的输入，而 reciever_tensors 是模型的输入。 `serving_input_receiver_fn` 的第二个作用就是编写将 receiver_tensors 变成 features 的逻辑。

#### tf.estimator.export.build_raw_serving_input_receiver_fn

如果我们的输入不需要经过处理，那么可以简单的使用 `tf.estimator.export.build_raw_serving_input_receiver_fn` 函数

In [13]:
def raw_serving_input_fn():
    SepalLength = tf.placeholder(tf.float32, [None], name='SepalLength')
    SepalWidth = tf.placeholder(tf.float32, [None], name='SepalWidth')
    PetalLength = tf.placeholder(tf.float32, [None], name='PetalLength')
    PetalWidth = tf.placeholder(tf.float32, [None], name='PetalWidth')
    input_fn = tf.estimator.export.build_raw_serving_input_receiver_fn({
        'SepalLength': SepalLength,
        'SepalWidth': SepalWidth,
        'PetalLength': PetalLength,
        'PetalWidth': PetalWidth,
    })()
    return input_fn

### 导出模型

In [14]:
estimator.export_savedmodel('export_base/iris', serving_input_receiver_fn=raw_serving_input_fn)

Instructions for updating:
This function has been renamed, use `export_saved_model` instead.
INFO:tensorflow:Calling model_fn.
infer
INFO:tensorflow:Done calling model_fn.
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.utils.build_tensor_info or tf.compat.v1.saved_model.build_tensor_info.
INFO:tensorflow:Signatures INCLUDED in export for Classify: None
INFO:tensorflow:Signatures INCLUDED in export for Regress: None
INFO:tensorflow:Signatures INCLUDED in export for Predict: ['serving_default']
INFO:tensorflow:Signatures INCLUDED in export for Train: None
INFO:tensorflow:Signatures INCLUDED in export for Eval: None
INFO:tensorflow:Restoring parameters from ./model/model.ckpt-120
INFO:tensorflow:Assets added to graph.
INFO:tensorflow:No assets to write.
INFO:tensorflow:SavedModel written to: export_base/iris/temp-b'1612338923'/saved_model.pb


b'export_base/iris/1612338923'

可以看到，export_base/iris 目录下多了一个 1611673453 目录，这个目录中存放这 pb 文件和 variables 文件

In [15]:
!tree export_base/iris

[01;34mexport_base/iris[00m
├── [01;34m1611673453[00m
│   ├── saved_model.pb
│   └── [01;34mvariables[00m
│       ├── variables.data-00000-of-00001
│       └── variables.index
└── [01;34m1612338923[00m
    ├── saved_model.pb
    └── [01;34mvariables[00m
        ├── variables.data-00000-of-00001
        └── variables.index

4 directories, 6 files


### 使用 saved_model_cli

In [16]:
!saved_model_cli show --dir export_base/iris/1611673453 --all


MetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['PetalLength'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: PetalLength_1:0
    inputs['PetalWidth'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: PetalWidth_1:0
    inputs['SepalLength'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: SepalLength_1:0
    inputs['SepalWidth'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1)
        name: SepalWidth_1:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['predict_cls'] tensor_info:
        dtype: DT_INT64
        shape: (-1)
        name: ArgMax:0
    outputs['predict_pro'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 3)
        name: Softmax:0
  Method name is: tensorflow/serving/predict


使用 saved_model_cli 还可以用一组输入进行测试

In [17]:
!saved_model_cli run --dir export_base/iris/1611673453 \
    --tag_set serve \
    --signature_def "serving_default" \
    --input_expr 'SepalLength=[5.1,5.9,6.9];SepalWidth=[3.3,3.0,3.1];PetalLength=[1.7,4.2,5.4];PetalWidth=[0.5,1.5,2.1]'

2021-02-03 15:55:27.571696: I tensorflow/core/platform/cpu_feature_guard.cc:142] Your CPU supports instructions that this TensorFlow binary was not compiled to use: AVX2 AVX512F FMA
2021-02-03 15:55:27.584064: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x7f98f6138b40 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
2021-02-03 15:55:27.584112: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
Instructions for updating:
This function will only be available through the v1 compatibility library as tf.compat.v1.saved_model.loader.load or tf.compat.v1.saved_model.load. There will be a new function for importing SavedModels in Tensorflow 2.0.
Result for output key predict_cls:
[0 2 2]
Result for output key predict_pro:
[[0.66284543 0.18443526 0.15271927]
 [0.17048366 0.37887084 0.45064554]
 [0.08769966 0.36253315 0.54976714]]


In [18]:
saved_model_cli run --dir slot_model.savedmodel/1612339274 \
    --tag_set serve \
    --signature_def "serving_default" \
    --input_expr 'input_ids=[[1,2,3,4,5,6,7,8,9,10]]'

SyntaxError: invalid syntax (<ipython-input-18-febe8a50858e>, line 1)

In [None]:
model_path = 'export_base/iris/1608121703'
# 2. 使用 tornado/flask
# steps：
# 1. load model
predictor = tf.contrib.predictor.from_saved_model(model_path) # model_path必须指定具体的版本号

# 2. predict
predict_result = predictor(input_params) # input_params 格式必须 符合 serving_input_receiver_fn中入参
                                        #     predict_result 格式和 model_fn中返回格式一致
# 3. using tornado
class b_vxHandler(tornado.web.RequestHandler): 

    def post(self, version):
        try:
            predict_result = predictor(input_params)
        except BaseException as err:
            self.finish(....)


application = tornado.web.Application([
    (r"/b/(?P<version>v\d+)", b_vxHandler),
])


if __name__ == "__main__":
    # tornado.options.parse_command_line()
    application.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()

# References
1. [《Estimator工程实现》系列三： SavedModel模型保存导出示例 - 简书](https://www.jianshu.com/p/72058da4d7f7)

2. [tensorflow中模型的保存与使用总结 — carlos9310](https://carlos9310.github.io/2019/10/13/tensorflow-model-save-use/#run)

3. [TensorFlow之estimator详解 - marsggbo - 博客园](https://www.cnblogs.com/marsggbo/p/11232897.html#%E4%BB%80%E4%B9%88%E6%98%AFtf.estimator.estimatorspec)