# A3C

A3C是Asynchronous Advantage Actor-Critic Model的简称，即异步优势演员-评论家模型，A3C并不是一种像Policy Gradient或DQN这样具体的算法，而是一种解决问题的思想，它的核心精神是，在强化学习的训练过程中，我们可以并行地训练多个Agent，在训练的过程中，各个Agent是参数共享的。更具体一些，我们可能有N个Agent并行地在环境采样1回合后计算1次梯度，我们还有1个或多个Agent在这个过程中什么都不做，当N个Agent中的1个或者M个或者N个采样完成，并反向传播或者基于时间的反向传播计算完1回合的梯度后，这些Agent会将梯度异步地分发给那1个或者多个什么都不做的Agent，然后这些什么都不做的Agent执行一次参数更新，再将更新后的参数分发给这个分发梯度的Agent，然后一直重复这个过程。当然分发梯度和分发参数这个过程是否是异步或者同步，也是可以大做文章的。

# A3C with TensorFlow

那么具体要怎么操作呢？这里以TensorFlow为框架实现了一个DPPO，即Distributed Proximal Policy Optimization，分布式近端策略优化模型。这个和A3C有什么关系呢？在上文提到，A3C并不是一个具体的算法，它的核心精神是一套异步训练模型、同步或者异步更新参数的思想。或者换句话说，不管是DQN、Policy Gradient、PPO、ACER，还是基于它们的一系列改进，我们都可以用A3C的思想去改进它们。好在TensorFlow已经为我们做了大部分的底层工作，我们只需要几十行代码，就可以把一个单进程的训练过程改进为分布式训练过程。

# Distributed TensorFlow

为了避免让文章沦为文档翻译，所以这里仅仅对分布式TensorFlow做非常简短的说明，详细的文档可以参考：
> [Distributed TensorFlow](https://www.tensorflow.org/deploy/distributed)     

首先，需要构造集群描述对象让集群待命，可以通过如下方法构造集群描述对象：
```
cluster = tf.train.ClusterSpec({
    'worker': [
        'localhost:8001',
        'localhost:8002',
        'localhost:8003',
    ],
    'ps': [
        'localhost:8000'
    ]
})
```
可以看出，集群描述对象是一个键为job_name（任务名），值为ip:port的字典，至于 job_name 的定义稍后会做解释。然后通过如下语句启动集群中的一个节点，并让节点待命：
```
server = tf.train.Server(cluster, job_name=role, task_index=task_index)
if role == 'ps':
    logging.warning('Parameter server started.')
    server.join()
else:
    pass
    # do some sth later.
```
至此，一个节点就被启动并待命了，可以看到一个节点会被抽象为一个server对象，其中job_name对应了节点的任务名，ps是Parameter Server，即参数服务器，worker即计算服务器，它们的用途会在下文提到。根据集群中的每个节点是否会完整地构建自己的计算图，TensorFlow提供了两种方案，分别是 In-graph replication 和 Between-graph replication，每个节点是否会构建自己的计算图，也决定了每个节点的工作方式。

### In-graph replication

在这种方案中，集群中的每一个节点不会完整地构建自己的计算图，每一个节点仅仅是单纯地利用自己的算力通过以下的语句执行任务：
```
with tf.device("/job:ps/task:0"):
    # Define vars.
    
with tf.device("/job:worker/task:0"):
    # Do computations.
```
通常，只需要提前启动集群，然后构造一个Session，然后根据节点分配计算图中的各个结点，然后进行训练就可以了，非常地直觉。这样做有一个缺点是数据会在各个结点分发，如果数据非常大，这样是得不偿失的。在下文实现的DPPO中，我们将不会采用这套方案。

### Between-graph replication

与In-graph replication不同的是，集群中的每一个节点会完整地构建自己的计算图，可以说这种方案就是为了A3C而设计的，在这种方案中，我们会有一个或者多个参数节点（Parameters Server），多个计算节点（Worker Server），每个计算节点完成梯度计算后，会异步地将梯度分发到参数节点，然后参数节点会同步或者异步地用梯度更新参数，然后分发最新的参数到一个或者多个计算节点。

### Parameters Server & Worker Server

ps，即参数节点，在Between-graph replication的方案中，它通常什么都不做，节点启动后即调用`join()`待命，worker，即计算节点，在Between-graph replication方案中，这些节点定义了完整的计算图并执行这些计算，在计算节点完成一次梯度计算后，梯度会被异步分发给参数节点，参数节点更新参数后，分发参数给计算节点。这个过程可以既可以是异步的也可以是同步的，在Between-graph replication方案中，默认是异步的。

# PPO

在前一篇文章中已经实现了一个PPO，学习笔记：
> [PPO Note](https://github.com/Ceruleanacg/Learning-Notes/blob/master/note/PPO.ipynb)    

源码：
> [PPO Code](https://github.com/Ceruleanacg/Learning-Notes/blob/master/playground/PPO.py)


# DPPO in Action

首先实现一个方法，它用来启动集群的各个节点，并根据节点类型待命或者定义并执行计算图：

In [1]:
# coding=utf-8

import sys
sys.path.append('..')

import multiprocessing as mp
import tensorflow as tf
import logging
import gym

from base.model import *
from playground import PPO
from utility.launcher import start_game


def start_a3c(cluster, role, task_index):
    # 根据集群描述对象启动节点
    server = tf.train.Server(cluster, job_name=role, task_index=task_index)
    if role == 'ps':
        # 如果是参数节点，则join待命
        logging.warning('Parameter server started.')
        server.join()
    else:
        # 如果是计算节点，定义计算图，计算梯度
        worker_device = "/job:worker/task:{}".format(task_index)
        logging.warning('Worker: {},  server stated.'.format(worker_device))
        # 根据集群描述对象分配节点
        with tf.device(tf.train.replica_device_setter(cluster=cluster)):
            # Make env.
            env = gym.make('CartPole-v0')
            env.seed(1)
            env = env.unwrapped
            # Init session.
            session = tf.Session(server.target)
            # session = tf.Session()
            # Init agent.
            agent = PPO.Agent(env.action_space.n, env.observation_space.shape[0], **{
                KEY_SESSION: session,
                KEY_MODEL_NAME: 'PPO',
                KEY_TRAIN_EPISODE: 1000
            })
            start_game(env, agent, task_index)

  return f(*args, **kwds)


然后定义集群描述对象：

In [2]:
cluster = tf.train.ClusterSpec({
        'worker': [
            'localhost:8001',
            'localhost:8002',
            'localhost:8003',
        ],
        'ps': [
            'localhost:8000'
        ]
    })

role_task_index_map = [
    ('ps', 0),
    ('worker', 0),
    ('worker', 1),
    ('worker', 2),
]

启动A3C并训练：

In [None]:
pool = mp.Pool(processes=4)

for role, task_index in role_task_index_map:
    pool.apply_async(start_a3c, args=(cluster, role, task_index, ))
pool.close()
pool.join()