## 这一节介绍env的基本使用
env的基本方法
step,reset,render,close,seed

In [1]:
#了解一个env的状态空间和动作空间
import gym
env = gym.make('CartPole-v0')
print("env.action_space: ", env.action_space)
#env.action_space:  Discrete(2)
print("env.observation_space: ", env.observation_space)
#env.observation_space:  Box(-3.4028234663852886e+38, 3.4028234663852886e+38, (4,), float32)
print (env.observation_space.high ) 
#[4.8000002e+00 3.4028235e+38 4.1887903e-01 3.4028235e+38]
print (env.observation_space.low ) 
#[-4.8000002e+00 -3.4028235e+38 -4.1887903e-01 -3.4028235e+38]
env.reset()
#array([ 0.01746862, -0.01032063, -0.02640114, -0.02186856])
# 这四个数字组成的状态变量（state variables）分别含义如下：
# 0.03749292： 小车在轨道上的位置（position of the cart on the track）
# -0.03226631： 杆子与竖直方向的夹角（angle of the pole with the vertical）
# 0.01609263： 小车速度（cart velocity）
# -0.04661368： 角度变化率（rate of change of the angle）

array([ 0.01746862, -0.01032063, -0.02640114, -0.02186856])

In [None]:
#自定义空间
import gym
observation_space = gym.spaces.Box(low=0, high=255, shape=(4, 84, 84))
action_space=gym.spaces.Discrete(8)

In [None]:
#gym最基本的使用
import gym
env = gym.make('CartPole-v0')
env.reset()
#执行一个向左的操作
obj, reward, done, info = env.step(1) #1 向右 0向左
print("obj", obj)
print("reward", reward)
print("done", done)
print("info", info)

for _ in range(1000):
    env.render() 
    #随机获取一个动作进行执行
    obj, reward, done, info = env.step(env.action_space.sample()) # take a random action
    if done:
        env.reset()
env.close()

# obj [-0.02268437  0.60981794 -0.04339413 -0.87715228]
# reward 1.0
# done False
# info {}
#一个动作执行后，环境会返回四个变量（obj:新的状态（对照前面环境初始化的状态理解）、reward：指定该动作获得的奖励值（在游戏中的得分）、
#                                 done:回合是否结束（你控制的小人是不是死了，对应回合结束）、info:额外信息（该游戏较简单，info为空））

In [None]:
#使用gym自带的录像功能,注意录像是以episode为单位的
import gym
env = gym.make('CartPole-v0')
from gym import wrappers
env = wrappers.Monitor(env,"./gym-outputs", force=True)
env.reset()
for _ in range(1000):
    # env.render() 
    obj, reward, done, info = env.step(env.action_space.sample()) # take a random action
    if done:
        break
env.close()

# ##在notebook中关闭环境后，可以网页播放生成的mp4文件
# import io
# import base64
# from IPython.display import HTML
# video = io.open('./gym-outputs/openaigym.video.%s.video000000.mp4' % env.file_infix, 'r+b').read()
# encoded = base64.b64encode(video)
# HTML(data='''
#     <video width="360" height="auto" alt="test" controls><source src="data:video/mp4;base64,{0}" type="video/mp4" /></video>'''
# .format(encoded.decode('ascii')))


## 这一节使用wrapper对gym环境进行处理

In [None]:
#先引入下相关包
import gym_super_mario_bros
from gym.spaces import Box
from gym import Wrapper
from nes_py.wrappers import JoypadSpace#BinarySpaceToDiscreteSpaceEnv
from gym_super_mario_bros.actions import SIMPLE_MOVEMENT, COMPLEX_MOVEMENT, RIGHT_ONLY
import cv2
import numpy as np
import subprocess as sp

In [None]:
# RGB图像转灰度图
#借助cv2即（opencv）包快速转换COLOR_RGB2GRAY
def process_frame(frame):
    if frame is not None:
        frame = cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) #图像转换
        frame = cv2.resize(frame, (84, 84))[None, :, :] / 255. #裁剪合适大小，并归一化
        return frame
    else:
        return np.zeros((1, 84, 84))

In [None]:
#写一个继承Wrapper的包装类，一定注意在构造方法中调用父类的构造函数
#1.包装类copy了原环境的以下信息,在新的类初始化时要进行相应的修改,虽然这些量
#  在实现新的环境逻辑时不一定用得到;此外在包装类初始化时还要定义一些别的要
#  使用的变量
'''self.env = env
   self.action_space = self.env.action_space
   self.observation_space = self.env.observation_space
   self.reward_range = self.env.reward_range
   self.metadata = self.env.metadata'''
#2.同时新的环境逻辑通过重写step和reset方法实现,只能重写step和reset

class CustomReward(Wrapper):
  '''这个类的作用
  1.处理状态空间,将RGB转为灰度,并将图像裁剪为84x84
  2.设定新的奖励函数
  这里我们做了几个小优化如下：
      1).reward += (info["score"] - self.curr_score) / 40.
      原来的reward仅包含了对“离终点更近”的奖励和“时间消耗”、”死掉“的惩罚
      为了让游戏更好玩，我们添加了info["score"]，包含了对获得技能、金币的
      奖励，但不是重点，为了不影响整体要通关的属性，弱化他
      2).我们对回合结束时到达终点和未达到的奖励和惩罚进行放大，激励agent
      更快速的到达终点
      if done:
                  if info["flag_get"]:
                      reward += 50
                  else:
                      reward -= 50
      
      3.这里仅仅是对reward修改的一些示例，后面自己在实战时可以自己根据实际
      情况进行定义，比如当agent有时陷入一个错误的路线卡住时，可以添加一个缓
      冲区让agent学会后退等
  '''

  def __init__(self, env=None):
      super().__init__(env)
      self.observation_space= Box(low=0,high=255,shape=(1,84,84))
      self.curr_score = 0

  # 重写step方法以处理状态空间并规定新的奖励函数
  def step(self,action):
      # 走一步,拿到原有的奖励
      state,reward,done,info=self.env.step(action)
      state=process_frame(state)
      reward += (info["score"]-self.curr_score)/40.
      self.curr_score = info["score"]
      if done:
          if info["flag_get"]:
              reward += 50
          else:
              reward -= 50
      return state, reward / 10., done, info
  #reset需要初始化一些自定义变量并返回一个初始状态
  def reset(self):
    self.curr_score = 0
    return process_frame(self.env.reset())

In [None]:
## 在学习时并不需要所有帧,我们可以连续4帧给相同的输入,并将结果合并为一帧
class CustomSkipFrame(Wrapper):
    def __init__(self, env,skip=4) -> None:
        super().__init__(env)
        self.observation_space = Box(low=0, high=255, shape=(4, 84, 84))
        self.skip = skip

    def step(self, action):
        total_reward = 0
        states = []
        state, reward, done, info = self.env.step(action)
        for i in range(self.skip):
            if not done:
                state, reward, done, info = self.env.step(action)
                total_reward += reward
                states.append(state)
            else:
                states.append(state)
        states = np.concatenate(states, 0)[None, :, :, :]
        return states.astype(np.float32), reward, done, info  

    def reset(self):
        state = self.env.reset()
        states = np.concatenate([state for _ in range(self.skip)], 0)[None, :, :, :]
        return states.astype(np.float32)

In [None]:
#至此，我们完成了超级玛丽环境的自定义，封装如下：
def create_train_env(world, stage, action_type, output_path=None):
    env = gym_super_mario_bros.make("SuperMarioBros-{}-{}-v0".format(world, stage))
    if action_type == "right":
        actions = RIGHT_ONLY
    elif action_type == "simple":
        actions = SIMPLE_MOVEMENT
    else:
        actions = COMPLEX_MOVEMENT
    env = JoypadSpace(env, actions)
    env = CustomReward(env)
    env = CustomSkipFrame(env)
    return env, env.observation_space.shape[0], len(actions)

In [None]:
#测试一下
custom_env = create_train_env(1,1,'simple')
print(custom_env)
