<a href="https://colab.research.google.com/github/argonism/TsukurinagaraRL/blob/master/Zerokara_chap3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# colabでgymを使うための設定

In [None]:
# 使用するパッケージのインストール
# gym==0.17.2 pyvirtualdisplay==1.3.2
# xvfb=2:1.19.6-1ubuntu4.4 python-opengl=3.1.0+dfsg-1 ffmpeg=7:3.4.8-0ubuntu0.2
# JSAnimation==0.1
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!pip install JSAnimation > /dev/null 2>&1

わからないところ
- Q関数とQテーブルの違い
  - 多分同じ

In [None]:
# 使用するパッケージの宣言
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
import gym
from gym.wrappers import Monitor

ENV = 'CartPole-v0'
NUM_DIZITIZED = 6


In [None]:
# 動画の描画関数の宣言
import glob
import io
import os
import base64
from JSAnimation.IPython_display import display_animation
from matplotlib import animation
from IPython import display as ipythondisplay
from IPython.display import HTML
from pyvirtualdisplay import Display

display = Display(visible=0, size=(640, 400))
display.start()

def show_video():
  mp4list = glob.glob('video/*.mp4')
  if len(mp4list) > 0:
    mp4 = mp4list[0]
    video = io.open(mp4, 'r+b').read()
    encoded = base64.b64encode(video)
    ipythondisplay.display(HTML(data='''<video alt="test" autoplay 
                loop controls style="height: 400px;">
                <source src="data:video/mp4;base64,{0}" type="video/mp4" />
             </video>'''.format(encoded.decode('ascii'))))
  else: 
    print("Could not find video")
    
def reset_video():
  mp4list = glob.glob('video/*.mp4')
  for mp4 in mp4list:
    os.remove(mp4)

def wrap_env(env):
  env = Monitor(env, './video', force=True, video_callable=(lambda ep: ep % 100 == 0))
  reset_video()
  return env

In [None]:
# CartPoleをランダムに動かす
env = wrap_env(gym.make(ENV))
observation = env.reset()  # 最初に環境のresetが必要
for step in range(0, 200):
    action = np.random.choice(2)  # 0(カートを左に押す), 1(カートを右に押す)をランダムに返す
    env.render()
    observation, reward, done, info = env.step(action)  # actionを実行する
    if done:
      break
env.close()

In [None]:
show_video()

## 離散化の閾値を求める

bins(-2.4, 2.4, 6) のとき、

[-1.6, -0.8,  0. ,  0.8,  1.6]

となる。

こうすると、インデックスを用いて（0, 1, 2, 3, 4, 5)という離散値を割り当てられる。



In [None]:
def bins(clip_min, clip_max, num):
  return np.linspace(clip_min, clip_max, num + 1)[1:-1]

bins(-2.4, 2.4, 6)

array([-1.6, -0.8,  0. ,  0.8,  1.6])

# 離散値に変換

$cartPos * 6^0 + cartV * 6^1 + poleAngle * 6^2 + poleV * 6^3$

In [None]:
def digitize_state(observation):
  cart_pos, cart_v, pole_angle, pole_v = observation
  digitized = [
               np.digitize(cart_pos, bins=bins(-2.4, 2.4, NUM_DIZITIZED)),
               np.digitize(cart_v, bins=bins(-3.0, 3.0, NUM_DIZITIZED)),
               np.digitize(pole_angle, bins=bins(-0.5, 0.5, NUM_DIZITIZED)),
               np.digitize(pole_v, bins=bins(-2.0, 2.0, NUM_DIZITIZED))
  ]
  return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

In [None]:
GAMMA = 0.99
ETA = 0.5
MAX_STEPS = 200
NUM_EPISODES = 1000

# Brain

In [None]:
class Brain:
  def __init__(self, num_states, num_actions):
    self.num_actions = num_actions
    self.q_table = np.random.uniform(low=0, high=1, size=(NUM_DIZITIZED**num_states, num_actions))

  def bins(self, clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num+1)[1:-1]
  
  def digitize_state(self, observation):
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [
                np.digitize(cart_pos, bins=bins(-2.4, 2.4, NUM_DIZITIZED)),
                np.digitize(cart_v, bins=bins(-3.0, 3.0, NUM_DIZITIZED)),
                np.digitize(pole_angle, bins=bins(-0.5, 0.5, NUM_DIZITIZED)),
                np.digitize(pole_v, bins=bins(-2.0, 2.0, NUM_DIZITIZED))
    ]
    return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])
  
  def update_Q_table(self, observation, action, reward, observation_next):
    state = self.digitize_state(observation)
    state_next = self.digitize_state(observation_next)
    Max_Q_next = max(self.q_table[state_next][:])
    self.q_table[state, action] = self.q_table[state, action] + ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])

  # e-greedy法で少しずつランダム性を排除
  def decide_action(self, observation, episode):
    state = self.digitize_state(observation)
    epsilon = 0.5 * (1 / (episode + 1))

    if epsilon <= np.random.uniform(0, 1):
      action = np.argmax(self.q_table[state][:])
    else:
      action = np.random.choice(self.num_actions)
    return action


# Agent

In [None]:
class Agent:
  def __init__(self, num_states, num_actions):
    self.brain = Brain(num_states, num_actions)
  
  def update_Q_function(self, observation, action, reward, observation_next):
    self.brain.update_Q_table(observation, action, reward, observation_next)
  
  def get_action(self, observation, step):
    action = self.brain.decide_action(observation, step)
    return action


In [None]:
class Environment:
  def __init__(self):
    self.env = wrap_env(gym.make(ENV))
    num_states = self.env.observation_space.shape[0]
    num_actions = self.env.action_space.n

    self.agent = Agent(num_states, num_actions)
  
  def run(self):
    complete_episodes = 0
    is_episode_final = False

    for episode in range(NUM_EPISODES):
      observation = self.env.reset()

      for step in range(MAX_STEPS):
        
        action = self.agent.get_action(observation, episode)

        observation_next, _, done, _ = self.env.step(action)

        if done:
          if step < 195:
            reward = -1
            complete_episodes = 0
          else:
            reward = 1
            complete_episodes += 1
        else:
          reward = 0
        
        self.agent.update_Q_function(observation, action, reward, observation_next)

        observation = observation_next

        if done:
          print(f'{episode} Episode: Finished after {step + 1} time steps')
          break
        
      if is_episode_final is True:
        show_video()
        break
      
      if complete_episodes >= 10:
        print('10回連続成功')
        is_episode_final = True

In [None]:
cartpole_env = Environment()
cartpole_env.run()

0 Episode: Finished after 17 time steps
1 Episode: Finished after 13 time steps
2 Episode: Finished after 12 time steps
3 Episode: Finished after 33 time steps
4 Episode: Finished after 16 time steps
5 Episode: Finished after 50 time steps
6 Episode: Finished after 17 time steps
7 Episode: Finished after 10 time steps
8 Episode: Finished after 33 time steps
9 Episode: Finished after 32 time steps
10 Episode: Finished after 95 time steps
11 Episode: Finished after 140 time steps
12 Episode: Finished after 46 time steps
13 Episode: Finished after 74 time steps
14 Episode: Finished after 14 time steps
15 Episode: Finished after 102 time steps
16 Episode: Finished after 68 time steps
17 Episode: Finished after 35 time steps
18 Episode: Finished after 90 time steps
19 Episode: Finished after 48 time steps
20 Episode: Finished after 114 time steps
21 Episode: Finished after 19 time steps
22 Episode: Finished after 9 time steps
23 Episode: Finished after 11 time steps
24 Episode: Finished aft