In [None]:
!pip install gym[atari,accept-rom-license]==0.21.0
!pip install imageio==2.4.1
!apt-get install -y xvfb python-opengl ffmpeg > /dev/null 2>&1
!pip install -U colabgymrender

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
import gym
import numpy as np
from colabgymrender.recorder import Recorder

pygame 2.1.0 (SDL 2.0.16, Python 3.7.14)
Hello from the pygame community. https://www.pygame.org/contribute.html


## 0. Load the Environment

In [None]:
def load_frozen_lake(desc=None, map_name=None, is_slippery=False):
    """
    loads the pre-made FrozenLakeEnv evnironment from OpenAI’s gym:

    desc - None or a list of lists containing a custom description of the map to load for the environment
    map_name - None or a string containing the pre-made map to load
    
    **If both desc and map_name are None, the environment will load a randomly generated 8x8 map

    is_slippery - boolean to determine if the ice is slippery
    
    Returns: the environment
    """
    if desc == None and map_name == None:
        env = gym.make('FrozenLake-v1', desc=generate_random_map(), is_slippery=is_slippery)    
    else:
        env = gym.make('FrozenLake-v1', desc=desc, map_name=map_name, is_slippery=is_slippery)
    return env

def generate_random_map(size=8, p=0.8):
    """Generates a random valid map (one that has a path from start to goal)
    Args:
        size: size of each side of the grid
        p: probability that a tile is frozen
    Returns:
        A random valid map
    """
    valid = False
    board = []  # initialize to make pyright happy

    while not valid:
        p = min(1, p)
        board = np.random.choice(["F", "H"], (size, size), p=[p, 1 - p])
        board[0][0] = "S"
        board[-1][-1] = "G"
        valid = is_valid(board, size)
    return ["".join(x) for x in board]

def is_valid(board, max_size):
    """
    ensures a generated random board is valid
    """
    frontier, discovered = [], set()
    frontier.append((0, 0))
    while frontier:
        r, c = frontier.pop()
        if not (r, c) in discovered:
            discovered.add((r, c))
            directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
            for x, y in directions:
                r_new = r + x
                c_new = c + y
                if r_new < 0 or r_new >= max_size or c_new < 0 or c_new >= max_size:
                    continue
                if board[r_new][c_new] == "G":
                    return True
                if board[r_new][c_new] != "H":
                    frontier.append((r_new, c_new))
    return False

In [None]:
np.random.seed(0)
env = load_frozen_lake()
print(env.desc)
print(env.P[0][0])
env = load_frozen_lake(is_slippery=True)
print(env.desc)
print(env.P[0][0])
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
print(env.desc)
env = load_frozen_lake(map_name='4x4')
print(env.desc)

[[b'S' b'F' b'F' b'F' b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'F' b'F' b'H' b'F' b'F']
 [b'F' b'H' b'F' b'H' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'H' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'H' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'G']]
[(1.0, 0, 0.0, False)]
[[b'S' b'F' b'H' b'F' b'H' b'F' b'H' b'F']
 [b'H' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'H' b'F' b'F' b'F' b'F' b'H']
 [b'F' b'F' b'F' b'F' b'F' b'H' b'F' b'H']
 [b'F' b'F' b'H' b'F' b'H' b'F' b'H' b'F']
 [b'F' b'F' b'H' b'F' b'F' b'F' b'F' b'G']]
[(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 8, 0.0, True)]
[[b'S' b'F' b'F']
 [b'F' b'H' b'H']
 [b'F' b'F' b'G']]
[[b'S' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'H']
 [b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'G']]


In [None]:
'''
[[b'S' b'F' b'F' b'F' b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'F' b'F' b'H' b'F' b'F']
 [b'F' b'H' b'F' b'H' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'H' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'H' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'H' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'G']]
[(1.0, 0, 0.0, False)]
[[b'S' b'F' b'H' b'F' b'H' b'F' b'H' b'F']
 [b'H' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'F' b'F' b'F' b'F' b'F']
 [b'F' b'F' b'H' b'F' b'F' b'F' b'F' b'H']
 [b'F' b'F' b'F' b'F' b'F' b'H' b'F' b'H']
 [b'F' b'F' b'H' b'F' b'H' b'F' b'H' b'F']
 [b'F' b'F' b'H' b'F' b'F' b'F' b'F' b'G']]
[(0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 0, 0.0, False), (0.3333333333333333, 8, 0.0, True)]
[[b'S' b'F' b'F']
 [b'F' b'H' b'H']
 [b'F' b'F' b'G']]
[[b'S' b'F' b'F' b'F']
 [b'F' b'H' b'F' b'H']
 [b'F' b'F' b'F' b'H']
 [b'H' b'F' b'F' b'G']]
'''

## 1. Initialize Q-table

In [None]:
def q_init(env):
    """
    initializes the Q-table:
    env is the FrozenLakeEnv instance
    Returns: the Q-table as a numpy.ndarray of zeros
    """
    return np.zeros((env.observation_space.n, env.action_space.n))

In [None]:
env = load_frozen_lake()
Q = q_init(env)
print(Q.shape)
env = load_frozen_lake(is_slippery=True)
Q = q_init(env)
print(Q.shape)
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)
print(Q.shape)
env = load_frozen_lake(map_name='4x4')
Q = q_init(env)
print(Q.shape)

'''
(64, 4)
(64, 4)
(9, 4)
(16, 4)
'''


(64, 4)
(64, 4)
(9, 4)
(16, 4)


'\n(64, 4)\n(64, 4)\n(9, 4)\n(16, 4)\n'

## 2. Epsilon Greedy

In [None]:
def epsilon_greedy(Q, state, epsilon):
    """
    uses epsilon-greedy to determine the next action:
    
    Q - numpy.ndarray containing the q-table
    state - the current state = current_row * nrows + current_col (where both the row and col start at 0).
    epsilon - the epsilon to use for the calculation
    
    samples p with numpy.random.uniformn to determine if agent should explore or exploit
    If exploring, picks the next action with numpy.random.randint from all possible actions
    
    Returns: the next action index
    """
    # print("Q", Q, "state", state, "epsilon", epsilon)
    rnd = np.random.rand()
    if rnd > epsilon:
      # gets best action index from each row, then chooses based on current column
      return np.argmax(Q, axis=1)[state % Q.shape[0]]
    else: 
      return np.random.randint(0, Q.shape[1])


In [None]:
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)
Q[7] = np.array([0.5, 0.7, 1, -1])
np.random.seed(0)
print(epsilon_greedy(Q, 7, 0.5))
np.random.seed(1)
print(epsilon_greedy(Q, 7, 0.5))

# 2
# 0

2
0


## 3. Q-learning

In [None]:
def train(env, Q, episodes=5000, max_steps=100, alpha=0.1, gamma=0.99, epsilon=1, min_epsilon=0.1, epsilon_decay=0.05):
    """
    performs Q-learning:

    env is the FrozenLakeEnv instance
    Q is a numpy.ndarray containing the Q-table
    episodes is the total number of episodes to train over
    max_steps is the maximum number of steps per episode
    alpha is the learning rate
    gamma is the discount rate
    epsilon is the initial threshold for epsilon greedy
    min_epsilon is the minimum value that epsilon should decay to
    epsilon_decay is the decay rate for updating epsilon between episodes
    When the agent falls in a hole, the reward should be updated to be -1
    Returns: Q, total_rewards
        Q is the updated Q-table
        total_rewards is a list containing the rewards per episode
    """
    total_rewards = []
    for episode in range(episodes):
        state = env.reset()
        rewards_current_episode = 0

        for step in range(max_steps):
            action = epsilon_greedy(Q, state, epsilon)
            new_state, reward, done, info = env.step(action)

            # Update Q-table for Q(s, a) using Belman equation
            Q[state, action] = Q[state, action] * (1 - alpha) + alpha * (reward + gamma * np.max(Q[new_state, :]))

            state = new_state
            rewards_current_episode += reward

            if done == True:
                break

            
        epsilon = min_epsilon + (1 - min_epsilon) * np.exp(-epsilon_decay*episode)
        total_rewards.append(rewards_current_episode)
    return Q, total_rewards
        
        
        

In [None]:
np.random.seed(0)
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)

Q, total_rewards  = train(env, Q)
print(Q)
split_rewards = np.split(np.array(total_rewards), 10)
for i, rewards in enumerate(split_rewards):
    print((i+1) * 500, ':', np.mean(rewards))

[[0.96059593 0.970299   0.95098488 0.96059396]
 [0.96059557 0.         0.0094072  0.37627228]
 [0.18061285 0.         0.         0.        ]
 [0.97029877 0.9801     0.         0.96059583]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.98009763 0.98009933 0.99       0.9702983 ]
 [0.98009922 0.98999782 1.         0.        ]
 [0.         0.         0.         0.        ]]
500 : 0.906
1000 : 0.94
1500 : 0.95
2000 : 0.95
2500 : 0.94
3000 : 0.922
3500 : 0.946
4000 : 0.948
4500 : 0.926
5000 : 0.964


In [None]:
'''
[[ 0.96059593  0.970299    0.95098488  0.96059396]
 [ 0.96059557 -0.77123208  0.0094072   0.37627228]
 [ 0.18061285 -0.1         0.          0.        ]
 [ 0.97029877  0.9801     -0.99999988  0.96059583]
 [ 0.          0.          0.          0.        ]
 [ 0.          0.          0.          0.        ]
 [ 0.98009763  0.98009933  0.99        0.9702983 ]
 [ 0.98009922  0.98999782  1.         -0.99999952]
 [ 0.          0.          0.          0.        ]]
500 : 0.812
1000 : 0.88
1500 : 0.9
2000 : 0.9
2500 : 0.88
3000 : 0.844
3500 : 0.892
4000 : 0.896
4500 : 0.852
5000 : 0.928
'''

## 4. Play

In [None]:
def play(env, Q, max_steps=100):
    """
    has a trained agent play an episode:

    env is the FrozenLakeEnv instance
    Q is a numpy.ndarray containing the Q-table
    max_steps is the maximum number of steps in the episode
    Each state of the board should be displayed via the console
    You should always exploit the Q-table
    Returns: the total rewards for the episode
    """
    total_reward = 0
    state = env.reset()
    env.render()
    for step in range(max_steps):
      action = np.argmax(Q[state])
      state, reward, done, info = env.step(action)
      total_reward += reward
      env.render()
      if done == True:
        break
    return total_reward


Print:

In [None]:
np.random.seed(0)
desc = [['S', 'F', 'F'], ['F', 'H', 'H'], ['F', 'F', 'G']]
env = load_frozen_lake(desc=desc)
Q = q_init(env)

Q, total_rewards  = train(env, Q)
print(play(env, Q))


[41mS[0mFF
FHH
FFG
  (Down)
SFF
[41mF[0mHH
FFG
  (Down)
SFF
FHH
[41mF[0mFG
  (Right)
SFF
FHH
F[41mF[0mG
  (Right)
SFF
FHH
FF[41mG[0m
1.0


## playground

In [None]:
print(dir(env))
#print(env.action_space)
print(dir(env.observation_space))
print(env.observation_space.n)
print(dir(env.action_space))
print(env.action_space.n)
print(env.action_space.np_random)
#print(env.class_name)
#print(env.close)
#print(env.compute_reward)
#print(env.env)
#print(env.observation_space
#print(env.render
#print(env.reset
#print(env.metadata)
#print(env.spec)
#print(env.unwrapped)
#print(env.step())
      
# 'reward_range', '', 'step'
print(env.reset())
print(env.step(env.action_space.sample()))

['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_action_space', '_elapsed_steps', '_max_episode_steps', '_metadata', '_observation_space', '_reward_range', 'action_space', 'class_name', 'close', 'compute_reward', 'env', 'metadata', 'observation_space', 'render', 'reset', 'reward_range', 'seed', 'spec', 'step', 'unwrapped']
['__class__', '__contains__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__str

In [None]:
env = gym.make("LunarLander-v2", render_mode="human")
env.action_space.seed(42)

observation, info = env.reset(seed=42)

for _ in range(1000):
    observation, reward, terminated, truncated, info = env.step(env.action_space.sample())

    if terminated or truncated:
        observation, info = env.reset()

env.close()

In [None]:
env = gym.make("ALE/Breakout-v5")
env = Recorder(env, './video')

observation = env.reset()
terminal = False

while not terminal:
  action = env.action_space.sample()
  observation, reward, terminal, info = env.step(action)

env.play()





In [None]:
env = gym.make("CartPole-v0")
env = Recorder(env, './video')

observation = env.reset()
terminal = False

while not terminal:
  action = env.action_space.sample()
  observation, reward, terminal, info = env.step(action)

env.play()


ImportError: ignored

In [None]:
env = gym.make('FrozenLake-v1')
env = Recorder(env, './video')

observation = env.reset()
terminal = False

while not terminal:
  action = env.action_space.sample()
  observation, reward, terminal, info = env.step(action)

env.play()


NameError: ignored

In [None]:
print(env.metadata)

{'render.modes': ['human', 'ansi']}


In [None]:
env = gym.make('FrozenLake-v1')
env.reset()
env.render(mode = 'human') #(mode = 'rgb_array').shape[:2][::-1]


[41mS[0mFFF
FHFH
FFFH
HFFG
