## TD学習 (Temporal Difference Learning)

source : Richard S. Sutton and Andrew G.Barto, 「強化学習」

---

### 強化学習の枠組み

強化学習の枠組みは、学習と意思決定を行う「エージェント」と  
それ以外のすべてから構成される「環境」の相互作用として表される。  
  
離散的な時間ステップ　$t=0,1,...$のあるステップtにおいて、  
エージェントは環境から状態$s_{t}$を受け取り、行動$a_{t}$を選択する。  
  
このとき、状態$s_{t}$から行動$a_{t}$への写像はエージェントの方策（policy）と呼ばれ、$\pi(s,a)$で表される。  

また、エージェントは、最終的に受け取る報酬を最大化することを目標として学習する。  
一般的には期待収益（expected return）を最大化するように設定される。  
  
各時間ステップtに受け取る報酬を$r_{t}$とするとき、  
最も単純な場合には、収益$R_{t}$は  
　$R(t) = r_{t+1} + r_{t+2} + ... + r_{T}  $  
として表される。ここでTは最終時間ステップである。（相互作用が離散的な場合）  
  
連続タスクにおいてはT=∞となりR(t)が発散しうるため、割引収益  
$$
\begin{align}
R_{t} &= r_{t+1} + \gamma r_{t+2} + \gamma^{2} r_{t+3} + ... \\  
　　　&= \sum_{k=0}^{\infty}\gamma^{k} r_{t+k+1} 
\end{align}
$$


を最大化するようにa(t)を選択する。  
ただし、γは割引率と呼ばれるパラメータで(0<=γ<=1)である。  

---

一般的な強化学習アルゴリズムは価値関数に基づく評価を行っている。  
方策πのもとでの状態sの価値Vπ(s)は、MDP（マルコフ決定過程）では  

$$
\begin{align}
V^{\pi}(s) = E_{\pi}\{R_{t} | s_{t}=s\} = E_{\pi}\{ \sum_{k=0}^{\infty}\gamma^{k}r_{t+k+1}|s_{t}=s\}
\end{align}
$$

と表される。関数 $ V^{\pi} $を方策πに対する状態価値観数と呼ぶ。
  
同様に、方策πのもとで状態sにおいて行動aを取ることの価値を $Q^{\pi}(s,a)$で表し、  
状態sで行動aを取り、その後に方策πに従った期待報酬として次のように定義する。  
$$
\begin{align}
Q^{\pi}(s,a)=E_{\pi}\{R_{t}|s_{t}=s, a_{t}=a\} = E_{\pi}\{\sum_{k=0}^{\infty}\gamma^{k}R_{t+k+1}|s_{t}=s, a_{t}=a\}
\end{align}
$$

---

任意のs,aが与えられたときの次に可能な各状態s'の確率を遷移確率と呼ぶ。  
$$
\begin{align}
P^{a}_{ss'} = Pr\{s_{t+1}=s'| s_{t}=s, a_{t}=a\}
\end{align}
$$
  
同様にして、次の報酬の期待値を次のように表す。
$$
\begin{align}
R^{a}_{ss'}=E\{r_{t+1}| s_{t}=s, a_{t}=a, s_{t+1}=s'\}
\end{align}
$$
  
    
強化学習と動的計画法で使われている価値観数は、  
任意の方策πと状態sに対して、sの価値と可能な後続状態群の価値との間に  
以下の整合性条件（consistency condition）がなりたち、これを$V^{\pi}$に対するBellman方程式という。  
  
$$
\begin{align}
V^{\pi}(s) &= E_{\pi}\{R_{t} | s_{t}=s\} \\
&= E_{\pi}\{ \sum_{k=0}^{\infty}\gamma^{k}r_{t+k+1}|s_{t}=s\} \\
&= E_{\pi}\{r_{t+1}+ \gamma\sum_{k=0}^{\infty}\gamma^{k}r_{t+k+2}|s_{t}=s\} \\
&= \sum_{a}\pi(s,a)\sum_{s'}P^{a}_{ss'} [ R^{a}_{ss'} + \gamma E_{\pi} \{ \sum_{k=0}^{\infty}\gamma^{k}r_{t+k+2}|s_{t+1}=s'\} ] \\
&= \sum_{a}\pi(s,a)\sum_{s'}P^{a}_{ss'}[R^{a}_{ss'}+\gamma V^{\pi}(s')]
\end{align}
$$


---

いま、すべての状態に対して、方策πの期待収益がπ'よりも良いか同じであるなら、  
πはπ'よりも良いか、同じであると定義する。  
つまり、すべての $ s \in S $ に対して、$ V^{\pi}(s) \leqq V^{\pi'}(s) $ であるなら、その時に限り $\pi \leqq \pi'$である。
  
これが１つの最適方策であり、すべての最適方策を$\pi^{*}$と記す。  
最適方策群は最適状態価値関数 $V^{*}(s) = \max_{\pi}V^{\pi}(s)$ を共有する。    


同様に、最適方策群は最適行動価値関数 $Q^{*}(s,a)=\max_{\pi}Q^{\pi}(s,a)$ を共有する。  
$V^{*}$を用いて$Q^{*}$を次のように書くことができる。  

$$
\begin{align}
Q^{*}(s,a)=E\{r_{t+1}+\gamma V^{*}(s_{t+1})|s_{t}=s,a_{t}=a\}
\end{align}
$$

$V^{*}$に対するBellman方程式を、Bellman最適方程式という。  
  
$$
\begin{align}
V^{*}(s) &= \max_{a \in A(s)}Q^{\pi^{*}}(s,a) \\
&= \max_{a}E_{\pi^{*}}\{R_{t} | s_{t}=s\} \\
&= \max_{a}E_{\pi^{*}}\{ \sum_{k=0}^{\infty}\gamma^{k}r_{t+k+1}|s_{t}=s\} \\
&= \max_{a}E_{\pi^{*}}\{r_{t+1}+ \gamma\sum_{k=0}^{\infty}\gamma^{k}r_{t+k+2}|s_{t}=s\} \\
&= \max_{a}E\{r_{t+1}+\gamma V^{*}(s_{t+1})|s_{t}=s,a_{t}=a\} \\
&= \max_{a}\sum_{s'}P^{a}_{ss'}[R^{a}_{ss'}+\gamma V^{*}(s')]
\end{align}
$$

$Q^{*}$に対するBellman最適方程式は次の通り。
$$
\begin{align}
Q^{*}(s) &= E\{r_{t+1}+\gamma \max_{a'}Q^{*}(s_{t+1},a')|s_{t}=s,a_{t}=a\} \\
&= \sum_{s'}P^{a}_{ss'}[R^{a}_{ss'}+\gamma \max_{a'} Q^{*}(s',a')]
\end{align}
$$

---

### TD学習

TD法は経験を用いて予測問題を解決し、方策πに従って経験をいくつか得ることで  
$V^{\pi}$ の推定値$V$を更新する手法の一つである。  
  
最も単純なTD法はTD(0)と呼ばれ、以下のようになる。
$$
\begin{align}
V(s_{t}) \leftarrow V(s_{t}) + \alpha [ r_{t+1}+\gamma V(s_{t+1}) - V(s_{t})]
\end{align}
$$
ここで$\alpha$はステップサイズパラメータである。

$V^{\pi}$に対するBellman方程式より
$$
\begin{align}
V^{\pi}(s) &= E_{\pi}\{R_{t} | s_{t}=s\} \\
&= E_{\pi}\{r_{t+1}+ \gamma\sum_{k=0}^{\infty}\gamma^{k}r_{t+k+2}|s_{t}=s\} \\
\therefore V^{\pi}(s) &= E_{\pi}\{r_{t+1}+ \gamma V^{\pi}(s_{t+1})|s_{t}=s \}
\end{align}
$$

としたとき、モンテカルロ法は前者の推定値を、動的計画法は後者の推定値を目標とする。  
TD法は両者を融合させたものである。  

### テーブル型 TD(0) アルゴリズム
Algorithm
>$V(s)$を任意に初期化し、$\pi$を評価対象の方策に初期化する  
>各エピソードに対して繰り返し：  
>　　$s$を初期化  
>　　エピソードの各ステップに対して繰り返し：  
>　　　　$ a \leftarrow s $に対して$\pi$で与えられる行動  
>　　　　行動$a$を取り、報酬$r$と次状態$s'$を観測する  
>　　　　$V(s) \leftarrow V(s) + \alpha[r+\gamma V(s')-V(s)]$  
>　　　　$s \leftarrow s'$  
>　　$s$が終端状態ならば繰り返しを終了  

このTD予測法を制御問題に適用する方法について考える。

### Sarsa : 方策オン型TD制御
行動価値関数を学習するためにTD法を用いる。

Algorithm
>$Q(s,a)$を任意に初期化  
>各エピソードに対して繰り返し：  
>　　$s$を初期化  
>　　$Q$から導かれる方策（εグリーディ方策など）を用いて、$s$で取る行動$a$を選択する  
>　　エピソードの各ステップに対して繰り返し：  
>　　　　行動$a$を取り、報酬$r$と次状態$s'$を観測する  
>　　　　$Q$から導かれる方策を用いて、$s'$での行動$a'$を選択する  
>　　　　$Q(s,a) \leftarrow Q(s,a) + \alpha[r+\gamma Q(s',a')-Q(s,a)]$  
>　　　　$s \leftarrow s'; a \leftarrow a';$  
>　　$s$が終端状態ならば繰り返しを終了 

＊エージェントと環境との相互作用は離散的であり、エピソード的タスク群に分解されること、  
　および行動の集合Aと状態の集合Sは有限の要素しか持たず、  
　その数は学習開始時に既知であることを仮定する。  
＊また、テーブル型TD(0)アルゴリズムとして実装しており、  
　state(i) | i=0~nが一次元的に並べられることを前提としている。  
　policyとしてはe-greedyを用いる。  

In [1]:
class Q_table_function(object):
    def __init__(self, state_space_size, action_space_size,
                 learning_rate=0.01, discount_rate=0.95, initial_value=1,random_initial_value=True,
                 decay_learning_rate=1):
        self.state_space_size = state_space_size
        self.action_space_size = action_space_size
        self.learning_rate = learning_rate
        self.discount_rate = discount_rate
        self.initial_value = initial_value
        self.random_initial_value = random_initial_value
        self.decay_learning_rate = decay_learning_rate

        if self.random_initial_value:
            self.q_table = np.random.rand(self.state_space_size, self.action_space_size) * self.initial_value
        else:
            self.q_table = np.ones((self.state_space_size, self.action_space_size), dtype=float64) * self.initial_value

        self.last_state = None
        self.last_action = None
    
    def estimate_q_value(self, state, action=None):
        self.last_state, self.last_action = state, action
        if action is None:
            return self.q_table[state]
        else:
            return self.q_table[state][action]
    
    def update_q_table(self, reward, next_state, next_action,
                       last_state=None, last_action=None):
        last_state = self.last_state if last_state is None else last_state
        last_action = self.last_action if last_action is None else last_action

        delta = reward + self.discount_rate * self.estimate_q_value(next_state, next_action) \
                - self.estimate_q_value(last_state, last_action)
        self.q_table[state][action] = self.q_table[state][action] + self.learning_rate * delta

    def decay_learning_rate_value(self, decay_rate=None):
        decay_rate = self.decay_learning_rate if decay_rate is None else decay_rate
        if 0<=decay_rate<=1:
            self.learning_rate = self.learning_rate * decay_rate

    def save_q_table(self):
        return self.q_table

    def load_q_table(self, q_table=None):
        if q_table is not None:
            self.q_table = q_table

    def reset(self, reset_q_table=True, learning_rate=None, discount_rate=None, decay_learning_rate=None):
        if reset_q_table:
            if self.random_initial_value:
                self.q_table = np.random.rand(self.state_space_size, self.action_space_size) * initial_value
            else:
                self.q_table = np.ones((self.state_space_size, self.action_space_size), dtype=float64) * initial_value
        self.learning_rate = learning_rate if learning_rate is not None else self.learning_rate
        self.discount_rate = discount_rate if discount_rate is not None else self.discount_rate
        self.decay_learning_rate = decay_learning_rate if decay_learning_rate is not None else self.decay_learning_rate


In [2]:
import numpy as np
class Policy_e_greedy(object):
    def __init__(self, state_space_size, action_space_size,
                 action_count_list=None,
                 initial_play_count=None, epsilon=0.1, min_choose=1):
        self.state_space_size = state_space_size
        self.action_space_size = action_space_size
        self.total_play_count = 0
        self.min_choose = min_choose
        self.epsilon = epsilon

        self.last_state = None
        self.last_action = None

        # self.action_count_list[action] = number of [action] is choosed
        if action_count_list is None:
            self.action_count_list = np.zeros(self.action_space_size, dtype=int)
        else:
            self.action_count_list = action_count_list
        if initial_play_count is not None:
            self.total_play_count = initial_play_count

    def choose_act_greedy(self, state, value_table):
        index_of_less_selected = np.where(self.action_count_list)
        if index_of_less_selected[0].size == 0:
            max_index = np.where(value_table == value_table.max())
            action = np.random.choice(max_index[0], 1)
        else:
            action = int(np.random.choice(index_of_less_selected, 1))

    def choose_act(self, state, update_flag=True, epsilon=None):
        epsilon = self.epsilon if epsilon is None else epsilon
        if np.random.choice([1, 0], p=[epsilon, 1-epsilon]):            
            action = int(np.random.choice(range(action_space_size)))
        else:
            action = self.choose_act_greedy(state)

        self.last_state, self.last_action = state, action
        if update_flag:
            self.total_play_count += 1
            self.action_count_list[action] += 1
        
        return action

    def save_record(self):
        return self.action_count_list, self.total_play_count
    
    def load_record(self, action_count_list=None, total_play_count=None):
        self.action_count_list = action_count_list if action_count_list is not None else self.action_count_list
        self.total_play_count = total_play_count if total_play_count is not None else self.total_play_count
    
    def reset_record(self, reset_count=True, reset_last=True):
        if reset_count:
            self.total_play_count = 0
            self.action_count_list = np.zeros(self.action_space_size, dtype=int)
        if reset_last:
            self.last_state = None
            self.last_action = None

In [3]:
class Agent_SARSA(object):
    def __init__(self, state_space_size, action_space_size,
                 state_function, q_function=None, policy_function=None,
                 learning_rate=0.01, discount_rate=0.95, 
                 initial_value=1, random_initial_value=True, decay_learning_rate=1,
                 action_count_list=None, initial_play_count=None, epsilon=0.1, min_choose=1):
        self.state_space_size = state_space_size
        self.action_space_size = action_space_size
        self.learning_rate = learning_rate
        self.discount_rate = discount_rate
        self.initial_value = initial_value
        self.random_initial_value = random_initial_value
        self.decay_learning_rate = decay_learning_rate
        self.action_count_list = action_count_list
        self.initial_play_count = initial_play_count
        self.epsilon = epsilon
        self.min_choose = min_choose

        self.total_play_count = 0
        self.last_state = None
        self.last_action = None

        self.state_function = self.state_function

        if q_function is None:
            self.q_function = Q_table_function(self.state_space_size, self.action_space_size,
                                               self.learning_rate, self.discount_rate, self.initial_value,
                                               self.random_initial_value, self.decay_learning_rate)
        else:
            self.q_function = q_function

        if policy_function is None:
            self.policy_function = Policy_e_greedy(self.state_space_size, self.action_space_size,
                                                   self.action_count_list, self.initial_play_count,
                                                   self.epsilon, self.min_choose)
        else:
            self.policy_function = policy_function


    def act(self, state, update_flag=True):
        action = self.policy_function.choose_act(state, update_flag=update_flag)
        self.last_state, self.last_action = state, action
        return action

    def observe_state(action=None):
        # state_function must return (reward, next_state, episode_end_flag)
        return self.state_function.return_next(action)


    def learning_step(self, action, update_flag=True):
        reward, next_state, episode_end_flag = self.observe_state(action)
        if episode_end_flag:
            next_action = -1
        else:
            next_action = self.act(next_state, update_flag)
        if update_flag:
            self.q_function.update_q_table(reward, next_state, next_action,
                                           self.last_state, self.last_action)
        return reward, next_state, next_action, episode_end_flag

    def learn(self, episode_num, maximum_trial_per_episode=1000, 
              save_flag=True, update_flag=True, reset_when_finished=False):
        reward = 0
        episode_end_flag = False
        trial_count = 0

        # episode loop
        for episode in range(episode_num):
            # choose initial action
            self.state_function.reset_state()
            state = self.observe_state()
            action = self.act(state, update_flag)
            
            while episode_end_flag==False and (trial_count < maximum_trial_per_episode):
                reward, state, action, episode_end_flag = self.learning_step(action)
                trial_count += 1
            else:
                reward, episode_end_flag, trial_count = 0, False, 0

        save_data = self.save() if save_flag else None
        if reset_when_finished:
            self.reset()

        return save_data

    def demo_play(self, episode_num=1, maximum_trial_per_episode=1000):
        self.learn(episode_num, maximum_trial_per_episode, safe_flag=False, update_flag=False)


    def save(self):
        # returns (q_table, (action_count_list, total_play_count))
        q_save_data = self.q_function.save_q_table()
        policy_save_data = self.policy_function.save_record()
        return q_save_data, policy_save_data

    def load(self, q_save_data=None, policy_save_data=None):
        if q_save_data is not None:
            self.q_function.load_q_table(q_table)
        if policy_save_data is not None:
            self.policy_function.load_record(self, action_count_list=policy_save_data[0], 
                                             total_play_count=policy_save_data[1])

    def reset(self, reset_q_table=True, reset_count=True, reset_last=True,
              learning_rate=None, discount_rate=None,decay_learning_rate=None):
        self.q_function.reset(reset_q_table, learning_rate, discount_rate, decay_learning_rate)
        self.policy_function.reset_record(reset_count, reset_last)

### Q学習 : 方策オフ型TD制御
SARSAでは次状態$s'$を観測した後に$Q$から導かれる方策により行動$a'$を選択したが、  
Q学習では$\max_{a'}Q(s',a')$を与える$a'$を用いる。  

Algorithm
>$Q(s,a)$を任意に初期化  
>各エピソードに対して繰り返し：  
>　　$s$を初期化    
>　　エピソードの各ステップに対して繰り返し：  
>　　　　$Q$から導かれる方策（εグリーディ方策など）を用いて、$s$で取る行動$a$を選択する  
>　　　　行動$a$を取り、報酬$r$と次状態$s'$を観測する  
>　　　　$Q(s,a) \leftarrow Q(s,a) + \alpha[r+\gamma \max_{a'} Q(s',a')-Q(s,a)]$  
>　　　　$s \leftarrow s';$  
>　　$s$が終端状態ならば繰り返しを終了 

In [4]:
import numpy as np
#from SARSA import Agent_SARSA, Q_table_function, Policy_e_greedy

class Agent_Q_learning(Agent_SARSA):
    def learning_step(self, action, update_flag=True):
        reward, next_state, episode_end_flag = self.observe_state(action)
        if episode_end_flag:
            next_action = -1
        else:
            # choose a' which gives max Q(s',a')
            next_action = self.act(next_state, update_flag, epsilon=0)
        if update_flag:
            self.q_function.update_q_table(reward, next_state, next_action,
                                           self.last_state, self.last_action)
        return reward, next_state, next_action, episode_end_flag
