In [1]:
import numpy as np
import torch
import gymnasium as gym
from gymnasium import spaces

from agilerl.networks.evolvable_cnn import EvolvableCNN
from agilerl.networks.evolvable_composed import EvolvableComposed
from agilerl.networks.evolvable_mlp import EvolvableMLP
from agilerl.hpo.mutation import Mutations, get_return_type

  from .autonotebook import tqdm as notebook_tqdm


#### Defining an EvolvableComposed

In [4]:
from gymnasium import spaces

# Create a sample Dict observation space with 3 image spaces and 2 vector spaces
observation_space = spaces.Dict({
    "image1": spaces.Box(low=0, high=255, shape=(3, 255, 255), dtype=np.uint8),
    "image2": spaces.Box(low=0, high=255, shape=(3, 255, 255), dtype=np.uint8),
    "image3": spaces.Box(low=0, high=255, shape=(3, 255, 255), dtype=np.uint8),
    "vector1": spaces.Box(low=-1, high=1, shape=(3,), dtype=np.float32),
    "vector2": spaces.Box(low=-1, high=1, shape=(3,), dtype=np.float32),
})

latent_dim = 10
num_outputs = 10
num_atoms = 51
channel_size = (8, 16, 32)
kernel_size = (3, 3, 3)
stride_size = (1, 1, 1)
hidden_size = [64, 64]
v_min = -10
v_max = 10
support = torch.linspace(v_min, v_max, num_atoms)
net = EvolvableComposed(observation_space, channel_size, kernel_size, stride_size, hidden_size, latent_dim, num_outputs, rainbow=True, num_atoms=num_atoms, support=support)
net

EvolvableComposed(
  (feature_net): ModuleDict(
    (image1): EvolvableCNN(
      (feature_net): Sequential(
        (feature_conv_layer_0): Conv2d(3, 8, kernel_size=(3, 3), stride=(1, 1))
        (feature_activation_0): ReLU()
        (feature_conv_layer_1): Conv2d(8, 16, kernel_size=(3, 3), stride=(1, 1))
        (feature_activation_1): ReLU()
        (feature_conv_layer_2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1))
        (feature_activation_2): ReLU()
        (feature_flatten): Flatten(start_dim=1, end_dim=-1)
        (feature_linear_output): Linear(in_features=1984032, out_features=10, bias=True)
        (feature_output_activation): ReLU()
      )
      (value_net): Sequential(
        (feature_head_linear_layer_0): Linear(in_features=10, out_features=64, bias=True)
        (feature_head_activation_0): ReLU()
        (feature_head_linear_layer_1): Linear(in_features=64, out_features=64, bias=True)
        (feature_head_activation_1): ReLU()
        (feature_head_linear_la

In [5]:
get_return_type(net.add_cnn_layer)

dict

#### Probe Environment

In [6]:
from typing import Dict, List, Optional, Tuple, Union

class SimpleMultiObsEnv(gym.Env):
    """
    Base class for GridWorld-based MultiObs Environments 4x4  grid world.

    .. code-block:: text

        ____________
       | 0  1  2   3|
       | 4|¯5¯¯6¯| 7|
       | 8|_9_10_|11|
       |12 13  14 15|
       ¯¯¯¯¯¯¯¯¯¯¯¯¯¯

    start is 0
    states 5, 6, 9, and 10 are blocked
    goal is 15
    actions are = [left, down, right, up]

    simple linear state env of 15 states but encoded with a vector and an image observation:
    each column is represented by a random vector and each row is
    represented by a random image, both sampled once at creation time.

    :param num_col: Number of columns in the grid
    :param num_row: Number of rows in the grid
    :param random_start: If true, agent starts in random position
    :param channel_last: If true, the image will be channel last, else it will be channel first
    """

    def __init__(
        self,
        num_col: int = 4,
        num_row: int = 4,
        random_start: bool = True,
        discrete_actions: bool = True,
        channel_last: bool = True,
    ):
        super().__init__()

        self.vector_size = 5
        if channel_last:
            self.img_size = [64, 64, 1]
        else:
            self.img_size = [1, 64, 64]

        self.random_start = random_start
        self.discrete_actions = discrete_actions
        if discrete_actions:
            self.action_space = spaces.Discrete(4)
        else:
            self.action_space = spaces.Box(0, 1, (4,))

        self.observation_space = spaces.Dict(
            spaces={
                "vec": spaces.Box(0, 1, (self.vector_size,), dtype=np.float64),
                "img": spaces.Box(0, 255, self.img_size, dtype=np.uint8),
            }
        )
        self.count = 0
        # Timeout
        self.max_count = 100
        self.log = ""
        self.state = 0
        self.action2str = ["left", "down", "right", "up"]
        self.init_possible_transitions()

        self.num_col = num_col
        self.state_mapping: List[Dict[str, np.ndarray]] = []
        self.init_state_mapping(num_col, num_row)

        self.max_state = len(self.state_mapping) - 1

    def init_state_mapping(self, num_col: int, num_row: int) -> None:
        """
        Initializes the state_mapping array which holds the observation values for each state

        :param num_col: Number of columns.
        :param num_row: Number of rows.
        """
        # Each column is represented by a random vector
        col_vecs = np.random.random((num_col, self.vector_size))
        # Each row is represented by a random image
        row_imgs = np.random.randint(0, 255, (num_row, 64, 64), dtype=np.uint8)

        for i in range(num_col):
            for j in range(num_row):
                self.state_mapping.append({"vec": col_vecs[i], "img": row_imgs[j].reshape(self.img_size)})

    def get_state_mapping(self) -> Dict[str, np.ndarray]:
        """
        Uses the state to get the observation mapping.

        :return: observation dict {'vec': ..., 'img': ...}
        """
        return self.state_mapping[self.state]

    def init_possible_transitions(self) -> None:
        """
        Initializes the transitions of the environment
        The environment exploits the cardinal directions of the grid by noting that
        they correspond to simple addition and subtraction from the cell id within the grid

        - up => means moving up a row => means subtracting the length of a column
        - down => means moving down a row => means adding the length of a column
        - left => means moving left by one => means subtracting 1
        - right => means moving right by one => means adding 1

        Thus one only needs to specify in which states each action is possible
        in order to define the transitions of the environment
        """
        self.left_possible = [1, 2, 3, 13, 14, 15]
        self.down_possible = [0, 4, 8, 3, 7, 11]
        self.right_possible = [0, 1, 2, 12, 13, 14]
        self.up_possible = [4, 8, 12, 7, 11, 15]

    def step(self, action: Union[int, np.ndarray]):
        """
        Run one timestep of the environment's dynamics. When end of
        episode is reached, you are responsible for calling `reset()`
        to reset this environment's state.
        Accepts an action and returns a tuple (observation, reward, terminated, truncated, info).

        :param action:
        :return: tuple (observation, reward, terminated, truncated, info).
        """
        if not self.discrete_actions:
            action = np.argmax(action)  # type: ignore[assignment]

        self.count += 1

        prev_state = self.state

        reward = -0.1
        # define state transition
        if self.state in self.left_possible and action == 0:  # left
            self.state -= 1
        elif self.state in self.down_possible and action == 1:  # down
            self.state += self.num_col
        elif self.state in self.right_possible and action == 2:  # right
            self.state += 1
        elif self.state in self.up_possible and action == 3:  # up
            self.state -= self.num_col

        got_to_end = self.state == self.max_state
        reward = 1 if got_to_end else reward
        truncated = self.count > self.max_count
        terminated = got_to_end

        self.log = f"Went {self.action2str[action]} in state {prev_state}, got to state {self.state}"

        return self.get_state_mapping(), reward, terminated, truncated, {"got_to_end": got_to_end}

    def render(self, mode: str = "human") -> None:
        """
        Prints the log of the environment.

        :param mode:
        """
        print(self.log)

    def reset(self, *, seed: Optional[int] = None, options: Optional[Dict] = None) -> Tuple[Dict[str, np.ndarray], Dict]:
        """
        Resets the environment state and step count and returns reset observation.

        :param seed:
        :return: observation dict {'vec': ..., 'img': ...}
        """
        if seed is not None:
            super().reset(seed=seed)
        self.count = 0
        if not self.random_start:
            self.state = 0
        else:
            self.state = np.random.randint(0, self.max_state)
        return self.state_mapping[self.state], {}

In [None]:
env = SimpleMultiObsEnv()

obs, _ = env.reset()
obs

In [5]:
from agilerl.algorithms import MATD3

device = "cuda" if torch.cuda.is_available() else "cpu"
compile_mode = "default"
accelerator = None
matd3 = MATD3(
    observation_spaces=[spaces.Box(0, 1, shape=(3, 32, 32))],
    action_spaces=[spaces.Discrete(2)],
    one_hot=False,
    n_agents=1,
    agent_ids=["agent_0"],
    max_action=[(1,)],
    min_action=[(-1,)],
    discrete_actions=True,
    device=device,
    torch_compiler=compile_mode,
    accelerator=accelerator,
)

In [None]:
from agilerl.algorithms.base import RLAlgorithm
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.networks.evolvable_mlp import EvolvableMLP
from agilerl.training.train_off_policy import train_off_policy
from agilerl.utils.utils import (
    create_population,
    make_vect_envs,
    observation_space_channels_to_first,
    print_hyperparams
)

import yaml

with open("configs/training/dqn.yaml") as file:
    config = yaml.safe_load(file)

INIT_HP = config["INIT_HP"]
MUTATION_PARAMS = config["MUTATION_PARAMS"]
NET_CONFIG = config["NET_CONFIG"]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("============ AgileRL ============")
print(f"DEVICE: {device}")

env = make_vect_envs(INIT_HP["ENV_NAME"], num_envs=INIT_HP["NUM_ENVS"])

observation_space = env.single_observation_space
action_space = env.single_action_space
if INIT_HP["CHANNELS_LAST"]:
    observation_space = observation_space_channels_to_first(observation_space)

field_names = ["state", "action", "reward", "next_state", "done"]
memory = ReplayBuffer(
    memory_size=INIT_HP["MEMORY_SIZE"], field_names=field_names, device=device
)
tournament = TournamentSelection(
    INIT_HP["TOURN_SIZE"],
    INIT_HP["ELITISM"],
    INIT_HP["POP_SIZE"],
    INIT_HP["EVAL_LOOP"],
)
mutations = Mutations(
    algo=INIT_HP["ALGO"],
    no_mutation=MUTATION_PARAMS["NO_MUT"],
    architecture=MUTATION_PARAMS["ARCH_MUT"],
    new_layer_prob=MUTATION_PARAMS["NEW_LAYER"],
    parameters=MUTATION_PARAMS["PARAMS_MUT"],
    activation=MUTATION_PARAMS["ACT_MUT"],
    rl_hp=MUTATION_PARAMS["RL_HP_MUT"],
    rl_hp_selection=MUTATION_PARAMS["RL_HP_SELECTION"],
    mutation_sd=MUTATION_PARAMS["MUT_SD"],
    min_lr=MUTATION_PARAMS["MIN_LR"],
    max_lr=MUTATION_PARAMS["MAX_LR"],
    min_batch_size=MUTATION_PARAMS["MAX_BATCH_SIZE"],
    max_batch_size=MUTATION_PARAMS["MAX_BATCH_SIZE"],
    min_learn_step=MUTATION_PARAMS["MIN_LEARN_STEP"],
    max_learn_step=MUTATION_PARAMS["MAX_LEARN_STEP"],
    arch=NET_CONFIG["arch"],
    rand_seed=MUTATION_PARAMS["RAND_SEED"],
    device=device,
)

state_dim = RLAlgorithm.get_state_dim(observation_space)
action_dim = RLAlgorithm.get_action_dim(action_space)

agent_pop = create_population(
    algo=INIT_HP["ALGO"],
    observation_space=observation_space,
    action_space=action_space,
    net_config=NET_CONFIG,
    INIT_HP=INIT_HP,
    actor_network=None,
    critic_network=None,
    population_size=INIT_HP["POP_SIZE"],
    num_envs=INIT_HP["NUM_ENVS"],
    device=device,
)

trained_pop, pop_fitnesses = train_off_policy(
    env,
    INIT_HP["ENV_NAME"],
    INIT_HP["ALGO"],
    agent_pop,
    memory=memory,
    INIT_HP=INIT_HP,
    MUT_P=MUTATION_PARAMS,
    swap_channels=INIT_HP["CHANNELS_LAST"],
    max_steps=INIT_HP["MAX_STEPS"],
    evo_steps=INIT_HP["EVO_STEPS"],
    eval_steps=INIT_HP["EVAL_STEPS"],
    eval_loop=INIT_HP["EVAL_LOOP"],
    learning_delay=INIT_HP["LEARNING_DELAY"],
    eps_start=INIT_HP["EPS_START"] if "EPS_START" in INIT_HP else 1.0,
    eps_end=INIT_HP["EPS_END"] if "EPS_END" in INIT_HP else 0.01,
    eps_decay=INIT_HP["EPS_DECAY"] if "EPS_DECAY" in INIT_HP else 0.999,
    target=INIT_HP["TARGET_SCORE"],
    tournament=tournament,
    mutation=mutations,
    wb=INIT_HP["WANDB"],
)

print_hyperparams(trained_pop)
# plot_population_score(trained_pop)

if str(device) == "cuda":
    torch.cuda.empty_cache()

env.close()


In [3]:
import inspect

ind = agent_pop[0]

# Get all attributes of the current object
attributes = inspect.getmembers(ind, lambda a: not (inspect.isroutine(a)))

# Exclude attributes that are EvolvableModule's or Optimizer's (also check for nested 
# module-related attributes for multi-agent algorithms)
exclude = list(ind.evolvable_attributes().keys())

# Exclude private and built-in attributes
attributes = [
    a for a in attributes if not (a[0].startswith("_") or a[0].endswith("_"))
]

# If input_args_only is True, only include attributes that are 
# input arguments to the constructor
constructor_params = inspect.signature(ind.__init__).parameters.keys()
attributes = {
    k: v
    for k, v in attributes
    if k not in exclude and k in constructor_params
}

In [9]:
attributes

{}

#### Racecar Gym

In [2]:
from agilerl.networks.evolvable_cnn import EvolvableCNN
from agilerl.algorithms.ppo import PPO
from accelerate import Accelerator

observation_space = spaces.Box(low=0, high=255, shape=(3, 32, 32), dtype=np.uint8)
action_space = spaces.Discrete(2)
one_hot = False
net_config_cnn = {
    "arch": "cnn",
    "hidden_size": [8],
    "channel_size": [3],
    "kernel_size": [3],
    "stride_size": [1],
    "normalize": False,
}
batch_size = 64
lr = 1e-4
gamma = 0.99
gae_lambda = 0.95
mut = None
action_std_init = 0.6
clip_coef = 0.2
ent_coef = 0.01
vf_coef = 0.5
max_grad_norm = 0.5
target_kl = None
update_epochs = 4
actor_network = None
critic_network = None
accelerator = Accelerator()
wrap = True

ppo = PPO(
    observation_space=observation_space,
    action_space=action_space,
    one_hot=one_hot,
    discrete_actions=True,
    net_config=net_config_cnn,
    batch_size=batch_size,
    lr=lr,
    gamma=gamma,
    gae_lambda=gae_lambda,
    mut=mut,
    action_std_init=action_std_init,
    clip_coef=clip_coef,
    ent_coef=ent_coef,
    vf_coef=vf_coef,
    max_grad_norm=max_grad_norm,
    target_kl=target_kl,
    update_epochs=update_epochs,
    actor_network=actor_network,
    critic_network=critic_network,
    accelerator=accelerator,
    wrap=wrap
)


In [4]:
ppo.optim_to_modules

{AcceleratedOptimizer (
 Parameter Group 0
     amsgrad: False
     betas: (0.9, 0.999)
     capturable: False
     differentiable: False
     eps: 1e-08
     foreach: None
     fused: None
     lr: 0.0001
     maximize: False
     weight_decay: 0
 
 Parameter Group 1
     amsgrad: False
     betas: (0.9, 0.999)
     capturable: False
     differentiable: False
     eps: 1e-08
     foreach: None
     fused: None
     lr: 0.0001
     maximize: False
     weight_decay: 0
 ): ['critic', 'actor']}

In [8]:
ppo.evolvable_attributes().keys()

dict_keys(['actor', 'critic', 'optimizer'])

In [4]:
ppo._identify_param_to_modules()

In [5]:
optimzer = ppo.optimizer

for param_group in optimzer.param_groups:
    for param in param_group["params"]:
        print(param._evol_module)

{'actor'}
{'actor'}
{'actor'}
{'actor'}
{'actor'}
{'actor'}
{'critic'}
{'critic'}
{'critic'}
{'critic'}
{'critic'}
{'critic'}


In [None]:
# On-policy
if swap_channels:
    state = np.moveaxis(state, [-1], [-3])

# Multi-agent
if swap_channels:
    if not is_vectorised:
        state = {
            agent_id: np.moveaxis(np.expand_dims(s, 0), [-1], [-3])
            for agent_id, s in state.items()
        }
    else:
        state = {
            agent_id: np.moveaxis(s, [-1], [-3])
            for agent_id, s in state.items()
        }

        

[2]