<a href="https://colab.research.google.com/github/chonholee/tutorial/blob/main/rl/example_gym_CartPole.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Example CartPole

In [None]:
import gym

env = gym.make("CartPole-v0")

for episode in range(10):

    env.reset()

    total_reward = 0

    for t in range(50):

        action = env.action_space.sample()
        state, reward, done, info = env.step(action)

        total_reward += reward

        print(episode, t, state, reward, total_reward, done)

        if done:
            print('Failed')
            break

アニメーション動画の保存

In [None]:
import gym

env = gym.make("CartPole-v0")

frames = []

for episode in range(10):

    env.reset()

    for t in range(50):

        action = env.action_space.sample()
        env.step(action)
        frames.append(env.render(mode='rgb_array'))


plt.figure()
patch = plt.imshow(frames[0])
plt.axis('off')

def animate(i):
    patch.set_data(frames[i])

anim = animation.FuncAnimation(
    plt.gcf(), animate, frames=len(frames),interval=50)

anim.save('example.mp4', "ffmpeg")

# Q学習（クラス化）

In [94]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import animation

import gym

# 各状態の分割数
NUM_DIZITIZED = 6

# 学習パラメータ
GAMMA = 0.99  # 時間割引率
ETA = 0.5  # 学習係数

class State:
    def __init__(self, num_states, num_actions):
        # 行動数を取得
        self.num_actions = num_actions

        # Qテーブルを作成　(分割数^状態数)×(行動数)
        self.q_table = np.random.uniform(
            low=-1,
            high=1,
            size=(NUM_DIZITIZED**num_states, num_actions)
        )

    def bins(self, clip_min, clip_max, num):
        # 観測した状態デジタル変換する閾値を求める
        return np.linspace(clip_min, clip_max, num + 1)[1:-1]

    def analog2digitize(self, observation):
        #状態の離散化
        cart_pos, cart_v, pole_angle, pole_v = observation
        digitized = [
            np.digitize(cart_pos, bins=self.bins(-2.4, 2.4, NUM_DIZITIZED)),
            np.digitize(cart_v, bins=self.bins(-3.0, 3.0, NUM_DIZITIZED)),
            np.digitize(pole_angle, bins=self.bins(-0.5, 0.5, NUM_DIZITIZED)),
            np.digitize(pole_v, bins=self.bins(-2.0, 2.0, NUM_DIZITIZED))
        ]
        return sum([x * (NUM_DIZITIZED**i) for i, x in enumerate(digitized)])

    def update_Q_table(self, observation, action, reward, observation_next):
        # 状態の離散化
        state = self.analog2digitize(observation)
        state_next = self.analog2digitize(observation_next)
        Max_Q_next = max(self.q_table[state_next][:])

        # Qテーブルを更新(Q学習)
        self.q_table[state, action] = self.q_table[state, action] + \
            ETA * (reward + GAMMA * Max_Q_next - self.q_table[state, action])

    def decide_action(self, observation, episode):
        # ε-greedy法で行動を選択する
        state = self.analog2digitize(observation)
        epsilon = 0.5 * (1 / (episode + 1))

        if epsilon <= np.random.uniform(0, 1):
            # 最も価値の高い行動を行う。
            action = np.argmax(self.q_table[state][:])
        else:
            # 適当に行動する。
            action = np.random.choice(self.num_actions)
        return action


In [95]:
class Agent:
    def __init__(self, num_states, num_actions):
        # 環境を生成
        self.state = State(num_states, num_actions)

    def update_Q_function(self, observation, action, reward, observation_next):
        # Qテーブルの更新
        self.state.update_Q_table(observation, action, reward, observation_next)

    def get_action(self, observation, step):
        # 行動
        action = self.state.decide_action(observation, step)
        return action

In [96]:
# 最大のステップ数
MAX_STEPS = 100
# 最大の試行回数
NUM_EPISODES = 500
# 成功とみなすステップ数
SUCESS_STEPS=50
# 連続成功回数
SUCESS_CONSECUTIVE_EISODES=5

class Environment():
    def __init__(self, toy_env):
        # 環境名
        self.env_name = toy_env
        # 環境を生成
        self.env = gym.make(toy_env)
        # 状態数を取得
        num_states = self.env.observation_space.shape[0]
        # 行動数を取得
        num_actions = self.env.action_space.n
        # Agentを生成
        self.agent = Agent(num_states, num_actions)

    def run(self):
        complete_episodes = 0 # 成功数
        step_list = []
        is_episode_final = False  # 最後の試行
        is_failed = False
        frames = []  # 画像を保存する変数

        # 試行数分繰り返す
        for episode in range(NUM_EPISODES):

            observation = self.env.reset()  # 環境の初期化

            for step in range(MAX_STEPS):

                # 最後の試行のみ画像を保存する。
                if is_episode_final or is_failed:
                    frames.append(self.env.render(mode='rgb_array'))

                # 行動を求める
                action = self.agent.get_action(observation, episode)
                # 行動a_tの実行により、s_{t+1}, r_{t+1}を求める
                observation_next, _, done, _ = self.env.step(action)

                # 報酬を与える
                if done:  # ステップ数がMAX経過するか、一定角度以上傾くとdoneはtrueになる
                    if step < SUCESS_STEPS:
                        reward = -1  # 失敗したので-1の報酬を与える
                        complete_episodes = 0  # 成功数をリセット
                    else:
                        reward = 1  # 成功したので+1の報酬を与える
                        complete_episodes += 1  # 連続成功記録を更新
                else:
                    reward = 0

                # Qテーブルを更新する
                self.agent.update_Q_function(observation, action, reward, observation_next)

                # 観測の更新
                observation = observation_next

                # 終了時の処理
                if done:
                    print('%d Episode finished after %f time steps / num_success %d' %
                           (episode, step, complete_episodes))

                    step_list.append(step+1)
                    break

            if is_episode_final or is_failed:
                es = np.arange(0, len(step_list))
                plt.plot(es, step_list)
                plt.savefig("cartpole.png")
                plt.figure()
                patch = plt.imshow(frames[0])
                plt.axis('off')

                def animate(i):
                    patch.set_data(frames[i])

                anim = animation.FuncAnimation(plt.gcf(), animate,
                                               frames=len(frames),interval=50)

                # 最後の試行を動画ファイルに保存
                if is_episode_final:
                    anim.save(self.env_name+'.mp4', "ffmpeg")
                if is_failed:
                    anim.save(self.env_name+'_failed.mp4', "ffmpeg")
                break

            # 指定回連続成功したら最後の試行を行って結果を描画する
            if complete_episodes >= SUCESS_CONSECUTIVE_EISODES:
                print(f'--- {SUCESS_CONSECUTIVE_EISODES}回連続成功 ---')
                is_episode_final = True

            if episode == NUM_EPISODES-2:
                print('--- 失敗：学習不足 ---')
                is_failed = True

In [None]:
TOY = "CartPole-v1"

def main():
    cartpole = Environment(TOY)
    cartpole.run()

if __name__ == "__main__":
    main()