In [17]:
from typing import *
import os
import random
from enum import Enum
from glob import glob
from datetime import datetime

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from nptyping import NDArray
from IPython.display import display


sns.set_style('whitegrid')
colors = ['#de3838', '#007bc3', '#ffd12a']
markers = ['o', 'x', ',']
%config InlineBackend.figure_formats = ['svg']

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 100)

cmap = sns.diverging_palette(255, 0, as_cmap=True)  # カラーパレットの定義

# Bellman Equation

In [18]:
LIMIT_GAME_COUNT = 5
HAPPY_END_BORDER = 4
MOVE_PROB = 0.9


def V(s: str, gamma: float = 0.99) -> float:
    V : float = R(s) + gamma * max_V_on_next_state(s, gamma)
    return V


def R(s: str) -> float:
    if s == 'happy_end':
        return 1
    elif s == 'bad_end':
        return -1
    else:
        return 0


def max_V_on_next_state(s: str, gamma: float) -> float:
    """全ての行動で価値Vを計算し、値が最大のVを返却"""
    # If game end, expected value is 0.
    if s in ['happy_end', 'bad_end']:
        return 0
    
    actions : List[str] = ['up', 'down']
    values : List[float] = []
    for a in actions:
        transition_probs : Dict[str, float] = transit_func(s, a)
        v : float = 0
        for next_state in transition_probs:
            prob : float = transition_probs[next_state]
            v += prob * V(next_state, gamma)
        values.append(v)
    return max(values)


def transit_func(s: str, a: str) -> Dict[str, float]:
    """Make next state by adding action str to state.

    UnitTests
    ---------
    >>> transit_func(s='state', a='up')
    {'state_up': MOVE_PROB, 'state_down': 1 - MOVE_PROB}
    >>> transit_func(s='state_up_up_up_up_up', a='up')
    {'happy_end': 1.0}
    """
    actions : List[str] = s.split('_')[1:]

    def next_state(state: str, action: str) -> str:
        return '_'.join([state, action])

    if len(actions) == LIMIT_GAME_COUNT:
        up_count : int = sum([1 if a == 'up' else 0 for a in actions])
        state : str = 'happy_end' if up_count >= HAPPY_END_BORDER else 'bad_end'
        prob : float = 1.0
        return {state: prob}
    else:
        opposite : str = 'up' if a == 'down' else 'down'
        return {
            next_state(s, a): MOVE_PROB,
            next_state(s, opposite): 1 - MOVE_PROB}

    

print(V(s='state'))
print(V(s='state_up_up'))
print(V(s='state_down_down'))

print('--------------')
print(f'value of happy end: {V(s="happy_end")}')
print(f'value of happy end: {V(s="bad_end")}')

0.7880942034605892
0.9068026334400001
-0.96059601
--------------
value of happy end: 1.0
value of happy end: -1.0


# 動的計画法による学習

In [10]:
from abc import ABCMeta, abstractmethod

from ch1 import Environment, State, Action


class Planner(ABCMeta):
    def __init__(self, env: Environment) -> None:
        self.env : Environment = env
        self.log : List[str] = []

    def initialize(self) -> None:
        self.env.reset()
        self.log : List[List[float]] = []

    @abstractmethod
    def plan(self, gamma: float = 0.9, threshold: float = 0.0001):
        raise NotImplementedError('Planner have to implements plan method.')

    def transitions_at(self, state, action) -> Iterator[Tuple[float, State, float]]:
        """遷移確率T(s'|s,a)、次の状態s'、報酬を返却R(s,a,s')を返却"""
        transition_probs : Dict[State, float] = self.env.transit_func(state=state, action=action)
        for next_state in transition_probs:
            prob : float = transition_probs[next_state]
            reward, _ = self.env.reward_func(state=next_state)
            yield prob, next_state, reward

    def dict_to_grid(self, state_reward_dict: Dict[State, float]) -> List[List[float]]:
        grid : List[List[float]] = []
        for i in range(self.env.row_length):
            row : List[int] = [0] * self.env.col_length
            grid.append(row)
        for s in state_reward_dict:
            grid[s.row][s.col] = state_reward_dict[s]
        
        return grid    

## Value Iteration

In [13]:
class ValueIterationPlaner(Planner):
    def __init__(self, env: Environment) -> None:
        super().__init__(env)

    def plan(self, gamma: float = 0.9, threshold: float = 0.0001) -> List[List[float]]:
        """動的計画法により各状態の状態価値関数V(s)を更新後、各々の状態を要素とする状態価値関数のListを返却"""
        self.initialize()
        actions : List[Action] = self.env.actions
        V : Dict[State, float] = {s: 0 for s in self.env.states}  # Initialize each state's expected reward.

        # 更新幅がthresholdを下回るまで、状態価値関数Vを更新
        while True:
            delta : float = 0
            self.log.append(self.dict_to_grid(state_reward_dict=V))
            for s in V:
                if not self.env.can_action_at(state=s):
                    continue
                expected_rewards : List[float] = []  # ある状態sに関して、取り得る状態価値関数を保存
                for a in actions:
                    r : float = 0
                    for prob, next_state, reward in self.transitions_at(state=s, action=a):
                        r += prob * (reward + gamma * V[next_state])
                    expected_rewards.append(r)
                max_reward : float = max(expected_rewards)  # アクションaに対する最大の状態価値関数: V(s) = max_a
                delta = max(delta, abs(max_reward - V[s]))
                V[s] = max_reward
            
            if delta < threshold:
                break
        
        V_grid : List[List[float]] = self.dict_to_grid(state_reward_dict=V)
        return V_grid

## Policy Iteration

In [16]:
class PolicyIterationPlanner(Planner):
    def __init__(self, env: Environment) -> None:
        super().__init__(env)
        self.policy = {}
    
    def initialize(self) -> None:
        super().initialize()
        self.policy : Dict[State, Dict[Action, float]] = {}
        actions : List[Action] = self.env.actions
        states : List[State] = self.env.states
        for s in states:
            self.policy[s] = {}
            for a in actions:
                # Initialize policy.
                # At first, each action is taken uniformly.
                self.policy[s][a] = 1 / len(actions)

    def estimate_by_policy(self, gamma: float, threshold: float) -> Dict[State, float]:
        """方策評価"""
        V : Dict[State, float] = {s: 0 for s in self.env.states}  # Initialize each state's expected reward.

        while True:
            delta : float = 0
            for s in V:
                expected_rewards : List[float] = []
                for a in self.policy[s]:
                    action_prob : float = self.policy[s][a]
                    r : float = 0
                    for prob, next_state, reward in self.transitions_at(state=s, action=a):
                        r += action_prob * prob * (reward + gamma * V[next_state])
                    expected_rewards.append(r)
                value : float = sum(expected_rewards)
                delta = max(delta, abs(value - V[s]))
                V[s] = value
            if delta < threshold:
                break
        
        return V

    def plan(self, gamma: float = 0.9, threshold: float = 0.0001) -> List[List[float]]:
        """動的計画法により各状態の状態価値関数V(s)を更新後、各々の状態を要素とする状態価値関数のListを返却"""
        self.initialize()
        states : List[State] = self.env.states
        actions : List[Action] = self.env.actions
        
        def take_max_action(action_value_dict: Dict[Action, float]) -> Action:
            """valueが最大となるkeyを取得
            
            UnitTests
            ---------
            >>> take_max_action(action_value_dict={'UP': 0.5, 'DOWN': 0.7})
            'DOWN'
            """
            return max(action_value_dict, key=action_value_dict.get)

        # 更新幅がthresholdを下回るまで、状態価値関数Vを更新
        while True:
            update_stable : bool = True
            # Estimate expected rewards under current policy.
            V : Dict[State, float] = self.estimate_by_policy(gamma=gamma, threshold=threshold)  # 方策評価により状態価値関数V(s)を改善
            self.log.append(self.dict_to_grid(state_reward_dict=V))

            for s in states:
                # Get an action following to the current policy.
                policy_action : Action = take_max_action(action_value_dict=self.policy[s])  # ポリシー確率π(a, s)によって採択されたaction

                # Compare with other actions.
                action_rewards : Dict[Action, float] = {}
                for a in actions:
                    r : float = 0
                    for prob, next_state, reward in self.transitions_at(state=s, action=a):
                        r += prob * (reward + gamma * V[next_state])
                    action_rewards[a] = r
                best_action : Action = take_max_action(action_value_dict=action_rewards)  # 状態価値関数V(s)を最大とするaction
                if policy_action != best_action:
                    update_stable = False
                
                # Update policy (set best_action prob=1, otherwise=0 (greedy))
                for a in self.policy[s]:
                    prob = 1 if a == best_action else 0
                    self.policy[s][a] = prob

            if update_stable:
                # If policy isn't updated, stop iteration
                break
        
        # Turn dictionary to grid
        V_grid : List[List[float]] = self.dict_to_grid(state_reward_dict=V)
        return V_grid