In [73]:
# Import the necessary libraries
import numpy as np
import gymnasium as gym
from IPython.display import clear_output
import time

# Create the Taxi-v3 environment
env = gym.make('Taxi-v3', render_mode = 'human')

'''
0. Move South
1. Move North
2. Move East
3. Move West
4. Pickup a passenger
5. Drop off a passenger
'''


'\n0. Move South\n1. Move North\n2. Move East\n3. Move West\n4. Pickup a passenger\n5. Drop off a passenger\n'

In [74]:
def value_iteration(env, gamma=0.9, threshold=0.001):
    """
    Perform value iteration for the given environment.

    Args:
        env: The Gym environment.
        gamma: Discount factor for future rewards.
        threshold: Convergence threshold for value iteration.

    Returns:
        A tuple of (value_table, policy), where:
        - value_table is a numpy array containing the value of each state.
        - policy is a numpy array containing the best action for each state.
    """
    
    # Initialize value table with zeros, one for each environment state
    value_table = np.zeros(env.observation_space.n)
    
    # Keep iterating until value changes are below the threshold
    while True:
        # Keep a copy of the value table to check for convergence
        updated_value_table = np.copy(value_table)
        
        # Update the value for each state
        for state in range(env.observation_space.n):
            # Compute the value of each action
            Q_values = [sum([prob * (reward + gamma * updated_value_table[next_state]) 
                             for prob, next_state, reward, _ in env.P[state][action]])
                        for action in range(env.action_space.n)]
            # Update the value for this state to the maximum Q value
            value_table[state] = max(Q_values)
        
        # Check if values have converged (change is less than threshold)
        if np.sum(np.fabs(updated_value_table - value_table)) <= threshold:
            break
    
    # Extract the policy from the value table by choosing the action with the highest value
    policy = np.zeros(env.observation_space.n, dtype=int)
    for state in range(env.observation_space.n):
        Q_values = [sum([prob * (reward + gamma * value_table[next_state]) 
                         for prob, next_state, reward, _ in env.P[state][action]])
                    for action in range(env.action_space.n)]
        policy[state] = np.argmax(Q_values)
    
    return value_table, policy

print("Value iteration function defined.")


Value iteration function defined.


In [75]:
# Apply value iteration to the environment to find the optimal value function and policy
optimal_value_function, optimal_policy = value_iteration(env, gamma=0.9, threshold=1e-10)

print("Optimal value function and policy computed.")


Optimal value function and policy computed.


In [76]:
def run_with_optimal_policy(env, policy, max_steps=100):
    """
    Runs and renders an episode using the optimal policy.

    Args:
        env: The Gym environment.
        policy: The optimal policy to follow.
        max_steps: The maximum number of steps to run the simulation for.

    Returns:
        total_reward: The total reward accumulated in the episode.
    """
    current_state = env.reset()
    total_reward = 0
    for step in range(max_steps):
        # Clear the output cell to make the animation
        clear_output(wait=True)
        
        # Render the environment's current state
        env.render()
        
        # If the state is a tuple (which it should not normally be), we handle it
        if isinstance(current_state, tuple):
            current_state = current_state[0]
            
        # Select the action based on the optimal policy
        action = policy[current_state]
        
        # Step the environment with the selected action
        step_result = env.step(action)
        
        # Update the current state, reward, and done flag from the step result
        current_state = step_result[0]
        reward = step_result[1]
        done = step_result[2]
        
        # Accumulate the reward
        total_reward += reward
        
        # Print current step, action taken, and the reward received
        action_names = ['South', 'North', 'East', 'West', 'Pickup', 'Dropoff']
        print(f"Step: {step}, Action: {action_names[action]}, Reward: {reward}, Total Reward: {total_reward}")
        
        # If the episode is finished, break the loop
        if done:
            print("The agent has successfully completed its task!")
            break
        
        # Pause for a short time to watch the animation
        time.sleep(0.5)

    # Close the environment to free resources
    env.close()
    return total_reward

# Run the visualization function to see the taxi agent in action
total_reward = run_with_optimal_policy(env, optimal_policy)
print(f"Total reward from the episode: {total_reward}")


Step: 14, Action: Dropoff, Reward: 20, Total Reward: 6
The agent has successfully completed its task!
Total reward from the episode: 6


In [71]:
''''def create_env():
    return gym.make('Taxi-v3')
def run_multiple_episodes_with_optimal_policy(env_creator, policy, num_episodes=10, max_steps=100):
    episode_rewards = []
    for episode in range(num_episodes):
        env = env_creator()  # Create a new environment for each episode
        total_reward = run_with_optimal_policy(env, policy, max_steps)
        episode_rewards.append(total_reward)
        print(f"Episode {episode}: Total Reward: {total_reward}")
    return episode_rewards
episode_rewards = run_multiple_episodes_with_optimal_policy(create_env, optimal_policy, num_episodes=10)
print(f"Average reward over 10 episodes: {np.mean(episode_rewards)}")'''


'\'def create_env():\n    return gym.make(\'Taxi-v3\')\ndef run_multiple_episodes_with_optimal_policy(env_creator, policy, num_episodes=10, max_steps=100):\n    episode_rewards = []\n    for episode in range(num_episodes):\n        env = env_creator()  # Create a new environment for each episode\n        total_reward = run_with_optimal_policy(env, policy, max_steps)\n        episode_rewards.append(total_reward)\n        print(f"Episode {episode}: Total Reward: {total_reward}")\n    return episode_rewards\nepisode_rewards = run_multiple_episodes_with_optimal_policy(create_env, optimal_policy, num_episodes=10)\nprint(f"Average reward over 10 episodes: {np.mean(episode_rewards)}")'