In [11]:
import gym
import numpy as np
import time

from bokeh.plotting import figure, show
from bokeh.io import output_notebook

output_notebook()

env_name = 'FrozenLake-v1'



In [142]:
def render_single(env, policy, max_steps=100):
    """
    Renders policy for an environment.

    Parameters
    ----------
    env:    gym.core.Environment, open gym environment object
    policy: np.array of shape [env.nS], the action to take at a given state
    """
    episode_reward = 0
    ob = env.reset()
    for t in range(max_steps):
        env.render()
        time.sleep(0.25)
        a = policy[ob]
        ob, reward, done, _ = env.step(a)
        episode_reward += reward
        if done:
            break
    env.render();
    if not done:
        print("The agent didn't reach a terminal state in {} steps.".format(max_steps))
    else:
        print("Episode reward: %f" % episode_reward)
        

def set_random_seeds(env):
    np.random.seed(1)
    random.seed(1)
    env.seed(1)

In [181]:
class QLearning:
    '''Implements Off-policy control with Q Learning.'''
    def __init__(self, env, num_states, num_actions, alpha, epsilon, gamma):
        '''Parameters
        ----------
        env:         gym.core.Environment, open gym environment object
        num_states:  integer, number of states in the environment
        num_actions: integer, number of possible actions
        alpha:       float, step size, (0, 1]
        epsilon:     float, the epsilon parameter used for exploration
        gamma:       float, discount factor, small > 0
        '''
        self.env = env
        self.num_states = num_states
        self.num_actions = num_actions
        self.alpha = alpha
        self.epsilon = epsilon
        self.gamma = gamma
        
        self.Q = np.zeros((self.num_states, self.num_actions))
        
        self.env.reset()
        
        
    def run_q_learning(self, num_episodes, verbose=True):
        '''Runs Q learning
        
        Parameters
        ----------
        num_episodes: integer, number of episodes to run to train RL agent
        
        Returns
        ----------
        self.policy:         list of integers of length self.num_states, final policy
        '''
        terminated = False
        
        for i in range(num_episodes):
            print(f'episode number: {i}')
            state = np.random.choice(self.num_states, 1)[0]
            terminated = False
            while not terminated:
                current_action = self.draw_action(state)
                action, reward, next_state, terminated = self.generate_next_step(current_action)
                self.evaluate_policy(state, action, reward, next_state)

                state = next_state

        
        # Once training is finished, calculate and return policy using argmax approach
        final_policy = np.argmax(self.Q, axis=1)
        
        return final_policy
    
    
    def generate_next_step(self, action):
        '''Generates episode given policy. Calculates r_t and s_t+1
        
        Parameters
        ----------
        policy: list of integers of length self.num_states, the action to take at a given state
        
        Returns
        ----------
        state_action_reward: list of tuple (state, action, reward)
        '''
        observation, reward, terminated, _ = self.env.step(action)
        if terminated:
            self.env.reset()

        return (action, reward, observation, terminated)
    
    
    def evaluate_policy(self, state, action, reward, next_state):
        '''Updates action value function self.Q.

        Parameters
        ----------
        state:      int
        action:     int
        reward:     float
        next_state: int
        '''

        
        est_reward = reward + self.gamma * np.max(self.Q[next_state])
        self.Q[state][action] = self.Q[state][action] + self.alpha * (est_reward - self.Q[state][action])
        
                
    def draw_action(self, state):
        '''Improves and updates current policy self.policy using epsilon greedy approach.'''
        greedy_action = self.argmax(state)
        return self.get_epsilon_greedy_action(greedy_action)
                

    def argmax(self, state):
        """
        Finds and returns greedy action.

        Parameters
        ----------
        state: int, state for which greedy action should be selected
        
        Returns
        ----------
        action: int, corresponds to the index of the greedy action

        """
        return np.argmax(self.Q[state])

    
    def get_epsilon_greedy_action(self, greedy_action):
        '''Returns next action using epsilon greedy approach.
        
        Parameters
        ----------
        greedy_action: integer, greedy action (action with a maximum Q value)
        
        Returns
        ----------
        next_action: integer, either greedy or random action
        '''   
        prob = np.random.random()

        if prob < 1 - self.epsilon:
            return greedy_action
        
        return np.random.randint(0, self.num_actions)


# Tests

In [91]:
import ipytest
import pytest
ipytest.autoconfig()

import random

random.seed(10)
np.random.seed(0)

In [92]:
%%ipytest -qq

class Env():
    num_states = 4
    num_actions = 4
 
@pytest.fixture
def mock_env():
    return Env()

@pytest.fixture
def q_learning_instance(mock_env):
    alpha = 1
    epsilon = 0.8
    gamma = 1
    yield QLearning(mock_env, mock_env.num_states, mock_env.num_actions, alpha, epsilon, gamma)
    

def test_init_agent(mock_env, q_learning_instance):
    q_learning_instance.init_agent()
    
    
def test_get_epsilon_greedy_action(mock_env, q_learning_instance):
    q_learning_instance.init_agent()
    action = q_learning_instance.get_epsilon_greedy_action(3)
    assert action < mock_env.num_actions
    assert action >= 0

    
def test_argmax(mock_env, q_learning_instance):
    q_learning_instance.init_agent()
    action = q_learning_instance.argmax(1)
    assert action < mock_env.num_actions
    assert action >= 0
    

[32m.[0m[32m.[0m[32m.[0m[32m                                                                                          [100%][0m


In [186]:
np.random.seed(1)

alpha = 1
epsilon = 0.8
gamma = 1.0
n_episodes = 2

env = gym.make(env_name, is_slippery=False)
env.seed(0)
num_states = env.observation_space.n
num_actions = env.action_space.n
print(num_states)
print(num_actions)

model = QLearning(env, num_states, num_actions, alpha, epsilon, gamma)

policy = model.run_q_learning(n_episodes)

16
4
episode number: 0
episode number: 1


In [187]:
policy

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])

In [188]:
model.Q

array([[0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.],
       [0., 0., 0., 0.]])