In [None]:
import torch
import dataclasses
import mediapy
from huggingface_hub import PyTorchModelHubMixin
from huggingface_hub import ModelCard
from gpudrive.networks.late_fusion import NeuralNet
from gpudrive.agents.core import merge_actions
from gpudrive.env.config import EnvConfig
from gpudrive.env.env_torch import GPUDriveTorchEnv
from gpudrive.visualize.utils import img_from_fig
from gpudrive.env.dataset import SceneDataLoader
from gpudrive.utils.config import load_config 
import sys
import imageio
import numpy as np
import os

project_root = os.path.abspath(os.path.join(os.getcwd(), "../.."))

sys.path.append(project_root+'/examples/experimental')

from eval_utils import multi_policy_rollout,compute_metrics  








In [None]:
def create_policy_masks(env, num_sim_agents=2, num_worlds=10):
    policy_mask = torch.zeros_like(env.cont_agent_mask, dtype=torch.int)
    agent_indices = env.cont_agent_mask.nonzero(as_tuple=True)

    for i, (world_idx, agent_idx) in enumerate(zip(*agent_indices)):
        policy_mask[world_idx, agent_idx] = (i % num_sim_agents) + 1

    policy_masks = {f'pi_{int(policy.item())}': torch.zeros_like(env.cont_agent_mask, dtype=torch.bool,device=device) 
                    for policy in policy_mask.unique() if policy.item() != 0}

    for p in range(1, num_sim_agents + 1):
        policy_masks[f'pi_{p}'] = (policy_mask == p).reshape(num_worlds, -1)

    return policy_masks


In [None]:

config = load_config(project_root+"/examples/experimental/config/eval_config")

VIDEO_PATH = "videos"

max_agents = config.max_controlled_agents
NUM_WORLDS = 2
device = "cpu" # cpu just because we're in a notebook
NUM_SIM_AGENTS = 2
FPS = 5

sim_agent1 = NeuralNet.from_pretrained("daphne-cornelisse/policy_S10_000_02_27")
sim_agent2 = NeuralNet.from_pretrained("daphne-cornelisse/policy_S1000_02_27")

# Some other info
card = ModelCard.load("daphne-cornelisse/policy_S10_000_02_27")


VIDEO_PATH = "videos"

max_agents = config.max_controlled_agents
NUM_WORLDS = 2
device = "cpu" # cpu just because we're in a notebook
NUM_SIM_AGENTS = 2
FPS = 5

sim_agent1 = NeuralNet.from_pretrained("daphne-cornelisse/policy_S10_000_02_27")
sim_agent2 = NeuralNet.from_pretrained("daphne-cornelisse/policy_S1000_02_27")


In [None]:

# Create data loader
train_loader = SceneDataLoader(
    root=project_root+'/data/processed/examples',
    batch_size=NUM_WORLDS,
    dataset_size=100,
    sample_with_replacement=False,
)

# Set params
env_config = dataclasses.replace(
    EnvConfig(),
    ego_state=config.ego_state,
    road_map_obs=config.road_map_obs,
    partner_obs=config.partner_obs,
    reward_type=config.reward_type,
    norm_obs=config.norm_obs,
    dynamics_model=config.dynamics_model,
    collision_behavior=config.collision_behavior,
    dist_to_goal_threshold=config.dist_to_goal_threshold,
    polyline_reduction_threshold=config.polyline_reduction_threshold,
    remove_non_vehicles=config.remove_non_vehicles,
    lidar_obs=config.lidar_obs,
    disable_classic_obs=config.lidar_obs,
    obs_radius=config.obs_radius,
    steer_actions = torch.round(
        torch.linspace(-torch.pi, torch.pi, config.action_space_steer_disc), decimals=3  
    ),
    accel_actions = torch.round(
        torch.linspace(-4.0, 4.0, config.action_space_accel_disc), decimals=3
    ),
)



env = GPUDriveTorchEnv(
    config=env_config,
    data_loader=train_loader,
    max_cont_agents=config.max_controlled_agents,
    device=device,
)

In [None]:
next_obs = env.reset()


control_mask = env.cont_agent_mask

policy_mask = create_policy_masks(env, 2,NUM_WORLDS)

policies_set = {'pi_1': (sim_agent1,policy_mask['pi_1']),
                'pi_2': (sim_agent2, policy_mask['pi_2'])
                } 
        


metrics,frames=multi_policy_rollout(
env,
policies_set, 
device,
deterministic=False,
render_sim_state = True,
render_every_n_steps= 5
)






env.close()


In [None]:

mediapy.show_videos(frames, fps=15, width=500, height=500, columns=2, codec='gif')