# AI実践演習(2)提出用
* 学籍番号：2364902
* 氏名：金 奎碩



---


## 演習(2): GymnasiumのCartPole環境に対するDQNを用いた強化学習の実装


### 1. 前準備

In [1]:
# 前準備: 必要なライブラリを読み込む
!pip install gymnasium

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


### 2. 学習させるデバイスの設定

In [2]:
import torch

if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

print(f"Using {device} device")

Using cpu device


### 3. 学習の設定（データの準備に相当）

`09_深層学習の発展的トピック.ipynb`では`Pendulum-v1`を学習対象の環境としていたが

今回の場合は`CartPole-v1`の環境であるためtarget_envをCartPole-v1に変更している。

また、`Pendulum-v1`場合はアクションを-1と1として設定しているが今回は
* 行動
    * Cartを左右どちらかの方向に押す
        * 0: Cartを左に押す
        * 1: Cartを右に押す

として設定されているので`possible_actions`を[-1, 1]から[0, 1]に変更した。

そして、終了条件が以下であるため
* 終了条件
    * 最大ステップ数（500）に到達する
    * Poleの角度が±12°を超える
    * Cartの位置が±2.4を超える

最大ステップ数である`max_step`を500に設定した。また、Poleの角度が±12°とCartの位置が±2.4の条件は`CartPole-v1`環境の基本動作であるため、特別に指定する必要がない。

In [10]:
from datetime import datetime

max_episode = 200  # 学習において繰り返す最大エピソード数
max_step = 500  # 1エピソードの最大ステップ数
n_warmup_steps = 10000  # warmupを行うステップ数
save_interval = 1  # モデルを吐き出すステップ間隔
possible_actions = [0, 1]  # 行動(action)の取りうる値のリスト
gamma = 0.99  # 割引率
epsilon = 0.1  # ε-greedyのパラメータ
memory_size = 10000  # 取得した経験を保存するメモリバッファのサイズ
batch_size = 128  # バッチサイズ
target_update_interval = 10  # Double DQNにおいて、学習していない方のDQNのパラメータを更新する間隔
result_dir = './result/pendulum' + datetime.now().strftime('%Y%m%d%H%M')  # 結果を保存するフォルダ（実行した時刻をフォルダ名に付ける）
target_env = 'CartPole-v1'  # 学習対象の環境


### 4. 強化学習で利用するクラス・関数を定義する（学習モデルの設定と関数定義に相当）

9回の授業で授業支援システムからアップロードされた

「dqn_sample.py」ファイルをそのまま利用する。

In [5]:
from dqn_sample import Qnetwork, EpsilonGreedyPolicy

### 5. 学習の実行

#### インスタンスの作成

In [11]:
import gymnasium as gym
import os

# 学習したモデルを保存するフォルダを作成
os.makedirs(result_dir, exist_ok=True)
print('保存先：' + result_dir)

# 環境を作成
env = gym.make(target_env)
dim_state = env.env.observation_space.shape[0]

# Q学習用のインスタンスを作成
q_network = Qnetwork(dim_state, possible_actions, device=device, gamma=gamma)
policy = EpsilonGreedyPolicy(q_network, epsilon=epsilon)

保存先：./result/pendulum202501230605


#### warm up処理

`CartPole-v1`の場合基本的に終了ステップを含む、実行されたステップごとに+1として報酬が設定されているため、get_reward()のような報酬を決める関数が必要ない。

よって、memory.append()にrewardをそのまま使うことでwarm up処理が可能である。

In [12]:
import random

print('warming up {:,} steps...'.format(n_warmup_steps))
memory = []
state, info = env.reset()
for step in range(n_warmup_steps):
    action = random.choice(possible_actions)

    observation, reward, terminated, truncated, info = env.step(action)

    memory.append((state, action, reward, observation, terminated, truncated))
    state = observation

    if terminated or truncated:
        state, info = env.reset()

memory = memory[-memory_size:]
print('warming up {:,} steps... done.'.format(n_warmup_steps))

warming up 10,000 steps...
warming up 10,000 steps... done.


#### DQNの学習

In [16]:
import os
import random
import numpy as np

print('training {:,} episodes...'.format(max_episode))
for episode in range(max_episode):
    episode_rewards, losses = [], []
    state, info = env.reset()  # 環境を初期化

    for step in range(max_step):
        action, epsilon, q_values = policy.get_action(state, possible_actions)  # 次の行動を取得
        observation, reward, terminated, truncated, info = env.step(action)  # 行動して結果を取得

        memory.append((state, action, reward, observation, terminated, truncated))  # メモリバッファに行動の結果を保存
        episode_rewards.append(reward)  # 報酬をリストに追加
        exps = random.sample(memory, batch_size)  # メモリバッファからミニバッチを取得
        loss, td_error = q_network.update_on_batch(exps)  # DQNのパラメータ更新
        losses.append(loss)  # 学習時の損失をリストに追加

        if q_network.double_mode and step % target_update_interval == 0:
            q_network.sync_target_network(soft=0.01)  # 学習していない方のネットワークにパラメータを反映
        state = observation  # 次の行動を推定するための状態を保存
        memory = memory[-memory_size:]  # メモリバッファのサイズを超えた分を削除

        # 環境の終了条件に該当した場合
        if terminated or truncated:
            reward_avg = np.mean(episode_rewards)  # 報酬の平均値を計算
            loss_avg = np.mean(losses)  # 損失の平均値を計算
            print('episode[{}/{}] ... reward_avg:{} loss:{}'.format(episode, max_episode, reward_avg, loss_avg))
            if episode % save_interval == 0:
                model_path = os.path.join(result_dir, 'episode_{:03}.pth'.format(episode))
                q_network.save_model(model_path)  # 学習済みモデルを保存
            break

env.close()  # 環境のクローズ
print('training {:,} episodes... done.'.format(max_episode))


training 200 episodes...
episode[0/200] ... reward_avg:1.0 loss:2881.166248361688
episode[1/200] ... reward_avg:1.0 loss:107.62634890874227
episode[2/200] ... reward_avg:1.0 loss:103.90321548863461
episode[3/200] ... reward_avg:1.0 loss:101.06659766223943
episode[4/200] ... reward_avg:1.0 loss:102.01351712544759
episode[5/200] ... reward_avg:1.0 loss:93.76575739999835
episode[6/200] ... reward_avg:1.0 loss:101.69937851405379
episode[7/200] ... reward_avg:1.0 loss:106.38587745666504
episode[8/200] ... reward_avg:1.0 loss:89.77785102688536
episode[9/200] ... reward_avg:1.0 loss:104.91202931177048
episode[10/200] ... reward_avg:1.0 loss:101.53616375393338
episode[11/200] ... reward_avg:1.0 loss:120.24345787711765
episode[12/200] ... reward_avg:1.0 loss:101.95820495409843
episode[13/200] ... reward_avg:1.0 loss:105.32779509226481
episode[14/200] ... reward_avg:1.0 loss:114.9558895111084
episode[15/200] ... reward_avg:1.0 loss:93.80755267226905
episode[16/200] ... reward_avg:1.0 loss:93.698

### 6. 学習済みモデルを用いてPendulumの予測制御を行い、結果の動画を再生する


In [17]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

## 予測に用いる設定
weight_path = result_dir + '/episode_{:03}.pth'.format(max_episode-1)
max_step = 200  # 1エピソードの最大ステップ数
possible_actions = [0, 1]  # 行動(action)の取りうる値のリスト
episode_num = 10
predict_movie_dir = result_dir + '/movie'
videos = set()  # 保存したビデオ名を保存する集合

## 環境を準備
env = gym.make(target_env, render_mode='rgb_array')
env = RecordVideo(env, predict_movie_dir)
dim_state = env.env.observation_space.shape[0]

q_network = Qnetwork(dim_state, possible_actions)
q_network.main_network.load_state_dict(torch.load(weight_path,  map_location=device, weights_only=True))
policy = EpsilonGreedyPolicy(q_network, epsilon=0)  # εを0にすることでランダムな行動が選択されないようにする

print('start_episodes')
for episode in range(episode_num):
    state, info = env.reset(seed=episode)
    score = 0
    for step in range(max_step):
        action, epsilon, q_values = policy.get_action(state, possible_actions)
        next_state, reward, terminated, truncated, info = env.step(action)

        score += reward
        state = next_state
        if terminated or truncated:
            print('episode{}, score:{}'.format(episode, score))
            break

## 保存した動画の再生
import base64
import io
import os
from IPython.display import display, HTML

## 結果の出力先を準備
for f in os.listdir(predict_movie_dir):
    if f.endswith('.mp4'):
        video = io.open(predict_movie_dir + '/' + f, 'r+b').read()
        encoded = base64.b64encode(video)

        display(HTML(data='''
            <video controls>
            <source src="data:video/mp4;base64,{}" type="video/mp4" />
            </video>
            '''.format(encoded.decode('ascii'))))

start_episodes
episode0, score:136.0
episode1, score:116.0
episode2, score:118.0
episode3, score:116.0
episode4, score:105.0
episode5, score:111.0
episode6, score:123.0
episode7, score:109.0
episode8, score:113.0
episode9, score:113.0


### 考察
結果を見るとシミュレーション1から5までは片方向にずっと行ってしまうことがわかる。
そして、シミュレーション6が一番中心に立って均衡を保つように見える。


これを見て片方向にずっとカートが移動してしまうことを防ぐために以下のような報酬関数を追加するとどうなるか確認したいと思う。
```
def get_reward(observation, reward):
    position, velocity, angle, angular_velocity = observation
    # 中央に近い位置と角度を高く評価
    angle_penalty = abs(angle)  # 角度が大きいときにパネルティ増加
    position_penalty = abs(position)  # 中心から離れるとパネルティ増加
    return reward - (angle_penalty + position_penalty)
```

これを利用してもう一回学習すると

#### 学習の設定

In [19]:
from datetime import datetime

max_episode = 200  # 学習において繰り返す最大エピソード数
max_step = 500  # 1エピソードの最大ステップ数
n_warmup_steps = 10000  # warmupを行うステップ数
save_interval = 1  # モデルを吐き出すステップ間隔
possible_actions = [0, 1]  # 行動(action)の取りうる値のリスト
gamma = 0.99  # 割引率
epsilon = 0.1  # ε-greedyのパラメータ
memory_size = 10000  # 取得した経験を保存するメモリバッファのサイズ
batch_size = 128  # バッチサイズ
target_update_interval = 10  # Double DQNにおいて、学習していない方のDQNのパラメータを更新する間隔
result_dir = './result/pendulum' + datetime.now().strftime('%Y%m%d%H%M')  # 結果を保存するフォルダ（実行した時刻をフォルダ名に付ける）
target_env = 'CartPole-v1'  # 学習対象の環境

def get_reward(observation, reward):
    position, velocity, angle, angular_velocity = observation
    # 中央に近い位置と角度を高く評価
    angle_penalty = abs(angle)  # 角度が大きいときにパネルティ増加
    position_penalty = abs(position)  # 中心から離れるとパネルティ増加
    return reward - (angle_penalty + position_penalty)


#### インスタンスの作成

In [22]:
import gymnasium as gym
import os

# 学習したモデルを保存するフォルダを作成
os.makedirs(result_dir, exist_ok=True)
print('保存先：' + result_dir)

# 環境を作成
env = gym.make(target_env)
dim_state = env.env.observation_space.shape[0]

# Q学習用のインスタンスを作成
q_network = Qnetwork(dim_state, possible_actions, device=device, gamma=gamma)
policy = EpsilonGreedyPolicy(q_network, epsilon=epsilon)

保存先：./result/pendulum202501230636


#### warm up処理

In [20]:
import random

print('warming up {:,} steps...'.format(n_warmup_steps))
memory = []
state, info = env.reset()
for step in range(n_warmup_steps):
    action = random.choice(possible_actions)

    observation, reward, terminated, truncated, info = env.step(action)

    c_reward = get_reward(observation, reward)
    memory.append((state, action, c_reward, observation, terminated, truncated))
    state = observation

    if terminated or truncated:
        state, info = env.reset()

memory = memory[-memory_size:]
print('warming up {:,} steps... done.'.format(n_warmup_steps))

warming up 10,000 steps...
warming up 10,000 steps... done.


#### DQNの学習

In [23]:
import os
import random
import numpy as np

print('training {:,} episodes...'.format(max_episode))
for episode in range(max_episode):
    episode_rewards, losses = [], []
    state, info = env.reset()  # 環境を初期化

    for step in range(max_step):
        action, epsilon, q_values = policy.get_action(state, possible_actions)  # 次の行動を取得
        observation, reward, terminated, truncated, info = env.step(action)  # 行動して結果を取得

        c_reward = get_reward(observation, reward)  # 自身で定義した報酬関数から報酬を取得

        memory.append((state, action, c_reward, observation, terminated, truncated))  # メモリバッファに行動の結果を保存
        episode_rewards.append(c_reward)  # 報酬をリストに追加
        exps = random.sample(memory, batch_size)  # メモリバッファからミニバッチを取得
        loss, td_error = q_network.update_on_batch(exps)  # DQNのパラメータ更新
        losses.append(loss)  # 学習時の損失をリストに追加

        if q_network.double_mode and step % target_update_interval == 0:
            q_network.sync_target_network(soft=0.01)  # 学習していない方のネットワークにパラメータを反映
        state = observation  # 次の行動を推定するための状態を保存
        memory = memory[-memory_size:]  # メモリバッファのサイズを超えた分を削除

        # 環境の終了条件に該当した場合
        if terminated or truncated:
            reward_avg = np.mean(episode_rewards)  # 報酬の平均値を計算
            loss_avg = np.mean(losses)  # 損失の平均値を計算
            print('episode{} ... reward_avg:{} loss:{}'.format(episode, reward_avg, loss_avg))
            if episode % save_interval == 0:
                model_path = os.path.join(result_dir, 'episode_{:03}.pth'.format(episode))
                q_network.save_model(model_path)  # 学習済みモデルを保存
            break

env.close()  # 環境のクローズ
print('training {:,} episodes... done.'.format(max_episode))

training 200 episodes...
episode0 ... reward_avg:0.8362614777870476 loss:0.8032387420535088
episode1 ... reward_avg:0.8214510854333639 loss:0.815458458662033
episode2 ... reward_avg:0.8774507148191333 loss:0.8058135569095611
episode3 ... reward_avg:0.858284160391324 loss:0.7949362463421292
episode4 ... reward_avg:0.8429965682327747 loss:0.7789298713207244
episode5 ... reward_avg:0.8532610544934869 loss:0.7423019230365753
episode6 ... reward_avg:0.8090750715798802 loss:0.7094431122144064
episode7 ... reward_avg:0.8057234827429056 loss:0.7171853065490723
episode8 ... reward_avg:0.8222808386975279 loss:0.9922081840535005
episode9 ... reward_avg:0.8702264441177249 loss:1.6653286159038543
episode10 ... reward_avg:0.8584396287798881 loss:2.0853296279907227
episode11 ... reward_avg:0.8334620594978333 loss:2.5785897076129913
episode12 ... reward_avg:0.8259687148034572 loss:2.8055988073349
episode13 ... reward_avg:0.862198070436716 loss:3.832444167137146
episode14 ... reward_avg:0.8367403335869

####  学習済みモデルを用いてPendulumの予測制御を行い、結果の動画を再生する

In [24]:
import gymnasium as gym
from gymnasium.wrappers import RecordVideo

## 予測に用いる設定
weight_path = result_dir + '/episode_{:03}.pth'.format(max_episode-1)
max_step = 200  # 1エピソードの最大ステップ数
possible_actions = [0, 1]  # 行動(action)の取りうる値のリスト
episode_num = 10
predict_movie_dir = result_dir + '/movie'
videos = set()  # 保存したビデオ名を保存する集合

## 環境を準備
env = gym.make(target_env, render_mode='rgb_array')
env = RecordVideo(env, predict_movie_dir)
dim_state = env.env.observation_space.shape[0]

q_network = Qnetwork(dim_state, possible_actions)
q_network.main_network.load_state_dict(torch.load(weight_path,  map_location=device, weights_only=True))
policy = EpsilonGreedyPolicy(q_network, epsilon=0)  # εを0にすることでランダムな行動が選択されないようにする

print('start_episodes')
for episode in range(episode_num):
    state, info = env.reset(seed=episode)
    score = 0
    for step in range(max_step):
        action, epsilon, q_values = policy.get_action(state, possible_actions)
        next_state, reward, terminated, truncated, info = env.step(action)

        score += reward
        state = next_state
        if terminated or truncated:
            print('episode{}, score:{}'.format(episode, score))
            break

## 保存した動画の再生
import base64
import io
import os
from IPython.display import display, HTML

## 結果の出力先を準備
for f in os.listdir(predict_movie_dir):
    if f.endswith('.mp4'):
        video = io.open(predict_movie_dir + '/' + f, 'r+b').read()
        encoded = base64.b64encode(video)

        display(HTML(data='''
            <video controls>
            <source src="data:video/mp4;base64,{}" type="video/mp4" />
            </video>
            '''.format(encoded.decode('ascii'))))

start_episodes


### 結論
`CartPole-v1`環境は基本的な報酬と終了条件が定められている。
* 報酬
  * 終了ステップを含む、実行されたステップごとに+1

* 終了条件
    * 最大ステップ数（500）に到達する
    * Poleの角度が±12°を超える
    * Cartの位置が±2.4を超える

これは`env.reset()`として環境を初期化するときに定められる。

しかし、このように棒が倒れないと次のステップになることで報酬を+1もらうだけでは最初の結果のように片方向にずっと移動して終了してしまうことがよく起こる可能性がある。

よって、中心から離れることと棒の中心からの角度が大きくなると報酬にパネルティを与えることでより中心に近い位置で均衡を保つことができるように学習することができると思う。

実際に、パネルティを定義した学習のほうがより中心に近い位置で均衡を保つことが確認できるためこのようなパネルティは必要であると考えられる。