In [None]:
# !pip install mujoco
# !pip install pip install gymnasium[mujoco]

### 概要
Mujoco環境下でのカートポールのQ学習による学習とその推論が行えるサンプル

### 注意事項
pkl形式で保存されるQTableは，パラメータによっては1GB以上の大きさになるので注意！

# Q学習

#### 構成
1. ライブラリのインポート
1. ハイパーパラメータの定義
1. 環境の用意
1. 離散化処理
1. Q値の初期化
2. メインループ
   1. ε-greedy
   2. 状態と報酬の取得
   3. Q値の更新
   4. 状態の更新

In [2]:
import gymnasium as gym
import numpy as np
import mujoco
import time
import pickle
import matplotlib.pyplot as plt
from tqdm import trange
from collections import defaultdict

# グラフ描画用の配列
record_episode = []
record_reward = []
record_step = []
model = defaultdict(lambda: dict())

'''
ハイパーパラメータ
'''
ALPHA = 0.05
GAMMA = 0.95 # 割引率
EPSILON = 0.1 # ε-greedy法のε
max_number_of_steps = 1000  # 1試行のstep数
num_consecutive_iterations = 5  # 学習完了評価に使用する平均試行回数
num_episodes = 80000  # 総試行回数
total_reward_vec = np.zeros(num_consecutive_iterations)  # 各試行の報酬を格納
num_planning_steps = 50 #プランニングステップ数

start_time = time.time()

rng = np.random.RandomState()

'''
環境の用意
'''
env = gym.make('InvertedPendulum-v4')#, render_mode="human" CartPole-v1
print(env.action_space.shape)
observation, info = env.reset()

'''
離散化
'''
num_dizitized = 5  # 状態の分割数
def bins(clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num + 1)[1:-1]

# 各値を離散値に変換
def digitize_state(observation):
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [
        np.digitize(cart_pos, bins=bins(-2.4, 2.4, num_dizitized)),
        np.digitize(cart_v, bins=bins(-3.0, 3.0, num_dizitized)),
        np.digitize(pole_angle, bins=bins(-0.5, 0.5, num_dizitized)),
        np.digitize(pole_v, bins=bins(-2.0, 2.0, num_dizitized))
    ]
    return sum([x * (num_dizitized**i) for i, x in enumerate(digitized)])


q_table = np.random.uniform(low=-1, high=1, size=(num_dizitized**4, 2))


'''
学習の実行
'''
# ---epsilon-グリーディ
def get_action_q(next_state, episode, epsilon):
    if epsilon <= np.random.uniform(0, 1):
        next_action = np.argmax(q_table[next_state])
    else:
        next_action = np.random.choice([0, 1])
    return next_action

# ---Q学習関連
def update_Qtable(q_table, state, action, reward, next_state):
    next_Max_Q=max(q_table[next_state][0],q_table[next_state][1] )
    q_table[state, action] = (1 - ALPHA) * q_table[state, action] + ALPHA * (reward + GAMMA * next_Max_Q)
    return q_table

# ---Dyna
def add_experience_to_model(state, action, next_state, reward):
    model[state][action] = tuple([next_state, reward])

def sample_from_model():
    state = rng.choice(list(model.keys()))
    action = rng.choice(list(model[state].keys()))
    next_state, reward = model[state][action]
    return state, action, next_state, reward


ts = time.time()
for episode in trange(num_episodes, leave=False):  # 試行数分繰り返す
    # 環境の初期化
    state,info = env.reset()
    observation = state[0],state[1],state[2],state[3],
    state = digitize_state(observation)
    action = np.argmax(q_table[state])
    episode_reward = 0


    for t in trange(max_number_of_steps, leave=False):  # 1試行のループ
        # print(action,state)
        observation, reward, done, _, info = env.step((action,))
        # 状態を保存
        # state_data.append((observation, action, reward)) # 現在
        # render_data.append(env.render()) # 次
        # episode_data.append(episode)
        step_num = 0

        # 報酬設計
        if done:
            if t < 195: # step = 195　で終了にしている．
                reward = 0  # 倒れたら罰則
            else:
                reward = 1  # 立ったまま終了時は罰則はなし
        else:
            reward = 1  # 各ステップで立ってたら報酬追加

        episode_reward += reward  # 報酬を追加

        # Q-tableを更新する
        next_state = digitize_state(observation)  # 観測状態を離散値に変換
        q_table = update_Qtable(q_table, state, action, reward, next_state)
        #Dyna-Q
        add_experience_to_model(state, action, next_state, reward)
        for i in range(num_planning_steps):
            state_in_model, action_in_model, next_state_in_model, reward_in_model = sample_from_model()
            update_Qtable(q_table, state_in_model, action_in_model, next_state_in_model, reward_in_model)
            # state, action, next_state, reward

        #  行動を選択
        action = get_action_q(next_state, episode, EPSILON)

        print(action)

        state = next_state
        step_num += 1
        # 終了時の処理
        if done:
            # if (episode+1)%10==0:
            #   print('Episode {0}: {1} steps, reward {2}, mean reward {3:.3f}'.format(episode+1, t+1, episode_reward, total_reward_vec.mean()))
            total_reward_vec = np.hstack((total_reward_vec[1:], episode_reward))  # 報酬を記録
            record_reward.append(total_reward_vec.mean()) # グラフ描画用に記録
            record_episode.append(episode+1)
            step_num = t
            record_step.append(step_num+1)
            break
env.close()

# Qテーブルを保存する
with open('q_table.pkl', 'wb') as f:
    pickle.dump(q_table, f)

# プロット
fig = plt.figure()
plt.subplot(1, 2, 1)
plt.plot(record_episode, record_reward, color="red")
plt.grid()
plt.xlabel("episode")
plt.ylabel("mean reward")

plt.subplot(1, 2, 2)
plt.plot(record_episode, record_step, color="blue")
plt.grid()
plt.xlabel("episode")
plt.ylabel("step")

plt.savefig("./q-learning_episode_per_reward.png")
plt.show()

()


                                       

AssertionError: (0,) (<class 'tuple'>) invalid

# 推論

#### 構成
1. ライブラリのインポート
2. 環境の用意
3. 離散化処理
4. QTableの読み込み
5. メインループ
   1. 環境の初期化
   2. Qtableから行動選択

In [None]:
import gymnasium as gym
import numpy as np
import mujoco
import time
import pickle
import matplotlib.pyplot as plt
from tqdm import trange
import random
import keyboard


#試験回数
test_num = 50
time_memo = []



'''
環境の用意
'''
env = gym.make('InvertedPendulum-v4')
possible_actions = [0,1]


# 離散化
num_dizitized = 100  # 状態の分割数
def bins(clip_min, clip_max, num):
    return np.linspace(clip_min, clip_max, num + 1)[1:-1]

# 各値を離散値に変換
def digitize_state(observation):
    cart_pos, cart_v, pole_angle, pole_v = observation
    digitized = [
        np.digitize(cart_pos, bins=bins(-2.4, 2.4, num_dizitized)),
        np.digitize(cart_v, bins=bins(-3.0, 3.0, num_dizitized)),
        np.digitize(pole_angle, bins=bins(-0.5, 0.5, num_dizitized)),
        np.digitize(pole_v, bins=bins(-2.0, 2.0, num_dizitized))
    ]
    return sum([x * (num_dizitized**i) for i, x in enumerate(digitized)])

# 保存されたQテーブルを読み込む
with open('q_table.pkl', 'rb') as f:
    q_table = pickle.load(f)

def select_action(state):
    # 現在の状態に対する最適な行動をQテーブルから選択する
    if state in q_table:
        action = max(q_table[state], key=q_table[state].get)
    else:
        # 状態が未知の場合はランダムな行動を選択するなどの戦略を考えることもあります
        action = random.choice(possible_actions)

    return action

print("qキー長押しで強制終了")

for Test in trange(test_num):
    # 環境の初期化
    state,info = env.reset()
    observation = state[0],state[1],state[2],state[3],
    state = digitize_state(observation)
    action = select_action(state)
    start_time = time.time()
    if keyboard.is_pressed('q'):
        break

    while True:  # 1試行のループ

        action = select_action(state)
        observation, reward, done, _, info = env.step((action,))
        next_state = digitize_state(observation)
        state = next_state

        ts = time.time()

        if done:
            time_memo.append(ts-start_time)
            # print('{:.2f}'.format(ts-start_time),"sec")
            break

env.close()

# Output result
print("カートポールが立っていた平均時間を出力：")
print("Time_memn:",'{:.2f}'.format(sum(time_memo)/len(time_memo)),"sec")