Monitored Session
----------------

https://www.tensorflow.org/api_guides/python/train#Distributed_execution

https://www.tensorflow.org/api_guides/python/train#Training_Hooks

相比简单的tf.Session对象，MonitoredSession更方便使用。

它封装了checkpoint的save和restore，summary的定期保存，初始化变量，启动queue runners，还提供了很多的Hooks来监控训练的过程。另外它还实现了主从模式，适合分布式环境运行。


# tf.train.MonitoredTrainingSession

这个函数是tf.train.MonitoredSession的工厂方法。包含一系列的构造参数。先不管分布式环境相关的参数。

* checkpoint_dir：指定一个目标，它会自动的进行checkpoint的保存或者恢复
* scaffold：不明白是啥？
* hooks: SessionRunHook列表，每个hook都会被触发
* save_checkpoint_secs：每多少秒自动保存一次checkpoint
* save_summaries_steps：每多少步自动保存一次summary
* save_summaries_secs：每多少秒自动保存一次summary
* stop_grace_period_secs：Coordinator优雅退出的秒数

# tf.train.MonitoredSession

这是一个包含固定动作模板的Session运行过程，分成三个方面：

初始化过程：Initialization依次做以下的事情

1. 调用每个hook.begin()
1. 调用scaffold.finalize()
1. 创建session
1. 使用Scaffold初始化模型
1. 如果checkpoint存在，就从checkpoint恢复模型
1. 启动queue runners

执行过程：Run，当run()被调用的时候，依次执行以下过程：

1. 调用每个hook.before_run()
1. 调用被monitored的session.run()
1. 调用hook.after_run()
1. 返回session.run()的结果
1. 如果发生了AbortedError 或者 UnavailableError 两个异常，会重新创建和初始化session

关闭过程：Close

1. 调用hook.end()
1. 关闭queue runners和session
1. 忽略掉OutOfRange错误，这个代表输入队列的样本消耗完毕

In [1]:
import tensorflow as tf

class DebugHook(object):

    def begin(self):
        print '[begin]'
        
    def after_create_session(self, session, coord):
        print '[after_create_session]\tsession=', session, 'coord=', coord
        
    def end(self, session):
        print '[end]\tsession=', session
        
    def before_run(self, run_context):
        print '[before_run]\trun_context=', run_context
        return tf.train.SessionRunArgs(next10)
        
    def after_run(self, run_context, run_values):
        print '[after_run]\trun_context=', run_context, 'run_values=', run_values
        
global_step_tensor = tf.train.get_or_create_global_step()
next10 = tf.assign_add(global_step_tensor, 10)

with tf.train.MonitoredTrainingSession(
        hooks=[DebugHook()],
        ) as sess:
    sess.run(global_step_tensor)


[begin]
[after_create_session]	session= <tensorflow.python.client.session.Session object at 0x1167a4790> coord= <tensorflow.python.training.coordinator.Coordinator object at 0x11677f4d0>
[before_run]	run_context= <tensorflow.python.training.session_run_hook.SessionRunContext object at 0x11681e8d0>
[after_run]	run_context= <tensorflow.python.training.session_run_hook.SessionRunContext object at 0x11681e8d0> run_values= SessionRunValues(results=10, options=, run_metadata=)
[end]	session= <tensorflow.python.client.session.Session object at 0x1167a4790>


# Run Args, Context and Values

对于hook的before_run()和after_run()方法，需要先理解这三个类型的作用。

* tf.train.SessionRunArgs：用来表示传递给session.run()的参数
* tf.train.SessionRunContext：表示跟session.run()被调用的相关的上下文信息
* tf.train.SessionRunValues：包含session.run()的返回值

## tf.train.SessionRunArgs(fetches, feed_dict, options)

这三个参数都对应于session.run()的参数。源码里实际上就是一个namedtuple包含参数中的几个属性。

在before_run()函数里可以返回SessionRunArgs对象，表示新添加的args也需要在即将要执行的run()函数里被求值。

## tf.train.SessionRunContext

几个属性：

* original_args：SessionRunArgs对象
* session：session对象
* stop_requested：是否已经被停止。调用context.request_stop()可以请求停止这个session，对应于coordinator.request_stop().

## tf.train.SessionRunValues(results, options, run_metadata)

也是一个namedtuple。results属性包含了run()的结果。results的shape和SessionRunArgs的fetches一样。

在hook.after_run(run_context, run_values)里的run_values包含的就是before_run()里返回的args对应求值的结果。

# Training Hooks

tf.train已经提供了很多基本功能的hooks，在不用定义自己的hooks的情况下可以完成大部分通用的工作。

* tf.train.SessionRunHook：这个类就是个基类，定义了5个空方法，等待子类实现（begin/end/after_create_session/before_run/after_run）
* tf.train.LoggingTensorHook：以指定的周期（时间/步数）通过logging.info输出跟定tensors的值
* tf.train.StopAtStepHook：达到指定的步数时调用context.request_stop()
* tf.train.CheckpointSaverHook：以指定的周期（时间/步数）保存checkpoint
* tf.train.StepCounterHook：以指定的周期（时间/步数）更新summary，显示steps_per_sec(每秒训练的步数，表示速度)
* tf.train.NanTensorHook：如果loss函数输出NaN了，抛出异常NaNLossDuringTrainingError或者调用context.request_stop()退出session
* tf.train.SummarySaverHook：以指定的周期（时间/步数）保存summary. scafold是干什么用的？
* tf.train.GlobalStepWaiterHook：这个hook的before_run函数会没.5秒查询一次global_step，知道大于某个指定的步数，before_run才会退出。这是个while循环，那么当前的线程就在before_run上一直轮询，除非别的线程能够增加global_step的值，要不然就死循环了
* tf.train.FinalOpsHook：在最后hook.end被调用的时候执行指定的tensor ops
* tf.train.FeedFnHook：把给定的feed_fn在before_run的时候调用并传递给feed_dict

下面的代码拿一个简单的例子来试一下。


In [1]:
import tensorflow as tf

global_step_tensor = tf.train.get_or_create_global_step()

x = tf.constant(1.)
y_ = tf.constant(1.)

w = tf.Variable(.1)
y = tf.multiply(x, w)

loss = tf.pow((y - y_), 2)
train_step = tf.train.GradientDescentOptimizer(.1).minimize(loss, global_step=global_step_tensor)

final_hook = tf.train.FinalOpsHook([y, loss, global_step_tensor])
with tf.train.MonitoredTrainingSession(
        hooks=[
            tf.train.StopAtStepHook(last_step=10),
            tf.train.LoggingTensorHook([global_step_tensor, y, loss], every_n_iter=4),
            final_hook],
        ) as sess:
    while not sess.should_stop():
        sess.run(train_step)

print 'finally:', final_hook.final_ops_values

INFO:tensorflow:<tf.Variable 'global_step:0' shape=() dtype=int64_ref> = 1, Tensor("Mul:0", shape=(), dtype=float32) = 0.1, Tensor("Pow:0", shape=(), dtype=float32) = 0.81
INFO:tensorflow:<tf.Variable 'global_step:0' shape=() dtype=int64_ref> = 5, Tensor("Mul:0", shape=(), dtype=float32) = 0.63136, Tensor("Pow:0", shape=(), dtype=float32) = 0.135895 (0.011 sec)
INFO:tensorflow:<tf.Variable 'global_step:0' shape=() dtype=int64_ref> = 9, Tensor("Mul:0", shape=(), dtype=float32) = 0.849005, Tensor("Pow:0", shape=(), dtype=float32) = 0.0227995 (0.006 sec)
finally: [0.90336323, 0.0093386658, 10]
