# Mujoco Gymnasium template
MujocoとGymnasiumを用いた強化学習のために環境を簡単につくるためのひな型

##  参考
- [Gymnasium Documentation](https://gymnasium.farama.org )
- [test_mujoco_custom_env.py](https://github.com/Farama-Foundation/Gymnasium/blob/main/tests/envs/mujoco/test_mujoco_custom_env.py)
- [GitHub mujoco_py_env.py](https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/mujoco/mujoco_py_env.py#L108)
- [GitHub inverted_pendulum.py](https://github.com/Farama-Foundation/Gymnasium/blob/main/gymnasium/envs/mujoco/inverted_pendulum.py)
- [【Python】物理エンジンMuJoCoの紹介＆MJCFドキュメント【MuJoCoチュートリアル①】](https://qiita.com/Yayoi-Habami/items/1bf5a3e05b1516a90381)
- [mujoco_pyの使用例](https://qiita.com/mk_yng/items/3160b48f32144f352b05#site%E3%82%BF%E3%82%B0%E3%81%AE%E3%81%A4%E3%81%84%E3%81%9F%E3%83%91%E3%83%BC%E3%83%84%E3%81%AE%E3%82%BF%E3%82%B9%E3%82%AF%E7%A9%BA%E9%96%93%E4%B8%AD%E3%81%AE%E4%BD%8D%E7%BD%AE%E3%81%A8%E9%80%9F%E5%BA%A6%E3%82%92%E5%8F%96%E5%BE%97%E3%81%99%E3%82%8B)

# Settings

In [None]:
ENV_NAME = "TemplateEnv-v0"  # 実行する課題名
NUM_EPISODES = 5000  # 最大試行回数
MAX_STEPS = 1000  # 1試行のstep数
MODEL_PATH = "your/model/path/and/name.th"
XML_FILE =  "your_robot_and_env.xml"# 環境のモデルは，相対パスを通すか，自身の環境\gymnasium\envs\mujoco\assets\の直下に置いておく．
TENSORBOARD_LOG = "your/log/path"  # tensorboardのログ保存先
FRAME_SKIP = 5  #1回のstep() 呼び出しで何回シミュレーションを実行するか，フレームスキップ数を大きくすると計算効率が上がるが、精度が下がる

ちょっと幸せになれるかもしれないbot

In [None]:
# import json
# import time
# import requests

# class SlackNotifyBot(object):
#     def __init__(self, access_token):
#         self.__headers = {'Authorization': 'Bearer ' + access_token}

#     def send_to_slack(self, message,slack_ch):
#         API_URL = "https://slack.com/api/chat.postMessage"
#         headers = self.__headers
#         data = {
#             'channel': slack_ch,
#             'text': message
#         }
#         r = requests.post(API_URL, headers=headers, data=data)

# class LINENotifyBot(object):
#     API_URL = 'https://notify-api.line.me/api/notify'

#     def __init__(self, access_token):
#         self.__headers = {'Authorization': 'Bearer ' + access_token}

#     def send_to_line(self, message, image=None, sticker_package_id=None, sticker_id=None):
#         payload = {
#             'message': message,
#             'stickerPackageId': sticker_package_id,
#             'stickerId': sticker_id,
#         }
#         files = {'imageFile': open(image, 'rb')} if image else {}
#         requests.post(
#             LINENotifyBot.API_URL,
#             headers=self.__headers,
#             data=payload,
#             files=files,
#         )

# with open("./bot_setting.json", "r", encoding="utf-8") as f:
#     bot_settings = json.load(f)
# line_token = bot_settings["LINE_token"]["my_line_token"]
# slack_token = bot_settings["slack_token"]["my_slack_token"]
# my_slack_channel = bot_settings["slack_ch"]["my_slack_channel"]

# slack_bot = SlackNotifyBot(access_token=slack_token)
# line_bot = LINENotifyBot(access_token=line_token)

# Environment

In [105]:
import os
import warnings

import numpy as np
import pytest
import mujoco

from gymnasium import utils
from gymnasium.envs.mujoco import MujocoEnv
from gymnasium.error import Error
from gymnasium.spaces import Box
from gymnasium.utils.env_checker import check_env


class MyEnv(MujocoEnv, utils.EzPickle):
    """
    A simple mujuco env to test third party mujoco env, using the `Gymansium.MujocoEnv` environment API.
    """

    metadata = {
        "render_modes": [
            "human",
            "rgb_array",
            "depth_array",
        ],
    }

    def __init__(self, xml_file=XML_FILE, frame_skip=FRAME_SKIP, **kwargs):

        utils.EzPickle.__init__(self, xml_file, frame_skip, **kwargs)

        MujocoEnv.__init__(
            self,
            xml_file,
            frame_skip=frame_skip,
            observation_space=None,  # needs to be defined after
            default_camera_config={},
            **kwargs,
        )

        self.metadata = {
            "render_modes": [
                "human",
                "rgb_array",
                "depth_array",
            ],
            "render_fps": int(np.round(1.0 / self.dt)),
        }

        obs_size = len(self.data.sensordata) #センサー数の取得
        self.observation_space = Box(
            low=-np.inf, high=np.inf, shape=(obs_size,), dtype=np.float64
        )

        '''
        連続値をとる行動空間はenvs/mujoco/mujoco_env.pyにおける_set_action_spaceによって
        xmlで定義したアクチュエータから自動で行動空間を定義する．
        '''


    def step(self, action):

        x_position_before = self.data.qpos[0]
        self.do_simulation(action, self.frame_skip)
        x_position_after = self.data.qpos[0]

        obs = self._get_obs()

        reward = x_position_after - x_position_before # 例として前進した距離に応じて報酬を与える場合
        terminated = ... # 終了条件を設定する．
        truncation = ... # TimeLimit wrapperなどのwrapperで設定すると柔軟に設定できて良い．
        info = self._get_reset_info()

        if self.render_mode == "human":
            self.render()

        return obs, reward, terminated, truncation, info


    def _get_obs(self):

        '''観測データの取得例
        # MJCFで設定したセンサデータの取得
        self.data.sensordata[n].copy()

        # 各剛体の位置座標を取得
        body_positions = self.data.geom('body_name').xpos

        # 各剛体の姿勢角(クォータニオン)を取得
        body_orientations = self.data.geom('body_name').xquat

        # 各関節の角速度を取得
        joint_velocities = self.data.qvel

        # 各関節の角度位置を取得
        joint_positions = self.data.qpos
        '''

        obs = ...
        return obs


    def reset_model(self):

        '''
        example:
        set_stateでモデルのジョイント位置qposと速度qvelを設定.上記はランダムな位置と初速を与える場合の例．

        self.set_state(
            self.init_qpos
            + self.np_random.uniform(low=-0.1, high=0.1, size=self.model.nq),
            self.init_qvel + self.np_random.standard_normal(self.model.nv) * 0.1,)

        '''

        self.set_state(self.init_qpos,self. init_qvel) # 例として初期状態にリセットする場合
        return self._get_obs()

    def _get_reset_info(self):
        return {"works": True}


    def viewer_setup(self):
        """
        ビューアの設定
            1. ビューアが初期化されていることを確認
            2. ビューアのインスタンスを変数vに代入
            3. カメラがトラッキングする対象をモデルの先頭の剛体(インデックス0)に設定
                - これによりカメラがロボットの根元を中心に映すようになる
            4. カメラとトラッキング対象との距離を設定
                - モデル全体の大きさ(self.model.stat.extent)の半分の距離に設定
            5. カメラの注視点の高さを設定
                - 具体的な値は v.model.stat.center[2] に基づいて決められている
        """
        assert self.viewer is not None  # ビューアが初期化されていることを確認
        v = self.viewer
        v.cam.trackbodyid = 0  # カメラがトラッキングする対象を先頭の剛体に設定
        v.cam.distance = self.model.stat.extent * 0.5  # カメラと対象との距離を設定
        v.cam.lookat[2] = 0.12250000000000005  # カメラの注視点の高さを設定


CHECK_ENV_IGNORE_WARNINGS = [
    f"\x1b[33mWARN: {message}\x1b[0m"
    for message in [
        "A Box observation space minimum value is -infinity. This is probably too low.",
        "A Box observation space maximum value is infinity. This is probably too high.",
        "For Box action spaces, we recommend using a symmetric and normalized space (range=[-1, 1] or [0, 1]). See https://stable-baselines3.readthedocs.io/en/master/guide/rl_tips.html for more information.",
    ]
]


@pytest.mark.parametrize("frame_skip", [1, 2, 3, 4, 5])
def test_frame_skip(frame_skip):
    """verify that custom envs work with different `frame_skip` values"""
    env = MyEnv(frame_skip=frame_skip)

    # Test if env adheres to Gym API
    with warnings.catch_warnings(record=True) as w:
        check_env(env.unwrapped, skip_render_check=True)
        env.close()
    for warning in w:
        if warning.message.args[0] not in CHECK_ENV_IGNORE_WARNINGS:
            raise Error(f"Unexpected warning: {warning.message}")


def test_xml_file():
    """Verify that the loading of a custom XML file works"""
    relative_path = "./tests/envs/mujoco/assets/walker2d_v5_uneven_feet.xml"
    env = MyEnv(xml_file=relative_path).unwrapped
    assert isinstance(env, MujocoEnv)
    assert env.data.qpos.size == 9

    full_path = os.getcwd() + "/tests/envs/mujoco/assets/walker2d_v5_uneven_feet.xml"
    env = MyEnv(xml_file=full_path).unwrapped
    assert isinstance(env, MujocoEnv)
    assert env.data.qpos.size == 9

    # note can not test user home path (with '~') because github CI does not have a home folder


def test_reset_info():
    """Verify that the environment returns info at `reset()`"""
    env = MyEnv()

    _, info = env.reset()
    assert info["works"] is True

In [106]:
import gymnasium as gym
gym.envs.registration.register(id=ENV_NAME,entry_point=MyEnv)

  logger.warn(f"Overriding environment {new_spec.id} already in registry.")


# Learning

ランダムな行動

In [None]:
import gymnasium as gym
import numpy as np
from tqdm import trange

env = gym.make(ENV_NAME, render_mode="human")

print("action_space:",env.action_space.shape)
print("observation_space:",env.observation_space.shape[0])

max_number_of_steps = 1000
num_consecutive_iterations = 100
num_episodes = 100
last_time_steps = np.zeros(num_consecutive_iterations)

for episode in trange(num_episodes):
    # 環境の初期化
    observation = env.reset()
    episode_reward = 0

    for step in range(max_number_of_steps):
        action = env.action_space.sample()

        # 行動の実行とフィードバックの取得
        observation, reward, done, _, info = env.step(action)
        episode_reward += reward

        if done or step == max_number_of_steps - 1:
            print("sum_reward:",episode_reward)
            # print('%d Episode finished after %d time steps / mean %f' % (episode, t + 1,
            #     last_time_steps.mean()))
            # last_time_steps = np.hstack((last_time_steps[1:], [episode_reward]))
            break

env.close()