In [2]:
import os
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt


from IPython.display import Image
from matplotlib import animation
from tqdm.notebook import tqdm

# Problem statement

Given Taxi-v3 game environment, train an RL Agent to complete this game using Q-Learning. 

## Environment description

+ Environment type: **Deterministic** environment. 
+ Action Space: Discrete(6). 
+ State Space: Discrete(500). 
+ Game Objective: Drop-off passenger to the right destination.
+ Terminate State: Taxi drop-off passenger to the right destination or game timeout.
+  Reward function:
    + +20 points for a correct drop-off.
    + -1 point for each move.
    + -10 points for a wrong drop-off

In [4]:
env_id = 'Taxi-v3'
env = gym.make(env_id, render_mode='rgb_array')

state_space = env.observation_space.n 
action_space = env.action_space.n

print('Observation Space: ', state_space)
print('Action Space: ', action_space)

Observation Space:  500
Action Space:  6


In [5]:
def init_q_table(state_space, action_space): 
    q_table = np.zeros((state_space, action_space))
    return q_table 

def greedy_policy(q_table, state): 
    action = np.argmax(q_table[state, :])
    return action 

def epsilon_policy(q_table, state, epsilon): 
    """
        This policiy can ensure the balance between exploration and exploitation: 
            + Exploration: seeking to new knowledge about the environment by trying different actions and observe new state. 
            + Exploitation: Using knowledge agent already got to make actions that it expects will yield high rewards. 
    """
    rand_n = float(np.random.uniform(0, 1))

    if rand_n > epsilon: 
        action = greedy_policy(q_table, state)
    else: 
        action = np.random.choice(q_table.shape[1])
    return action

In [8]:
n_training_episodes = 300
n_eval_episodes = 10
lr = 0.7

max_steps = 99
gamma = 0.95
eval_seed = range(n_eval_episodes)

max_epsilon = 1.0
min_epsilon = 0.05
decay_rate = 0.0005

In [10]:
def train(env, 
          q_table, 
          max_steps=max_steps, 
          n_training_episodes=n_training_episodes, 
          min_epsilon=min_epsilon, 
          max_epsilon=max_epsilon, 
          decay_rate=decay_rate, 
          lr=lr, 
          gamma=gamma):

    for episode in tqdm(range(n_training_episodes), desc='Training model ...'): 
        epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate*episode) 

        state, info = env.reset()
        step = 0
        terminated, truncated = False, False

        for step in range(max_steps): 
            action = epsilon_policy(q_table, state, epsilon)
            new_state, reward, terminated, truncated, info = env.step(action)
            q_table[state, action] += lr * (reward + gamma*np.max(q_table[new_state]) - q_table[state, action])

            if terminated or truncated: 
                break

            state = new_state

In [11]:
q_table = init_q_table(state_space, action_space)
train_q_table = train(env, 
                      q_table)

Training model ...:   0%|          | 0/300 [00:00<?, ?it/s]

In [15]:
print(q_table.shape)
print(q_table)

(500, 6)
[[  0.           0.           0.           0.           0.
    0.        ]
 [ -3.94067474  -3.05795711  -4.03170362  -3.20909937  -2.9850844
  -12.54565266]
 [ -2.80919142  -1.9447495   -2.8316826   -1.7178      -1.46543439
  -10.4748    ]
 ...
 [ -3.10865425  -2.59361503  -3.19659993  -2.8024386  -11.49013232
  -12.10897621]
 [ -3.99024184  -3.70766748  -4.01472188  -4.17976238 -13.1644032
  -13.15448383]
 [  0.           0.           0.           0.           0.
    0.        ]]


In [16]:
def evaluate_agent(env, q_table, max_steps, seed): 
    episode_rewards = []

    for episode in tqdm(range(n_eval_episodes)): 
        if seed:
            state, info = env.reset(seed=seed[episode])

        else: 
            state, info = env.reset()

        step, truncated, terminated = 0, False, False 
        total_rewards_ep = 0

        for step in range(max_steps): 
            action = greedy_policy(q_table, state)
            new_state, reward, terminated, truncated, info = env.step(action)
            total_rewards_ep += reward

            if terminated or truncated: 
                break

            state = new_state

        episode_rewards.append(total_rewards_ep)
    
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward

In [17]:
mean_reward, std_reward = evaluate_agent(env, q_table, max_steps=max_steps, seed=eval_seed)

  0%|          | 0/10 [00:00<?, ?it/s]

In [18]:
print(mean_reward)
print(std_reward)

-99.0
0.0
