In [27]:
import numpy as np
from rl_glue import RLGlue
import agent
from environment import BaseEnvironment
from tqdm import tqdm

# Environment

In [28]:
class Environment(BaseEnvironment):
    """Implements the environment for an RLGlue environment

    Note:
        env_init, env_start, env_step, env_cleanup, and env_message are required
        methods.
    """

    # def __init__(self):


    def env_init(self, env_info={}):
        """Setup for the environment called when the experiment first starts.

        Note:
            Initialize a tuple with the reward, first state observation, boolean
            indicating if it's terminal.
        """
        self.start = 0
        self.current_state = None
        self.rand_generator = np.random.RandomState(env_info.get("seed"))

    def env_start(self):
        """The first method called when the episode starts, called before the
        agent starts.

        Returns:
            The first state observation from the environment.
        """
        self.current_state = self.start 

        self.reward_obs_term = (0.0, self.observation(self.current_state), False)

        return self.reward_obs_term[1]

    def env_step(self, action):
        """A step taken by the environment.

        Args:
            action: The action taken by the agent

        Returns:
            (float, state, Boolean): a tuple of the reward, state observation,
                and boolean indicating if it's terminal.
        """

        if action == 0:
            if self.current_state == 0:
                if self.rand_generator.rand() < 0.1:
                    new_state = 3
                    reward = 0
                else:
                    new_state = 4
                    reward = 0.3
            else:
                new_state = 4
                reward = 0

        else:
            if self.current_state == 3:
                reward = 2
            else:
                reward = 0
            new_state = self.current_state + 1

        self.current_state = new_state
        is_terminal = (new_state > 3)

        self.reward_obs_term = (reward, self.observation(self.current_state), is_terminal)
        # print(action, self.reward_obs_term)

        return self.reward_obs_term

    # NOTE
    # The observation is two-dimensional in this environment.
    # Each row in the observation is the observation for an action.
    # The agent implementations in the next cells work with these two-dimensional observations.
    def observation(self, state):
        if state == 0:
            obs = np.array([[0,1],[.8,0]])
        elif state == 1:
            obs = np.array([[0,0],[.8,0]])
        elif state == 2:
            obs = np.array([[0,0],[-1,0]])
        elif state == 3:
            obs = np.array([[0,1],[-1,0]])
        else:
            obs = np.zeros((2,2))
        return obs

    def env_cleanup(self):
        """Cleanup done after the environment ends"""
        pass

    def env_message(self, message):
        """A message asking the environment for information

        Args:
            message (string): the message passed to the environment

        Returns:
            string: the response (or answer) to the message
        """
        return ''


# Agents

## Q-Learning

In [29]:
# Q-Learning agent
class QLearningAgent(agent.BaseAgent):
    def agent_init(self, agent_init_info):
        """Setup for the agent called when the experiment first starts.
        
        Args:
        agent_init_info (dict), the parameters used to initialize the agent. The dictionary contains:
        {
            num_params (int): num_params,
            num_actions (int): The number of actions,
            epsilon (float): The epsilon parameter for exploration,
            step_size (float): The step-size,
            discount (float): The discount factor,
        }
        
        """
        self.num_actions = agent_init_info["num_actions"]
        self.num_params = agent_init_info["num_params"]
        self.epsilon = agent_init_info["epsilon"]
        self.step_size = agent_init_info["step_size"]
        self.discount = agent_init_info["discount"]
        self.theta = np.zeros((self.num_params))
        
        self.rand_generator = np.random.RandomState(agent_info.get("seed"))

        
    def agent_start(self, observation):
        """The first method called when the episode starts, called after
        the environment starts.
        Args:
            observation (Numpy array): the state observation from the
                environment's evn_start function.
        Returns:
            action (int): the first action the agent takes.
        """
        
        
        current_q = observation.dot(self.theta.T)
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        self.prev_state = observation
        self.prev_action = action
        return action
    
    def agent_step(self, reward, observation):
        """A step taken by the agent.
        Args:
            reward (float): the reward received for taking the last action taken
            observation (Numpy array): the state observation from the
                environment's step based, where the agent ended up after the
                last step.
        Returns:
            action (int): the action the agent is taking.
        """
        
        
        current_q = observation.dot(self.theta.T)
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        
        self.theta += self.step_size * (reward + self.discount * \
            current_q.max() - self.theta.dot(self.prev_state[self.prev_action].T))*self.prev_state[self.prev_action]
        self.prev_state = observation
        self.prev_action = action
        return action
        
    
    def agent_end(self, reward):
        """Run when the agent terminates.
        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """
        
        self.theta += self.step_size * \
            (reward - self.theta.dot(self.prev_state[self.prev_action].T))*self.prev_state[self.prev_action]
        
    def argmax(self, q_values):
        """argmax with random tie-breaking
        Args:
            q_values (float): the array of action-values
        """
        best_actions = np.flatnonzero(q_values == q_values.max())
        return self.rand_generator.choice(best_actions)

## Sarsa

In [30]:
# Sarsa agent
class SarsaAgent(agent.BaseAgent):
    def agent_init(self, agent_init_info):
        """Setup for the agent called when the experiment first starts.
        
        Args:
        agent_init_info (dict), the parameters used to initialize the agent. The dictionary contains:
        {
            num_params (int): The number of states,
            num_actions (int): The number of actions,
            epsilon (float): The epsilon parameter for exploration,
            step_size (float): The step-size,
            discount (float): The discount factor,
        }
        
        """
        
        self.num_actions = agent_init_info["num_actions"]
        self.num_params = agent_init_info["num_params"]
        self.epsilon = agent_init_info["epsilon"]
        self.step_size = agent_init_info["step_size"]
        self.discount = agent_init_info["discount"]
        self.theta = np.zeros((self.num_params))
        
        self.rand_generator = np.random.RandomState(agent_info.get("seed"))

        
    def agent_start(self, observation):
        """The first method called when the episode starts, called after
        the environment starts.
        Args:
            observation (Numpy array): the state observation from the
                environment's evn_start function.
        Returns:
            action (int): the first action the agent takes.
        """
        
        
        current_q = observation.dot(self.theta.T)
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        self.prev_state = observation
        self.prev_action = action
        return action
        
    
    def agent_step(self, reward, observation):
        """A step taken by the agent.
        Args:
            reward (float): the reward received for taking the last action taken
            observation (Numpy array): the state observation from the
                environment's step based, where the agent ended up after the
                last step.
        Returns:
            action (int): the action the agent is taking.
        """
        
        
        current_q = observation.dot(self.theta.T)
        if self.rand_generator.rand() < self.epsilon:
            action = self.rand_generator.randint(self.num_actions)
        else:
            action = self.argmax(current_q)
        
        self.theta += self.step_size * (reward + self.discount * \
            current_q[action] - self.theta.dot(self.prev_state[self.prev_action].T))*self.prev_state[self.prev_action]
        self.prev_state = observation
        self.prev_action = action
        return action
        
    
    def agent_end(self, reward):
        """Run when the agent terminates.
        Args:
            reward (float): the reward the agent received for entering the
                terminal state.
        """
        
        self.theta += self.step_size * \
            (reward - self.theta.dot(self.prev_state[self.prev_action].T))*self.prev_state[self.prev_action]
        
  
    def argmax(self, q_values):
        """argmax with random tie-breaking
        Args:
            q_values (float): the array of action-values
        """
        best_actions = np.flatnonzero(q_values == q_values.max())
        return self.rand_generator.choice(best_actions)

# Experiment

In [31]:
%%time

agents = {
    "Q-learning": QLearningAgent,
    "Sarsa": SarsaAgent
}
env = Environment
all_thetas = {}
all_qs = {}
agent_info = {"num_actions": 2, "num_params": 2, "epsilon": 0.5, "step_size": 0.001, "discount": 1.0}
env_info = {}
num_runs = 1 # The number of runs
num_episodes = 100000 # The number of episodes in each run

for algorithm in ["Q-learning", "Sarsa"]:
    all_thetas[algorithm] = []
    all_qs[algorithm] = []
    for run in tqdm(range(num_runs)):
        agent_info["seed"] = run
        env_info["seed"] = run
        rl_glue = RLGlue(env, agents[algorithm])
        rl_glue.rl_init(agent_info, env_info)

        for episode in range(num_episodes):
            rl_glue.rl_episode(0) 
            
        all_thetas[algorithm].append(rl_glue.agent.theta)
        
        q = np.zeros((4,2))
        for s in range(4):
            obs = rl_glue.environment.observation(s)
            q[s,:] = obs.dot(rl_glue.agent.theta.T)
        all_qs[algorithm].append(q)

for algorithm in ["Q-learning", "Sarsa"]:
    print(algorithm)
    print('Q(s,a):')
    print(np.array(all_qs[algorithm]).mean(axis=0))
    print('Theta:')
    print(np.array(all_thetas[algorithm]).mean(axis=0))
    print('---')

100%|██████████| 1/1 [00:03<00:00,  3.05s/it]
100%|██████████| 1/1 [00:02<00:00,  2.86s/it]

Q-learning
Q(s,a):
[[ 0.26584002 -0.17848127]
 [ 0.         -0.17848127]
 [ 0.          0.22310159]
 [ 0.26584002  0.22310159]]
Theta:
[-0.22310159  0.26584002]
---
Sarsa
Q(s,a):
[[ 0.31008856 -0.4971869 ]
 [ 0.         -0.4971869 ]
 [ 0.          0.62148362]
 [ 0.31008856  0.62148362]]
Theta:
[-0.62148362  0.31008856]
---
CPU times: user 5.92 s, sys: 0 ns, total: 5.92 s
Wall time: 5.92 s





Note that for Q-learning, Q(S4,A1)>Q(S4,A2) so its policy will choose A1 in S4.

The best achievable policy chooses A2 in S4 and matches the action values found by Sarsa.