# Continuous Deep Reinforcement Learning

In this notebook we provide a quick introduction to the "Deep Deterministic Gradient Policy" algorithm presented in the 2016 paper "Continuous Control with Deep Reinforcement Learning" by Lillicrap et al. We demonstrate the algorithm in a Slot Car Racing (also known by the brand-name Carrera) environment.

The following consists of multiple parts:
  
**TODO:** Fix introduction.

## Environment: Lunar Lander

> Landing pad is always at coordinates $(0, 0)$. Coordinates are the first two numbers in state vector. Reward for moving from the top of the screen to landing pad and zero speed is about $100..140$ points. If lander moves away from landing pad it loses reward back. Episode finishes if the lander crashes or comes to rest, receiving additional -100 or +100 points. Each leg ground contact is $+10$. Firing main engine is $-0.3$ points each frame. Solved is 200 points. Landing outside landing pad is possible. Fuel is infinite, so an agent can learn to fly and then land on its first attempt. Action is two real values vector from $-1$ to $+1$. First controls main engine, $-1..0$ off, $0..+1$ throttle from $50\%$ to $100\%$ power. Engine can't work with less than $50\%$ power. Second value $-1.0..-0.5$ fire left engine, $+0.5..+1.0$ fire right engine, $-0.5..0.5$ off.

Note: Rendering does not work when running on a remote machine or similar. We actually had [rendering directly into notebooks](https://github.com/ahoereth/ddpg/blob/f2fcad4/Lander.ipynb) implemented, but were not able to get it to work when using docker for training using a GPU.

In [None]:
import gym
env = gym.make('LunarLanderContinuous-v2')
state = env.reset()
terminal = False
while not terminal:
    action = env.action_space.sample()
    _, _, terminal, _ = env.step(action)
    env.render()

## Memory
While the paper takes note of prioritized replay methods, it only makes use of sampling experiences uniformly from a limited sized buffer. The straight forward approach for implementing this would be to use `collections.deque`, but sampling from such a queue (as the name maybe already shows...) is [expensive](https://wiki.python.org/moin/TimeComplexity). Therefore we implement a custom memory class which makes use of a basic list and implements the element limit through a pointer which dictates which element is to be overwritten on insert.

In [None]:
import random


class Memory:
    """Uniform replay memory with maximum size."""

    def __init__(self, max_size):
        self.max_size = max_size
        self._buffer = []
        self._pointer = 0

    def __len__(self):
        return len(self._buffer)

    def add(self, experience):
        if len(self) < self.max_size:
            self._buffer.append(experience)
        else:
            self._buffer[self._pointer] = experience
            self._pointer = (self._pointer + 1) % self.max_size

    def sample(self, n):
        return random.sample(self._buffer, n)

## Agent

Rather abstract implementation of a reinforcement learning agent. The actual RL model is plug and play, as long as it is implemented consistently.

In [None]:
from itertools import count
from typing import Tuple
from threading import Thread
import numpy as np


class Agent:
    """A reinforcement learning agent."""
    theta = 0.15
    sigma = 0.2
    batchsize = 64
    eps = 1
    eps_min = .1
    feed_threads = 1

    def __init__(self, env, model, render=False, memory_size=100000):
        """Create a new reinforcement learning agent."""
        self.memory = Memory(memory_size)
        self.env = env
        self.model = model
        self.render = render
        self.feeding = False
        self.feeders = [Thread(target=self.feed, daemon=True)
                        for _ in range(self.feed_threads)]

    def feed(self):
        """Feed samples from memory to the model for training.

        The model's feed() function blocks when it does not need more data,
        so we can aggressivley loop here.
        """
        while self.feeding:
            self.model.feed(self.memory.sample(self.batchsize))

    def train(self, episodes: int):
        """Play many episodes with many steps.

        Stores observations in memory, starts the feed threads and calls
        the model's training function when memory is big enough.
        """
        eps_rate = 1 / (episodes / 2)
        stats = []
        total_steps = 0
        for episode in range(1, episodes + 1):
            state = self.env.reset()
            episode_reward = 0
            for step in count():
                # Perform action, store new experience, train model.
                action = self._get_action(state)
                state_, reward, terminal, _ = self.env.step(action)
                episode_reward += reward
                self.memory.add((state, action, reward, state_, terminal))
                state = state_  # Next state becomes current state.
                if self.render:
                    self.env.render()
                if len(self.memory) >= self.batchsize:
                    if not self.feeding:
                        self.feeding = True
                        for feeder in self.feeders:
                            feeder.start()
                    self.model.train()
                if terminal:  # Start new episode if in terminal state.
                    stats.append((episode_reward, step))
                    break
            total_steps += step
            if self.eps > self.eps_min:
                self.eps -= eps_rate
            if episode % 50 == 0 or episode == episodes:
                self.model.save(episode)
            if episode % 10 == 0:
                stats = np.asarray(stats)
                rargmax, _ = stats.argmax(0)
                print('Episode {}, max reward/steps {:.2f}/{:.2f}, '
                      'average reward/steps {:.2f}/{:.2f}'
                      .format(episode, *stats[rargmax], *stats.mean(0)))
                print('total_steps', total_steps)
                stats = []

    def _get_action(self, state) -> Tuple[float]:
        """Get a action vector from the model."""
        return self.model.get_action(state, self.eps)

## DDPG Model

While we again will create a class for the Deep Deterministic Gradient Policy model, we will implement some of the parts as functions outside of the class in order to better walk through them in this notebook. When implementing this as a script one would want to integrate them all into the class.

In [None]:
import tensorflow as tf
from tensorflow.contrib.framework import get_variables

### Actor and Critic Networks
Our model consists of a total of two different networks -- an actor and a critic network. The problem with that approach is that during training we not only optimize those networks, we also use them to dirigate our agent. Manipulating the online policy leads to a feedback loop which leads to instability. While we already use a big memory buffer to mitigate this problem, the authors propose to additionally use two sets of parameters for each network.

In the implementation this leads to theoretically four networks, two actor and two critic networks, an online and a target version for each. While the online networks will be used for online predictions and will be updated at every timestep, the target networks will be used for determining the directions in which the online networks should be updated. From time to time the target networks will be updated using the online networks weights -- more on that below.

#### Abstract Networks
In order to easily model all the networks which we need to create, we use the `namedtuple` collection. Similar to class instances named tuples allow dot-access to their members. Each network has some output `y` and some variables `var`. The advantage of such an approach is that members of the graph are more consistently accessible and additional features (for example `operations` for updating batch normalization running averages) can be added later on.

In [None]:
from collections import namedtuple
Network = namedtuple('Network', ['y', 'vars'])

#### Dense Layers
Lillicrap et al 2016 describes very specific dense layers:

- Weights and biases are initialized from a bounded uniform distribution where the bounds depend on the previous layer's size (or are fixed for output layers).
- All critic layers use l2 weight decay.

In order to not repeat all the initialization code we predefine a wrapper function here.

In [None]:
def dense(x, units, activation=tf.identity, decay=None, minmax=None):
    """Build a dense layer with uniform initialization and optional loss."""
    if minmax is None:
        minmax = float(x.shape[1].value) ** -.5
    return tf.layers.dense(
        x,
        units,
        activation=activation,
        kernel_initializer=tf.random_uniform_initializer(-minmax, minmax),
        bias_initializer=tf.random_uniform_initializer(-minmax, minmax),
        kernel_regularizer=decay and tf.contrib.layers.l2_regularizer(1e-3)
    )

#### Critic
The critic is the value function, you can also think about this as the Bellman equation approximator. It returns Q values which describe the expected reward for an action. Those values would normally be determined through dynamic programming. The critic maps a state/action pair to a single scalar value. This stands in contrast to Deep Q Networks (Mnih et al 2015), where the Q network maps the environment's state to a vector of Q values, one for each action. This is because in our case the Q network is not used to determine which action to take (DQN uses a deterministic `argmax` policy on the Q value vector), but only to *criticize* whatever action the actor network decides on taking.

We strictly stick to the network structure described in the paper:

  - Two hidden layers with ReLu activation and 400 and 300 neurons respectivley.
  - <s>Batch normalization applied to the input and first hidden layer.</s><br>*[Removed](https://github.com/ahoereth/ddpg/blob/https://github.com/ahoereth/ddpg/blob/ea02a94/Lander.ipynb) batch normalization  because it let to massive instability.*
  - Actions enter the network after the first hidden layer.
 
As common in Deep Q-Networks the single output neuron uses a linear activation. The `critic` function below is designed such that we can reuse it for both the online and target network.

In [None]:
def critic(x: tf.Tensor, actions: tf.Tensor, name='online', reuse=False):
    """Build a critic network q, the value function approximator."""
    with tf.variable_scope(name, reuse=reuse) as scope:
        net = dense(x, 400, tf.nn.relu, decay=True)
        net = tf.concat([net, actions], axis=1)
        net = dense(net, 300, tf.nn.relu, decay=True)
        y = dense(net, 1, decay=True, minmax=3e-4)
        return Network(tf.squeeze(y), get_variables(scope))

The critic is simply optimized by minimizing the mean squared error between the Q target values which are computed using the Bellman approximation on the experiences we make in the environment (we will do so below) and the actual output from the network. In the following cell we also bind the operations of the online critic (to update the running averages) to the minimize operation -- this way they will be called everytime we call the optimizer.

In [None]:
def train_critic(critic: Network, critic_: tf.Tensor, terminals: tf.Tensor, 
                 rewards: tf.Tensor, gamma: float):
    """Build critic network optimizer minimizing MSE."""
    with tf.variable_scope('critic'):
        targets = tf.where(terminals, rewards, rewards + gamma * critic_.y)
        mse = tf.reduce_mean(tf.squared_difference(targets, critic.y))
        tf.summary.scalar('loss', mse)
        optimizer = tf.train.AdamOptimizer(1e-3)
        return optimizer.minimize(mse, tf.train.get_global_step())

#### Actor
Actor networks describe the policy the agent should follow in any given state. Given some input the policy deterministically provides an action. Actions here are vectors of real values -- in a racing environment such a vector would for example contain a steering angle and a acceleration value.

The actor networks have a simillar structure as the critic, but do not receive the actions as input at any point. Further more there is no weight decay applied and the output uses a hyperbolic tangent activation function to bound the actions between $-1$ and $1$ -- which needs to be tweaked for different environments.

In [None]:
def actor(x: tf.Tensor, dout: int, max_out: float, name='online'):
    """Build an actor network mu, the policy function approximator."""
    with tf.variable_scope(name) as scope:
        net = dense(x, 400, tf.nn.relu)
        net = dense(net, 300, tf.nn.relu)
        y = dense(net, dout, tf.nn.tanh, minmax=3e-4)
        scaled = y * max_out
        return Network(scaled, get_variables(scope))

##### Policy Gradients
**TODO**: Explain actor optimization. Discuss (action/policy) gradient computation/application thoroughly.

Our goal is to tweak the critic network's parameters such that it outputs action values, which, as input to the critic network, result in good results (aka high Q values aka profit). 

The actor is trained by ascending the gradients of the critic with respect to the actor's actions.

To do so, we need to perform gradient **ascent** on the critic's gradient with respect to the actor's action output (which is an input to the critic, a necessary condition for computing such gradient). That gradient, lets call it *value gradient*, depicts the direction in which to adjust the action (originally the output of the actor network) to minimize the Q values -- we ascent it, because we want to maximize the Q values.
$$value\_gradient = \Delta_a Q(s,a|\theta^Q)|_{s=s_i, a=\mu(s_i)}$$

The next step is to take those gradients to the **actor** network, although they originally came from the **critic** network. Normally TensorFlow would worry about optimizing the network for us by minimizing some scalar loss we provide. The magic in this is, that one does not actually pass a scalar to the optimizer's `minimize` function, but a Tensor node, which depends on a complete graph of computations -- those are just commonly abstracted away. When aiming to minimize such a loss, TensorFlow visits all the nodes on which that final loss node depends and internally computes the gradients to modulate them a little bit such that the loss decays.

Our problem is: We do not have such a loss node, but a complete set of gradients from the critic network. We now need to manually modulate the actor network's gradients, such that they move a little bit into the uphill direction of the critic network's gradients.

$$policy\_gradient = value\_gradient \Delta_{\theta, \mu} \mu(s|\theta^\mu)|_{s_i}$$


$$\Delta_{\theta, \mu}J \approx \frac{1}{N} \sum_i \Delta_a Q(s,a|\theta^Q)|_{s=s_i, a=\mu(s_i)} \Delta_{\theta^\mu} \mu(s|\theta^\mu)|_{s_i}$$

In [None]:
def train_actor(actor: Network, critique: tf.Tensor):
    """Build actor network optimizier performing action gradient ascent."""
    with tf.variable_scope('actor'):
        optimizer = tf.train.AdamOptimizer(1e-4)

        # What is `actor.y`'s influence on the critic network's output?
        value_gradient, = tf.gradients(critique, actor.y)

        # Use `value_gradient` as initial value for the `actor.y` gradients --
        # normally this is set to 1s by TF.
        policy_gradients = tf.gradients(actor.y, actor.vars, -value_gradient)
        gradient_pairs = zip(policy_gradients, actor.vars)
        return optimizer.apply_gradients(gradient_pairs)

#### Target Network Updates
While the online networks are trained directly (thus the *OptimizableNetwork* name), the target networks are only updated irregularily using the online network's parameters. For this paper describes a process named *soft updates*, which only slowly moves the target network's parameters into the direction of the online network. The original Deep Q- and also the Double Deep Q-Network approach instead just directly copies the parameters over.

##### Initial Hard Update
In order to ensure the online and target networks initial equallity, we first implement the hard parameter copying. This function will only be used after initial variable initialization to make sure the online and target network start off from the same foundation.

In [None]:
def hard_updates(src: Network, dst: Network):
    """Overwrite target with online network parameters."""
    with tf.variable_scope('hardupdates'):
        return [target.assign(online)
                for online, target in zip(src.vars, dst.vars)]

##### Soft Update
The soft update also consists of the same assign operation as above, but not directly overwrites the target network's parameters but mashes the online and target parameters together. `tau` herein describes how strongly the new values influence the old values.

NOTE: *This could also be implemented using moving averages over the online networks. Might be more efficient?*

In [None]:
def soft_updates(src: Network, dst: Network, tau=1e-3):
    """Soft update the dst net's parameters using those of the src net."""
    with tf.variable_scope('softupdates'):
        return [target.assign(tau * online + (1 - tau) * target)
                for online, target in zip(src.vars, dst.vars)]

### Noise

**TODO:** Explain Ornstein-Uhlenbeck process noise and RL exploration strategies.

Quote from Lillicrap et al:

> For the exploration noise process we used temporally correlated noise in order to explore well in physical environments that have momentum. We used an Ornstein-Uhlenbeck process (Uhlenbeck & Ornstein, 1930) with θ = 0.15 and σ = 0.2. The Ornstein-Uhlenbeck process models the velocity of a Brownian particle with friction, which results in temporally correlated values centered around 0.

In [None]:
def noise(n, theta=.15, sigma=.2):
    with tf.variable_scope('OUNoise'):
        state = tf.Variable(tf.zeros((n,)))
        noise = -theta * state + sigma * tf.random_normal((n,))
        reset = state.assign(tf.zeros((n,)))
        return state.assign_add(noise), reset

### Bringing it all together

  - Initializes networks and session.
  - Resets TensorFlow graph because notebooks.
  - Copies the initial parameters to the target networks.
  - Provides `train` function which counts SGD steps.
  - Target networks are updated every n SGD steps.
  - Provides `get_action`.
  
**TODO:** The fancy input pipeline using a queue is great, but can the code for it be simplified?

In [None]:
from itertools import chain
import tensorflow as tf


class DDPG:
    """Deep Deterministic Policy Gradient RL Model."""
    gamma = 0.99  # Discount factor
    theta = 0.15  # Ornstein-Uhlenbeck process theta
    mu = 0.2  # Ornstein-Uhlenbeck process mu

    def __init__(self, din=2, dout=1, action_bounds=1, checkpoint=None):
        """Create a new DDPG model."""
        self.bounds = action_bounds

        # Reset graph and recreate it.
        tf.reset_default_graph()
        tf.train.create_global_step()
        self.global_step = tf.train.get_global_step()
        self.session = tf.Session()

        # The queue is fed samples from the replay memory in an independent
        # thread. Massivley speeds up training because the data is already
        # available in the tensorflow graph when the training ops are called.
        self.queue = tf.FIFOQueue(
            capacity=64 * 3,
            dtypes=[tf.float32, tf.float32, tf.float32, tf.float32, tf.bool],
            shapes=[[din], [din], [dout], [], []]
        )

        # By default we take samples from the queue, but it is also possible
        # to directly feed them using a `feed_dict`. The latter is for example
        # required when activley using the policy to move in the environment.
        x, x_, a, r, t = self.queue.dequeue_many(64)
        self.states = tf.placeholder_with_default(x, (None, din), 'states')
        self.states_ = tf.placeholder_with_default(x_, (None, din), 'states_')
        self.actions = tf.placeholder_with_default(a, (None, dout), 'actions')
        self.rewards = tf.placeholder_with_default(r, (None,), 'rewards')
        self.terminals = tf.placeholder_with_default(t, (None,), 'terminals')

        # This operator will be called in its own thread using the normal
        # feed_dict approach to fill the queue with training samples.
        self.enqueue_op = self.queue.enqueue_many([
            self.states, self.states_, self.actions,
            self.rewards, self.terminals,
        ])

        # Create the online and target actor networks and the noise provider.
        with tf.variable_scope('actor'):
            self.actor = actor(self.states, dout, self.bounds[1])
            self.actor_ = actor(self.states_, dout, self.bounds[1], 'target')
            self.noise, self.noise_reset = noise(dout, self.theta, self.mu)

        # Create the online and target critic networks. This has a small
        # speciallity: The online critic is created twice, once using the
        # fed states and fed actions as input and once using the fed states
        # and online actor's output as input. The later is required to compute
        # the (so called above) `value_gradient`, which directly depends on
        # the on-policy instead of the off-policy actions. The important part
        # here is that those two critics actually are the same network, just
        # with different inputs, but shared (!) parameters.
        with tf.variable_scope('critic'):
            self.critic = critic(self.states, self.actions)
            critique, _ = critic(self.states, self.actor.y, reuse=True)
            self.critic_ = critic(self.states_, self.actor_.y, 'target')

        with tf.variable_scope('training'):
            self.critic_op = train_critic(self.critic, self.critic_,
                                          self.terminals, self.rewards,
                                          self.gamma)
            self.actor_op = train_actor(self.actor, critique)
            self.targets_op = (soft_updates(self.critic, self.critic_) +
                               soft_updates(self.actor, self.actor_))

        self.summaries = tf.summary.merge_all()
        self.writer = tf.summary.FileWriter('logs/lander', self.session.graph)
        self.saver = tf.train.Saver(max_to_keep=1)
        if checkpoint:
            self.saver.restore(self.session, checkpoint)
        else:
            self.session.run(tf.global_variables_initializer())
            self.session.run(hard_updates(self.critic, self.critic_) +
                             hard_updates(self.actor, self.actor_))

    def feed(self, batch):
        """Feed the training queue with data."""
        states, actions, rewards, states_, terminals = zip(*batch)
        self.session.run(self.enqueue_op, {
            self.states: states, self.actions: actions, self.rewards: rewards,
            self.states_: states_, self.terminals: terminals,
        })

    def reset_noise(self):
        """Reset the OU process exploration noise.
        
        Note: Currently unused, unsure if it a fresh OU process would provide
        any advantage.
        """
        self.session.run(self.noise_reset)

    def save(self, step):
        """Save current graph paramter state."""
        self.saver.save(self.session, 'logs/lander', global_step=step)

    def train(self):
        """Train the online and update the target networks. Save summaries."""
        summary, _, _, _, i = self.session.run([self.summaries, self.critic_op,
                                                self.actor_op, self.targets_op,
                                                self.global_step])
        self.writer.add_summary(summary, i)

    def get_action(self, state, exploration=0):
        """Map a state to an action according to the current policy.
        
        TODO: Consider moving noise application and action bounding to TF?
        """
        actions, noise = self.session.run([self.actor.y, self.noise],
                                          {self.states: [state]})
        action = actions[0] + exploration * noise
        action = np.min([action, self.bounds[1]], axis=0)
        action = np.max([action, self.bounds[0]], axis=0)
        return action

## Training

In [None]:
env = gym.make('LunarLanderContinuous-v2')
# env = gym.make('Pendulum-v0')
model = DDPG(din=env.observation_space.shape[0], 
             dout=env.action_space.shape[0], 
             action_bounds=[env.action_space.low, env.action_space.high])
agent = Agent(env, model, render=False)
agent.train(1000)

## Replay from saved checkpoint

In [None]:
CHECKPOINT = 'logs/lander-950'

# New environment with video recording.
env = gym.make('LunarLanderContinuous-v2')
env = gym.wrappers.Monitor(env, 'logs/lander')

# Model and agent from saved state
model = DDPG(din=env.observation_space.shape[0], 
             dout=env.action_space.shape[0], 
             action_bounds=[env.action_space.low, env.action_space.high],
             checkpoint=CHECKPOINT)
agent = Agent(env, model, render=False)

# Play using learned policy!
terminal = False
reward = 0
state = env.reset()
for steps in  count():
    action = model.get_action(state, 0)
    state, r, terminal, _ = env.step(action)
    reward += r
    env.render()
    if terminal:
        break

env.close()  # Saves video.

print('Epsiode ended after {} steps with a reward of {}'.format(steps, reward))