In [7]:
# A sample environment and agent class
from typing import List
import random
class Environment :
  def __init__(self):
    self.steps_left = 10 # maximum of 10 episodes for the game to get over

  def get_observations(self) -> List[float]:
    return [0.0,0.0,0.0]

  def get_actions(self) -> List[int]:
    return [0,1]

  def is_done(self) -> bool :
    return self.steps_left == 0

  def action(self, action : int) -> float:
    if self.is_done():
      raise Exception("Game is over!")
    self.steps_left -= 1
    return random.random()




In [10]:
class Agent :
  def __init__(self):
    self.total_rewards = 0.0

  def step(self, env : Environment):
    obs = env.get_observations()
    actions = env.get_actions()
    reward = env.action(random.choice(actions))
    self.total_rewards += reward


In [14]:
env = Environment()
agent = Agent()
while not env.is_done():
  agent.step(env)
print("total reward is %.2f" % agent.total_rewards)


total reward is 4.28


There is a framework available for simulation of RL environments - Gymnasium. let us try a simple Cartpole environment to get started with

In [2]:
import gymnasium as gym

In [3]:
e = gym.make("CartPole-v1") # cart pole version 1

Observation : four floating point numbers - [x-position of the cartpole's center of mass, speed of the stick, angle of the stick relative to platform, angular speed of the stick] \
with some knowledge of mathematics and physics, the problem of moving the platform to the left/right to balance the stick from falling down can be solved. But the idea is to let the agent learn from trial and error and solve the problem.

In [4]:
obs, info = e.reset()
print(obs,info
      )

[ 0.00040496  0.03197915  0.02731298 -0.01524302] {}


In [5]:
e.observation_space

Box([-4.8               -inf -0.41887903        -inf], [4.8               inf 0.41887903        inf], (4,), float32)

In [6]:
e.action_space

Discrete(2)

In [7]:
e.step(0) # take an action and get the observation, reward, truncation flag, done flag


(array([ 0.00104454, -0.16352364,  0.02700813,  0.2859308 ], dtype=float32),
 1.0,
 False,
 False,
 {})

In [11]:
e.action_space.sample()

1

let us now build an agent which is completely random in its behaviour inside cartpole environment and check how much reward it accumulates


In [12]:
env = gym.make("CartPole-v1")
total_reward = 0.0
total_steps = 0
obs,info = env.reset()
while True:
  action = env.action_space.sample()
  obs, reward, terminated, truncated, info = env.step(action)
  total_reward += reward
  total_steps += 1
  if terminated:
    break
print("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))

Episode done in 17 steps, total reward 17.00


After 17 steps, the cart pole falls losing balance. Most gym environments have a reward boundary - the average reward an agent should obtain in 100 consecutive episodes to consider the problem to be solved.

To add more functionalities to the environment without disturbing the environment itself, gym provides wrapper functionalities which help add more customizations to action , observation or reward logic. the wrapper could for the entire class or individual methods like actionwrapper, observationwrapper or rewardrapper

To make it concrete, action is now wrapped with a random action wrapper which follows the current policy but takes a random action 10 percent of the times. this trick helps solve the exploration exploitation problem by forcing the agent to look for newer directions rather than blindly following the policy.

In [17]:
import random
class RandomActionWrapper(gym.ActionWrapper):
  def __init__(self, env : gym.Env, epsilon : float = 0.1) :
    super(RandomActionWrapper,self).__init__(env)
    self.epsilon = epsilon

  def action(self, action : gym.core.WrapperActType) -> gym.core.WrapperActType :
    if random.random() < self.epsilon:
      action = self.env.action_space.sample()
      print("Random action : ", action)
      return action
    return action



In [18]:
#Using our actionwrapped env
env = RandomActionWrapper(gym.make('CartPole-v1'))


In [20]:
obs = env.reset()
total_reward = 0.0
total_steps = 0
while True:
  obs,reward,is_done,trunc,info = env.step(0)
  total_reward += reward
  total_steps += 1
  if is_done:
    break
print("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))

Random action :  0
Episode done in 10 steps, total reward 10.00


the episode got over in 10 steps, out of which random action wrapper is called once [10 percent probability works itself out]

Another interesting wrapper is the Render wrapper which could be used to record the agent playing around in the environment. It has two wrappers : HumanRendering wrapper and the RecordVideo wrapper.

In [27]:
!apt-get install x11-utils > /dev/null 2>&1
!pip install pyglet > /dev/null 2>&1
!apt-get install -y xvfb python-opengl > /dev/null 2>&1

In [28]:
!pip install gym pyvirtualdisplay > /dev/null 2>&1

In [31]:
!apt-get install x11-utils > /dev/null 2>&1

In [32]:
!pip install pyglet==v1.3.2

Collecting pyglet==v1.3.2
  Downloading pyglet-1.3.2-py2.py3-none-any.whl.metadata (1.4 kB)
Downloading pyglet-1.3.2-py2.py3-none-any.whl (1.0 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.0 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.2/1.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:01[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m1.0/1.0 MB[0m [31m16.3 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m13.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyglet
  Attempting uninstall: pyglet
    Found existing installation: pyglet 2.1.2
    Uninstalling pyglet-2.1.2:
      Successfully uninstalled pyglet-2.1.2
Successfully installed pyglet-1.3.2


In [29]:
import matplotlib.pyplot as plt
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display

In [33]:
#some hacks to get the render working on colab notebook
display = Display(visible=0, size=(400, 300))
display.start()
env = gym.make("CartPole-v0")
env.reset()
prev_screen = env.render(mode='rgb_array')
plt.imshow(prev_screen)
obs = env.reset()
total_reward = 0.0
total_steps = 0
while True:
  obs,reward,is_done,trunc,info = env.step(0)
  screen = env.render(mode='rgb_array')
  plt.imshow(screen)
  ipythondisplay.clear_output(wait=True)
  ipythondisplay.display(plt.gcf())
  total_reward += reward
  total_steps += 1
  if is_done:
    break
print("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))
ipythondisplay.clear_output(wait=True)
env.close()

FileNotFoundError: [Errno 2] No such file or directory: 'Xvfb'

In [39]:
# to record video
env = gym.make("CartPole-v1", render_mode = 'rgb_array')
env = gym.wrappers.RecordVideo(env, 'video')
obs = env.reset()
total_reward = 0.0
total_steps = 0
while True:
  obs,reward,is_done,trunc,info = env.step(0)

  total_reward += reward
  total_steps += 1
  if is_done:
    break
print("Episode done in %d steps, total reward %.2f" % (total_steps, total_reward))


Episode done in 10 steps, total reward 10.00
