In [None]:
import gym
from gym.utils import seeding

In [None]:
class Example_v0 (gym.Env):

In [None]:
LF_MIN = 1
RT_MAX = 10

In [None]:
MOVE_LF = 0
MOVE_RT = 1

In [None]:
MAX_STEPS = 10
                                
REWARD_AWAY = -2
REWARD_STEP = -1
REWARD_GOAL = MAX_STEPS

In [None]:
metadata = {
    "render.modes": ["human"]
  }

In [None]:
def __init__ (self):
    self.action_space = gym.spaces.Discrete(2)
    self.observation_space = gym.spaces.Discrete(self.RT_MAX + 1)
    
    # possible positions to chose on `reset()`                                                          
    self.goal = int((self.LF_MIN + self.RT_MAX - 1) / 2)
    self.init_positions = list(range(self.LF_MIN, self.RT_MAX))
    self.init_positions.remove(self.goal)
    
    # change to guarantee the sequence of pseudorandom numbers
    # (e.g., for debugging)                                                               
    self.seed()
    
    self.reset()

In [None]:
def reset (self):
    self.position = self.np_random.choice(self.init_positions)
    self.count = 0
    
    self.state = self.position
    self.reward = 0
    self.done = False
    self.info = {}
    
    return self.state

In [None]:
def step (self, action):
    if self.done:
        # should never reach this point
        print("EPISODE DONE!!!")
    elif self.count == self.MAX_STEPS:
        self.done = True;
    else:
        assert self.action_space.contains(action)
        self.count += 1

        // insert simulation logic to handle an action ...

    try:
        assert self.observation_space.contains(self.state)
    except AssertionError:
        print("INVALID STATE", self.state)

    return [self.state, self.reward, self.done, self.info]

In [None]:
if action == self.MOVE_LF:
    if self.position == self.LF_MIN:
        # invalid
        self.reward = self.REWARD_AWAY
    else:
        self.position -= 1

    if self.position == self.goal:
        # on goal now
        self.reward = self.REWARD_GOAL
        self.done = 1
    elif self.position < self.goal:
        # moving away from goal
        self.reward = self.REWARD_AWAY
    else:
        # moving toward goal
        self.reward = self.REWARD_STEP

In [None]:
elif action == self.MOVE_RT:
    if self.position == self.RT_MAX:
        # invalid
        self.reward = self.REWARD_AWAY
    else:
        self.position += 1

    if self.position == self.goal:
        # on goal now
        self.reward = self.REWARD_GOAL
        self.done = 1
    elif self.position > self.goal:
        # moving away from goal
        self.reward = self.REWARD_AWAY
    else:
        # moving toward goal
        self.reward = self.REWARD_STEP

In [None]:
self.state = self.position
self.info["dist"] = self.goal - self.position

In [None]:
def render (self, mode="human"):
    s = "position: {:2d}  reward: {:2d}  info: {}"
    print(s.format(self.state, self.reward, self.info))

In [None]:
def seed (self, seed=None):
    self.np_random, seed = seeding.np_random(seed)
    return [seed]

In [None]:
def close (self):
    pass

In [None]:
from setuptools import setup

setup(name="gym_example",
      version="1.0.0",
      install_requires=["gym"]
)

In [None]:
from gym.envs.registration import register

register(
    id="example-v0",
    entry_point="gym_example.envs:Example_v0",
)

In [None]:
from gym_example.envs.example_env import Example_v0

In [None]:
import gym
import gym_example

In [None]:
def run_one_episode (env):
    env.reset()
    sum_reward = 0

    for i in range(env.MAX_STEPS):
        action = env.action_space.sample()
        state, reward, done, info = env.step(action)
        sum_reward += reward

        if done:
            break

    return sum_reward

In [None]:
env = gym.make("example-v0")
sum_reward = run_one_episode(env)

In [None]:
history = []

for _ in range(10000):
    sum_reward = run_one_episode(env)
    history.append(sum_reward)

avg_sum_reward = sum(history) / len(history)
print("\nbaseline cumulative reward: {:6.2}".format(avg_sum_reward))

In [None]:
import os
import shutil

chkpt_root = "tmp/exa"
shutil.rmtree(chkpt_root, ignore_errors=True, onerror=None)

ray_results = "{}/ray_results/".format(os.getenv("HOME"))
shutil.rmtree(ray_results, ignore_errors=True, onerror=None)

In [None]:
import ray

ray.init(ignore_reinit_error=True, local_mode=True)

In [None]:
from ray.tune.registry import register_env
from gym_example.envs.example_env import Example_v0

select_env = "example-v0"
register_env(select_env, lambda config: Example_v0())

In [None]:
import ray.rllib.agents.ppo as ppo

config = ppo.DEFAULT_CONFIG.copy()
config["log_level"] = "WARN"
agent = ppo.PPOTrainer(config, env=select_env)

In [None]:
status = "{:2d} reward {:6.2f}/{:6.2f}/{:6.2f} len {:4.2f} saved {}"
n_iter = 5

for n in range(n_iter):
    result = agent.train()
    chkpt_file = agent.save(chkpt_root)
    print(status.format(
            n + 1,
            result["episode_reward_min"],
            result["episode_reward_mean"],
            result["episode_reward_max"],
            result["episode_len_mean"],
            chkpt_file
            ))

In [None]:
tensorboard --logdir=$HOME/ray_results

In [None]:
import gym

agent.restore(chkpt_file)
env = gym.make(select_env)
state = env.reset()

In [None]:
sum_reward = 0
n_step = 20

for step in range(n_step):
    action = agent.compute_action(state)
    state, reward, done, info = env.step(action)

    env.render()
    sum_reward += reward

    if done == 1:
        print("cumulative reward", sum_reward)
        state = env.reset()
        sum_reward = 0

In [None]:
pip install -r requirements.txt
pip install -e gym-example