# Dynamic Programming Based Reinforcement Learning Methods

## Reinforcement Learning

In reinforcement learning, the agent aims to maixmize its cumulative reward:

$$
\max \sum_{t=1}^T \mathbb{E}_{a_t \sim \pi(s_t), s_{t+1} \sim p(s_{t+1} \mid s_t, a_t), s_t \sim p(s)}[ \gamma^{t-1} r(s_t,a_t)].
$$

From the perspective of Bellman equation, the caculation of cumulative reward can be also formulated as:

$$
V(s_t) = \mathbb{E}_{a \sim \pi(s_t)}[r(s_t,a_t) + \gamma V(s_{t+1})].
$$

## Policy Iteration Learning

Once a policy, $\pi$, has been improved using $v_{\pi}$ to yield a better policy, $\pi'$, we can then compute $v_{\pi'}$ and improve it again to yield and even better $\pi''$. We can thus obtain a sequence of monotonically improving policies and value functions:

$$
\pi_0 \stackrel{\text{E}}{\longrightarrow} v_{\pi_0} \stackrel{\text{I}}{\longrightarrow} \pi_1 \stackrel{\text{E}}{\longrightarrow} \dots \stackrel{\text{I}}{\longrightarrow}\pi_{\star}\stackrel{\text{E}}{\longrightarrow}v_{\star},
$$

where $\stackrel{\text{E}}{\longrightarrow}$ denotes a policy *evaluation*, and $\stackrel{\text{I}}{\longrightarrow}$ denotes a policy improvement. This way of finding an optimal policy is called *policy iteration*.

### Pesudo of Policy Iteration Learning

1. **Initialization**

    $V(s) \in \mathbb{R}$ and $\pi(s) \in \mathcal{A}(s)$ arbitrarily for all $s \in \mathcal{S}$

2. **Policy Evaluation**

    Loop:

    > $\Delta \leftarrow 0$  
    > Loop for each $s \in \mathcal{S}$:  
    >> $v \leftarrow V(S)$  
    >> $V(s) \leftarrow \sum_{s',r}p(s', r | s, \pi(s))[r + \gamma V(s')]$  
    >> $\Delta \leftarrow \max(\Delta, |v-V(s)|)$
    
    until $\Delta < \theta$ (a small positive number determining the accuracy of estimation)

3. **Policy Improvement**  

    policy-stable $\leftarrow$ true  
    For each $s \in \mathcal{S}$:
    
    > *old-action* $\leftarrow \pi(s)$  
    > $\pi(s) \leftarrow \arg \max_a\sum_{s', r}p(s', r | s, a)[r + \gamma V(s')]$  
    > If *old-action* $\ne \pi(s)$, then *policy-stable* $\leftarrow$ *false*  
    
    If *policy-stable*, then stop and return $V \sim v_{\star}$ and $\pi \sim \pi_{\star}$; else go to 2

**✨ Policy Evaluation Implementation**

In [5]:
import numpy as np

from copy import copy


def policy_eval(env, values, policies, upper_bound):
    print('\n===== Policy Evalution =====')
    delta = upper_bound
    iteration = 0

    while delta >= upper_bound:
        delta = 0.

        for s in env.states:
            v = values.get(s)
            env.set_state(s)

            action_index = policies.sample(s)
            action = env.actions[action_index]
            _, _, rewards, next_states = env.step(action)

            next_values = values.get(list(next_states))
            td_values = list(map(lambda x, y: x + env.gamma * y, rewards, next_values))

            exp_value = np.mean(td_values)
            values.update(s, exp_value)

            # update delta
            delta = max(delta, abs(v - exp_value))
            
        iteration += 1
        print('\r> iteration: {} delta: {}'.format(iteration, delta), flush=True, end="")
        

print("xiaogouzi")

xiaogouzi


**✨ Policy Improvement Implementation**

In [6]:
def policy_improve(env, values, policies):
    print('\n===== Policy Improve =====')
    policy_stable = True
    
    for state in env.states:
        old_act = policies.sample(state)

        # calculate new policy execution
        actions = env.actions
        value = [0] * len(env.actions)
        
        for i, action in enumerate(actions):
            env.set_state(state)
            _, _, rewards, next_states = env.step(action)
            next_values = values.get(list(next_states))
            td_values = list(map(lambda x, y: x + env.gamma * y, rewards, next_values))
            prob = [1 / len(next_states)] * len(next_states)

            value[i] = sum(map(lambda x, y: x * y, prob, td_values))

        # action selection
        new_act = actions[np.argmax(value)]

        # greedy update policy
        new_policy = [0.] * env.action_space
        new_policy[new_act] = 1.
        policies.update(state, new_policy)

        if old_act != new_act:
            policy_stable = False

    return policy_stable

## Value Iteration

One drawback to policy iteration is that each of its iterations involves policy evaluation, which may itself be a protracted iterative computation requiring multiple sweeps through the state set. Must we wait for exact convergence, or can we stop short of that?

### Pesudo of Value Iteration Learning

Algorithm parameter: a small threshold $\theta > 0$ determining accuracy of estimation

Initialize $V(s)$, for all $s \in \mathcal{S}^+$, arbitrarily except that $V(\textit{terminal})=0$

Loop:

>  $\Delta \leftarrow 0$  
>  Loop for each $s \in \mathcal{S}$:  
>> $v \leftarrow V(s)$  
>> $V(s) \leftarrow \max_{a}\sum_{s',r}p(s', r | s, a)\Big[ r + \gamma V(s') \Big]$  
>> $\Delta \leftarrow \max(\Delta, |v-V(s)|)$  

> until $\Delta < \theta$  

Output a deterministic policy, $\pi \approx \pi_{\star}$, such that $\pi(s) = \arg \max_{a}\sum_{s',r}p(s', r | s, a)\Big[ r + \gamma V(s') \Big]$

**✨ Value Iteration Implementation**

In [7]:
def value_iter(env, values, upper_bound):
    print('===== Value Iteration =====')
    delta = upper_bound + 1.
    states = copy(env.states)
    
    iteration = 0

    while delta >= upper_bound:
        delta = 0

        for s in states:
            v = values.get(s)

            # get new value
            actions = env.actions
            vs = [0] * len(actions)

            for i, action in enumerate(actions):
                env.set_state(s)
                _, _, rewards, next_states = env.step(action)
                td_values = list(map(lambda x, y: x + env.gamma * y, rewards, values.get(next_states)))

                vs[i] = np.mean(td_values)

            values.update(s, max(vs))
            delta = max(delta, abs(v - values.get(s)))
        
        iteration += 1
        print('\r> iteration: {} delta: {}'.format(iteration, delta), end="", flush=True)
        
    return

## A Simple Environment

### MatrixEnv

A simple maze game, the agent needs to learn walk from the start point to the destination (goal)

![](https://storage.googleapis.com/kagglesdsdata/datasets/374051/727215/matrix_game.png?GoogleAccessId=web-data@kaggle-161607.iam.gserviceaccount.com&Expires=1589524158&Signature=cpaCiVm8OS6yJx2KDrOWULEJ%2BBVCq9LqlSDu4nOFetTkvlY0jitrx6YPsAeDme60O0JcHELm2ASuOh1r%2BZeu%2BYIHUb8gAFhtR5AeGfFiImgHT4WuxEMJu6ddQ1Iggs0mbyXKAFzYldsAFPrrqfOvcsHZB%2BjXS0uodDhkYrQradvKYIShhI2w0SsCgEN%2BoFSuCFypdXHSejnj2h51vCL8Mcu3ELE%2BWQyrmpClh1WT%2BJTclZKVMMka0elXIju7f9IEvVJRv0dLdMzLfN4EcPMcFVqSl9okMQXCMVqDGUe32uClpRCArQux4XHtFv6idx1VUQKrvBVLp0g%2BUGlJS6ueaA%3D%3D)

```python
env = MatrixEnv()

# ...

# before starting a new episode, be sure to call the `env.reset()`
env.reset()

# act_index: get from policy sample
# retrieve action from action space with `act_index`
act = env.actions[act_index]

# method step accepts action, then update the inner state, return: {None, done, rewards: list, next_states: list} 
_, done, rewards, next_states = env.step(act)

# ...
```

In [8]:
class Env:
    def __init__(self):
        self._states = set()
        self._state = None
        self._actions = []
        self._gamma = None
        
    @property
    def states(self):
        return self._states
    
    @property
    def state_space(self):
        return self._state_shape
    
    @property
    def actions(self):
        return self._actions
    
    @property
    def action_space(self):
        return len(self._actions)
    
    @property
    def gamma(self):
        return self._gamma
    
    def _world_init(self):
        raise NotImplementedError
        
    def reset(self):
        raise NotImplementedError
    
    def step(self, state, action):
        """Return distribution and next states"""
        raise NotImplementedError
        
    def set_state(self, state):
        self._state = state


class MatrixEnv(Env):
    def __init__(self, height=4, width=4):
        super().__init__()
        
        self._action_space = 4
        self._actions = list(range(4))
        
        self._state_shape = (2,)
        self._state_shape = (height, width)
        self._states = [(i, j) for i in range(height) for j in range(width)]
        
        self._gamma = 0.9
        self._height = height
        self._width = width

        self._world_init()
        
    @property
    def state(self):
        return self._state
    
    @property
    def gamma(self):
        return self._gamma
    
    def set_gamma(self, value):
        self._gamma = value
        
    def reset(self):
        self._state = self._start_point
        
    def _world_init(self):
        # start_point
        self._start_point = (0, 0)
        self._end_point = (self._height - 1, self._width - 1)
        
    def _state_switch(self, act):
        # 0: h - 1, 1: w + 1, 2: h + 1, 3: w - 1
        if act == 0:  # up
            self._state = (max(0, self._state[0] - 1), self._state[1])
        elif act == 1:  # right
            self._state = (self._state[0], min(self._width - 1, self._state[1] + 1))
        elif act == 2:  # down
            self._state = (min(self._height - 1, self._state[0] + 1), self._state[1])
        elif act == 3:  # left
            self._state = (self._state[0], max(0, self._state[1] - 1))

    def step(self, act):
        assert 0 <= act <= 3
        
        done = False
        reward = 0.

        self._state_switch(act)
        
        if self._state == self._end_point:
            reward = 1.
            done = True

        return None, done, [reward], [self._state]

## Basic Data Structure for Learning

### ValueTable

Class `ValueTable` maintains the state value function which map state space $\mathcal{S} \in \mathbb{R}^2$ to real number space $\mathbb{R}$.

**Methods**:

- `update(state, value)`: update state value with given value
- `get(state)`: return state value with given state or states


In [9]:
class ValueTable:
    def __init__(self, env):
        self._values = np.zeros(env.state_space)
        
    def update(self, s, value):
        self._values[s] = value
        
    def get(self, state):
        if type(state) == list:
            # loop get
            res = [self._values[s] for s in state]
            return res
        elif type(state) == tuple:
            # return directly
            return self._values[state]

### Policies

Class `Policies` maintains the polcies of each states

**Methods**:

- `sample(state)`: sample action, return action index
- `retrieve(state)`: retrieve policy of given state
- `update(state, policy)`: update policy of state with given policy

In [10]:
from collections import namedtuple


Pi = namedtuple('Pi', 'act, prob')


class Policies:
    def __init__(self, env: Env):
        self._actions = env.actions
        self._default_policy = [1 / env.action_space] * env.action_space
        self._policies = dict.fromkeys(env.states, Pi(self._actions, self._default_policy))
    
    def sample(self, state):
        if self._policies.get(state, None) is None:
            self._policies[state] = Pi(self._actions, self._default_policy)

        policy = self._policies[state]
        return np.random.choice(policy.act, p=policy.prob)
    
    def retrieve(self, state):
        return self._policies[state].prob
    
    def update(self, state, policy):
        self._policies[state] = self._policies[state]._replace(prob=policy)

## Solving Matrix Game via Policy Iteration Learning

In [11]:
import time

env = MatrixEnv(width=8, height=8)  # TODO(ming): try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4

stable = False

start = time.time()
while not stable:
    policy_eval(env, values, policies, upper_bound)
    stable = policy_improve(env, values, policies)
end = time.time()

print('\n[time consumpution]: {} s'.format(end - start))

done = False
rewards = 0
env.reset()
step = 0

while not done:
    act_index = policies.sample(env.state)
    _, done, reward, next_state = env.step(env.actions[act_index])
    rewards += sum(reward)
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))


===== Policy Evalution =====
> iteration: 251 delta: 2.5096933587908954e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 89 delta: 9.404610869090391e-054
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 3 delta: 6.855961323637416e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 3 delta: 4.99799580495619e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 4.048376602039383e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 3.2791850476776574e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 2.656139888657094e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 2.1514733098193517e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 1.742693380979432e-05
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 1.4115816385995572e-05
===== Poli

## Solving Matrix Game via Value Iteration Learning

In [12]:
env = MatrixEnv(width=8, height=8)  # try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4

start = time.time()
value_iter(env, values, upper_bound)
_ = policy_improve(env, values, policies)
end = time.time()

print('\n[time consumption] {}s'.format(end - start))
# print("===== Render =====")
env.reset()
done = False
rewards = 0
step = 0
while not done:
    act_index = policies.sample(env.state)
    _, done, reward, next_state = env.step(env.actions[act_index])
    rewards += sum(reward)
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))

===== Value Iteration =====
> iteration: 89 delta: 9.404610870067387e-059
===== Policy Improve =====

[time consumption] 1.0287024974822998s
Evaluation: [reward] 1.0 [step] 14
