# Reinforcement Learning based on dp

## Reinforcement Learning 

There are two method to perform RL:  

1. Policy Iteration Learning  
2. Value Iteration Learning 

To perform RL, we must define:  

1. Environment  
2. ValueTable: maintains valuefunction form state space $S \in \mathbb R^2$ to real number space $\mathbb R$
3. Policies: Agent Policies.

## Environment 

### parameter

| math | code    | exampvalue | explanation |
| ---- | ----    | ----- | -------------------- |
| /    | `width`  | 4          | 矩阵宽度     |
| /    | `height` | 4          | 矩阵高度     |


### Set 

| math | code                | type  | explanation |
| ---- | ----               | ------ | ----------- |
| $\mathcal S$ | `env._state` | (a, b) | 状态集表示迷宫的某一位置        |
| $\mathcal A$ | `MatrixEnv._actions` | "up" "right" \.\.\. | 处于某状态下的可行动作 |

### Variable 

| math | code | explanation |
| ---- | ---- | ----------- |
| $s \in \mathcal S$  | `s` | state|
| $v \in \mathbb R$ | `v` | 价值 |
| $v_s \in \mathbb R$ | `v` | 某状态对应的价值 |
| $a \in \mathcal A$ | `act` | 可行动作 |

In [13]:
class Env:
    def __init__(self):
        self._states = set()
        self._state = None
        self._actions = []
        self._gamma = None
        
    @property
    def states(self):
        return self._states
    
    @property
    def state_space(self):
        return self._state_shape
    
    @property
    def actions(self):
        return self._actions
    
    @property
    def action_space(self):
        return len(self._actions)
    
    @property
    def gamma(self):
        return self._gamma
    
    def _world_init(self):
        raise NotImplementedError
        
    def reset(self):
        raise NotImplementedError
    
    def step(self, state, action):
        """Return distribution and next states"""
        raise NotImplementedError
        
    def set_state(self, state):
        self._state = state




In [14]:
class MatrixEnv(Env):
    def __init__(self, height=4, width=4):
        super().__init__()
        
        self._action_space = 4
        self._actions = ["up", "right", "down", "left"]
        
        self._state_shape = (2,)
        self._state_shape = (height, width)
        self._states = [(i, j) for i in range(height) for j in range(width)]
        
        self._gamma = 0.9
        self._height = height
        self._width = width

        self._world_init()
        
    @property
    def state(self):
        return self._state
    
    @property
    def gamma(self):
        return self._gamma
    
    def set_gamma(self, value):
        self._gamma = value
        
    def reset(self):
        self._state = self._start_point
        
    def _world_init(self):
        # start_point
        self._start_point = (0, 0)
        self._end_point = (self._height - 1, self._width - 1)
        
    def _state_switch(self, act):
        # 0: h - 1, 1: w + 1, 2: h + 1, 3: w - 1
        if act == "up":  # up
            self._state = (max(0, self._state[0] - 1), self._state[1])
        elif act == "right":  # right
            self._state = (self._state[0], min(self._width - 1, self._state[1] + 1))
        elif act == "down":  # down
            self._state = (min(self._height - 1, self._state[0] + 1), self._state[1])
        elif act == "left":  # left
            self._state = (self._state[0], max(0, self._state[1] - 1))

    def step(self, act):
        assert act in self._actions
        
        done = False
        reward = 0.

        self._state_switch(act)
        
        if self._state == self._end_point:
            reward = 1.
            done = True

        return None, done, [reward], [self._state]

## ValueTable

In [15]:
class ValueTable:
    def __init__(self, env):
        self._values = np.zeros(env.state_space)
        
    def update(self, s, value):
        self._values[s] = value
        
    def get(self, state):
        if type(state) == tuple:
            return self._values[state]
        if type(state) == list:
            # loop get
            res = [self._values[s] for s in state]
            return res
        elif type(state) == tuple:
            # return directly
            return self._values[state]

## Policies
对于任何一个状态$s$，都拥有一个策略$\pi$，对于每个状态的每一个策略，它无非是通过某个动作$a$选择迁移到可以迁移的任一状态$s'$，并且拥有不同的迁移概率$\pi_{sa}$。

### variable

|math| code| explanation |
| --- | --- | ---------- | 
| $\pi_s \in \vec {\mathbb R} $ | `Policies._policies[state]` | 在某个状态下的策略，包括可以转移到状态和与之对应的转移概率 |
| $\pi_{sa} \in [0, 1]$ | `Policies._policies[state].prob[... Policies._policies[state].act.index(act)]` | 状态$s$进行动作$a$的概率 |

In [16]:
from collections import namedtuple


Pi = namedtuple('Pi', ['act', 'prob'])


class Policies:
    def __init__(self, env: Env):
        self._actions = env.actions
        self._default_policy = [1 / env.action_space] * env.action_space
        self._policies = dict.fromkeys(env.states, Pi(self._actions, self._default_policy))
        # print("Policies:", self._policies)
        # print(self._policies[(1, 0)].prob)
    
    def sample(self, state):
        if self._policies.get(state, None) is None:
            self._policies[state] = Pi(self._actions, self._default_policy)

        policy = self._policies[state]
        return np.random.choice(policy.act, p=policy.prob)
    
    def retrieve(self, state):
        return self._policies[state].prob
    
    def update(self, state, policy):
        self._policies[state] = self._policies[state]._replace(prob=policy)

In [17]:
Pi = namedtuple('Pi', ['act', 'prob'])
pi = Pi(["up", "down"], [0.15, 0.25])
print(pi)
print(pi.prob[pi.act.index("up")])

Pi(act=['up', 'down'], prob=[0.15, 0.25])
0.15


## Policy Iteration Learning
### Parameter
| math | code       | explanation|
| ---- |   ----     | ---------- |
|      | `iteration`  | 循环次数    |
| /    | `upper_bound` |  /          |
### Pesudo
1. initialization
    $V(s) \in R$ and $\pi(s) \in \mathcal A(s) \quad \forall s \in \mathcal S$
    其中

In [18]:
import numpy as np

from copy import copy


def policy_eval(env, values, policies, upper_bound):
    print('\n===== Policy Evalution =====')
    delta = upper_bound
    iteration = 0

    while delta >= upper_bound:
        delta = 0.

        for s in env.states:
            v = values.get(s)
            env.set_state(s)

            action_index = policies.sample(s)
            # print(action_index)
            # print(env.actions)
            action = policies.sample(s)
            
            _, _, rewards, next_states = env.step(action)

            next_values = values.get(list(next_states))
            td_values = list(map(lambda x, y: x + env.gamma * y, rewards, next_values))

            exp_value = np.mean(td_values)
            values.update(s, exp_value)

            # update delta
            delta = max(delta, abs(v - exp_value))
            
        iteration += 1
        print('\r> iteration: {} delta: {}'.format(iteration, delta), flush=True, end="")

In [19]:
def policy_improve(env, values, policies):
    print('\n===== Policy Improve =====')
    policy_stable = True
    
    for state in env.states:
        old_act = policies.sample(state)

        # calculate new policy execution
        actions = env.actions
        value = [0] * len(env.actions)
        
        for i, action in enumerate(actions):
            env.set_state(state)
            _, _, rewards, next_states = env.step(action)
            next_values = values.get(list(next_states))
            td_values = list(map(lambda x, y: x + env.gamma * y, rewards, next_values))
            prob = [1 / len(next_states)] * len(next_states)

            value[i] = sum(map(lambda x, y: x * y, prob, td_values))

        # action selection
        new_act = env._actions[np.argmax(value)]
        # print(value)
        # print("new_act:", new_act, old_act)

        # greedy update policy
        # print(env.action_space)
        new_policy = [0.] * env.action_space
        new_policy[env._actions.index(new_act)] = 1.
        policies.update(state, new_policy)

        if old_act != new_act:
            policy_stable = False

    return policy_stable

## Solving Matrix Game via Policy Iteration Learning 


In [20]:
import time

env = MatrixEnv(width=4, height=4)  # TODO(ming): try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1

stable = False

start = time.time()
while not stable:
    policy_eval(env, values, policies, upper_bound)
    stable = policy_improve(env, values, policies)
end = time.time()

print('\n[time consumpution]: {} s'.format(end - start))

done = False
rewards = 0
env.reset()
step = 0

while not done:
    act_index = policies.sample(env.state)
    # print(act_index)
    _, done, reward, next_state = env.step(env.actions[env._actions.index(act_index)])
    rewards += sum(reward)
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))


===== Policy Evalution =====
> iteration: 2 delta: 0.8999999999999999
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 0.7290000000000001
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 0.5904900000000004
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 0.47829690000000014
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 0.38742048900000015
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 0.3138105960900006
===== Policy Improve =====

===== Policy Evalution =====
> iteration: 2 delta: 0.25418658283290085
===== Policy Improve =====

[time consumpution]: 0.05784320831298828 s
Evaluation: [reward] 1.0 [step] 6


## value Iteration


In [21]:
def value_iter(env, values, upper_bound):
    print('===== Value Iteration =====')
    delta = upper_bound + 1.
    states = copy(env.states)
    
    iteration = 0

    while delta >= upper_bound:
        delta = 0

        for s in states:
            v = values.get(s)

            # get new value
            actions = env.actions
            vs = [0] * len(actions)

            for i, action in enumerate(actions):
                env.set_state(s)
                _, _, rewards, next_states = env.step(action)
                td_values = list(map(lambda x, y: x + env.gamma * y, rewards, values.get(next_states)))

                vs[i] = np.mean(td_values)

            values.update(s, max(vs))
            delta = max(delta, abs(v - values.get(s)))
        
        iteration += 1
        print('\r> iteration: {} delta: {}'.format(iteration, delta), end="", flush=True)
        
    return

In [22]:
env = MatrixEnv(width=8, height=8)  # try different word size
policies = Policies(env)
values = ValueTable(env)
upper_bound = 1e-4

start = time.time()
value_iter(env, values, upper_bound)
_ = policy_improve(env, values, policies)
end = time.time()

print('\n[time consumption] {}s'.format(end - start))
# print("===== Render =====")
env.reset()
done = False
rewards = 0
step = 0
while not done:
    act_index = policies.sample(env.state)
    # print(act_index)
    _, done, reward, next_state = env.step(act_index)
    rewards += sum(reward)
    step += 1

print('Evaluation: [reward] {} [step] {}'.format(rewards, step))

===== Value Iteration =====
> iteration: 89 delta: 9.404610870067387e-05
===== Policy Improve =====

[time consumption] 0.5226085186004639s
Evaluation: [reward] 1.0 [step] 14
