In [1]:
from collections import defaultdict
import gymnasium as gym
import numpy as np
import math
import itertools

from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv, Overcooked
from overcooked_ai_py.mdp.overcooked_mdp import OvercookedGridworld
from overcooked_ai_py.visualization.state_visualizer import StateVisualizer
from tqdm import tqdm


In [2]:




class Overcooked_Qlearn:
    def __init__(
        self,
        layout_name: str,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        n_episodes,
        discount_factor: float = 0.95,
    ):
        """Initialize a Q-Learning agent.

        Args:
            env: The training environment
            learning_rate: How quickly to update Q-values (0-1)
            initial_epsilon: Starting exploration rate (usually 1.0)
            epsilon_decay: How much to reduce epsilon each episode
            final_epsilon: Minimum exploration rate (usually 0.1)
            discount_factor: How much to value future rewards (0-1)
        """
        base_mdp = OvercookedGridworld.from_layout_name(layout_name) # or other layout
        base_env = OvercookedEnv.from_mdp(base_mdp, info_level=0, horizon=400)
        env = Overcooked(base_env=base_env, featurize_fn=base_env.featurize_state_mdp)


        self.env = env

        self.possible_action = [*itertools.product(range(6),repeat=2)]


        # Q-table: maps (state, action) to expected reward
        # defaultdict automatically creates entries with zeros for new states
        self.q_values = defaultdict(lambda: np.zeros(len(self.possible_action)))

        self.lr = learning_rate
        self.discount_factor = discount_factor  # How much we care about future rewards

        # Exploration parameters
        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        # Track learning progress
        self.training_error = []

    def get_action(self, obs):

        # With probability epsilon: explore (random action)
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample(),self.env.action_space.sample()

        # With probability (1-epsilon): exploit (best known action)
        else:
            return self.possible_action[np.argmax(self.q_values[obs])]

    def update(
        self,
        obs,
        action,
        reward,
        terminated,
        next_obs,
    ):
        """Update Q-value based on experience.

        This is the heart of Q-learning: learn from (state, action, reward, next_state)
        """
        # What's the best we could do from the next state?
        # (Zero if episode terminated - no future rewards possible)
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])

        # What should the Q-value be? (Bellman equation)
        target = reward + self.discount_factor * future_q_value

        # How wrong was our current estimate?
        temporal_difference = target - self.q_values[obs][self.possible_action.index(action)]

        # Update our estimate in the direction of the error
        # Learning rate controls how big steps we take
        self.q_values[obs][self.possible_action.index(action)] = (
            self.q_values[obs][self.possible_action.index(action)] + self.lr * temporal_difference
        )

        # Track learning progress (useful for debugging)
        self.training_error.append(temporal_difference)

    def decay_epsilon(self):
        """Reduce exploration rate after each episode."""
        self.epsilon = max(self.final_epsilon, self.epsilon - self.epsilon_decay)

In [3]:
# Training hyperparameters
learning_rate = 0.02        # How fast to learn (higher = faster but less stable)
n_episodes = 5000        # Number of hands to practice
start_epsilon = 1.0         # Start with 100% random actions
epsilon_decay = start_epsilon / (n_episodes *4)  # Reduce exploration over time
final_epsilon = 0.2         # Always keep some exploration


agent = Overcooked_Qlearn(
    layout_name="cramped_room",
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
    n_episodes=n_episodes,
)

In [13]:
  # Progress bar
n_episodes = 100
for episode in tqdm(range(n_episodes)):
    obs = agent.env.reset()
    done = False

    # Play one complete hand
    while not done:
        # Agent chooses action (initially random, gradually more intelligent)
        action = agent.get_action(tuple(obs['both_agent_obs'][0] + obs['both_agent_obs'][1]))
        StateVisualizer().display_rendered_state(obs['overcooked_state'],window_display=True,grid=agent.env.mdp.terrain_mtx)


        # Take action and observe result
        next_obs, reward, terminated, event_info = agent.env.step(action)

        if terminated:
            subgoal_reward = event_info['episode']["ep_shaped_r"]
            reward = reward + (subgoal_reward - int(subgoal_reward/ ((episode + 10)*10)))


        # Learn from this experience
        agent.update(tuple(obs['both_agent_obs'][0] + obs['both_agent_obs'][1]),
                     action, reward, terminated,
                     tuple(next_obs['both_agent_obs'][0] + next_obs['both_agent_obs'][1]))


        #StateVisualizer().display_rendered_state(next_obs['overcooked_state'],window_display=True,grid=agent.env.mdp.terrain_mtx)

        # Move to next state
        done = terminated
        obs = next_obs
    # Reduce exploration rate (agent becomes less random over time)
    agent.decay_epsilon()

  0%|          | 0/100 [00:49<?, ?it/s]


KeyboardInterrupt: 

In [12]:
# Test the trained agent
def test_agent(agent, env, num_episodes=1000):
    """Test agent performance without learning or exploration."""
    total_rewards = []

    # Temporarily disable exploration for testing
    old_epsilon = agent.epsilon
    agent.epsilon = 0.0  # Pure exploitation

    for episode in tqdm(range(num_episodes)):
        obs = env.reset()
        episode_reward = 0
        done = False

        while not done:
            StateVisualizer().display_rendered_state(obs['overcooked_state'],window_display=True,grid=agent.env.mdp.terrain_mtx)
            action = agent.get_action(tuple(obs['both_agent_obs'][0] + obs['both_agent_obs'][1]))
            next_obs, reward, terminated, event_info = env.step(action)
            if reward > 0:
                print(reward)
                print("soup delivered")
            episode_reward += reward
            done = terminated
            obs = next_obs

        total_rewards.append(episode_reward)

    # Restore original epsilon
    agent.epsilon = old_epsilon

    win_rate = np.mean(np.array(total_rewards) > 0)
    average_reward = np.mean(total_rewards)

    print(f"Test Results over {num_episodes} episodes:")
    print(f"Win Rate: {win_rate:.1%}")
    print(f"Average Reward: {average_reward:.3f}")
    print(f"Standard Deviation: {np.std(total_rewards):.3f}")

# Test your agent
test_agent(agent, agent.env)

  0%|          | 0/1000 [00:28<?, ?it/s]


KeyboardInterrupt: 