# Chapter 02 - OpenAI Gym

We will define an environment that will give the agent random rewards for a limited number of steps, regardless of the agent's actions. This scenario is not very useful, but it will allow us to focus on specific methods in both the environment and agent classes.

In [1]:
import random 
from typing import List

class Environment:
    def __init__(self):
        self.steps_left = 10
    
    def get_observation(self) -> List[float]:
        return [0.0, 0.0, 0.0]
    
    def get_actions(self) -> List[int]:
        return [0, 1]
    
    def is_done(self) -> bool:
        return self.steps_left == 0
    
    def action(self, action) -> float:
        if self.is_done():
            raise Exception('Game is over.')
        self.steps_left -= 1
        return random.random()

The `get_observation()` method is supposed to return the current environment's observation to the agent. It is usually implemented as some function of the internal state of the environment. 

The `get_actions()` method allows the agent to query the set of actions it can execute. Normally, the set of actions that the agent can execute does not change over time, but some actions can become impossible in different states. In this simplistic example, there are only two actions that the agent can carry out, which are encoded as 0 and 1.

The `action()` method is the central piece in the environment's functionality. It does two things - handles the agent's action and returns the reward for this action. In our example, the reward is random and its action is discarded. 

In [2]:
class Agent:
    def __init__(self):
        self.total_reward = 0.0
        
    def step(self, env):
        current_obs = env.get_observation()
        actions = env.get_actions()
        reward = env.action(random.choice(actions))
        self.total_reward += reward

The `step` function accepts the environment instance as an argument and allows the agent to perform the following actions:

* Observe the environment.
* Make a decision about the action to take based on the observations. 
* Submit the action to the environment.
* Get the reward for the current step.

Our agent is dull and ignores the observations obtained during the decision-making process about which action to take. Instead, every action is selected randomly.

In [3]:
if __name__ == '__main__':
    env = Environment()
    agent = Agent()
    
    while not env.is_done():
        agent.step(env)
        
    print(f'Total Reward: {agent.total_reward:.4f}')

Total Reward: 4.7782


## OpenAI Gym

In [4]:
import gym

In [5]:
# Create the CartPole environment.
e = gym.make('CartPole-v0')

The observation of this environment is four floating-point numbers containing
information about the x coordinate of the stick's center of mass, its speed, its angle to
the platform, and its angular speed. Of course, by applying some math and physics
knowledge, it won't be complicated to convert these numbers into actions when
we need to balance the stick, but our problem is this – how do we learn to balance
this system without knowing the exact meaning of the observed numbers and only
by getting the reward? The reward in this environment is 1, and it is given on every
time step. The episode continues until the stick falls, so to get a more accumulated
reward, we need to balance the platform in a way to avoid the stick falling.

In [6]:
obs = e.reset()
obs

array([ 0.0042278 , -0.00807371,  0.03678896, -0.02680858])

We reset the environment and obtained the first observation. 

In [7]:
e.action_space

Discrete(2)

In [8]:
e.observation_space

Box(4,)

The action space field is of the Discrete type, so our actions will be just 0 or 1, where 0 means pushing the platform to the left and 1 means to the right. The observation space is of Box(4,), which means a vector of size 4 with the values inside the `[-inf, inf]` interval.

In [9]:
e.step(0)

(array([ 0.00406632, -0.2037034 ,  0.03625279,  0.27725092]), 1.0, False, {})

We pushed the platform to the left by executing the action 0 and for the tuple of four elements:

* A new observation which is a new vector of four numbers.
* A reward of 1.0.
* The `done` flag with value False, which means the episdoe is not over yet.
* Extra information about the environment, which is an empty dictionary.

Next we will use the `sample()` method of the `Space` class on the `action_space` and `observation_space`.

In [10]:
e.action_space.sample()

1

In [11]:
e.action_space.sample()

1

In [12]:
e.observation_space.sample()

array([-1.6325544e+00, -2.9759798e+38, -2.2532640e-01,  3.1070276e+38],
      dtype=float32)

In [13]:
e.observation_space.sample()

array([-7.8199971e-01,  1.4081275e+38,  4.0631613e-01, -7.2184949e+37],
      dtype=float32)

The sample from the action space could be used when you're not sure how to perform an action.

## The Random CartPole Agent

In [14]:
import gym

if __name__ == '__main__':
    env = gym.make('CartPole-v0')
    total_reward = 0.0
    total_steps = 0
    obs = env.reset()
    
    while True:
        action = env.action_space.sample()
        obs, reward, done, _ = env.step(action)
        total_reward += reward
        total_steps += 1
        if done:
            break
            
    print(f'Episode ended in {total_steps} steps, total reward {total_reward:2f}.')

Episode ended in 38 steps, total reward 38.000000.


## Extra Gym Functionality - Wrappers and Monitors

#### Wrappers

There are many situations where you want to "wrap" the existing environment and add some extra logic for doing something. Gym provides a convenient framework for these situations - the `Wrapper` class.

There are subclasses of `Wrapper` that allow the filtering of only a specific portion of information.

* `ObservationWrapper`
* `RewardWrapper`
* `ActionWrapper`

Let's imagine a situation where we want to interview in the stream of actions sent by the agent and, with a probability of 10%, replace the current action with a random one. By issuing random actions, we make our agent explore the environment and from time to time drift away from the beaten track of its policy. 

In [15]:
import gym
from typing import TypeVar
import random

Action = TypeVar('Action')

class RandomActionWrapper(gym.ActionWrapper):
    def __init__(self, env, epsilon=0.1):
        super(RandomActionWrapper, self).__init__(env)
        self.epsilon = epsilon
        
    def action(self, action:Action) -> Action:
        if random.random() < self.epsilon:
            print('Random!')
            return self.env.action_space.sample()
        else:
            return action

Here, we initialized our wrapper by calling a parent's `__init__` method and saving epsilon. 

Now it's time to apply our wrapper. We will create a normal CartPole environment and pass it to our `Wrapper` constructor. From here on, we will use our wrapper as a normal `Env` instance, instead of the original CartPole. As the `Wrapper` class inherits the `Env` class and exposes the same interface, we can nest our wrappers in any combination we want. 

In [16]:
if __name__ == '__main__':
    env = RandomActionWrapper(gym.make('CartPole-v0'))
    
    obs = env.reset()
    total_reward = 0.0
    
    while True:
        obs, reward, done, _ = env.step(0)
        total_reward += reward
        if done:
            break
            
    print(f'Reward: {total_reward:2f}')

Reward: 10.000000
