In [2]:
# import dependencies

import gymnasium as gym
import numpy as np
import random
from random import randint
from IPython.display import clear_output
from time import sleep


In [3]:
# create env

env=gym.make("Taxi-v3", render_mode="ansi") #"human", "ansi", "rgb_array"
env.reset()
print(env.render())

+---------+
|R: | : :[35mG[0m|
| : | : : |
| : : : : |
|[43m [0m| : | : |
|[34;1mY[0m| : |B: |
+---------+




In [4]:
# create Q table

action_size=env.action_space.n
print("Action size", action_size)

state_size=env.observation_space.n
print("State size", state_size)

Action size 6
State size 500


In [5]:
qtable=np.zeros ((state_size, action_size))
print(qtable)

[[0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 ...
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0.]]


In [6]:
# Episodes

total_episodes = 100000
total_test_episodes = 1000
max_steps = 99



In [7]:
learning_rate = 0.7     #learning rate (step size on map)
gamma = 0.618           #Discounting rate

#exploration parameters

epsilon = 1.0           #Exploration rate
max_epsilon = 1.0       #Exploration probability at start
min_epsilon = 0.01      #Minimum exploration probability
decay_rate = 0.01       #Exponential decay rate for exploration prob

In [8]:
for episode in range(total_episodes):
    # Reset env for start
    state = env.reset()[0]
    step = 0
    done = False
    total_rewards = 0  
    for step in range(max_steps):
            # 3. Choose an action a in the current world state (s)
            ## First we randomize a number
            exp_exp_tradeoff = random.uniform(0,1)
            
            ## If this number > greater than epsilon --> exploitation (taking the biggest Q value for this state)
            if exp_exp_tradeoff > epsilon:
                action = np.argmax(qtable[state,:])
            
            # Else doing a random choice --> exploration
            else:
                action = env.action_space.sample()
            
            # Take the action (a) and observe the outcome state(s') and reward (r)
            new_state, reward, done, info, _ = env.step(action)
            # Update Q(s,a):= Q(s,a) + lr [R(s,a) + gamma * max Q(s',a') - Q(s,a)]
            qtable[state, action] = qtable[state, action] + learning_rate * (reward + gamma * np.max(qtable[new_state,:]) - qtable[state, action])
            
            # Our new state is state
            state = new_state
            
            # If done : finish episode
            if done == True: 
                    break
        
            # Reduce epsilon (because we need less and less exploration)
            epsilon = min_epsilon + (max_epsilon - min_epsilon)*np.exp(-decay_rate*episode)
    if episode % 100 == 0:
        clear_output(wait=True)
        print(f"Episode: {episode}")

print("Finished")           

Episode: 99900
Finished


In [15]:
qtable

array([[  0.        ,   0.        ,   0.        ,   0.        ,
          0.        ,   0.        ],
       [ -2.50421541,  -2.43400544,  -2.50421536,  -2.43400544,
         -2.32039715, -11.43400312],
       [ -1.83910191,  -1.35777508,  -1.83910436,  -1.35778444,
         -0.57891593, -10.35777005],
       ...,
       [ -2.1430876 ,   0.68134715,  -2.12586921,  -2.2253835 ,
         -7.        , -10.58404736],
       [ -2.36863532,  -2.32211961,  -2.36351949,  -2.13656497,
        -10.75480486, -10.95144718],
       [ -1.21282   ,  -0.91      ,  -1.21282   ,  11.36      ,
         -7.        ,  -7.        ]])

In [16]:
env.reset()[0]
rewards = []

for episode in range(total_test_episodes):
    state = env.reset()[0]
    step = 0
    done = False
    total_rewards = 0
    
    
    for step in range(max_steps):
        env.render()
        # Take the action (index) that have the maximum expected future reward given that state
        action = np.argmax(qtable[state,:])
        
        new_state, reward, done, info, _ = env.step(action)
        
        total_rewards += reward
        
        if done:
            rewards.append(total_rewards)
            break
        state = new_state
    if episode % 100 == 0:
        clear_output(wait=True)
        print(f"Episode: {episode}")

print("Finished")

Episode: 900
Finished


In [30]:
def print_frames(frames, sleep_time, episodes, random):
    if random:
        random_number_list = []
        for e in range(episodes):
            random_number = randint(1, episodes)
            random_number_list.append(random_number)
        
    for i, frame in enumerate(frames, start=1):
        if random and frame["episode"] in random_number_list:
            clear_output(wait=True)
            
            print(f"Episode: {frame['episode']}")
            
            print(frame['frame'])
            
            print(f"Timestep: {i}")
            print(f"State: {frame['state']}")
            print(f"Action: {frame['action']}")
            print(f"Reward: {frame['reward']}")
            
            sleep(sleep_time)
        elif not random and frame["episode"] < episodes: 
            clear_output(wait=True)
            
            print(f"Episode: {frame['episode']}")
            
            print(frame['frame'])
            
            print(f"Timestep: {i}")
            print(f"State: {frame['state']}")
            print(f"Action: {frame['action']}")
            print(f"Reward: {frame['reward']}")
            
            sleep(sleep_time)

In [31]:
frames = []
total_penalties = 0
total_epochs = 0

for i in range(episode):
    # Reset the environment
    state = env.reset()[0]
    
    # Initialize fields
    epochs = penalties = reward = 0
    done = False
    
    # Start the episode process
    while not done:
        # Only Exploitation during the evaluation phase
        action = np.argmax(qtable[state])
        
        # Performing action inside the environment
        state, reward, done, info, _ = env.step(action)
        
        # Put each rendered frame into a dict for animation
        frames.append({
            "episode": i,
            "frame": env.render(),
            "state": state,
            "action": action,
            "reward": reward
        })
        
        # Getting stats when the agent performed illegal action (pickup or dropoff)
        penalties += 1 if reward == -10 else 0
        
        epochs += 1
    
    total_penalties += penalties
    total_epochs += epochs

In [32]:
print(f"Results ({episode} episodes)")
print(f"Average timesteps: {total_epochs / episode}")
print(f"Average penalties: {total_penalties / episode}")

Results (999 episodes)
Average timesteps: 13.037037037037036
Average penalties: 0.0


In [33]:
print_frames(frames, sleep_time=0.1, episodes=1000, random=True)

Episode: 2
+---------+
|R: | : :[35mG[0m|
| : | :[42m_[0m: |
| : : : : |
| | : | : |
|Y| : |B: |
+---------+
  (North)

Timestep: 37
State: 177
Action: 1
Reward: -1


KeyboardInterrupt: 