In [11]:
import gym
import numpy as np
import imageio
import matplotlib.pyplot as plt
from collections import defaultdict

In [12]:
# Initialize Environment
env = gym.make("Taxi-v3", render_mode="rgb_array")
num_states = env.observation_space.n
num_actions = env.action_space.n

print("State space:", num_states)
print("Action space:", num_actions)

State space: 500
Action space: 6


In [13]:
#  Monte Carlo Control with Exploring Starts
def monte_carlo_control(env, num_episodes=50, gamma=0.99, epsilon=0.1):
    """Monte Carlo Control using Exploring Starts"""
    Q = defaultdict(lambda: np.zeros(num_actions))  # Action-value function
    returns = defaultdict(list)

    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        episode_data = []

        # Generate an episode
        while not done:
            action = np.random.choice(num_actions) if np.random.rand() < epsilon else np.argmax(Q[state])
            next_state, reward, done, _, _ = env.step(action)
            episode_data.append((state, action, reward))
            state = next_state

        # Compute returns and update Q-values
        G = 0
        visited = set()
        for state, action, reward in reversed(episode_data):
            G = gamma * G + reward
            if (state, action) not in visited:
                visited.add((state, action))
                returns[(state, action)].append(G)
                Q[state][action] = np.mean(returns[(state, action)])

    # Extract optimal policy
    policy = {s: np.argmax(Q[s]) for s in range(num_states)}
    return policy


In [14]:
# Dynamic Programming - Policy Iteration
def policy_iteration(env, gamma=0.99, theta=1e-6):
    """Dynamic Programming - Policy Iteration"""
    policy = np.random.choice(num_actions, size=num_states)
    V = np.zeros(num_states)

    while True:
        # Policy Evaluation
        while True:
            delta = 0
            for s in range(num_states):
                v = V[s]
                a = policy[s]
                V[s] = sum(prob * (reward + gamma * V[next_s])
                           for prob, next_s, reward, _ in env.P[s][a])
                delta = max(delta, abs(v - V[s]))
            if delta < theta:
                break

        # Policy Improvement
        policy_stable = True
        for s in range(num_states):
            old_action = policy[s]
            action_values = [sum(prob * (reward + gamma * V[next_s])
                                 for prob, next_s, reward, _ in env.P[s][a]) for a in range(num_actions)]
            policy[s] = np.argmax(action_values)
            if old_action != policy[s]:
                policy_stable = False

        if policy_stable:
            break

    return policy


In [None]:
#  Train Monte Carlo & Dynamic Programming Policies
print("\nTraining Monte Carlo Policy...")
mc_policy = monte_carlo_control(env)
print("Monte Carlo Training Complete.")


Training Monte Carlo Policy...


In [None]:
import gym
import numpy as np
import random
import time
import pickle
from collections import defaultdict

def monte_carlo_control(env, num_episodes=5, gamma=0.99, epsilon=0.1):
    """Monte Carlo Control using Exploring Starts"""
    Q = defaultdict(lambda: np.zeros(env.action_space.n))  # Action-value function
    returns = defaultdict(list)

    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        episode_data = []

        # Generate an episode
        while not done:
            action = np.random.choice(env.action_space.n) if np.random.rand() < epsilon else np.argmax(Q[state])
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncatedy66
            episode_data.append((state, action, reward))
            state = next_state

        # Compute returns and update Q-values
        G = 0
        visited = set()
        for state, action, reward in reversed(episode_data):
            G = gamma * G + reward
            if (state, action) not in visited:
                visited.add((state, action))
                returns[(state, action)].append(G)
                Q[state][action] = np.mean(returns[(state, action)])

    # Extract optimal policy
    policy = {s: np.argmax(Q[s]) for s in range(env.observation_space.n)}
    return policy

# Initialize Environment
env = gym.make("Taxi-v3")
print("\nTraining Monte Carlo Policy...")
mc_policy = monte_carlo_control(env)
print("Monte Carlo Training Complete.")


In [None]:
print("\nTraining Dynamic Programming Policy...")
dp_policy = policy_iteration(env)
print("Dynamic Programming Training Complete.")

In [None]:
# Test the Policies
def test_policy(env, policy, num_episodes=5):
    """Runs a given policy on the environment"""
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = policy[state]  # Select action based on learned policy
            state, reward, done, _, _ = env.step(action)
            total_reward += reward

        print(f"Episode {episode + 1}: Total Reward = {total_reward}")




In [None]:
print("\nTesting Monte Carlo Policy...")
test_policy(env, mc_policy)

print("\nTesting Dynamic Programming Policy...")
test_policy(env, dp_policy)


In [None]:

# Generate Navigation GIF
def generate_navigation_gif(env, policy, filename="navigation.gif"):
    """Generates a GIF of the agent navigating the environment."""
    frames = []
    state, _ = env.reset()
    done = False

    while not done:
        action = policy[state]  # Select action
        state, _, done, _, _ = env.step(action)
        frame = env.render()
        frames.append(frame)

    imageio.mimsave(filename, frames, duration=0.5)
    print(f"\nNavigation GIF saved as {filename}")


# Generate GIFs for both policies
generate_navigation_gif(env, mc_policy, filename="mc_navigation.gif")
generate_navigation_gif(env, dp_policy, filename="dp_navigation.gif")

In [None]:
print("\nTesting Monte Carlo Policy...")
test_policy(env, mc_policy)

print("\nTesting Dynamic Programming Policy...")
test_policy(env, dp_policy)

In [None]:
#  Display Final Frame
last_frame = env.render()
plt.imshow(last_frame)
plt.axis("off")
plt.show()

In [None]:
episode_returns = []
for episode in range(num_episodes):
    state, _ = env.reset()
    done = False
    episode_data = []
    ep_return = 0  # Log reward for this episode

    while not done:
        action = np.random.choice(num_actions) if np.random.rand() < epsilon else np.argmax(Q[state])
        next_state, reward, done, _, _ = env.step(action)
        episode_data.append((state, action, reward))
        ep_return += reward
        state = next_state
    episode_returns.append(ep_return)
    # ... (rest of the Monte Carlo update)


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Compute rolling average with a window size of 100 episodes
rolling_avg = pd.Series(episode_returns).rolling(window=100).mean()
plt.figure(figsize=(10, 5))
plt.plot(rolling_avg)
plt.title("Monte Carlo Training: Rolling Average Return")
plt.xlabel("Episode")
plt.ylabel("Average Total Return")
plt.show()


In [None]:
deltas = []
while True:
    delta = 0
    for s in range(num_states):
        v = V[s]
        a = policy[s]
        V[s] = sum(prob * (reward + gamma * V[next_s])
                   for prob, next_s, reward, _ in env.P[s][a])
        delta = max(delta, abs(v - V[s]))
    deltas.append(delta)
    if delta < theta:
        break

# Plot the delta convergence
plt.figure(figsize=(10, 5))
plt.plot(deltas)
plt.title("Policy Evaluation Convergence in Policy Iteration")
plt.xlabel("Iteration")
plt.ylabel("Max Change in Value (Delta)")
plt.show()


In [None]:
def evaluate_policy(env, policy, num_episodes=100):
    rewards = []
    for episode in range(num_episodes):
        state, _ = env.reset()
        done = False
        ep_return = 0
        while not done:
            action = policy[state]
            state, reward, done, _, _ = env.step(action)
            ep_return += reward
        rewards.append(ep_return)
    return rewards

mc_test_rewards = evaluate_policy(env, mc_policy, num_episodes=100)
dp_test_rewards = evaluate_policy(env, dp_policy, num_episodes=100)

plt.figure(figsize=(10, 5))
plt.plot(mc_test_rewards, label="Monte Carlo")
plt.plot(dp_test_rewards, label="Dynamic Programming (Policy Iteration)")
plt.title("Test Episode Returns Comparison")
plt.xlabel("Test Episode")
plt.ylabel("Total Return")
plt.legend()
plt.show()
