In [1]:
import gym
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

In [2]:
env = gym.make("Taxi-v3").env
env.render()
print("Action Space {}".format(env.action_space))
print("State Space {}".format(env.observation_space))
print('Blue colored letter denotes the pickup location and purple colored letter denotes the drop location.')

+---------+
|R: | : :[34;1mG[0m|
| : | : : |
| : :[43m [0m: : |
| | : | : |
|Y| : |[35mB[0m: |
+---------+

Action Space Discrete(6)
State Space Discrete(500)
Blue colored letter denotes the pickup location and purple colored letter denotes the drop location.


In [3]:
state = env.encode(3, 1, 2, 0) # (taxi row, taxi column, passenger index, destination index)
print("State:", state)
env.s=state
env.render()

State: 328
+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : : : |
| |[43m [0m: | : |
|[34;1mY[0m| : |B: |
+---------+



In [4]:
env.P[state]

{0: [(1.0, 428, -1, False)],
 1: [(1.0, 228, -1, False)],
 2: [(1.0, 348, -1, False)],
 3: [(1.0, 328, -1, False)],
 4: [(1.0, 328, -10, False)],
 5: [(1.0, 328, -10, False)]}

In [5]:
import random
from collections import defaultdict

class Agent:

    def __init__(self, algorithm='sarsamax', start_epsilon=1, epsilon_decay=0.9, epsilon_cut=0.1, alpha=0.01, gamma=1,
                 nA=6):
        """ Initialize agent.
        Params
        ======
        - nA: number of actions available to the agent
        """
        algos = {
            'sarsamax': self.step_sarsamax,
            'exp_sarsa': self.step_exp_sarsa
        }

        self.step = algos[algorithm]
        self.Q = defaultdict(lambda: np.zeros(self.nA))
        self.epsilon, self.epsilon_decay, self.epsilon_cut, self.alpha, self.gamma, self.nA = \
            start_epsilon, epsilon_decay, epsilon_cut, alpha, gamma, nA

    def select_action(self, state):
        r = random.random()
        if r > self.epsilon:   # select greedy action with probability epsilon
            return np.argmax(self.Q[state])
        else:  # otherwise, select an action randomly
            return random.randint(0, 5)

    def get_probs(self, Q_s, epsilon, nA):
        """ obtains the action probabilities corresponding to epsilon-greedy policy """
        policy_s = np.ones(nA) * epsilon / nA
        best_a = np.argmax(Q_s)
        policy_s[best_a] = 1 - epsilon + (epsilon / nA)
        return policy_s

    def step_exp_sarsa(self, state, action, reward, next_state, done):
        """ Update the agent's knowledge, using the most recently sampled tuple.
        Params
        ======
        - state: the previous state of the environment
        - action: the agent's previous choice of action
        - reward: last reward received
        - next_state: the current state of the environment
        - done: whether the episode is complete (True or False)
        """
        if not done:
            probs = self.get_probs(self.Q[next_state], self.epsilon, self.nA)

            self.Q[state][action] += self.alpha * (
                        reward + self.gamma * np.dot(probs, self.Q[next_state]) - self.Q[state][action])
        else:
            self.Q[state][action] += self.alpha * (reward - self.Q[state][action])
            self.epsilon = self.epsilon * self.epsilon_decay
            if self.epsilon_cut is not None:
                self.epsilon = max(self.epsilon, self.epsilon_cut)

    def step_sarsamax(self, state, action, reward, next_state, done):
        """ Update the agent's knowledge, using the most recently sampled tuple.
        Params
        ======
        - state: the previous state of the environment
        - action: the agent's previous choice of action
        - reward: last reward received
        - next_state: the current state of the environment
        - done: whether the episode is complete (True or False)
        """
        if not done:
            self.Q[state][action] += self.alpha * (
                        reward + self.gamma * np.max(self.Q[next_state]) - self.Q[state][action])
        else:
            self.Q[state][action] += self.alpha * (reward - self.Q[state][action])
            self.epsilon = self.epsilon * self.epsilon_decay
            if self.epsilon_cut is not None:
                self.epsilon = max(self.epsilon, self.epsilon_cut)

In [8]:
from collections import deque
import sys
import math
def interact(env, agent, num_episodes=20000, window=100, print_logs=False):
    """ Monitor agent's performance.
    
    Params
    ======
    - env: instance of OpenAI Gym's Taxi-v1 environment
    - agent: instance of class Agent (see Agent.py for details)
    - num_episodes: number of episodes of agent-environment interaction
    - window: number of episodes to consider when calculating average rewards
    Returns
    =======
    - avg_rewards: deque containing average rewards
    - best_avg_reward: largest value in the avg_rewards deque
    """
    frames = []
    # initialize average rewards
    avg_rewards = deque(maxlen=num_episodes)
    # initialize best average reward
    best_avg_reward = -math.inf
    # initialize monitor for most recent rewards
    samp_rewards = deque(maxlen=window)
    # for each episode
    for i_episode in range(1, num_episodes+1):
        #print('\ni: ', i_episode)
        # begin the episode
        state = env.reset()
        # initialize the sampled reward
        samp_reward = 0
        while True:
            # agent selects an action
            action = agent.select_action(state)
            # agent performs the selected action
            next_state, reward, done, _ = env.step(action)
            # agent performs internal updates based on sampled experience
            agent.step(state, action, reward, next_state, done)
            # update the sampled reward
            samp_reward += reward
            # update the state (s <- s') to next time step
            state = next_state
            frames.append({'frame': env.render(mode='ansi'),
                              'episode': i_episode,
                               'state': state,
                               'action': action,
                               'reward': best_avg_reward}
                             )
            if done:
                # save final sampled reward
                samp_rewards.append(samp_reward)
                break
        if (i_episode >= 100):
            # get average reward from last 100 episodes
            avg_reward = np.mean(samp_rewards)
            # append to deque
            avg_rewards.append(avg_reward)
            # update best average reward
            if avg_reward > best_avg_reward:
                best_avg_reward = avg_reward
                
        # monitor progress
        if print_logs:
            print("\rEpisode {}/{} || Best average reward {}".format(i_episode, num_episodes, best_avg_reward), end="")
        # check if task is solved (according to OpenAI Gym)
        if best_avg_reward >= 9.7:
            if print_logs:
                print('\nEnvironment solved in {} episodes.'.format(i_episode), end="")
            break
        if i_episode == num_episodes: print('\n')
    return avg_rewards, best_avg_reward, frames

In [9]:
agent = Agent(algorithm= 'sarsamax',
              alpha= 0.2512238484351891,
              epsilon_cut= 0,
              epsilon_decay= 0.8888782926665223,
              start_epsilon= 0.9957089031634627,
              gamma= 0.7749915552696941)

avg_rewards, best_avg_reward, frames = interact(env, agent)





In [6]:
from IPython.display import clear_output
from time import sleep

def print_frames(frames):
    for i, frame in enumerate(frames):
        clear_output(wait=True)
        print(frame['frame'])
        print(f"Episode: {frame['episode']}")
        print(f"Timestep: {i + 1}")
        print(f"State: {frame['state']}")
        print(f"Action: {frame['action']}")
        print(f"Reward: {frame['reward']}")
        sleep(.1)
        
#print_frames(frames)

In [10]:
print_frames(frames)

+---------+
|R: | : :G|
| : | : : |
| : : : : |
| | : | : |
|[35mY[0m| : |[42mB[0m: |
+---------+
  (West)

Episode: 13
Timestep: 13829
State: 478
Action: 3
Reward: -inf


KeyboardInterrupt: 