In [2]:
### Install libraries ###

#'!pip install git+https://github.com/HumanCompatibleAI/overcooked_ai.git

In [3]:
### Imports ###

from overcooked_ai_py.mdp.overcooked_mdp import OvercookedGridworld
from overcooked_ai_py.mdp.overcooked_env import OvercookedEnv
from overcooked_ai_py.agents.benchmarking import AgentEvaluator
from overcooked_ai_py.visualization.state_visualizer import StateVisualizer
from overcooked_ai_py.agents.agent import NNPolicy, AgentFromPolicy, AgentPair
import gym
import numpy as np
import torch
from PIL import Image
import os
from IPython.display import display, Image as IPImage
# from google.colab import drive

In [4]:
from DQN_classes import DQN, DQNLearningAgent,trainDQN
from collections import deque
import pandas as pd

In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [6]:
### Environment setup ###

# Swap between the 5 layouts here:
# layout = "cramped_room"
# layout = "asymmetric_advantages"
layout = "coordination_ring"
# layout = "forced_coordination"
# layout = "counter_circuit_o_1order"

# Reward shaping is disabled by default.  This data structure may be used for
# reward shaping.  You can, of course, do your own reward shaping in lieu of, or
# in addition to, using this structure.
reward_shaping = {
    "PLACEMENT_IN_POT_REW": 3,
    "DISH_PICKUP_REWARD": 4,
    "SOUP_PICKUP_REWARD": 5,
    'DISH_DISP_DISTANCE_REW': 0, 
    'POT_DISTANCE_REW': 0, 
    'SOUP_DISTANCE_REW': 0
}

reward_shaping_test = {
    "onion_pickup":0,
    "onion_drop":0,
    "useful_onion_pickup": 0,
    "not_useful_onion_pickup": 0,
    "useful_onion_drop": 1,
    'not_useful_onion_drop': -1,
    'onion_potting': 3,
    'useful_dish_pickup': 3,
    'not_useful_dish_pickup':-3,
    'useful_dish_drop':3,
    'not_useful_dish_drop':-3,
    'soup_pickup':10,
    'bad_soup_pickup':-3,
    'soup_drop':-10,
    'no_action':0
}

# Length of Episodes.  Do not modify for your submission!
# Modification will result in a grading penalty!
horizon = 400

# Build the environment.  Do not modify!
mdp = OvercookedGridworld.from_layout_name(
    layout, rew_shaping_params=reward_shaping)
base_env = OvercookedEnv.from_mdp(mdp, horizon=horizon, info_level=0)
env = gym.make("Overcooked-v0", base_env=base_env,
               featurize_fn=base_env.featurize_state_mdp)

In [7]:
def dump(a):
    print(f"Agent Orientation: {a[0:4]}")
    print(f"Agent holdings: {a[4:8]}")
    print(f"Agent distance to onion,tomato,dish,soup: {a[8:16]}")
    print(f"N ingrediants in closest soup (Onion, tomato): {a[16:18]}")
    print(f"Agent distance to ServingArea, counter: {a[18:22]}")
    print(f"Pot 1 Available: {a[22]}")
    print(f"Pot 1 Status: {a[23:27]}")
    print(f"N of Onion, tomato in Pot 1: {a[27:29]}")
    print(f"Time remaining in Pot 1: {a[29]}")
    print(f"Distance to the Pot 1: {a[30:32]}")
    print(f"Pot Available 2: {a[32]}")
    print(f"Pot Status 2: {a[33:37]}")
    print(f"N of Onion, tomato in Pot 2: {a[37:39]}")
    print(f"Time remaining in Pot 2: {a[39]}")
    print(f"Distance to the Pot 2: {a[40:42]}")
    print(f"Facing walls: {a[42:46]}")

In [8]:
def calculate_rewards(baseenv_state,step,obs,actions):
    if actions == (4,4):
        return [-1,-1,0]
    # Define the rewards/penalties for different actions

    ## Onion pickup/drops
    ONION_PICKUP_REWARD = 3
    USEFUL_ONION_PICKUP = 1
    USEFUL_ONION_DROP = -1
    ONION_DROP_REWARD = -3
    
    ## potting
    SOUP_COOKING_REWARD = 5  # only for optimal_onion_potting
    OPTIMAL_POTTING_REWARD = 0
    VIABLE_ONION_POTTING_REWARD = 0 # viable_onion_potting
    CATASTROPHIC_POTTING_PENALTY = -16  # viable_onion_potting

    ## DISH
    DISH_PICKUP_REWARD = 3
    USEFUL_DISH_PICKUP = 1
    USEFUL_DISH_DROP = -1
    DISH_DROP_REWARD = -3
    
    ## Serving
    SOUP_DROP_PENALTY = -5
    SOUP_PICKUP_REWARD = 5
    SOUP_DELIVERY_REWARD = 0  # shared between both agents

    USELESS_ACTION_PENALTY = -1 #Stay

    # Initialize rewards for each agent
    rewards = [0, 0, 0]

    # Analyze the actions and assign rewards/penalties
    # cumulative_sparse_rewards_by_agent,cumulative_shaped_rewards_by_agent
    #reshaped_states = [key for (key,values) in state]

    # optimal_onion_potting
    state = baseenv_state.game_stats
    action_tags = [0,0]
    for action in state:
        if action == 'cumulative_sparse_rewards_by_agent' or action == 'cumulative_shaped_rewards_by_agent':
            continue
        for agent_id in range(2):
            if state[action][agent_id]:
                #print(f"action: {action}")
                if state[action][agent_id][-1]==step: #last steps
                    action_tags[agent_id] =1
                    if action == 'onion_pickup':
                        rewards[agent_id] += ONION_PICKUP_REWARD
                    elif action == 'useful_onion_pickup':
                        rewards[agent_id] += USEFUL_ONION_PICKUP
                    elif action == 'onion_drop':
                        rewards[agent_id] += ONION_DROP_REWARD
                    elif action == 'useful_onion_drop':
                        rewards[agent_id] += USEFUL_ONION_DROP
                    elif action == 'potting_onion':
                        if False:
                            print(
                                f"----------Onion Potting at Step: {step}-----------")
                            print(state)
                        # If poting with 3 onions
                        # pot location
                        agent_ori = obs['overcooked_state'].players[agent_id].to_dict()[
                            'orientation']
                        agent_pos = obs['overcooked_state'].players[agent_id].to_dict()[
                            'position']
                        pot_location = (
                            agent_ori[0]+agent_pos[0], agent_ori[1]+agent_pos[1])
                        if False:
                            print(f"pot location:{pot_location}")
                            print(f"ingredients: {ingre}")
                            print(baseenv_state)
                        
                        ingre = baseenv_state.state.objects[pot_location].ingredients
                        if len(ingre) == 3:
                            rewards[agent_id] += SOUP_COOKING_REWARD
                        
                    elif action == 'optimal_onion_potting':
                        if False:
                            print(f"----------Optimal Onion Potting at Step: {step}-----------")
                            print(state)
                            #If poting with 3 onions
                            ## pot location
                            agent_ori=obs['overcooked_state'].players[agent_id].to_dict()[
                                'orientation']
                            agent_pos=obs['overcooked_state'].players[agent_id].to_dict()['position']
                            pot_location = (
                                agent_ori[0]+agent_pos[0], agent_ori[1]+agent_pos[1])
                            print(f"pot location:{pot_location}")
                            assert (pot_location==(2,0))
                            ingre = baseenv_state.state.objects[pot_location].ingredients
                            print(f"ingredients: {ingre}")
                            if len(ingre)==3:
                                rewards[agent_id] += SOUP_COOKING_REWARD
                            print(baseenv_state)
                        rewards[agent_id] += OPTIMAL_POTTING_REWARD
                    elif action == 'viable_onion_potting':
                        #print(
                        #    f"----------Viable Onion Potting at Step: {step}-----------")
                        #print(state)
                        #print(baseenv_state)
                        rewards[agent_id] += VIABLE_ONION_POTTING_REWARD
                    elif action == 'catastrophic_onion_potting':
                        print(
                            f"----------Catastrophic Onion Potting at Step: {step}-----------")
                        print(state)
                        print(baseenv_state)
                        rewards[agent_id] += CATASTROPHIC_POTTING_PENALTY
                    elif action == 'useless_onion_potting':
                        print(
                            f"----------Useless Onion Potting at Step: {step}-----------")
                        print(state)
                        print(baseenv_state)
                        rewards[agent_id] += USELESS_ACTION_PENALTY
                    elif action == 'dish_pickup':
                        rewards[agent_id] += DISH_PICKUP_REWARD
                    elif action == 'useful_dish_pickup':
                        rewards[agent_id] += USEFUL_DISH_PICKUP
                    elif action == 'useful_dish_drop':
                        rewards[agent_id] += USEFUL_DISH_DROP
                    elif action == 'dish_drop':
                        rewards[agent_id] += DISH_DROP_REWARD
                    elif action == 'soup_pickup':
                        # check soup ready: baseenv_state.state.objects[pot_location].is_ready
                        if False:
                            print(
                                f"----------Soup Pickup at Step: {step}-----------")
                            print(state)
                            print(baseenv_state)
                            # If poting with 3 onions
                            # pot location
                        agent_ori = obs['overcooked_state'].players[agent_id].to_dict()[
                                'orientation']
                        agent_pos = obs['overcooked_state'].players[agent_id].to_dict()[
                                'position']
                        if False:
                            print(f"soup location:{agent_pos}")
                            
                            print(f"soup_ready: {soup_ready}")
                        soup_ready = obs['overcooked_state'].players[agent_id].held_object.is_ready
                        if soup_ready:
                            rewards[agent_id] += SOUP_PICKUP_REWARD
                    elif action == 'soup_delivery':
                        rewards[agent_id] += SOUP_DELIVERY_REWARD
                    elif action == 'soup_drop':
                        rewards[agent_id] += SOUP_DROP_PENALTY
        # check start cooking:
    for agent_id in range(2):
        # Pot0 start cooking
        if obs_new['both_agent_obs'][abs(agent_id-agentidx)][28] == 1 and obs['both_agent_obs'][abs(agent_id-agentidx)][28] == 0:
            if abs(obs_new['both_agent_obs'][abs(agent_id-agentidx)][30]) + abs(obs_new['both_agent_obs'][abs(agent_id-agentidx)][31]) == 1:
                rewards[agent_id] += SOUP_COOKING_REWARD
        # Pot1 cooking
        if obs_new['both_agent_obs'][abs(agent_id-agentidx)][38] == 1 and obs['both_agent_obs'][abs(agent_id-agentidx)][38] == 0:
            if abs(obs_new['both_agent_obs'][abs(agent_id-agentidx)][40]) + abs(obs_new['both_agent_obs'][abs(agent_id-agentidx)][41]) == 1:
                rewards[agent_id] += SOUP_COOKING_REWARD
    # check useless action
    for agent_id in range(2):
        #If agent choose to interact, but it does nothing, then panaty
        if action_tags[agent_id] == 0 and actions[agent_id] == 5:
            rewards[agent_id] += USELESS_ACTION_PENALTY
    return rewards

In [9]:
def save_checkpoint(agent, avgScore, filename="checkpoint.pth"):
    state = {
        'average_score': avgScore,
        'gamma': agent.gamma,
        'q_local_state_dict': agent.q_local.state_dict(),
        'q_target_state_dict': agent.q_target.state_dict(),
        'q_local_optimizer_state_dict': agent.optimizer.state_dict()
    }
    save_dir = "./"
    torch.save(state, save_dir+filename)

In [10]:
def load_checkpoint(agent,filename="checkpoint.pth"):
    checkpoint = torch.load(filename)
    #episode_start = checkpoint['episode']
    agent.q_local.load_state_dict(checkpoint['q_local_state_dict'])
    agent.q_target.load_state_dict(checkpoint['q_target_state_dict'])
    agent.optimizer.load_state_dict(
        checkpoint['q_local_optimizer_state_dict'])
    # Move to device
    agent.q_local = agent.q_local.to(device)
    agent.q_target = agent.q_target.to(device)
    optimizer_to(agent.optimizer, device)

def optimizer_to( optim, device):
    for param in optim.state.values():
        # Not sure there are any global tensors in the state dict
        if isinstance(param, torch.Tensor):
            param.data = param.data.to(device)
            if param._grad is not None:
                param._grad.data = param._grad.data.to(device)
        elif isinstance(param, dict):
            for subparam in param.values():
                if isinstance(subparam, torch.Tensor):
                    subparam.data = subparam.data.to(device)
                    if subparam._grad is not None:
                        subparam._grad.data = subparam._grad.data.to(
                            device)

In [11]:
N_statespace = env.observation_space.shape[0] #96
N_actions = env.action_space.n #6


In [12]:
N_actions

6

In [13]:
parameters_test = {"n_hidden": 64,#128
                   "lr": 1e-3,
                   "gamma": 0.99,
                   "batch_size": 128,
                   "replay_mem": 10000,
                   "update_frequency": 5,
                "tau": 1e-3}

In [11]:
parameters_test["lr"]

0.001

In [14]:
agent0 = DQNLearningAgent(N_statespace, N_actions, n_hidden=parameters_test['n_hidden'],
                         lr=parameters_test["lr"], gamma=parameters_test['gamma'], batch_size=parameters_test["batch_size"],
                         replay_mem=parameters_test["replay_mem"], update_frequency=parameters_test["update_frequency"], tau=parameters_test["tau"])

agent1 = DQNLearningAgent(N_statespace, N_actions, n_hidden=parameters_test['n_hidden'],
                          lr=parameters_test["lr"], gamma=parameters_test['gamma'], batch_size=parameters_test["batch_size"],
                          replay_mem=parameters_test["replay_mem"], update_frequency=parameters_test["update_frequency"], tau=parameters_test["tau"])

save_name_tag = layout 

In [15]:
targetN=5
e = 0
scores_last = deque(maxlen=int(targetN))
scores = []
episode_start=0


In [37]:
#bestcheckpoint_episode600_asymmetric_advantages_agent1_last.pth.tar
load_checkpoint(agent0, filename="bestcheckpoint_episode600_asymmetric_advantages_agent0_last.pth.tar")
load_checkpoint(
    agent1, filename="bestcheckpoint_episode600_asymmetric_advantages_agent1_last.pth.tar")

In [38]:
episode_start = 600
e

1

In [39]:
agent1.reset_lr(newlr=1e-3)  # 1e-3
agent0.reset_lr(newlr=1e-3)

In [40]:
epsilon = 0.05
eps_decay = 0.995
epsilon_min = 0.05
epsilon_max = 0.6
fine_tune_lr = False
lr_min = 1e-4
lr_max = 1e-2
min_episode = 50
targetScore = 7
targetN = 5

In [41]:
### Train your agent ###

# The code below runs a few episodes with a random agent.  Your learning algorithm
# would go here.

num_episodes = 1000
verbose = False
#swap = True
best_avg_last = 0
n_epsmin = 0
N_reset_eps = 20
N_converge = 0
N_converge_threshold=5
breaktag = 1
converge = False
for e in range(1,num_episodes+1):
    if breaktag == 0:
        break
    # Episode termination flag
    done = False

    # The number of soups the agent pair made during the episode
    num_soups_made = 0

    # Reset the environment at the start of each episode
    obs = env.reset()
    while env.agent_idx ==0:
        obs = env.reset()

    steps = 0
    
    while (not done) and breaktag==1:
        # Obtain observations for each agent
        #obs0 = obs["both_agent_obs"][0]
        #obs1 = obs["both_agent_obs"][1]
        
        if env.agent_idx == 0:
            obs0 = obs["both_agent_obs"][0]
            obs1 = obs["both_agent_obs"][1]
        else:
            obs0 = obs["both_agent_obs"][1]
            obs1 = obs["both_agent_obs"][0]
        a0 = agent0.choose_action(obs0, epsilon)
        a1 = agent1.choose_action(obs1, epsilon)
        if env.agent_idx ==0:
            action_merge = [a0,a1]
        else:
            action_merge = [a1, a0]
        #action_merge = [a0, a1]
        obs, R, done, info = env.step(action_merge)
        #if env.agent_idx ==0:
        #    obs, R, done, info = env.step([a0, a1])
        #else:
        #    obs, R, done, info = env.step([a1, a0])
        rewards = calculate_rewards(base_env, steps, obs, (a0,a1))
        #reshapedR = calculate_rewards(obs['overcooked_state'])
        #print(f"info {info}")
        # Obtain observations for each agent
        if env.agent_idx == 0:
            obs0_new = obs["both_agent_obs"][0]
            obs1_new = obs["both_agent_obs"][1]
        else:
            obs0_new = obs["both_agent_obs"][1]
            obs1_new = obs["both_agent_obs"][0]

        agent_0_reward = rewards[0]+R #info["shaped_r_by_agent"][0]
        agent_1_reward = rewards[1]+R #info["shaped_r_by_agent"][1]
        if False:
            if env.agent_idx == 0:
                agent_0_reward = rewards[0]+R #info["shaped_r_by_agent"][0]
                agent_1_reward = rewards[1]+R #info["shaped_r_by_agent"][1]
            else:
                agent_0_reward = rewards[1]+R #info["shaped_r_by_agent"][1]
                agent_1_reward = rewards[0]+R #info["shaped_r_by_agent"][0]

        if rewards[2]==1:
            #By default, rewards[2]==0
            print(f"reward 3 get: agent_0_reward {agent_0_reward}")
            breaktag=0
            break

        if verbose:
            print(
                f"-----eps: {epsilon} \t step:{steps} \t rewards: {agent_0_reward},{agent_1_reward}------")
            print(f"action: {(a0,a1)}")
            print(f"env.agent_idx:{env.agent_idx}")
            print(f"agent0 status:{obs['overcooked_state'].players[0].to_dict()}")
            print(
                f"agent1 status:{obs['overcooked_state'].players[1].to_dict()}")
            print(f"agent0 from obs['both_agent_obs'][0]:")
            dump(obs['both_agent_obs'][0])
            print(base_env)
            
            print(base_env.game_stats)
            #breaktag = 0
            #break
        agent0.step(state=obs0, action=a0, reward=agent_0_reward,  # info["shaped_r_by_agent"][0]+R,reshapedR[0]+R,
                    next_state=obs0_new, done=done)
        agent1.step(state=obs1, action=a1, reward=agent_1_reward,  # info["shaped_r_by_agent"][1]+R,reshapedR[1]+R,
                    next_state=obs1_new, done=done)
        #if info['shaped_r_by_agent'][0]+R !=0:
        #    print(f"current reward for agent0: {info['shaped_r_by_agent'][0]+R}")
        # Accumulate the number of soups made
        num_soups_made += int(R / 20) # Each served soup generates 20 reward
        steps = steps +1
        #if steps % 10 == 0:
            #print(f"states:{obs['both_agent_obs'][0].shape}")
    # Display status
    scores.append(num_soups_made)
    scores_last.append(num_soups_made)
    meanscore_last = np.mean(scores_last)
    print("Ep {0}".format(e), end=" ")
    print("agent_idx {0}".format(env.agent_idx), end=" ")
    print("episilon {:.3f}".format(epsilon), end=" ")
    print("number of soups made: {0}".format(num_soups_made))
    if epsilon > epsilon_min:
                #update epsilon greedy after each episode
                epsilon = epsilon * eps_decay
    else:
        #update n_epsmin
        n_epsmin = n_epsmin+1
        if num_soups_made < targetScore and n_epsmin % N_reset_eps == 0:  # If converged once, then not reset
            print(f'Episode {e}\tReset epsilon:{epsilon_max}')
            #save_checkpoint(avgScore=meanscore_last,filename=f"Score{meanscore_last:.2f}_checkpoint.pth.tar")
            # reset epsilon, and reduce learning rate
            epsilon=epsilon_max
            if fine_tune_lr:
                newlr = agent0.lr*0.8
                if newlr < lr_min:
                    newlr = lr_max
                print(f'Reset learning Rate: {newlr}')
                agent0.reset_lr(newlr=newlr)
                agent1.reset_lr(newlr=newlr)
            
    # Converge if scores does not change
    if e>min_episode and meanscore_last>=targetScore:
        converge = True
        if e % 100 == 0:
            #After first converge, get another N_converge_threshold time, to be stable
            N_converge = N_converge + 1
            #print(f'Episode {i}\tAverage Score: {meanscore_last:.2f}')
            if meanscore_last> best_avg_last:
                best_avg_last = meanscore_last
                save_checkpoint(agent=agent0, avgScore=meanscore_last,
                                filename=f'bestcheckpoint_episode{e}_{save_name_tag}_agent0_last.pth.tar')
                save_checkpoint(agent=agent1, avgScore=meanscore_last,
                                filename=f'bestcheckpoint_episode{e}_{save_name_tag}_agent1_last.pth.tar')
    if converge and meanscore_last < targetScore:
        converge = False
        N_converge = 0
    # Save most recent checkpoint
    N_saveCircle=1000
    if e % N_saveCircle == 0:
        save_checkpoint(agent=agent0, avgScore=meanscore_last,
                            filename=f'checkpoint_episode{e}_{save_name_tag}_agent0_last.pth.tar')
        save_checkpoint(agent=agent1, avgScore=meanscore_last,
                        filename=f'checkpoint_episode{e}_{save_name_tag}_agent1_last.pth.tar')
        df = pd.DataFrame(data={"episode": [
                          episode_start+x for x in range(e-N_saveCircle+1, e+1)], "Score": scores[(e-N_saveCircle):e]})
        df.to_csv(f'score_history_{save_name_tag}.csv', mode='a', index=False, header=False)
        
    if N_converge >= N_converge_threshold:
        break

# The info flag returned by the environemnt contains special status info
# specifically when done == True.  This information may be useful in
# developing, debugging, and analyzing your results.  It may also be a good
# way for you to find a metric that you can use in evaluating collaboration
# between your agents.
print("\nExample info dump:\n\n", info)

Ep 1 agent_idx 1 episilon 0.050 number of soups made: 10
Ep 2 agent_idx 1 episilon 0.050 number of soups made: 8
Ep 3 agent_idx 1 episilon 0.050 number of soups made: 5
Ep 4 agent_idx 1 episilon 0.050 number of soups made: 8
Ep 5 agent_idx 1 episilon 0.050 number of soups made: 8
Ep 6 agent_idx 1 episilon 0.050 number of soups made: 7
Ep 7 agent_idx 1 episilon 0.050 number of soups made: 8
Ep 8 agent_idx 1 episilon 0.050 number of soups made: 7
Ep 9 agent_idx 1 episilon 0.050 number of soups made: 3
Ep 10 agent_idx 1 episilon 0.050 number of soups made: 6
Ep 11 agent_idx 1 episilon 0.050 number of soups made: 12
Ep 12 agent_idx 1 episilon 0.050 number of soups made: 8
Ep 13 agent_idx 1 episilon 0.050 number of soups made: 6
Ep 14 agent_idx 1 episilon 0.050 number of soups made: 2
Ep 15 agent_idx 1 episilon 0.050 number of soups made: 7
Ep 16 agent_idx 1 episilon 0.050 number of soups made: 3
Ep 17 agent_idx 1 episilon 0.050 number of soups made: 6
Ep 18 agent_idx 1 episilon 0.050 numbe

In [60]:
obs0

array([ 1.,  0.,  0.,  0.,  0.,  0.,  0.,  0., -2.,  0.,  0.,  0., -2.,
        3.,  0.,  0.,  0.,  0.,  1.,  0.,  0.,  0.,  1.,  1.,  0.,  0.,
        0.,  0.,  0.,  0., -3.,  1.,  1.,  1.,  0.,  0.,  0.,  0.,  0.,
        0., -3.,  2.,  1.,  0.,  1.,  1.,  0.,  0.,  0.,  1.,  0.,  0.,
        0.,  0., -1., -2.,  0.,  0.,  2.,  1.,  0.,  0.,  0.,  0.,  2.,
       -2.,  0.,  0.,  1.,  1.,  0.,  0.,  0.,  0.,  0.,  0.,  3.,  0.,
        1.,  1.,  0.,  0.,  0.,  0.,  0.,  0.,  3., -1.,  0.,  1.,  0.,
        1., -6.,  2.,  7.,  1.])

In [80]:
print(base_env)

X       X       P       X       X       

O       ↑1      ↑0{ø✓           O       

X                               X       

X       D       X       S       X       




In [57]:
base_env.state.objects[(2, 0)].value

20

In [62]:
base_env.state.objects[(2, 0)].ingredients

['onion']

In [314]:
base_env.game_stats

{'tomato_pickup': [[], []],
 'useful_tomato_pickup': [[], []],
 'tomato_drop': [[], []],
 'useful_tomato_drop': [[], []],
 'potting_tomato': [[], []],
 'onion_pickup': [[92, 221, 296], [24, 62, 253]],
 'useful_onion_pickup': [[92, 221, 296], [24, 62, 253]],
 'onion_drop': [[286], [38]],
 'useful_onion_drop': [[], []],
 'potting_onion': [[100, 307], [118, 277]],
 'dish_pickup': [[40, 51, 68, 87, 104], [129, 131, 175, 194, 283]],
 'useful_dish_pickup': [[], []],
 'dish_drop': [[50, 66, 69, 89, 210], [130, 149, 188, 237]],
 'useful_dish_drop': [[50, 210], [130, 149, 188]],
 'soup_pickup': [[], [302]],
 'soup_delivery': [[], [315]],
 'soup_drop': [[], []],
 'optimal_onion_potting': [[100, 307], [118, 277]],
 'optimal_tomato_potting': [[], []],
 'viable_onion_potting': [[100, 307], [118, 277]],
 'viable_tomato_potting': [[], []],
 'catastrophic_onion_potting': [[], []],
 'catastrophic_tomato_potting': [[], []],
 'useless_onion_potting': [[], []],
 'useless_tomato_potting': [[], []],
 'cumul

In [26]:
base_env

X       X       X       X       X       X       X       X       X       

O       ↑1o     Xo      S       X       O       Xd              S       

X                               P       ↑0o                     X       

X                               P                               X       

X       X       X       D       X       D       X       X       X       


In [22]:
ag1 = obs['both_agent_obs'][0]
ag2 = obs['both_agent_obs'][1]
ag1[0:46]-ag2[46:92]

array([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.])

In [23]:
obs['overcooked_state'].all_objects_by_type

defaultdict(list,
            {'dish': [dish@(6, 1)],
             'onion': [onion@(2, 1), onion@(5, 2), onion@(1, 1)]})

In [24]:
dump(ag1)

Agent Orientation: [1. 0. 0. 0.]
Agent holdings: [1. 0. 0. 0.]
Agent distance to onion,tomato,dish,soup: [0. 0. 0. 0. 2. 3. 0. 0.]
N ingrediants in closest soup (Onion, tomato): [0. 0.]
Agent distance to ServingArea, counter: [2. 0. 0. 0.]
Pot 1 Available: 1.0
Pot 1 Status: [1. 0. 0. 0.]
N of Onion, tomato in Pot 1: [0. 0.]
Time remaining in Pot 1: 0.0
Distance to the Pot 1: [3. 1.]
Pot Available 2: 1.0
Pot Status 2: [1. 0. 0. 0.]
N of Onion, tomato in Pot 2: [0. 0.]
Time remaining in Pot 2: 0.0
Distance to the Pot 2: [3. 2.]
Facing walls: [1. 0. 1. 1.]


In [25]:
dump(ag2)

Agent Orientation: [1. 0. 0. 0.]
Agent holdings: [1. 0. 0. 0.]
Agent distance to onion,tomato,dish,soup: [0. 0. 0. 0. 0. 2. 0. 0.]
N ingrediants in closest soup (Onion, tomato): [0. 0.]
Agent distance to ServingArea, counter: [ 3. -1.  0.  0.]
Pot 1 Available: 1.0
Pot 1 Status: [1. 0. 0. 0.]
N of Onion, tomato in Pot 1: [0. 0.]
Time remaining in Pot 1: 0.0
Distance to the Pot 1: [-1.  0.]
Pot Available 2: 1.0
Pot Status 2: [1. 0. 0. 0.]
N of Onion, tomato in Pot 2: [0. 0.]
Time remaining in Pot 2: 0.0
Distance to the Pot 2: [-1.  1.]
Facing walls: [1. 0. 0. 1.]


In [319]:
obs['overcooked_state'].to_dict()

{'players': [{'position': (1, 2), 'orientation': (-1, 0), 'held_object': None},
  {'position': (3, 2), 'orientation': (0, 1), 'held_object': None}],
 'objects': [{'name': 'onion', 'position': (4, 2)},
  {'name': 'dish', 'position': (1, 0)},
  {'name': 'dish', 'position': (2, 3)},
  {'name': 'dish', 'position': (0, 2)},
  {'name': 'onion', 'position': (3, 0)},
  {'name': 'soup',
   'position': (2, 0),
   '_ingredients': [{'name': 'onion', 'position': (2, 0)}],
   'cooking_tick': -1,
   'is_cooking': False,
   'is_ready': False,
   'is_idle': True,
   'cook_time': -1,
   '_cooking_tick': -1}],
 'bonus_orders': [],
 'all_orders': [{'ingredients': ('onion', 'onion', 'onion')}],
 'timestep': 316}

In [57]:
obs['overcooked_state'].players

((1, 2) facing (0, -1) holding None, (3, 1) facing (0, -1) holding None)

In [321]:
if obs['overcooked_state'].players[1].held_object:
    if obs['overcooked_state'].players[1].held_object.name == 'soup':
        obs['overcooked_state'].players[1].held_object.ingredients

# state of the mdp
mdp_state = obs['overcooked_state']
mdp_state

<overcooked_ai_py.mdp.overcooked_mdp.OvercookedState at 0x7fd6a804f820>

In [54]:
obs['overcooked_state'].players[0]

(1, 2) facing (0, -1) holding None

In [40]:
#mdp_state.get_all_pots()

AttributeError: 'OvercookedState' object has no attribute 'get_all_pots'

In [47]:
dir(obs['overcooked_state'].players[1])

['__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 'deepcopy',
 'from_dict',
 'get_object',
 'has_object',
 'held_object',
 'orientation',
 'pos_and_or',
 'position',
 'remove_object',
 'set_object',
 'to_dict',
 'update_pos_and_or']

In [None]:
### Evaluate your agent ###

# This is where you would rollout episodes with your trained agent.
# The below code is a partcular way to rollout episodes in a format
# compatible with a state visualizer, if you'd like to visualize what your
# agents are doing during episodes.  Visualization is in the next cell.

class StudentPolicy(NNPolicy):
    """ Generate policy """
    def __init__(self):
        super(StudentPolicy, self).__init__()

    def state_policy(self, state, agent_index):
        """
        This method should be used to generate the poiicy vector corresponding to
        the state and agent_index provided as input.  If you're using a neural
        network-based solution, the specifics depend on the algorithm you are using.
        Below are two commented examples, the first for a policy gradient algorithm
        and the second for a value-based algorithm.  In policy gradient algorithms,
        the neural networks output a policy directly.  In value-based algorithms,
        the policy must be derived from the Q value outputs of the networks.  The
        uncommented code below is a placeholder that generates a random policy.
        """
        featurized_state = base_env.featurize_state_mdp(state)
        input_state = torch.FloatTensor(featurized_state[agent_index]).unsqueeze(0)

        # Example for policy NNs named "PNN0" and "PNN1"
        # with torch.no_grad():
        #   if agent_index == 0:
        #       action_probs = PNN0(input_state)[0].numpy()
        #   else:
        #       action_probs = PNN1(input_state)[0].numpy()

        # Example for Q value NNs named "QNN0" and "QNN1"
        action_probs = np.zeros(env.action_space.n)
        with torch.no_grad():
            if agent_index == 0:
                action_probs[np.argmax(agent0(input_state)[0].numpy())] = 1
            else:
                action_probs[np.argmax(agent1(input_state)[0].numpy())] = 1

        # Random deterministic policy
        action_probs = np.zeros(env.action_space.n)
        action_probs[env.action_space.sample()] = 1

        return action_probs

    def multi_state_policy(self, states, agent_indices):
        """ Generate a policy for a list of states and agent indices """
        return [self.state_policy(state, agent_index) for state, agent_index in zip(states, agent_indices)]


class StudentAgent(AgentFromPolicy):
    """Create an agent using the policy created by the class above"""
    def __init__(self, policy):
        super(StudentAgent, self).__init__(policy)


# Instantiate the policies for both agents
policy0 = StudentPolicy()
policy1 = StudentPolicy()

# Instantiate both agents
agent0 = StudentAgent(policy0)
agent1 = StudentAgent(policy1)
agent_pair = AgentPair(agent0, agent1)

# Generate an episode
ae = AgentEvaluator.from_layout_name({"layout_name": layout}, {"horizon": horizon})
trajs = ae.evaluate_agent_pair(agent_pair, num_games=1)
print("\nlen(trajs):", len(trajs))

In [None]:
### Agent Visualization ###

##############################################################################
# The function StateVisualizer() below generates images for the state of the
# environment at each time step of the episode.
#
# You have several options for how to use these images:
#
# 1) You can set img_dir to a local directory (or a directory within Google Drive
# if using Colab), and all the images will be saved to that directory for you to browse.
#
# 2) If using a notebook, you can set the argument ipthon_display=True to get a
# tool with a slider that lets you scan through all the images directly in the
# notebook.  This option does not require you to store your images.
#
# 3) You can generate a GIF of the episode. This requires you to set
# img_dir.  The code to generate the GIF is commented out below

# Modify as appropriate
img_dir = "img"
ipython_display = True
gif_path = "demo.gif"




In [None]:
# Do not modify -- uncomment for GIF generation
StateVisualizer().display_rendered_trajectory(
    trajs, img_directory_path=img_dir, ipython_display=ipython_display)
img_list = [f for f in os.listdir(img_dir) if f.endswith('.png')]
img_list.sort(key=lambda x: os.path.getmtime(os.path.join(img_dir, x)))
images = [Image.open(img_dir + "/" + img).convert('RGBA') for img in img_list]

In [None]:
images[0].save(gif_path, save_all=True, append_images=images[1:],
               optimize=False, duration=250, loop=0)


In [None]:
with open(gif_path, 'rb') as f:
    display(IPImage(data=f.read(), format='png'))

In [None]:
Image.open(img_dir + img)

In [None]:
[Image.open(img_dir +"/"+ img) for img in img_list]

In [None]:
def calculate_rewards(state):
    # Define the rewards/penalties for different actions
    ONION_PICKUP_REWARD = 3
    USEFUL_ONION_PICKUP = 4
    USEFUL_ONION_POTTING_REWARD = 5
    useful_onion_DROP = 2
    SOUP_COOKING_REWARD = 5
    SOUP_PICKUP_REWARD = 6
    SOUP_DELIVERY_REWARD = 20

    ONION_DROP_PENALTY = -3
    UNPRODUCTIVE_POTTING_PENALTY = -2
    useful_dishdrop = 4
    CATASTROPHIC_POTTING_PENALTY = -6
    SOUP_DROP_PENALTY = -15
    USELESS_ACTION_PENALTY = -1
    DISH_DROP_PENALTY = -4

    # Initialize rewards for each agent
    rewards = [0, 0]

    # Analyze the actions and assign rewards/penalties
    for action in state:
        for agent_id in range(2):
            if state[action][agent_id]:
                if action == 'onion_pickup':
                    rewards[agent_id] += ONION_PICKUP_REWARD
                elif action == 'useful_onion_pickup':
                    rewards[agent_id] += USEFUL_ONION_PICKUP
                elif action == 'onion_drop':
                    rewards[agent_id] += ONION_DROP_PENALTY
                elif action == 'useful_onion_drop':
                    rewards[agent_id] += useful_onion_DROP
                elif action == 'optimal_onion_potting':
                    rewards[agent_id] += USEFUL_ONION_POTTING_REWARD
                elif action == 'catastrophic_onion_potting':
                    rewards[agent_id] += CATASTROPHIC_POTTING_PENALTY
                elif action == 'useless_onion_potting':
                    rewards[agent_id] += USELESS_ACTION_PENALTY
                elif action == 'dish_drop':
                    rewards[agent_id] += DISH_DROP_PENALTY
                elif action == 'useful_dish_drop':
                    rewards[agent_id] += useful_dishdrop
                elif action == 'soup_pickup':
                    rewards[agent_id] += SOUP_PICKUP_REWARD
                elif action == 'soup_delivery':
                    rewards[agent_id] += SOUP_DELIVERY_REWARD
                elif action == 'soup_drop':
                    rewards[agent_id] += SOUP_DROP_PENALTY

    return rewards