https://medium.com/distributed-computing-with-ray/anatomy-of-a-custom-environment-for-rllib-327157f269e5

In [2]:
import gym
from gym.utils import seeding

## Define Custom Class

Gym classes should inherit from the gym.Env class

Gym classes should have 6 methods:
- **__init__()** with self.action_space, self.observation_space, and self.reset() call
- **reset()** resets the state of the environment for a new episode and returns initial observation
- **step(action)** how an agent takes an action during one step in an episode
- **render()** (*optional*) visualise the state of the environment
- **seed()** (*optional*) set seet for env's random generators
- **close()** (*optional*) how to close an environment

In [7]:
import numpy as np

class Example_v0(gym.Env):
    
    def __init__(self):
        # extra vars useful for this specific env
        self.left_min, self.right_max = 1, 10
        self.move_left, self.move_right = 0, 1
        self.max_steps = 10
        self.reward_away = -2
        self.reward_step = -1
        self.reward_goal = self.max_steps
        self.metadata = {'render.modes': ['human']}
        
        # vars required by gym
        self.action_space = gym.spaces.Discrete(2) # 2 poss actions
        self.observation_space = gym.spaces.Discrete(self.right_max+1) # observation space recieved by agent
        
        # extra vars useful for this specific env
        self.goal = int((self.left_min+self.right_max-1)/2) # place goal in middle of observation space array (makes env simpler)
        self.init_positions = list(range(self.left_min, self.right_max))
        self.init_positions.remove(self.goal)
        
        self.seed()
        
        
    def reset(self):
        # vars required by gym
        self.state = self.position
        self.reward = 0
        self.done = False
        self.info = {}
        
        # extra vars useful for this specific env
        self.position = self.np_random.choice(self.init_positions) # agent position in array
        self.count = 0 # number of steps taken this episode
        
        return self.state
        
    
    def step(self, action):
        if self.done:
            # should never happen!
            print('Episode done.')
        elif self.count == self.max_steps:
            self.done = True
        else:
            assert self.action_space.contains(action)
            self.count += 1
            
            # simulation logic to handle action
            if action == self.move_left:
                if self.position == self.left_min:
                    # invalid action
                    self.reward = self.REWARD_AWAY
                else:
                    # update position
                    self.position -= 1
                
                if self.position == self.goal:
                    # agent reached goal
                    self.reward = self.reward_goal
                    self.done = True
                elif self.position < self.goal:
                    # moving away from goal
                    self.reward = self.reward_away
                else:
                    # moving towards goal
                    self.reward = self.reward_step
                    
            elif action == self.move_right:
                if self.position == self.right_max:
                    # invalid action
                    self.reward = self.REWARD_AWAY
                else:
                    # update position
                    self.position += 1
                
                if self.position == self.goal:
                    # agent reached goal
                    self.reward = self.reward_goal
                    self.done = True
                elif self.position > self.goal:
                    # moving away from goal
                    self.reward = self.reward_away
                else:
                    # moving towards goal
                    self.reward = self.reward_step
                    
        # update env state
        self.state = self.position
        try:
            assert self.observation_space.contains(self.state)
        except AssertionError:
            print('Invalid state', self.state)
        
        # (optional) define info dict (useful for diagnositc info & troubleshooting)
        self.info['dist'] = self.goal - self.position
            
        return [self.state, self.reward, self.done, self.info]
            
        
        
    
    def render(self, mode='human'):
        s = "position: {:2d}  reward: {:2d}  info: {}"
        print(s.format(self.state, self.reward, self.info))
        
    
    def seed(self):
        self.np_random, seed = seeding.np_random(seed)
        
        return [seed]
    
    def close(self):
        pass