<h1>estimator APIにおけるtrain_and_evaluateとは</h1>
<p>前回、現実世界にMLを利用するために次のような問題をあげた。
<ol>
    <li>データがメモリに納まらない</li>
    <li>分散処理をしたい</li>
    <li>訓練中に評価する必要がある</li>
    <li>デプロイメント用にスケールアップしたい</li>
</ol>
</p>

<p>train_and_evaluateはML engineと組み合わせることで１以降の全てを助ける。どのようにそれらを解決していくかを説明する。</p>

<h2>分散処理が簡単に可能</h2>

<p>低レベルAPIのtensorflowでグラフを分散処理をしようとした場合次のようなことを手動で設定する必要がある。
<ul>
    <li>graphを分配する</li>
    <li>変数をシェアする</li>
    <li>ワーカーの管理</li>
    <li>例外や失敗の対処</li>
    <li>チェックポイントを作成</li>
</ul>
train_and_evaluateはすでにもうこういったことが書かれている。MLEngineと組み合わせるだけで簡単に分散処理を行うことができる。
</p>



<h3>訓練中の評価（およびモニタリング）</h3>

<p>train_and_evaluateはその名の通り、自分で設定したタイミングで訓練中に評価を行う。さらに、（これはestimatoraAPIに限ったことではないが）TensorBoardを組み合わせることで、訓練中のリアルタイミング分析が可能となる。</p>

<h4>train_and_evaluateの書き方</h4>

<p>train_and_evaluateはデプロイすることを極めて簡単にすることを考えて作られている、それも主にMLEngineを使ってである。プロダクトとして、訓練済みモデルを使って予測するとき、データをモデルのグラフに提供するまでのグラフが訓練時と変わることが多々ある。具体的に言えば、訓練時のデータがcsvであるのに、MLEngineでデータを渡すときはjsonである。ならば、訓練時と同じグラフを使うわけには行かない、train_input_fnではなくserving_input_fnを定義しなくてはいけない。ここがおそらく今までともっとも大きな違いである。</p>

<p>estimatorAPIでtrain_and_evaluateを使うとき次のように書く。
<ol>
    <li>今までと同じようにfeature_columnを定義</li>
    <li>(Runconfigを定義（保存するディレクトリ、チェックポイントの保存タイミング、TensorBoard用のデータ（summary)の保存タイミングを決める）)</li>    
    <li>estimatorを定義（今まではモデルといっていたが広義的にはモデルはアルゴリズム、訓練したパラメタなど全てをひっくるめたものを指すことが多いためestimatorとする）</li>
    <li>TrainSpecを定義（もちろん今までと同じようにtrain_input_fnを事前に定義しておく）</li>
    <li>exporterの作成（ここで、Serving_input_fnを使う、事前に定義しておく）</li>
    <li>EvalSpecを定義</li>
    <li>train_and_evaluateにestimator、TrainSpec、EvalSpecを渡す</li>
    Runconfigで指定せずに今まで通り、モデルのディレクトリを指定する方法でも動く。
</ol>
</p>

<h4>実例</h4>
以前と同じもの、コードも途中まで全く同じである。

In [1]:
# import libraryimport datalab.bigquery as bq
import tensorflow as tf
import numpy as np
import shutil
print(tf.__version__)

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


1.14.0


In [2]:
# define input_fn with out-of-memory

CSV_COLUMNS = ['fare_amount', 'pickuplon','pickuplat','dropofflon','dropofflat','passengers', 'key']
LABEL_COLUMN = 'fare_amount'
DEFAULTS = [[0.0], [-74.0], [40.0], [-74.0], [40.7], [1.0], ['nokey']]

def read_dataset(filename, mode, batch_size=528):
  def _input_fn():
    def decode_csv(value_column):
      columns = tf.decode_csv(value_column, record_defaults=DEFAULTS)
      features = dict(zip(CSV_COLUMNS, columns))
      label = features.pop(LABEL_COLUMN)
      return features, label
    
    filename_dataset = tf.data.Dataset.list_files(filename)
    textline_dataset = filename_dataset.flat_map(tf.data.TextLineDataset)
    dataset = textline_dataset.map(decode_csv)
    
    if mode == tf.estimator.ModeKeys.TRAIN:
      num_epochs = None
      dataset = dataset.shuffle(buffer_size = 10*batch_size)
    else:
      num_epochs = 1
    
    dataset = dataset.repeat(num_epochs).batch(batch_size)
    return dataset.make_one_shot_iterator().get_next()
  return _input_fn


In [3]:
# define feature_column

INPUT_COLUMNS = [
    tf.feature_column.numeric_column('pickuplon'),
    tf.feature_column.numeric_column('pickuplat'),
    tf.feature_column.numeric_column('dropofflat'),
    tf.feature_column.numeric_column('dropofflon'),
    tf.feature_column.numeric_column('passengers'),
]

def add_more_features(feats):
    # feature engeneeringをするとき便利なので作っておく
    return feats

feature_cols = add_more_features(INPUT_COLUMNS)

In [4]:
# define serving_input_fn
def serving_input_fn():
  # まず渡されるデータに対してそれぞれの名前と対応するtensorを作成し、辞書形式でまとめる。
  feature_placeholders = {
    'pickuplon' : tf.placeholder(tf.float32, [None]),
    'pickuplat' : tf.placeholder(tf.float32, [None]),
    'dropofflon' : tf.placeholder(tf.float32, [None]),
    'dropofflat' : tf.placeholder(tf.float32, [None]),
    'passengers' : tf.placeholder(tf.float32, [None]),
  }
  # 次にこのデータに対して、グラフに渡す際、何かしらの変換が必要なときは変換を行う。（名前を変えたり、特徴量を変化させたり）
  # 今回は全く同じため変化させる必要がない
  features = feature_placeholders
  return tf.estimator.export.ServingInputReceiver(features, feature_placeholders)


In [5]:
# define train_and_evaluate

def train_and_evaluate(output_dir, num_train_steps):
  # 今回はRunconfigを定義しない
  # runconfig = tf.estimator.RunConfig(model_dir=, save_summary_steps=, save_checkpoints_step=)のように作る
  estimator = tf.estimator.LinearRegressor(feature_cols, model_dir=output_dir)
  train_spec = tf.estimator.TrainSpec(input_fn = read_dataset('./data/taxi-train.csv', mode = tf.estimator.ModeKeys.TRAIN),
                                       max_steps = num_train_steps)
  exporter = tf.estimator.LatestExporter('exporter', serving_input_fn) # model_dirにexporterの名前で結果が保存される、それぞれがデプロイ用のモデルを保存している。
  eval_spec = tf.estimator.EvalSpec(
                       input_fn = read_dataset('./data/taxi-valid.csv', mode = tf.estimator.ModeKeys.EVAL),
                       steps = None, # 何バッチで評価するか決める
                       start_delay_secs = 1, # start evaluating after N seconds
                       throttle_secs = 10,  # ここに書かれている秒数以下の時間に一回評価する
                       exporters = exporter)
  return tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

In [7]:
# Run training    
shutil.rmtree('./output_dir', ignore_errors = True) # start fresh each time
train_and_evaluate('./output_dir', num_train_steps = 2000)

W0805 11:16:19.561046 4610495936 deprecation.py:323] From /Users/baito/Works/TFX/.venv/lib/python3.7/site-packages/tensorflow/python/training/training_util.py:236: Variable.initialized_value (from tensorflow.python.ops.variables) is deprecated and will be removed in a future version.
Instructions for updating:
Use Variable.read_value. Variables in 2.X are initialized automatically both in eager and graph (inside tf.defun) contexts.
W0805 11:16:19.715753 4610495936 deprecation.py:323] From <ipython-input-2-79e19eca4b37>:26: DatasetV1.make_one_shot_iterator (from tensorflow.python.data.ops.dataset_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use `for ... in dataset:` to iterate over a dataset. If using `tf.estimator`, return the `Dataset` object directly from your input function. As a last resort, you can use `tf.compat.v1.data.make_one_shot_iterator(dataset)`.
W0805 11:16:20.580346 4610495936 deprecation.py:323] From /Users/baito/Works/TFX/.venv

({'average_loss': 108.85366,
  'label/mean': 11.666427,
  'loss': 45310.336,
  'prediction/mean': 11.49476,
  'global_step': 2000},
 [b'./output_dir/export/exporter/1564971451'])