# Бейзлайн для задачи AIJ Multi-Agent AI

Данный ноутбук содержит реализацию [VDN](https://arxiv.org/abs/1706.05296) - кооперативного 
мульти-агентного алгоритма обучения с подкреплением. VDN основан на предпосылке
о линейном разложении общей награды агентов, таким образом, общая награда
всех агентов представлена в виде суммы индивидуальных наград.
Несмотря на то, что данная предпосылка ограничивает класс обучаемых стратегий
только кооперативными вариантами, VDN все же является хорошим бейзлайном для 
многих задач мульти-агентного обучения с подкреплением.

Данный бейзлайн позволяет получить целевую метрику (Mean Focal Score) около
42 при ее сабмите в тестовую систему (Случайная политика, для сравнения, 
получает ~4).

Главным результатом работы ноутбука будет создание директории `submission_vdn`, которую 
необходимо запаковать в .zip архив и отправить в тестирующую систему.

Мы рекомендуем запускать тесты на своих решениях, прежде чем отправлять их в систему.

In [1]:
# engine

from typing import Any, Dict, Optional, Tuple
# !pip3 install scipy
!pip3 install scikit-image
import numpy as np
from PIL import Image, ImageDraw
from PIL.Image import Image as ImageType
from scipy.ndimage import uniform_filter
from skimage import draw
from skimage.measure import block_reduce

COLOR_MAPS = {
    'red': np.array([255, 1, 1]),
    'blue': np.array([1, 1, 255]),
    'purple': np.array([128, 1, 128]),
    'pink': np.array([255, 200, 255]),
    'yellow': np.array([255, 255, 100]),
    'orange': np.array([235, 155, 1]),
    'gray': np.array([128, 128, 128]),
    'turquoise': np.array([1, 219, 255]),
    'domestic': np.array([255, 1, 1]),
    'foreign': np.array([51, 51, 51]),
}


def create_borders_candidate(
    grid_size: int,
    border_distort_range: Tuple[int, int],
    max_edge_dev: float,
    rng: np.random.Generator
) -> Tuple[np.ndarray, np.ndarray, bool]:
    """Create candidate border split

    Create candidate for agents' segments randomised split

    Args:
        grid_size: 2D square game field size
        border_distort_range: noise range for borders distortion
        max_edge_dev: maximum segment border shift
        rng: numpy random number generator

    Returns:
        Tuple[np.ndarray, np.ndarray, bool]: tuple of
            (horizontal borders, vertical borders, validity flag)
    """
    h_borders = np.zeros((grid_size, grid_size))
    v_borders = np.zeros((grid_size, grid_size))
    border_steps = np.linspace(0, grid_size, 4)[1:-1].astype(int)
    low, high = border_distort_range
    valid = True

    # horizontal borders
    h_inds = []
    for bs in border_steps:
        h_bord = rng.integers(low=low, high=high, size=(grid_size,))
        h_bord = np.cumsum(h_bord)
        h_bord += bs
        h_inds.append(h_bord)
        h_borders[h_bord, range(h_borders.shape[1])] = 1
        # check for drift
        if (np.abs(h_bord[-1] - bs) / grid_size) > max_edge_dev:
            valid = False

    # Border overlap
    if max(h_inds[0]) > min(h_inds[1]):
        valid = False

    # vertical borders
    v_inds = []
    for bs in border_steps:
        v_bord = rng.integers(low=low, high=high, size=(grid_size,))
        v_bord = np.cumsum(v_bord)
        v_bord += bs
        v_inds.append(v_bord)
        v_borders[range(v_borders.shape[0]), v_bord] = 1
        # check for drift
        if (np.abs(v_bord[-1] - bs) / grid_size) > max_edge_dev:
            valid = False

    # Border overlap
    if max(v_inds[0]) > min(v_inds[1]):
        valid = False

    return h_borders, v_borders, valid


def create_borders(
    grid_size: int,
    border_distort_range: Tuple[int, int],
    max_edge_dev: float,
    rng: np.random.Generator,
    max_tries: Optional[int] = 10
) -> Tuple[np.ndarray, np.ndarray]:
    """Create valid borders

    Create valid borders with multiple randomized attempts
    until success

    Args:
        grid_size: 2D square game field size
        border_distort_range: noise range for borders distortion
        max_edge_dev: maximum segment border shift
        rng: numpy random number generator
        max_tries: maximum attempts for border generation

    Returns:
        Tuple[np.ndarray, np.ndarray]: tuple of
            (horizontal borders, vertical borders)
    """
    valid = False
    n_tries = 0
    h_borders, v_borders = None, None
    while not valid:
        h_borders, v_borders, valid = create_borders_candidate(
            grid_size=grid_size,
            border_distort_range=border_distort_range,
            max_edge_dev=max_edge_dev,
            rng=rng
        )
        n_tries += 1
        if n_tries > max_tries:
            raise AssertionError(
                f'Exceeded max number of generation attempts: {max_tries}'
            )
    return h_borders, v_borders


def get_segment_map(
    h_borders: np.ndarray,
    v_borders: np.ndarray
) -> Dict[str, np.ndarray]:
    """Create segment map

    Create valid segment split for simulation

    Args:
        h_borders: np.ndarray of horizontal borders
        v_borders: np.ndarray of vertical borders

    Returns:
        Dict[str, np.ndarray]: dictionary of binary masks
            for each segment on the map
    """
    h_csum = np.cumsum(h_borders, axis=0)
    v_csum = (np.cumsum(v_borders, axis=1) + 1) * 10
    raw_segment_map = (h_csum + v_csum).astype(int)
    seg_ids = sorted(np.unique(raw_segment_map))
    agent_id = 0
    segment_map = {}
    for s in seg_ids:
        if s != 21:  # non water case
            segment_map[f'agent_{agent_id}'] = (
                    raw_segment_map == s).astype(int)
            agent_id += 1
        else:  # water case
            segment_map['water'] = (raw_segment_map == s).astype(int)
    return segment_map


def get_machines_candidate(
    s_map: np.ndarray,
    distance: int,
    machine_size: int,
    rng: np.random.Generator
) -> Tuple[np.ndarray, ...]:
    """Create candidate placement for machines

    Create candidate placement for machines for given
    individual agent segment

    Args:
        s_map: binary segment map for a given agent
        distance: initial distance between fabricator and recycler
        machine_size: machine icon size in pixels
        rng:  numpy random number generator

    Returns:
        Tuple[np.ndarray, ...]: tuple of numpy arrays which has
            - fabricator center location
            - recycler center location
            - fabricator binary map
            - recycler binary map
            - segment center location
    """
    center = (np
              .argwhere(s_map == 1)
              .mean(axis=0)
              .round(0)
              .astype(int))
    rad = rng.uniform(0, 2 * np.pi)
    h_comp = distance * np.cos(rad)
    v_comp = distance * np.sin(rad)
    machine_loc = np.array(
        [center[0] + h_comp, center[1] + v_comp]
    ).round(0).astype(int)
    recycler_loc = np.array(
        [center[0] - h_comp, center[1] - v_comp]
    ).round(0).astype(int)
    # machines loc
    s = machine_size // 2
    ms = machine_size
    machine_map = np.pad(
        np.zeros_like(s_map), ms, constant_values=0
    ).astype(int)
    recycler_map = np.pad(
        np.zeros_like(s_map), ms, constant_values=0
    ).astype(int)
    machine_map[(machine_loc[0] - s + ms):(machine_loc[0] + s + ms + 1),
                (machine_loc[1] - s + ms):(machine_loc[1] + s + ms + 1)] = 1
    recycler_map[(recycler_loc[0] - s + ms):(recycler_loc[0] + s + ms + 1),
                 (recycler_loc[1] - s + ms):(recycler_loc[1] + s + ms + 1)] = 1
    return machine_loc, recycler_loc, machine_map, recycler_map, center


def validate_locs(
    s_map: np.ndarray,
    machine_size: int,
    machine_map: np.ndarray,
    recycler_map: np.ndarray
) -> bool:
    """Validate machines placement

    Validate machines placement

    Args:
        s_map: binary segment map for a given agent
        machine_size: machine icon size in pixels
        machine_map: fabricator binary map
        recycler_map: recycler binary map

    Returns:
        bool: machines placement validity
    """
    s_map_pad = np.pad(
        s_map, machine_size, constant_values=0
    ).astype(int)
    negative_map = np.logical_not(s_map_pad)
    valid = True
    total_map = np.logical_or(machine_map, recycler_map)
    if np.logical_and(negative_map, total_map).sum() > 0:
        # check for borders violation
        valid = False
    if np.logical_and(machine_map, recycler_map).sum() > 0:
        # check for no intersection between
        valid = False
    return valid


def get_machines_loc(
    s_map: np.ndarray,
    machine_size: int,
    rng: np.random.Generator
) -> Tuple[np.ndarray, ...]:
    """Create placement for machines

    Create placement for machines for a given individual agent
    segment

    Args:
        s_map: binary segment map for a given agent
        machine_size: machine icon size in pixels
        rng:  numpy random number generator

    Returns:
        Tuple[np.ndarray, ...]: tuple of numpy arrays which has
            - fabricator center location
            - recycler center location
            - fabricator binary map
            - recycler binary map
            - segment center location
    """
    distance = s_map.shape[0] // 6
    valid = False
    while not valid and distance > 0:
        locs = get_machines_candidate(
            s_map=s_map, machine_size=machine_size,
            distance=distance, rng=rng
        )
        machine_loc, recycler_loc, machine_map, recycler_map, center = locs
        valid = validate_locs(
            s_map=s_map, machine_size=machine_size,
            machine_map=machine_map, recycler_map=recycler_map
        )
        distance -= 1
    if not valid:
        raise AssertionError(
            'Valid machines locations not found'
        )
    # get rid of padding for maps
    p = machine_size
    sum_pad = np.logical_or(machine_map, recycler_map).sum()
    machine_map = machine_map[p:-p, p:-p]
    recycler_map = recycler_map[p:-p, p:-p]
    sum_crop = np.logical_or(machine_map, recycler_map).sum()
    assert sum_pad == sum_crop, 'crop is invalid'
    return machine_loc, recycler_loc, machine_map, recycler_map, center


def get_all_machines_loc(
    segment_map: Dict[str, np.ndarray],
    machine_size: int,
    rng: np.random.Generator
) -> Tuple[Dict[str, Any], ...]:
    """Create placement for machines

    Create placement for machines for all agents segments

    Args:
        segment_map: dictionary with binary segment maps for each agent
        machine_size: machine icon size in pixels
        rng:  numpy random number generator

    Returns:
        Tuple[Dict[str, Any], ...]: dictionaries with information
            about fabricators, recyclers and segment centers placement
    """
    machines, recyclers, centers = {}, {}, {}
    for seg, s_map in segment_map.items():
        if seg != 'water':
            ml, rl, mm, rm, c = get_machines_loc(
                s_map=s_map, machine_size=machine_size,
                rng=rng
            )
            machines[seg] = {}
            recyclers[seg] = {}
            centers[seg] = c
            machines[seg]['center'] = ml
            machines[seg]['map'] = mm
            recyclers[seg]['center'] = rl
            recyclers[seg]['map'] = rm
    return machines, recyclers, centers


def get_non_resource_regions(
    segment_map: Dict[str, np.ndarray],
    machine_size: int,
    machines: Dict[str, Dict[str, np.ndarray]],
    recyclers: Dict[str, Dict[str, np.ndarray]],
) -> np.ndarray:
    """Get non resource regions

    Get binary map for regions where resources can
    not be located

    Args:
        segment_map: dictionary with binary segment maps for each agent
        machine_size: machine icon size in pixels
        machines: placement information for fabricators
        recyclers: placement information for recyclers

    Returns:
        np.ndarray: binary map for regions where resources can
            not be located
    """
    non_resource = segment_map['water'].copy()
    for a in machines.keys():
        mm = machines[a]['map']
        rm = recyclers[a]['map']
        am = np.logical_or(mm, rm)
        non_resource = np.logical_or(non_resource, am)
    non_resource = uniform_filter(
        non_resource.astype(float),
        size=int(machine_size),
        mode='constant'
    )
    return (non_resource > 1e-5).astype(np.uint8)


def get_region_grid(
    region: np.ndarray,
    cell_size: int,
    agg_fn: Optional = np.max
) -> np.ndarray:
    """Aggregate 2d array

    Aggregate 2d array with a given agg_func

    Args:
        region: region numerical representation
        cell_size: 2D aggregation window size
        agg_fn: function to aggregate with

    Returns:
        np.ndarray: aggregated region representation
    """
    region_grid = block_reduce(
        region,
        (cell_size, cell_size),
        agg_fn
    )
    return region_grid


def get_segment_map_grid(
    segment_map: Dict[str, np.ndarray],
    cell_size: int,
) -> Dict[str, np.ndarray]:
    """Aggregate segment map

    Aggregate binary segment map

    Args:
        segment_map: dictionary with binary segment maps for each agent
        cell_size: 2D aggregation window size

    Returns:
        Dict[str, np.ndarray]: dictionary with aggregated
            binary segment maps for each agent
    """
    grid_segment_map = {}
    for k, v in segment_map.items():
        grid_segment_map[k] = (get_region_grid(
            region=v, cell_size=cell_size,
            agg_fn=np.mean
        ) > 0.5).astype(np.uint8)
    return grid_segment_map


def get_unreachable_regions(
    segment_map: Dict[str, np.ndarray],
    machines: Dict[str, Dict[str, np.ndarray]],
    recyclers: Dict[str, Dict[str, np.ndarray]],
    include_water: Optional[bool] = True
) -> np.ndarray:
    """Get unreachable regions

    Get regions that cannot be accessed  by agents

    Args:
        segment_map: dictionary with binary segment maps for each agent
        machines: placement information for fabricators
        recyclers: placement information for recyclers
        include_water: whether to include water segment

    Returns:
        np.ndarray: binary map with unreachable regions
    """
    all_machines = np.zeros_like(segment_map['water'])
    for a in machines.keys():
        mm = machines[a]['map']
        rm = recyclers[a]['map']
        am = np.logical_or(mm, rm)
        all_machines = np.logical_or(all_machines, am)
    if include_water:
        unreachable = np.logical_or(all_machines, segment_map['water'])
    else:
        unreachable = all_machines
    return unreachable.astype(np.uint8)


def spawn_resources_grid(
    non_resource_grid: np.ndarray,
    resource_prob: float,
    rng: np.random.Generator
) -> np.ndarray:
    """Spawn initial resources

    Spawn initial resources with uniform probability

    Args:
        non_resource_grid: binary map for regions where resources can
            not be located
        resource_prob: probability to spawn resource at a given point
        rng: numpy random numbers generator

    Returns:
        np.ndarray: binary grid with spawned resources
    """
    init_resource_prob_map = rng.uniform(size=non_resource_grid.shape)
    init_resource_prob_map += non_resource_grid
    resources_grid = (init_resource_prob_map < resource_prob)
    return resources_grid.astype(np.uint8)


def grid_to_center_mappings(
    grid: np.ndarray,
    cell_size: int
) -> Tuple[Dict[tuple, np.ndarray], ...]:
    """Map grid to actual playing field

    Map low dimensional grid to actual playing field

    Args:
        grid: low dimensional grid
        cell_size: grid cell size on actual map

    Returns:
        Tuple[Dict[tuple, np.ndarray], ...]: mappings from
            grid coordinates to actual map coordinates and
            vise verse
    """
    grid_loc = np.argwhere(grid > 0)
    center_loc = grid_loc * cell_size + cell_size // 2
    grid_to_center = {tuple(g): c for g, c in zip(grid_loc, center_loc)}
    center_to_grid = {tuple(c): g for g, c in zip(grid_loc, center_loc)}
    return grid_to_center, center_to_grid


def get_template_texture(
    cmap: np.ndarray,
    template: np.ndarray,
    color_eps: float,
    rng: np.random.Generator
) -> np.ndarray:
    """Get template texture

    Get template texture for map visualisation

    Args:
        cmap: background colormap (grass)
        template: map template for dimensions
        color_eps: color noise
        rng: numpy random numbers generator

    Returns:
        np.ndarray: array with background texture
    """
    color_noise = rng.uniform(
        low=-color_eps, high=color_eps,
        size=template.shape
    ) * 255.
    color_noise = np.round(color_noise, 0)[:, :, np.newaxis]
    cmap = cmap[np.newaxis, np.newaxis, :]
    texture = cmap + color_noise
    texture = np.clip(texture, a_min=0, a_max=255)
    texture *= template[:, :, np.newaxis]
    return texture.astype(int)


def get_thick_border(
    borders: np.ndarray,
    border_display_width: int
) -> np.ndarray:
    """Get thick segment borders

    Get thick segment borders

    Args:
        borders: array with segments borders binary map
        border_display_width: segments borders thickness

    Returns:
        np.ndarray: array with segment borders
    """
    thick_borders = uniform_filter(
        borders.astype(float),
        size=border_display_width,
        mode='constant'
    )
    return (thick_borders > 1e-5).astype(int)


def get_machine_icon(
    cmap: np.ndarray,
    asset_size: int
) -> ImageType:
    """Get fabricator icon

    Get icon for fabricator

    Args:
        cmap: asset's color map
        asset_size: asset's size in pixels

    Returns:
        ImageType: asset PIL image
    """
    gen_size = 51
    outline = 5
    cmap = cmap[np.newaxis, np.newaxis, :]
    machine = np.zeros(shape=(gen_size, gen_size, 3))
    machine += cmap

    vec_a = np.abs(np.arange(0, gen_size) - gen_size // 2)
    vec_a -= vec_a.max()
    vec_b = vec_a
    dot = np.matmul(vec_a[:, np.newaxis], vec_b[np.newaxis, :])
    dot = np.abs(dot / dot.max())
    dot = (dot < 0.5).astype(int)
    dot = dot[:, :, np.newaxis]

    machine = machine * dot
    machine[:outline, :, :] = 0
    machine[-outline:, :, :] = 0
    machine[:, :outline, :] = 0
    machine[:, -outline:, :] = 0
    machine = machine.astype(np.uint8)
    image = Image.fromarray(machine)
    machine_icon = image.resize(size=(asset_size, asset_size))
    return machine_icon


def create_circular_mask(
    h: int, w: int,
    center: Optional = None, radius: Optional = None
):
    """Create circular mask
    Args:
        h: height
        w: width
        center: center
        radius: radius
    Returns:
        np.ndarray: circular mask numpy array
    """
    if center is None:
        center = (int(w / 2), int(h / 2))
    if radius is None:
        radius = min(center[0], center[1], w - center[0], h - center[1])

    Y, X = np.ogrid[:h, :w]
    dist_from_center = np.sqrt((X - center[0]) ** 2 + (Y - center[1]) ** 2)

    mask = dist_from_center <= radius
    return mask


def get_recycler_icon(
    cmap: np.ndarray,
    asset_size: int
) -> ImageType:
    """Get recycler icon

    Get icon for recycler

    Args:
        cmap: asset's color map
        asset_size: asset's size in pixels

    Returns:
        ImageType: asset PIL image
    """
    gen_size = 51
    mask = create_circular_mask(
        gen_size, gen_size)[:, :, np.newaxis]
    cmap = cmap[np.newaxis, np.newaxis, :]
    mask = cmap * mask
    mask = mask.astype(np.uint8)
    image = Image.fromarray(mask)
    recycler_icon = image.resize(size=(asset_size, asset_size))
    return recycler_icon


def get_resource_icon(
    asset_size: int,
    cmap: np.ndarray = np.array([220, 255, 255]),
    tolerance: int = 20
) -> ImageType:
    """Get resource icon

    Get icon for resource

    Args:
        cmap: asset's color map
        asset_size: asset's size in pixels
        tolerance: minimum pixel value to display

    Returns:
        ImageType: asset PIL image
    """
    img = Image.new('RGB', (51, 51))
    img = np.array(img)
    row1, col1 = draw.polygon((1, 50, 35, 50), (25, 10, 25, 40))
    row2, col2 = draw.polygon((20, 20, 34), (0, 50, 25))
    img[row1, col1, :] = cmap
    img[row2, col2, :] = cmap
    img = Image.fromarray(img)
    img = np.array(
        img.resize(size=(asset_size, asset_size))
    )
    filter = np.argwhere(img.mean(axis=2) < tolerance)
    img[filter[:, 0], filter[:, 1], :] = 0
    return Image.fromarray(img)


def get_trash_icon(
    asset_size: int,
    cmap: np.ndarray = np.array([94, 45, 1])
) -> ImageType:
    """Get trash icon

    Get icon for trash

    Args:
        cmap: asset's color map
        asset_size: asset's size in pixels

    Returns:
        ImageType: asset PIL image
    """
    img = np.ones(shape=(asset_size, asset_size, 3))
    img = img * cmap[np.newaxis, np.newaxis, :]
    return Image.fromarray(img.astype(np.uint8))


def get_agent_icon(
    asset_size: int,
    cmap: np.ndarray,
    tolerance: int = 20
) -> ImageType:
    """Get agent icon

    Get icon for agent

    Args:
        cmap: asset's color map
        asset_size: asset's size in pixels
        tolerance: minimum pixel value to display

    Returns:
        ImageType: asset PIL image
    """
    img = Image.new('RGB', (51, 51))
    img = np.array(img)
    row1, col1 = draw.polygon((1, 50, 50, 40, 40, 50, 50),
                              (25, 1, 18, 18, 32, 32, 50))
    img[row1, col1, :] = cmap
    img = Image.fromarray(img)
    img = np.array(
        img.resize(size=(asset_size, asset_size))
    )
    filter = np.argwhere(img.mean(axis=2) < tolerance)
    img[filter[:, 0], filter[:, 1], :] = 0
    return Image.fromarray(img)


def get_arrow_icon(
    asset_size: int,
    cmap: np.ndarray,
    tolerance: int = 10
) -> ImageType:
    """Get arrow icon

    Get icon for arrow

    Args:
        cmap: asset's color map
        asset_size: asset's size in pixels
        tolerance: minimum pixel value to display

    Returns:
        ImageType: asset PIL image
    """
    img = Image.new('RGB', (51, 51))
    img = np.array(img)
    row1, col1 = draw.polygon((0, 50, 35, 50), (25, 14, 25, 36))
    img[row1, col1, :] = cmap
    img = Image.fromarray(img)
    img = np.array(
        img.resize(size=(asset_size, asset_size))
    )
    filter = np.argwhere(img.mean(axis=2) < tolerance)
    img[filter[:, 0], filter[:, 1], :] = 0
    return Image.fromarray(img)


def create_icons(
    segment_map: Dict[str, np.ndarray],
    machine_icon_size: int,
    agent_icon_size: int,
    arrow_icon_size: int,
    resource_icon_size: int,
    trash_icon_size: int,
    rng: np.random.Generator,
    color_maps: Dict[str, np.ndarray] = COLOR_MAPS,
    resource_cmap: np.ndarray = np.array([220, 255, 255]),
    trash_cmap: np.ndarray = np.array([94, 45, 1]),
    north_arrow_cmap: np.ndarray = np.array([255, 1, 1]),
    home_arrow_cmap: np.ndarray = np.array([1, 255, 1]),
    non_home_arrow_cmap: np.ndarray = np.array([1, 1, 255]),
) -> Dict[str, Any]:
    """Get icons

    Get all icons required for visualisation

    Args:
        segment_map: dictionary with binary segment maps for each agent
        machine_icon_size: machine icon size in pixels
        agent_icon_size: agent icon size in pixels
        arrow_icon_size: arrow icon size in pixels
        resource_icon_size: resource icon size in pixels
        trash_icon_size: trash icon size in pixels
        rng: numpy random number generator
        color_maps: color maps for each agent
        resource_cmap: resource color map
        trash_cmap: trash color map
        north_arrow_cmap: north arrow color map
        home_arrow_cmap: home arrow color map
        non_home_arrow_cmap: non home arrow color map

    Returns:
        Dict[str, Any]: dictionary with icons for each asset
    """
    icons = {}
    agents = [a for a in segment_map.keys() if a != 'water']
    # rng.shuffle(agents)
    color_reference = {a: c for a, c in zip(agents, color_maps)}
    color_reference['domestic'] = 'domestic'
    color_reference['foreign'] = 'foreign'
    for a, c in color_reference.items():
        icons[a] = {}
        icons[a]['machine'] = get_machine_icon(
            cmap=color_maps[c],
            asset_size=machine_icon_size
        )
        icons[a]['recycler'] = get_recycler_icon(
            cmap=color_maps[c],
            asset_size=machine_icon_size
        )
        icons[a]['agent'] = get_agent_icon(
            cmap=color_maps[c],
            asset_size=agent_icon_size
        )
        icons[a]['cmap'] = color_maps[c]
    icons['resource'] = get_resource_icon(
        cmap=resource_cmap,
        asset_size=resource_icon_size
    )
    icons['trash'] = get_trash_icon(
        cmap=trash_cmap,
        asset_size=trash_icon_size
    )
    icons['north_arrow'] = get_arrow_icon(
        cmap=north_arrow_cmap,
        asset_size=arrow_icon_size
    )
    icons['home_arrow'] = get_arrow_icon(
        cmap=home_arrow_cmap,
        asset_size=arrow_icon_size
    )
    icons['non_home_arrow'] = get_arrow_icon(
        cmap=non_home_arrow_cmap,
        asset_size=arrow_icon_size
    )
    return icons


def create_erasers(
    agent_size: int,
    arrow_size: int,
    resource_size: int,
    trash_size: int
) -> Dict[str, ImageType]:
    """Create erasers

    Create zero-valued templates for removing icons
    from respective image layer

    Args:
        agent_size: agent icon size in pixels
        arrow_size: arrow icon size in pixels
        resource_size: resource icon size in pixels
        trash_size: trash icon size in pixels

    Returns:
        Dict[str, ImageType]: dictionary with
            zero-valued templates for each asset
    """
    erasers = {}
    erasers['agent'] = Image.fromarray(
        np.zeros(shape=(agent_size, agent_size))
    )
    erasers['arrow'] = Image.fromarray(
        np.zeros(shape=(arrow_size, arrow_size))
    )
    erasers['resource'] = Image.fromarray(
        np.zeros(shape=(resource_size, resource_size))
    )
    erasers['trash'] = Image.fromarray(
        np.zeros(shape=(trash_size, trash_size))
    )
    return erasers


def create_substrate_texture(
    segment_map: Dict[str, np.ndarray],
    borders: np.ndarray,
    border_display_width: int,
    machines: Dict[str, np.ndarray],
    recyclers: Dict[str, np.ndarray],
    icons: Dict[str, Any],
    rng: np.random.Generator
) -> np.ndarray:
    """Create substrate texture

    Create substrate texture for state visualisation

    Args:
        segment_map: dictionary with binary segment maps for each agent
        borders: array with segments borders binary map
        border_display_width: segments borders thickness
        machines: placement information for fabricators
        recyclers: placement information for recyclers
        icons: dictionary with icons for each asset
        rng: numpy random number generator

    Returns:
        np.ndarray: substrate texture image array
    """
    grass_texture = get_template_texture(
        cmap=np.array([1, 128, 1]),
        template=np.logical_not(segment_map['water']),
        color_eps=0.075,
        rng=rng
    )
    water_texture = get_template_texture(
        cmap=np.array([120, 140, 250]),
        template=segment_map['water'],
        color_eps=0.04,
        rng=rng
    )
    thick_borders = get_thick_border(
        borders=borders,
        border_display_width=border_display_width
    )
    borders_texture = get_template_texture(
        cmap=np.array([150, 150, 150]),
        template=thick_borders,
        color_eps=0.075,
        rng=rng
    )

    final_texture = grass_texture + water_texture
    final_texture *= np.logical_not(thick_borders)[:, :, np.newaxis]
    final_texture += borders_texture
    final_texture = Image.fromarray(final_texture.astype(np.uint8))
    # Add machines and recycler assets
    for a in segment_map.keys():
        if a != 'water':
            m_icon = icons[a]['machine']
            m_offset = machines[a]['center'] - (m_icon.size[0] // 2)
            r_icon = icons[a]['recycler']
            r_offset = recyclers[a]['center'] - (r_icon.size[0] // 2)
            final_texture.paste(m_icon, tuple(m_offset)[::-1])
            final_texture.paste(r_icon, tuple(r_offset)[::-1])
    return np.array(final_texture).astype(np.uint8)


def get_agents_perspective(
    grid_size: int,
    segment_map: Dict[str, np.ndarray],
    icons: Dict[str, Any],
    machines: Dict[str, Any],
    recyclers: Dict[str, Any],
    obs_dim: int
) -> Dict[str, np.ndarray]:
    """Get agents perspective

    Get machines attributions for the local view of
    each agent

    Args:
        grid_size: 2D square game field size
        segment_map: dictionary with binary segment maps for each agent
        machines: placement information for fabricators
        recyclers: placement information for recyclers
        icons: dictionary with icons for each asset
        obs_dim: local visual observation size

    Returns:
        Dict[str, np.ndarray]: binary maps for foreign and
            domestic machines for each agent
    """
    p = obs_dim
    machines_by_agent = {}
    valid_agents = [a for a in segment_map.keys() if a != 'water']
    for a1 in valid_agents:
        mba = Image.new('RGB', (grid_size, grid_size))
        for a2 in valid_agents:
            if a1 != a2:
                m_icon = icons['foreign']['machine']
                r_icon = icons['foreign']['recycler']
            else:
                m_icon = icons['domestic']['machine']
                r_icon = icons['domestic']['recycler']
            m_offset = machines[a2]['center'] - (m_icon.size[0] // 2)
            r_offset = recyclers[a2]['center'] - (r_icon.size[0] // 2)
            mba.paste(m_icon, tuple(m_offset)[::-1])
            mba.paste(r_icon, tuple(r_offset)[::-1])
        machines_by_agent[a1] = np.pad(
            np.array(mba), pad_width=((p, p), (p, p), (0, 0)),
            constant_values=0)
    return machines_by_agent


def render_init_resources(
    resource_grid_to_center: Dict[tuple, np.ndarray],
    icons: Dict[str, Any],
    grid_size: int
) -> ImageType:
    """Render initial resources

    Render initial resources distribution

    Args:
        grid_size: 2D square game field size
        icons: dictionary with icons for each asset
        resource_grid_to_center: mapping from low dimension
            resource grid to full size game field

    Returns:
        ImageType: PIL image with initial resources
    """
    resource_map = np.zeros(shape=(grid_size, grid_size, 3))
    resource_map = Image.fromarray(resource_map.astype(np.uint8))
    resources_loc = np.array(list(resource_grid_to_center.values()))
    r_icon = np.array(icons['resource'])
    mask = (r_icon.sum(axis=-1) > 0) * 255.
    mask = (mask).astype(np.uint8)
    mask = Image.fromarray(mask)
    r_icon = Image.fromarray(r_icon.astype(np.uint8))
    for rl in resources_loc:
        offset = rl - (r_icon.size[0] // 2)
        offset = tuple(offset)[::-1]
        resource_map.paste(r_icon, offset, mask=mask)
    return resource_map


def render_wealth(
    money: int,
    block_size: int
) -> ImageType:
    """Render wealth for local menu

    Render wealth for local menu (deprecated)

    Args:
        money: current agent's wealth
        block_size: menu block size

    Returns:
        ImageType: PIL image wealth size
    """
    wealth = Image.new('RGB',
                       (block_size * 2, block_size))
    dw = ImageDraw.Draw(wealth)
    dw.text((1, 1), str(money), fill=(255, 255, 255))
    return wealth


def init_agents(
    centers: Dict[str, np.ndarray],
    block_size: int,
    rng: np.random.Generator
) -> Dict[str, Any]:
    """Init agents state

    Create dictionary with agents initial state

    Args:
        block_size: menu block size
        centers: segment centers locations
        rng: numpy random number generator

    Returns:
        Dict[str, Any]: agents initial state
    """
    agents_state = {}
    dirs = [0, 0.5, 1, 1.5]
    for a, c in centers.items():
        agents_state[a] = {}
        agents_state[a]['loc'] = c
        agents_state[a]['dir_pi'] = rng.choice(dirs, 1).item()
        agents_state[a]['inventory'] = {}
        agents_state[a]['inventory']['resource'] = False
        agents_state[a]['inventory']['trash'] = False
        agents_state[a]['inventory']['trash_source'] = None
        agents_state[a]['wealth'] = 0
        agents_state[a]['last_render_wealth'] = 0
        agents_state[a]['last_wealth_image'] = render_wealth(
            money=0, block_size=block_size
        )
    return agents_state


def pi_to_rad(n_pi: float) -> int:
    """Convert pi to radians

    Convert pi to radian angle for rotations

    Args:
        n_pi: angle in arc notation

    Returns:
        int: angle in radians
    """
    return int((n_pi - 0.5) * 180)


def render_agents(
    agents_state: Dict[str, Any],
    icons: Dict[str, Any],
    grid_size: int,
    local_mode: Optional[bool] = False
) -> np.ndarray:
    """Render agents

    Create image layer with agents

    Args:
        agents_state: agents state dictionary
        grid_size: 2D square game field size
        icons: dictionary with icons for each asset
        local_mode: whether to generate subjective view

    Returns:
        np.ndarray: array with agents image layer
    """
    agents_map = Image.new('RGB', (grid_size, grid_size))
    for a, s in agents_state.items():
        if not local_mode:
            icon = icons[a]['agent']
        else:
            icon = icons['foreign']['agent']
        icon = icon.rotate(pi_to_rad(s['dir_pi']))
        y, x = tuple(s['loc'])
        x, y = x - icon.size[0] // 2, y - icon.size[1] // 2
        agents_map.paste(icon, (x, y))
    return np.array(agents_map)


def loc_to_coord(
    loc: np.ndarray,
    grid_size: int
) -> np.ndarray:
    """Numpy loc to coordinates

    Numpy loc to Cartesian coordinates

    Args:
        loc: array with numpy loc
        grid_size: game field grid size

    Returns:
        np.ndarray: array with Cartesian coordinates
    """
    y, x = tuple(loc)
    y = grid_size - y
    return np.array([x, y])


def cart2pol(x: int, y: int) -> Tuple[float, float]:
    """Cartesian to polar

    Convert Cartesian coordinates to polar coordinates

    Args:
        x: x-coordinate
        y: y-coordinate

    Returns:
        Tuple[float, float]: (rho, phi) - polar coordinates
    """
    rho = np.sqrt(x ** 2 + y ** 2)
    phi = np.arctan2(y, x)
    return rho, phi


def dir_to_shift(dir: float) -> np.ndarray:
    """Direction to shift

    Utility function for building local observation

    Args:
        dir: direction in arc notation

    Returns:
        np.ndarray: array with coordinates shift
    """
    if dir == 0.0:
        shift = np.array([0, 1])
    elif dir == 0.5:
        shift = np.array([-1, 0])
    elif dir == 1.0:
        shift = np.array([0, -1])
    elif dir == 1.5:
        shift = np.array([1, 0])
    else:
        raise ValueError(
            f'Invalid direction for movement: {dir} * pi'
        )
    return shift


class GameEngine:

    def __init__(
        self,
        seed: int,
        grid_size: int = 210,  # should be div by resource_size and trash_size
        obs_dim: int = 60,  # should be divisible by 5
        move_step: int = 7,
        resource_price: int = 10,
        recycle_cost: int = 4,
        border_distort_range: Tuple[int, int] = (-1, 2),
        max_edge_dev: float = 0.1,
        max_tries: int = 25,
        machine_size: int = 9,
        machine_reach: int = 9,
        agent_size: int = 9,
        agent_reach: int = 9,
        resource_size: int = 7,
        trash_size: int = 5,
        resource_prob: float = 0.1,
        border_display_width: int = 2,
        blocked_vanish_alpha: float = 0.25
    ):
        """Game Engine

        Game engine for AIJ Multi-Agent AI competition

        Valid action space (discrete):
            # 0: move forward by `move_step` pixels if possible
            # 1: move left by `move_step` pixels if possible
            # 2: move right by `move_step` pixels if possible
            # 3: move backward by `move_step` pixels if possible
            # 4: pickup resource (if closer than `agent_reach` pixels)
            # 5: pickup trash (if closer than `agent_reach` pixels)
            # 6: throw resource (put into machine if closer than `machine_reach`)
            # 7: throw trash (put into recycler if closer than `machine_reach`)
            # 8: noop

        Parameters:
            seed: random seed for engine
            grid_size: 2D square game field size
            obs_dim: local visual observation size
            move_step: movement step size in pixels
            resource_price: reward given for resource processing
            recycle_cost: cost of recycling trash
            border_distort_range: noise range for borders distortion
            max_edge_dev: maximum segment border shift
            max_tries: maximum attempts for border generation
            machine_size: machine icon size in pixels
            machine_reach: size of machine interaction region
            agent_size: agent icon size in pixels
            agent_reach: agent reach when picking up items
            resource_size: resource icon size in pixels
            trash_size: trash icon size in pixels
            resource_prob: probability to spawn resource at a given point
            border_display_width: segments borders thickness
            blocked_vanish_alpha: blocked segment fogging degree
        """
        assert grid_size % resource_size == 0 and grid_size % trash_size == 0
        assert obs_dim % 5 == 0
        assert obs_dim // 5 >= resource_size + trash_size

        self.rng = np.random.default_rng(seed)
        self.grid_size = grid_size
        self.diag = np.sqrt(2 * grid_size ** 2).item()
        self.move_step = move_step
        self.obs_dim = obs_dim
        self.block_size = obs_dim // 5
        self.resource_price = resource_price
        self.recycle_cost = recycle_cost
        self.border_distort_range = border_distort_range
        self.max_edge_dev = max_edge_dev
        self.max_tries = max_tries
        self.machine_size = machine_size
        self.machine_reach = machine_reach
        self.agent_size = agent_size
        self.agent_reach = agent_reach
        self.arrow_size = obs_dim // 5
        self.resource_size = resource_size
        self.trash_size = trash_size
        self.resource_prob = resource_prob
        self.border_display_width = border_display_width
        self.blocked_vanish_alpha = blocked_vanish_alpha

        # Construct main layout
        self.h_borders, self.v_borders = create_borders(
            grid_size=grid_size,
            border_distort_range=border_distort_range,
            max_edge_dev=max_edge_dev,
            max_tries=max_tries,
            rng=self.rng
        )
        self.borders = np.logical_or(self.h_borders, self.v_borders)
        self.segment_map = get_segment_map(
            h_borders=self.h_borders, v_borders=self.v_borders
        )
        self.machines, self.recyclers, self.centers = get_all_machines_loc(
            segment_map=self.segment_map,
            machine_size=machine_size,
            rng=self.rng
        )
        # Segment maps for resources and trash
        self.resource_segment_map = get_segment_map_grid(
            segment_map=self.segment_map,
            cell_size=resource_size
        )
        self.trash_segment_map = get_segment_map_grid(
            segment_map=self.segment_map,
            cell_size=trash_size
        )

        # Distribute initial resources
        non_resource = get_non_resource_regions(
            segment_map=self.segment_map,
            machine_size=machine_size,
            machines=self.machines,
            recyclers=self.recyclers,
        )
        self.non_resource_grid = get_region_grid(
            region=non_resource,
            cell_size=resource_size
        )
        self.resource_grid = spawn_resources_grid(
            non_resource_grid=self.non_resource_grid,
            resource_prob=resource_prob,
            rng=self.rng
        )
        mappings = grid_to_center_mappings(
            grid=self.resource_grid,
            cell_size=resource_size
        )
        self.resource_grid_to_center, self.resource_center_to_grid = mappings

        # Create grid for trash
        self.non_trash_grid = get_region_grid(
            region=non_resource,
            cell_size=trash_size
        )
        self.trash_grid = np.zeros_like(
            self.non_trash_grid
        ).astype(np.uint8)
        self.trash_grid_to_center = {}
        self.trash_center_to_grid = {}
        self.trash_center_to_source = {}

        # Get unreachable regions
        self.non_reachable = get_unreachable_regions(
            segment_map=self.segment_map,
            machines=self.machines,
            recyclers=self.recyclers,
        )
        self.all_machines = get_unreachable_regions(
            segment_map=self.segment_map,
            machines=self.machines,
            recyclers=self.recyclers,
            include_water=False
        )[:, :, np.newaxis]
        self.all_machines = self.pad_state(self.all_machines)

        # Generate assets for rendering
        self.icons = create_icons(
            segment_map=self.segment_map,
            machine_icon_size=self.machine_size,
            agent_icon_size=self.agent_size,
            arrow_icon_size=self.arrow_size,
            resource_icon_size=self.resource_size,
            trash_icon_size=self.trash_size,
            rng=self.rng
        )

        # Generate erasers for assets
        self.erasers = create_erasers(
            agent_size=self.agent_size,
            arrow_size=self.arrow_size,
            resource_size=self.resource_size,
            trash_size=self.trash_size
        )

        # Generate main substrate texture
        self.substrate_texture = create_substrate_texture(
            segment_map=self.segment_map,
            borders=self.borders,
            border_display_width=border_display_width,
            machines=self.machines,
            recyclers=self.recyclers,
            icons=self.icons,
            rng=self.rng
        )
        self.agents_perspectives = get_agents_perspective(
            grid_size=self.grid_size,
            segment_map=self.segment_map,
            icons=self.icons,
            machines=self.machines,
            recyclers=self.recyclers,
            obs_dim=self.obs_dim
        )

        # Generate resources layer
        self.resource_map = render_init_resources(
            resource_grid_to_center=self.resource_grid_to_center,
            icons=self.icons,
            grid_size=grid_size
        )

        # Generate trash layer
        self.trash_map = Image.new('RGB', (grid_size, grid_size))

        # Create agents state
        self.agents_state = init_agents(
            centers=self.centers,
            block_size=self.block_size,
            rng=self.rng
        )

        # Create blocked dict
        self.blocked = {a: False for a in self.agents_state.keys()}
        self.blocked_map = np.zeros(
            shape=(grid_size, grid_size)).astype(np.uint8)

    def agents_map(self, local_mode: Optional[bool] = False) -> np.ndarray:
        """Render agents

        Create image layer with agents

        Args:
            local_mode: whether to generate subjective view

        Returns:
            np.ndarray: array with agents image layer
        """
        return render_agents(
            agents_state=self.agents_state,
            icons=self.icons,
            grid_size=self.grid_size,
            local_mode=local_mode
        )

    def get_state(self) -> np.ndarray:
        """Get state

        Get current game engine state

        Returns:
            np.ndarray: image array with current visual state
        """
        resource_map = np.array(self.resource_map)
        trash_map = np.array(self.trash_map)
        # add resources layer
        state = self.substrate_texture * np.logical_not(
            resource_map > 0)
        state = state + resource_map
        # add trash layer
        state = state * np.logical_not(
            trash_map > 0)
        state = state + trash_map
        # add agents layer
        agents_map = self.agents_map()
        state = state * np.logical_not(
            agents_map > 0)
        state = state + agents_map
        return state

    def trash_by_segment(self, agent_id: str) -> int:
        """Calculate trash by segment

        Calculate number of trash items on a given
        segment

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            int: number of trash items on a given segment
        """
        segment = self.trash_segment_map[agent_id]
        trash_by_segment = np.logical_and(
            segment, self.trash_grid
        )
        return trash_by_segment.sum()

    def resource_by_segment(self, agent_id: str) -> int:
        """Calculate resource by segment

        Calculate number of resource items on a given
        segment

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            int: number of resource items on a given segment
        """
        segment = self.resource_segment_map[agent_id]
        resource_by_segment = np.logical_and(
            segment, self.resource_grid
        )
        return resource_by_segment.sum()

    def is_home_segment(self, agent_id) -> bool:
        """Check if home segment

        Check if given agent is on home segment

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if currently on home segment
        """
        loc = self.agents_state[agent_id]['loc']
        home = self.segment_map[agent_id][loc[0], loc[1]]
        return bool(home.item())

    def add_block(self, agent_id: str) -> None:
        """Add block

        Impose block on agent actions (resource and trash
        recycling)

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            None
        """
        if not self.blocked[agent_id]:
            self.blocked[agent_id] = True
            self.blocked_map = np.logical_or(
                self.blocked_map, self.segment_map[agent_id])
            blocked_segment = np.logical_not(self.segment_map[agent_id])
            blocked_segment = blocked_segment + self.blocked_vanish_alpha
            blocked_segment = np.clip(blocked_segment, a_min=0, a_max=1)
            blocked_segment = np.expand_dims(blocked_segment, axis=-1)
            new_st = self.substrate_texture * blocked_segment
            new_st = np.round(new_st, 0).astype(np.uint8)
            self.substrate_texture = new_st

    def pad_state(self, state: np.ndarray) -> np.ndarray:
        """Pad state

        Pad image state for building local observations

        Args:
            state: raw engine visual state

        Returns:
            np.ndarray: padded engine visual state
        """
        p = self.obs_dim
        state_pad = np.pad(
            state, pad_width=((p, p), (p, p), (0, 0)),
            constant_values=0)
        return state_pad

    def delete_resource(self, center_loc: tuple) -> None:
        """Delete resource

        Delete resource from game grid given its raw
        numpy loc

        Args:
            center_loc: numpy loc on the 2D game field

        Returns:
            None
        """
        # delete from the references
        grid_loc = tuple(self.resource_center_to_grid[center_loc])
        del self.resource_center_to_grid[center_loc]
        del self.resource_grid_to_center[grid_loc]
        # delete from the grid
        self.resource_grid[grid_loc[0], grid_loc[1]] = 0
        # delete from the resource map
        y, x = center_loc
        c = (self.resource_size // 2)
        self.resource_map.paste(
            self.erasers['resource'], (x - c, y - c)
        )

    def delete_trash(self, center_loc: tuple) -> str:
        """Delete trash

        Delete trash from game grid given its raw
        numpy loc

        Args:
            center_loc: numpy loc on the 2D game field

        Returns:
            None
        """
        # delete from the references
        grid_loc = tuple(self.trash_center_to_grid[center_loc])
        del self.trash_center_to_grid[center_loc]
        del self.trash_grid_to_center[grid_loc]
        trash_source = self.trash_center_to_source[center_loc]
        del self.trash_center_to_source[center_loc]
        # delete from the grid
        self.trash_grid[grid_loc[0], grid_loc[1]] = 0
        # delete from the trash map
        y, x = center_loc
        c = (self.trash_size // 2)
        self.trash_map.paste(
            self.erasers['trash'], (x - c, y - c)
        )
        return trash_source

    def add_resource(self, grid_loc: np.ndarray) -> None:
        """Add resource

        Add resource to the game field and resource grid
        given its resource grid location

        Args:
            grid_loc: numpy loc resource grid

        Returns:
            None
        """
        center_loc = grid_loc * self.resource_size + self.resource_size // 2
        # add to the references
        self.resource_grid_to_center[tuple(grid_loc)] = center_loc
        self.resource_center_to_grid[tuple(center_loc)] = grid_loc
        # add to the resource grid
        self.resource_grid[grid_loc[0], grid_loc[1]] = 1
        # add to the resource map
        y, x = tuple(center_loc)
        c = (self.resource_size // 2)
        self.resource_map.paste(
            self.icons['resource'], (x - c, y - c)
        )

    def add_trash(self, grid_loc: np.ndarray, agent_id: str) -> None:
        """Add trash

        Add trash to the game field and trash grid
        given its trash grid location

        Args:
            grid_loc: numpy loc trash grid
            agent_id: agent ID in form `agent_{i}`

        Returns:
            None
        """
        center_loc = grid_loc * self.trash_size + self.trash_size // 2
        # add to the references
        self.trash_grid_to_center[tuple(grid_loc)] = center_loc
        self.trash_center_to_grid[tuple(center_loc)] = grid_loc
        self.trash_center_to_source[tuple(center_loc)] = agent_id
        # add to the trash grid
        self.trash_grid[grid_loc[0], grid_loc[1]] = 1
        # add to the trash map
        y, x = tuple(center_loc)
        c = (self.trash_size // 2)
        self.trash_map.paste(
            self.icons['trash'], (x - c, y - c)
        )

    def sample_trash(self, agent_id: str) -> bool:
        """Sample trash

        Sample trash randomly at a given agents' segment

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if sampled successfully and False otherwise
        """
        a_reg = np.logical_and(
            self.trash_segment_map[agent_id],
            np.logical_and(
                np.logical_not(self.non_trash_grid),
                np.logical_not(self.trash_grid)
            )
        ).astype(np.uint8)
        a_loc = np.argwhere(a_reg > 0)
        if a_loc.shape[0] > 0:
            idx = self.rng.integers(low=0, high=a_loc.shape[0])
            grid_loc = a_loc[idx]
            self.add_trash(grid_loc=grid_loc, agent_id=agent_id)
            done = True
        else:
            done = False
        return done

    def sample_resource(self, agent_id: str) -> bool:
        """Sample resource

        Sample resource randomly at a given agents' segment

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if sampled successfully and False otherwise
        """
        a_reg = np.logical_and(
            self.resource_segment_map[agent_id],
            np.logical_and(
                np.logical_not(self.non_resource_grid),
                np.logical_not(self.resource_grid)
            )
        ).astype(np.uint8)
        a_loc = np.argwhere(a_reg > 0)
        if a_loc.shape[0] > 0:
            idx = self.rng.integers(low=0, high=a_loc.shape[0])
            grid_loc = a_loc[idx]
            self.add_resource(grid_loc=grid_loc)
            done = True
        else:
            done = False
        return done

    def move_candidate(
        self, action_id: int,
        agent_id: str, step_size: int
    ) -> Tuple[np.ndarray, np.ndarray, float]:
        """Movement candidate

        Propose movement candidate without taking into an account
        unreachable regions

        Args:
            action_id: movement action integer id
            agent_id: agent ID in form `agent_{i}`
            step_size: agent step size in pixels

        Returns:
            Tuple[np.ndarray, np.ndarray, float]: tuple which contains:
                - initial location
                - new candidate location
                - new direction
        """
        loc = self.agents_state[agent_id]['loc']
        dir = self.agents_state[agent_id]['dir_pi']
        if action_id == 0:
            new_dir = dir
        elif action_id == 1:
            new_dir = (dir + 0.5) % 2.
        elif action_id == 2:
            new_dir = (dir - 0.5) % 2.
        elif action_id == 3:
            new_dir = (dir - 1.) % 2.
        else:
            new_dir = None
            raise ValueError(
                f'Invalid action_id for movement: {action_id}'
            )
        shift = dir_to_shift(dir=new_dir)
        new_loc = loc + shift * step_size
        new_loc = np.clip(
            new_loc,
            a_min=(0 + self.agent_size // 2),
            a_max=(self.grid_size - 1 - self.agent_size // 2)
        ).astype(int)
        return loc, new_loc, new_dir

    def move(self, action_id: int, agent_id: str) -> None:
        """Make movement

        Make agent movement with respect to unreachable regions

        Args:
            action_id: movement action integer id
            agent_id: agent ID in form `agent_{i}`

        Returns:
            None
        """
        valid = False
        new_loc, new_dir = None, None
        step = self.move_step
        while not valid and step > -1:
            init_loc, new_loc, new_dir = self.move_candidate(
                action_id=action_id, agent_id=agent_id,
                step_size=step
            )
            valid = not bool(self.non_reachable[new_loc[0], new_loc[1]])
            step -= 1
        self.agents_state[agent_id]['loc'] = new_loc
        self.agents_state[agent_id]['dir_pi'] = new_dir

    def pickup(
        self, agent_id: str, type: str
    ) -> Tuple[np.ndarray, bool]:
        """Pickup item

        pickup item from game field

        Args:
            agent_id: agent ID in form `agent_{i}`
            type: one of {'resource', 'trash'}

        Returns:
            Tuple[np.ndarray, bool]: picked item loc and pickup
                status boolean
        """
        status, item_loc = False, None
        loc = self.agents_state[agent_id]['loc']
        reach = self.agent_reach
        if type == 'resource':
            ridxs = np.array(list(self.resource_grid_to_center.values()))
        elif type == 'trash':
            ridxs = np.array(list(self.trash_grid_to_center.values()))
        else:
            raise ValueError(
                f'Invalid item to pickup: {type}'
            )
        if ridxs.shape[0] > 0:
            deltas = np.abs(ridxs - loc)
            max_dist = deltas.max(axis=1)
            valid = np.argwhere(max_dist < reach).squeeze(axis=-1)
            if len(valid) > 0:
                if len(valid) > 1:
                    mean_dist = deltas[valid].mean(axis=-1)
                    min_dist = np.argmin(mean_dist)
                    valid = valid[min_dist]
                else:
                    valid = valid[0]
                item_loc = ridxs[valid]
                if not self.agents_state[agent_id]['inventory'][type]:
                    if type == 'resource':
                        self.delete_resource(center_loc=tuple(item_loc))
                    else:
                        trash_source = self.delete_trash(center_loc=tuple(item_loc))
                        self.agents_state[agent_id]['inventory']['trash_source'] = trash_source
                    self.agents_state[agent_id]['inventory'][type] = True
                    status = True
        return item_loc, status

    def throw_resource(self, agent_id: str) -> bool:
        """Throw resource

        Throw resource onto the game field at the current
        agent location if possible

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if succeeded, False otherwise
        """
        ac_loc = self.agents_state[agent_id]['loc']
        ag_loc = ac_loc // self.resource_size
        done = False
        if self.agents_state[agent_id]['inventory']['resource']:
            # check if we can throw it here
            invalid = np.logical_or(
                self.non_resource_grid,
                self.resource_grid
            )
            valid = not bool(invalid[ag_loc[0], ag_loc[1]])
            if valid:
                self.add_resource(grid_loc=ag_loc)
                # remove from inventory
                self.agents_state[agent_id]['inventory']['resource'] = False
                done = True
        return done

    def throw_trash(self, agent_id: str) -> bool:
        """Throw trash

        Throw trash onto the game field at the current
        agent location if possible

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if succeeded, False otherwise
        """
        ac_loc = self.agents_state[agent_id]['loc']
        ag_loc = ac_loc // self.trash_size
        done = False
        if self.agents_state[agent_id]['inventory']['trash']:
            # check if we can throw it here
            invalid = np.logical_or(
                self.non_trash_grid,
                self.trash_grid
            )
            valid = not bool(invalid[ag_loc[0], ag_loc[1]])
            if valid:
                trash_source = self.agents_state[agent_id]['inventory']['trash_source']
                self.add_trash(grid_loc=ag_loc, agent_id=trash_source)
                # remove from inventory
                self.agents_state[agent_id]['inventory']['trash'] = False
                self.agents_state[agent_id]['inventory']['trash_source'] = None
                done = True
        return done

    def recycle_resource(self, agent_id: str) -> bool:
        """Recycle resource

        Recycle resource at the fabricator if:
            1) It is in the inventory
            2) Fabricator is close enough
            3) Agent is not blocked

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if succeeded, False otherwise
        """
        done = False
        if not self.blocked[agent_id]:
            if self.agents_state[agent_id]['inventory']['resource']:
                ac_loc = self.agents_state[agent_id]['loc']
                machine_loc = self.machines[agent_id]['center']
                delta = np.abs(machine_loc - ac_loc).max()
                if delta <= self.machine_reach:
                    self.agents_state[agent_id]['inventory']['resource'] = False
                    self.agents_state[agent_id]['wealth'] += self.resource_price
                    self.sample_trash(agent_id=agent_id)
                    done = True
        return done

    def recycle_trash(self, agent_id: str) -> bool:
        """Recycle trash

        Recycle trash at the recycler if:
            1) It is in the inventory
            2) Recycler is close enough
            3) Agent is not blocked
            4) Agent has enough money for recycling

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            bool: True if succeeded, False otherwise
        """
        done = False
        if not self.blocked[agent_id]:
            has_trash = self.agents_state[agent_id]['inventory']['trash']
            has_money = self.agents_state[agent_id]['wealth'] >= self.recycle_cost
            if has_trash and has_money:
                ac_loc = self.agents_state[agent_id]['loc']
                recycler_loc = self.recyclers[agent_id]['center']
                delta = np.abs(recycler_loc - ac_loc).max()
                if delta <= self.machine_reach:
                    self.agents_state[agent_id]['inventory']['trash'] = False
                    self.agents_state[agent_id]['inventory']['trash_source'] = None
                    self.agents_state[agent_id]['wealth'] -= self.recycle_cost
                    done = True
        return done

    def drop_resource(self, agent_id: str) -> str:
        """Drop resource

        Throw or recycle resource. If recycle is possible,
        recycle it at the fabricator, otherwise drop onto
        the game field

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            str: drop status string. One in
                {'dropped', 'drop_failed', 'resource_recycled'}
        """
        recycle_done = self.recycle_resource(agent_id=agent_id)
        if not recycle_done:
            throw_done = self.throw_resource(agent_id=agent_id)
            if throw_done:
                status = 'dropped'
            else:
                status = 'drop_failed'
        else:
            status = 'resource_recycled'
        return status

    def drop_trash(self, agent_id: str) -> str:
        """Drop trash

        Throw or recycle trash. If recycle is possible,
        recycle it at the recycler, otherwise drop onto
        the game field

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            str: drop status string. One in
                {'dropped', 'drop_failed', 'resource_recycled'}
        """
        recycle_done = self.recycle_trash(agent_id=agent_id)
        if not recycle_done:
            throw_done = self.throw_trash(agent_id=agent_id)
            if throw_done:
                status = 'dropped'
            else:
                status = 'drop_failed'
        else:
            status = 'trash_recycled'
        return status

    def local_view(self, state_pad: np.ndarray, agent_id: str) -> np.ndarray:
        """Render local view

        Render local view image array for a given agent ID

        Args:
            state_pad: padded engine state image
            agent_id: agent ID in form `agent_{i}`

        Returns:
            np.ndarray: local view image array
        """
        agent_loc = self.agents_state[agent_id]['loc']
        agent_dir = self.agents_state[agent_id]['dir_pi']
        p = self.obs_dim
        if agent_dir == 0.0:
            x_high = agent_loc[0] + p // 2  # right
            x_low = x_high - p  # left
            y_low = agent_loc[1] - self.agent_size // 2 + 1  # back
            y_high = y_low + p  # forward
        elif agent_dir == 0.5:
            x_high = agent_loc[0] + self.agent_size // 2  # back
            x_low = x_high - p  # forward
            y_high = agent_loc[1] + p // 2  # right
            y_low = y_high - p  # left
        elif agent_dir == 1.0:
            x_low = agent_loc[0] - p // 2 + 1  # right
            x_high = x_low + p  # left
            y_high = agent_loc[1] + self.agent_size // 2  # back
            y_low = y_high - p  # forward
        elif agent_dir == 1.5:
            x_low = agent_loc[0] - self.agent_size // 2 + 1  # back
            x_high = x_low + p  # forward
            y_low = agent_loc[1] - p // 2 + 1  # right
            y_high = y_low + p  # left
        else:
            raise ValueError(
                f'Invalid agent direction: {agent_dir}'
            )
        xl, xh, yl, yh = x_low + p, x_high + p, y_low + p, y_high + p
        obs = state_pad[xl:xh, yl:yh, :]
        all_machines = self.all_machines[xl:xh, yl:yh, :]
        perspective = self.agents_perspectives[agent_id][xl:xh, yl:yh, :]
        obs = obs * np.logical_not(all_machines)
        obs = obs + perspective
        if agent_dir == 0.0:
            obs = np.rot90(obs, k=1)
        elif agent_dir == 0.5:
            pass
        elif agent_dir == 1.0:
            obs = np.rot90(obs, k=-1)
        elif agent_dir == 1.5:
            obs = np.rot90(obs, k=2)
        else:
            pass
        return obs.copy()

    def local_proprio(self, agent_id: str) -> np.ndarray:
        """Get local proprioceptive obs

        Get local proprioceptive obs for a given agent ID

        Args:
            agent_id: agent ID in form `agent_{i}`

        Returns:
            np.ndarray: subjective proprioceptive obs
        """
        money = self.agents_state[agent_id]['wealth'] / self.resource_price
        dir = self.agents_state[agent_id]['dir_pi']
        loc = self.agents_state[agent_id]['loc']
        center = self.centers[agent_id]

        # North
        if dir == 0.0:
            x, y = 1, 0
        elif dir == 0.5:
            x, y = 0, 1
        elif dir == 1.0:
            x, y = -1, 0
        elif dir == 1.5:
            x, y = 0, -1
        else:
            x, y = 0, 0

        center_vec = loc_to_coord(center, self.grid_size) - \
            loc_to_coord(loc, self.grid_size)
        rho, phi = cart2pol(center_vec[0].item(), center_vec[1].item())
        rho, phi = rho / self.grid_size, phi / np.pi
        is_home = float(self.is_home_segment(agent_id=agent_id))
        has_resource = float(
            self.agents_state[agent_id]['inventory']['resource'])
        has_trash = float(self.agents_state[agent_id]['inventory']['trash'])
        _, north_dir = cart2pol(x, y)
        north_dir = north_dir / np.pi
        proprio = np.array([
            money, has_resource, has_trash,
            rho, phi, north_dir, is_home
        ])
        return proprio.astype(np.float32)

    def local_obs(
        self, state_pad: np.ndarray, agent_id: str
    ) -> Dict[str, np.ndarray]:
        """Get local observation

        Get local composite observation for a given agent ID

        Args:
            state_pad: padded engine state image
            agent_id: agent ID in form `agent_{i}`

        Returns:
            Dict[str, np.ndarray]: composite observation with the
                following key-value pairs:
                    - 'image': local visual observation
                    - 'proprio': subjective proprioceptive observation
        """
        view = self.local_view(state_pad=state_pad, agent_id=agent_id)
        proprio = self.local_proprio(agent_id=agent_id)
        return {'image': view, 'proprio': proprio}


Defaulting to user installation because normal site-packages is not writeable
Collecting scikit-image
  Downloading scikit_image-0.24.0-cp39-cp39-macosx_12_0_arm64.whl (13.4 MB)
[K     |████████████████████████████████| 13.4 MB 2.7 MB/s eta 0:00:01
Collecting imageio>=2.33
  Downloading imageio-2.37.0-py3-none-any.whl (315 kB)
[K     |████████████████████████████████| 315 kB 52.4 MB/s eta 0:00:01
[?25hCollecting tifffile>=2022.8.12
  Downloading tifffile-2024.8.30-py3-none-any.whl (227 kB)
[K     |████████████████████████████████| 227 kB 50.3 MB/s eta 0:00:01
[?25hCollecting pillow>=9.1
  Downloading pillow-11.2.1-cp39-cp39-macosx_11_0_arm64.whl (3.0 MB)
[K     |████████████████████████████████| 3.0 MB 65.9 MB/s eta 0:00:01
[?25hCollecting lazy-loader>=0.4
  Downloading lazy_loader-0.4-py3-none-any.whl (12 kB)
Collecting numpy>=1.23
  Downloading numpy-2.0.2-cp39-cp39-macosx_14_0_arm64.whl (5.3 MB)
[K     |████████████████████████████████| 5.3 MB 20.6 MB/s eta 0:00:01
[?25hCol

In [2]:
# agents

import abc
from typing import Dict, Optional

import numpy as np


class BaseAgent(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def load(self, ckpt_dir: str) -> None:
        """Agent Loading

        Loading an agent from the directory with artifacts.

        Args:
            ckpt_dir: path to an individual directory with artifacts
                and the `agent_config.yaml` file for this agent
        """
        pass

    @abc.abstractmethod
    def get_action(self, observation: Dict[str, np.ndarray]) -> int:
        """Getting action

        Getting action from the agent based on visual observation

        Args:
            observation: dictionary with the following keys and values
                "image": numpy array with an image of a local field of
                    view with dimensions (60, 60, 3) and np.uint8 data type
                "proprio": numpy array with proprioceptive information
                    about the position of the agent on the general map
                    and the condition of its inventory with dimensions (7,)
                    and np.float32 data type
        Returns:
            int: index of the selected action
                {0, 1, 2, 3, 4, 5, 6, 7, 8}
        """
        pass

    @abc.abstractmethod
    def reset_state(self) -> None:
        """Resetting the internal state

        In case of accumulation of internal context for decision-making
        during the episode, this method is called before each new
        simulation to clear the internal state before a new episode.
        If the internal context is not used, the method can be left in
        its current form.
        """
        pass


class RandomAgent(BaseAgent):
    """Random agent

    Random agent for AIJ Multi-agent AI Contest

    Attributes:
        action_dim: discrete action dimension, for this contest
            is always 9
        rng: numpy random number generator for reproducibility
    """
    def __init__(
        self,
        action_dim: int = 9,
        seed: Optional[int] = None
    ):
        """Initialise random agent

        Args:
            action_dim: discrete action dimension, for this contest
                is always 9
            seed: random number generator seed
        """
        self.action_dim = action_dim
        if seed is None:
            seed = np.random.randint(0, int(1e6), 1).item()
        self.rng = np.random.default_rng(seed)

    def load(self, ckpt_dir: str) -> None:
        """Agent Loading

        Loading an agent from the directory with artifacts.

        Args:
            ckpt_dir: path to an individual directory with artifacts
                and the `agent_config.yaml` file for this agent
        """
        pass

    def get_action(self, observation: Dict[str, np.ndarray]) -> int:
        """Getting action

        Getting action from random agent

        Args:
            observation: dictionary with the following keys and values
                "image": numpy array with an image of a local field of
                    view with dimensions (60, 60, 3) and np.uint8 data type
                "proprio": numpy array with proprioceptive information
                    about the position of the agent on the general map
                    and the condition of its inventory with dimensions (7,)
                    and np.float32 data type
        Returns:
            int: index of the selected action
                {0, 1, 2, 3, 4, 5, 6, 7, 8}
        """
        return self.rng.integers(0, self.action_dim, 1).item()

    def reset_state(self) -> None:
        """Resetting the internal state

        In case of accumulation of internal context for decision-making
        during the episode, this method is called before each new
        simulation to clear the internal state before a new episode.
        If the internal context is not used, the method can be left in
        its current form.
        """
        pass


In [6]:
# environment

import functools
from typing import Any, Dict, Optional, Tuple
# !pip3 install pettingzoo
import numpy as np
from gymnasium.spaces import Box
from gymnasium.spaces import Dict as DictSpace
from gymnasium.spaces import Discrete
from pettingzoo import ParallelEnv
from skimage.transform import resize

TimestampType = Tuple[
    Dict[str, Dict[str, np.ndarray]], Dict[str, float],
    Dict[str, bool], Dict[str, bool], Dict[str, dict]
]

MOVES = ["FORWARD", "LEFT", "RIGHT", "BACKWARD",
         "PICKUP_RESOURCE", "PICKUP_TRASH",
         "DROP_RESOURCE", "DROP_TRASH", "NOOP"]


class AijMultiagentEnv(ParallelEnv):
    """Parallel multi-agent environment for AIJ
    Multi-Agent RL contest

    Environment at the testing system will have the same
    hyperparameter setting as below, so it is not recommended
    to change it

    Attributes:
        grid_size: 2D square game field size
        obs_dim: local visual observation size
        move_step: movement step size in pixels
        resource_price: reward given for resource processing
        recycle_cost: cost of recycling trash
        border_distort_range: noise range for borders distortion
        max_edge_dev: maximum segment border shift
        max_tries: maximum attempts for border generation
        machine_size: machine icon size in pixels
        machine_reach: size of machine interaction region
        agent_size: agent icon size in pixels
        agent_reach: agent reach when picking up items
        resource_size: resource icon size in pixels
        trash_size: trash icon size in pixels
        resource_prob: probability to spawn resource at a given point
        border_display_width: segments borders thickness
        ecology_penalty: decrease in agent's ecology score caused by 1 trash item
        neighbour_ecology_weight: neighbour ecology effect at the resource
            respawn rate
        global_ecology_weight: global ecology effect at the resource
            respawn rate
        init_respawn_prob: initial probability to spawn resource at a given point
        blocked_vanish_alpha: blocked segment fogging degree
        max_dead_segments: max number of blocked segments before global
            termination
    """
    # Hardcoded hyperparameters
    grid_size: int = 210  # should be div by resource_size and trash_size
    obs_dim: int = 60  # should be divisible by 5
    move_step: int = 7
    resource_price: int = 10
    recycle_cost: int = 4
    border_distort_range: Tuple[int, int] = (-1, 2)
    max_edge_dev: float = 0.1
    max_tries: int = 25
    machine_size: int = 9
    machine_reach: int = 9
    agent_size: int = 9
    agent_reach: int = 9
    resource_size: int = 7
    trash_size: int = 5
    resource_prob: float = 0.075
    border_display_width: int = 2
    ecology_penalty: int = 20
    neighbour_ecology_weight: float = 0.2
    global_ecology_weight: float = 0.3
    init_respawn_prob: float = 0.015
    blocked_vanish_alpha: float = 0.25
    max_dead_segments: int = 4

    # To enable rendering
    metadata = {"render_modes": ['rgb_array'],
                "name": "aij_multiagent_env"}

    def __init__(
        self,
        max_cycles: Optional[int] = 1000,
        state_size: Optional[int] = 110,
        render_mode: Optional[str] = 'rgb_array',
    ):
        """Multi-agent RL Environment

        Multi-agent RL Environment for AIJ Contest 2024

        Args:
            max_cycles: maximum simulation length in time steps
            state_size: display state size for rendering
            render_mode: render mode

        Valid Action Space:
            0: move forward by `move_step` pixels if possible
            1: move left by `move_step` pixels if possible
            2: move right by `move_step` pixels if possible
            3: move backward by `move_step` pixels if possible
            4: pickup resource (if closer than `agent_reach` pixels)
            5: pickup trash (if closer than `agent_reach` pixels)
            6: throw resource (put into machine if closer than `machine_reach`)
            7: throw trash (put into recycler if closer than `machine_reach`)
            8: noop
        """
        self.engine = None
        self.rng = None
        self.ecology_scores = None
        self.num_moves = None
        self.current_state = None
        self.seed = None
        self.render_mode = render_mode
        self.max_cycles = max_cycles
        self.state_size = state_size
        self.action_meanings = MOVES
        self.possible_agents = [f'agent_{i}' for i in range(8)]
        self.neighbours_mapping = {
            'agent_0': ['agent_1', 'agent_3'],
            'agent_1': ['agent_0', 'agent_2'],
            'agent_2': ['agent_1', 'agent_4'],
            'agent_3': ['agent_0', 'agent_5'],
            'agent_4': ['agent_2', 'agent_7'],
            'agent_5': ['agent_3', 'agent_6'],
            'agent_6': ['agent_5', 'agent_7'],
            'agent_7': ['agent_4', 'agent_6'],
        }
        self.agents = self.possible_agents.copy()
        self.agent_name_mapping = dict(
            zip(self.possible_agents, list(range(len(self.possible_agents))))
        )

    @classmethod
    def _get_engine(
        cls, seed: Optional[int] = None
    ) -> Tuple[GameEngine, np.random.Generator, int]:
        """Get engine

        Get game engine for simulation round

        Args:
            seed: random seed for simulation

        Returns:
            Tuple[GameEngine, np.random.Generator, int]: tuple of:
                - GameEngine instance
                - numpy random number generator
                - seed for logging
        """
        if seed is None:
            seed = np.random.randint(0, int(1e6), 1).item()
        engine = GameEngine(
            seed=seed,
            grid_size=cls.grid_size,
            obs_dim=cls.obs_dim,
            move_step=cls.move_step,
            resource_price=cls.resource_price,
            recycle_cost=cls.recycle_cost,
            border_distort_range=cls.border_distort_range,
            max_edge_dev=cls.max_edge_dev,
            max_tries=cls.max_tries,
            machine_size=cls.machine_size,
            machine_reach=cls.machine_reach,
            agent_size=cls.agent_size,
            agent_reach=cls.agent_reach,
            resource_size=cls.resource_size,
            trash_size=cls.trash_size,
            resource_prob=cls.resource_prob,
            border_display_width=cls.border_display_width,
            blocked_vanish_alpha=cls.blocked_vanish_alpha
        )
        rng = np.random.default_rng(seed + 1)
        return engine, rng, seed

    @functools.lru_cache(maxsize=None)
    def observation_space(self, agent) -> DictSpace:
        """Get observation space

        Get environment observation space

        Args:
            agent: agent to return observation space for

        Returns:
            DictSpace: specification of composite observation
        """
        d = self.engine.obs_dim
        gs, diag = self.engine.grid_size, self.engine.diag
        image_space = Box(0, 255, (d, d, 3), np.uint8, seed=self.rng)
        low = np.array([0., 0., 0., 0., -1., -1., 0.])
        high = np.array([np.inf, 1., 1., diag / gs, 1., 1., 1.])
        proprio_space = Box(
            low=low, high=high, shape=(7,), dtype=np.float32, seed=self.rng)
        return DictSpace({'image': image_space, 'proprio': proprio_space})

    @functools.lru_cache(maxsize=None)
    def action_space(self, agent) -> Discrete:
        """Get action space

        Get environment action space

        Args:
            agent: agent to return action space for

        Returns:
            Discrete: specification of discrete action space
        """
        return Discrete(len(self.action_meanings), seed=self.rng)

    def reset(
        self, seed: Optional[int] = None, options: Optional[Any] = None
    ) -> Tuple[Dict[str, Dict[str, np.ndarray]], Dict[str, Dict]]:
        """Reset environment

        Reset environment to its initial state

        Args:
            seed: numpy random seed
            options: any other additional options

        Returns:
            Tuple[Dict[str, Dict[str, np.ndarray]], Dict[str, Dict]]:
                tuple which contains:
                    - composite observations for each agent
                    - infos for each agent
        """
        self.num_moves = 0
        self.agents = self.possible_agents.copy()
        self.engine, self.rng, self.seed = self._get_engine(seed=seed)
        self.ecology_scores = {a: 100 for a in self.agents}
        # Render next observations
        state = self.engine.get_state()
        self.current_state = state
        # Local perspective state
        local_agents_map = self.engine.agents_map(local_mode=True)
        local_state = state * np.logical_not(
            local_agents_map > 0)
        local_state = local_state + local_agents_map
        # Render local observations
        state_pad = self.engine.pad_state(local_state)
        obs = {a: self.engine.local_obs(
            state_pad=state_pad, agent_id=a) for a in self.agents}
        # Log information
        infos = {a: {
            'ecology_score': self.ecology_scores[a],
            'num_trash': self.engine.trash_by_segment(agent_id=a),
            'num_resource': self.engine.resource_by_segment(agent_id=a),
            'dead_ecology': self.engine.blocked[a]
        } for a in self.agents}
        return obs, infos

    def _make_action(self, agent_id: str, action_id: int) -> None:
        """Make action

        Execute action in the environment for a given agent

        Args:
             agent_id: agent ID in form `agent_{i}`
             action_id: action integer id from valid actions set

        Returns:
            None

        Raises:
            ValueError: if action ID is outside valid action set
        """
        if 4 > action_id >= 0:
            self.engine.move(agent_id=agent_id, action_id=action_id)
        elif action_id == 4:
            self.engine.pickup(agent_id=agent_id, type='resource')
        elif action_id == 5:
            self.engine.pickup(agent_id=agent_id, type='trash')
        elif action_id == 6:
            self.engine.drop_resource(agent_id=agent_id)
        elif action_id == 7:
            self.engine.drop_trash(agent_id=agent_id)
        elif action_id == 8:
            pass
        else:
            raise ValueError(
                f'Invalid action: {action_id} for agent: {agent_id}')

    def _update_ecology_scores(self) -> Dict[str, int]:
        """Update ecology scores

        Update ecology scores given the most recent trash
        distribution information

        Returns:
            Dict[str, int]: current number of trash per segment
        """
        new_ec_scores = {}
        trash_by_agent = {}
        for a, s in self.ecology_scores.items():
            n_trash = self.engine.trash_by_segment(agent_id=a)
            trash_by_agent[a] = n_trash
            ec_score = max(0, 100 - n_trash * self.ecology_penalty)
            new_ec_scores[a] = ec_score * int(not self.engine.blocked[a])
        self.ecology_scores = new_ec_scores
        return trash_by_agent

    def _get_resource_respawn_probs(self) -> Dict[str, float]:
        """Get resource respawn probs

        Get resource respawn probability given current trash
        distribution

        Returns:
            Dict[str, int]: respawn probs by agent ID
        """
        resp_probs = {}
        gs = np.mean(list(self.ecology_scores.values())).item()
        for a, s in self.ecology_scores.items():
            n1, n2 = self.neighbours_mapping[a]
            ns1, ns2 = self.ecology_scores[n1], self.ecology_scores[n2]
            mean_ns = (ns1 + ns2) / 2
            nw, p = self.neighbour_ecology_weight, self.init_respawn_prob
            gw = self.global_ecology_weight
            lw = max(0., 1 - nw - gw)
            r = ((lw * s + gw * gs + nw * mean_ns) / 100)
            resp_probs[a] = (r ** 2.15) * p * int(not self.engine.blocked[a])
        return resp_probs

    def _get_terminations(self) -> Dict[str, bool]:
        """Get terminations

        Get global termination and impose blocks according to
        local ecology scores

        Returns:
            Dict[str, bool]: global termination by agent ID
                (all True or all False)
        """
        global_termination = False
        for a, s in self.ecology_scores.items():
            terminated = s == 0
            if terminated:
                self.engine.add_block(agent_id=a)
        n_blocked = sum(list(self.engine.blocked.values()))
        if n_blocked > self.max_dead_segments:
            global_termination = True
        return {a: global_termination for a in self.possible_agents}

    def state(self) -> Dict[str, np.ndarray]:
        """Get global state

        Get global state for CTDE multi-agent RL paradigm.
        Note! That method won't be called at testing system
        and may be used only for training agents.

        Returns:
            Dict[str, np.ndarray]: global state with following key-value
                pairs:
                    - 'image': global visual state, shape:
                        (self.state_size, self.state_size, 3)
                    - 'wealth': array with agents wealth, shape: (8,)
                    - 'has_resource': array with binary flag, indicating that
                        resource is in inventory, shape: (8,)
                    - 'has_resource': array with binary flag, indicating that
                        trash is in inventory, shape: (8,)
        """
        state = {}
        image = self.current_state.copy()
        image = resize(image, (self.state_size, self.state_size))
        state['image'] = np.round(image * 255, 0).astype(np.uint8)
        state['wealth'] = np.array(
            [self.engine.agents_state[a]['wealth']
             for a in self.possible_agents])
        state['has_resource'] = np.array(
            [self.engine.agents_state[a]['inventory']['resource']
             for a in self.possible_agents]).astype(int)
        state['has_trash'] = np.array(
            [self.engine.agents_state[a]['inventory']['trash']
             for a in self.possible_agents]).astype(int)
        return state

    def render(self) -> np.ndarray:
        """Render global state

        Render global state as numpy image array

        Returns:
            np.ndarray: global visual state, shape:
                (self.state_size, self.state_size, 3)
        """
        return self.state()['image']

    def close(self) -> None:
        """Close all rendering windows"""
        pass

    def step(self, actions: Dict[str, int]) -> TimestampType:
        """Perform simulation step

        Perform simulation step in parallel multi-agent RL style.
        In case of conflicting actions (for example two agents
        picking up the same resource, priorities are assigned
        randomly). Note: both terminations and truncations occur
        simultaneously for all agents participating in the
        simulation.

        Args:
            actions: dictionary with action IDs for each agent
        Returns:
            TimestampType: environment time stamp as a tuple of:
                - Dict[str, Dict[str, np.ndarray]]: composite observations
                    for each agent
                - Dict[str, float]: rewards for each agent
                - Dict[str, bool]: terminations for each agent
                - Dict[str, bool]: truncations for each agent
                - Dict[str, dict]: infos for each agent"""
        self.num_moves += 1
        # Cache money to calculate rewards
        old_w = {a: self.engine.agents_state[a]['wealth'] for a in self.agents}
        # Apply actions in random order (to reconcile possible conflicts)
        agents = self.agents.copy()
        self.rng.shuffle(agents)
        for agent in agents:
            self._make_action(agent_id=agent, action_id=actions[agent])
        # Get rewards from updated wealth
        rewards = {a: self.engine.agents_state[a]['wealth'] - old_w[a]
                   for a in self.agents}
        # Apply rules to update ecology scores
        trash_by_agent = self._update_ecology_scores()
        # Terminate or truncate for those neeeded
        terminations = self._get_terminations()
        truncation = self.num_moves >= self.max_cycles
        truncations = {a: truncation for a in self.agents}
        # Respawn resources according to respawn probabilities
        for a, p in self._get_resource_respawn_probs().items():
            u = self.rng.uniform(low=0, high=1)
            if u < p:
                self.engine.sample_resource(agent_id=a)
        # Render next observations
        state = self.engine.get_state()
        self.current_state = state
        # Local perspective state
        local_agents_map = self.engine.agents_map(local_mode=True)
        local_state = state * np.logical_not(
            local_agents_map > 0)
        local_state = local_state + local_agents_map
        # Render local observations
        state_pad = self.engine.pad_state(local_state)
        observations = {a: self.engine.local_obs(
            state_pad=state_pad, agent_id=a) for a in self.agents}
        # Log information
        infos = {a: {
            'ecology_score': self.ecology_scores[a],
            'num_trash': trash_by_agent[a],
            'num_resource': self.engine.resource_by_segment(agent_id=a),
            'dead_ecology': self.engine.blocked[a]
        } for a in self.agents}
        # Delete truncated and terminated Agents
        if truncation or any(terminations.values()):
            self.agents = []
        return observations, rewards, terminations, truncations, infos


In [None]:
from omegaconf import DictConfig
submission_dir = 'submission_vdn'

config = DictConfig({
    'warmup_steps': 1000,
    'eps_start': 0.2,
    'eps_decay': 0.996,
    'eps_decay_every': 1000,
    'acs_dim': 9,
    'batch_size': 32,
    'update_every': 4,
    'buffer_size': 100000,
    'initial_batch_episodes': 10,
    'learning_rate': 0.0007,
    'gamma': 0.99,
    'target_updates_freq': 15,
    'episodes_per_iter': 2,
    'iter_per_save': 20,
    'n_iters': 200,
    'tau': 0.005,
    'output_dir': f'{submission_dir}/agents',
    'in_channels': 3
})

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m


In [15]:
import os
from typing import Any, Dict, List, Optional, Union
!pip3 install tqdm
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn.functional as F

import yaml
from math import ceil
from omegaconf import DictConfig
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm

Defaulting to user installation because normal site-packages is not writeable
You should consider upgrading via the '/Library/Developer/CommandLineTools/usr/bin/python3 -m pip install --upgrade pip' command.[0m


# Создаем корректную структуру сабмишена


Правильная структура директории решения представлена ниже:

```
.
├── utils                          # Директория с модулями, необходимыми для класса агента (опционально)
├── model.py                       # Скрипт с реализацией класса агента(ов) и фабричного метода
└── agents                         # Директория с артефактами агентов (название фисировано) 
    ├── agent_0                    # Артефакты отдельного агента (название произвольное)
        ├── agent_config.yaml      # Конфиг агента для фабричного метода (название фиксировано)
        └── weights_agent_0.pth    # Пример артефакта (веса модели или любые произвольные файлы)
    ...                            # Агентов может быть и больше 8
    └── agent_n
        ├── agent_config.yaml    
        └── weights_agent_n.pth
```

In [16]:
required_dirs = [
    f'{submission_dir}/agents',
    f'{submission_dir}/utils'
]
for d in required_dirs:
    if not os.path.exists(d):
        os.makedirs(d)

# Вспомогательные функции для Torch

Данные вспомогательные функции будут записаны в файл `submission_vdn/utils/utils.py`.

In [17]:
%%writefile submission_vdn/utils/utils.py
import torch
import numpy as np


def from_numpy(device, array, dtype=np.float32):
    array = array.astype(dtype)
    tensor = torch.from_numpy(array)
    return tensor.to(device)


def to_numpy(tensor):
    return tensor.to('cpu').detach().numpy()


def get_device() -> torch.device:
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')

Writing submission_vdn/utils/utils.py


# Функция сэмплирования из среды

Далее реализован параллельный сэмплинг данных из среды `AijMultiagentEnv` при помощи нескольких агентов. В ходе работы функции вызываются следующие методы:

1) `reset_state()` - перезагрузка внутреннего состояния агента с началом эпизода
2) `get_action()` - получение действия из композитного наблюдения

In [18]:
def sample_rollouts(
    n_rollouts: int,
    env: AijMultiagentEnv,
    agents: Dict[str, BaseAgent],
    verbose: Optional[bool] = False
) -> List[List[Dict[str, Any]]]:
    rollouts = []
    for _ in tqdm(range(n_rollouts), disable=not verbose):
        rollout = []
        for agent in agents.values():
            agent.reset_state()
        observations, infos = env.reset()
        done = False
        while not done:
            actions = {name: agent.get_action(observation=observations[name])
                       for name, agent in agents.items() if name in env.agents}
            next_observations, rewards, terminations, truncations, next_infos = env.step(actions)
            transition = {
                'observations': observations,
                'next_observations': next_observations,
                'actions': actions,
                'rewards': rewards,
                'terminations': terminations,
                'truncations': truncations
            }
            observations = next_observations
            done = all(truncations.values()) or all(terminations.values())
            rollout.append(transition)
        rollouts.append(rollout)
    return rollouts

# Сэмплирование эпизодов при помощи случайных агентов

In [19]:
def get_mean_agent_return(batch):
    mean_rews = []
    for path in batch:
        ep_tot_rew = [sum(t['rewards'].values()) for t in path]
        ep_tot_rew = sum(ep_tot_rew)
        mean_rews.append(ep_tot_rew / 8)
    return np.mean(mean_rews)


env = AijMultiagentEnv()
example_agents = {a: RandomAgent() for a in env.possible_agents}
example_batch = sample_rollouts(n_rollouts=10, env=env, agents=example_agents, verbose=True)
print(f'Mean agent return for RandomAgent: {get_mean_agent_return(example_batch)}')

100%|██████████| 10/10 [00:10<00:00,  1.02s/it]

Mean agent return for RandomAgent: 6.675





# Создаем буфер данных

Создаем простой буфер данных для хранения эпизодов симуляции

In [20]:
class ReplayBuffer(Dataset):

    def __init__(
        self,
        n_transitions: int
    ):
        self.rollouts = []
        self.n_transitions = n_transitions
        self.lengths = None

    def add_batch(self, rollouts):
        self.rollouts.extend(rollouts)
        self._evict()
        self.lengths = [len(r) for r in self.rollouts]

    def _evict(self) -> None:
        while len(self) > self.n_transitions:
            self.rollouts.pop(0)

    def __len__(self):
        if len(self.rollouts) == 0:
            return 0
        else:
            return sum([len(r) for r in self.rollouts])

    def __getitem__(self, idx: int) -> Dict[str, Dict[str, Union[np.ndarray, int]]]:
        c_lengths = np.cumsum(self.lengths)
        r_ind = np.argwhere(c_lengths > idx).min()
        r_ind_last = 0
        if r_ind > 0:
            r_ind_last = c_lengths[r_ind - 1]
        t_ind = idx - r_ind_last
        transition = self.rollouts[r_ind][t_ind]
        item = {
            **transition,
            'rollout_index': r_ind,
            'transition_index': t_ind
        }
        return item

In [21]:
def collate_fn(data: List[Dict[str, Any]]) -> Dict[str, Any]:
    collated_data = {k: {} for k in data[0].keys()}
    for a in data[0]['observations'].keys():
        collated_data['observations'][a] = {}
        collated_data['observations'][a]['image'] = np.array(
            [d['observations'][a]['image'] for d in data])
        collated_data['observations'][a]['proprio'] = np.array(
            [d['observations'][a]['proprio'] for d in data])
        collated_data['next_observations'][a] = {}
        collated_data['next_observations'][a]['image'] = np.array(
            [d['next_observations'][a]['image'] for d in data])
        collated_data['next_observations'][a]['proprio'] = np.array(
            [d['next_observations'][a]['proprio'] for d in data])
        collated_data['actions'][a] = np.array([d['actions'][a] for d in data])
        collated_data['rewards'][a] = np.array([d['rewards'][a] for d in data])
        collated_data['terminations'][a] = np.array([d['terminations'][a] for d in data])
        collated_data['truncations'][a] = np.array([d['truncations'][a] for d in data])
    collated_data['rollout_index'] = np.array([d['rollout_index'] for d in data])
    collated_data['transition_index'] = np.array([d['transition_index'] for d in data])
    return collated_data

# Создаем пример dataloader

Он понадобится для демонстрации размерностей основных данных

In [22]:
example_buffer = ReplayBuffer(100000)
example_buffer.add_batch(example_batch)

example_dataloader = DataLoader(
    dataset=example_buffer,
    batch_size=32,
    num_workers=0,
    collate_fn=collate_fn,
    shuffle=True,
)

In [23]:
sample = next(iter(example_dataloader))

print(f"Image observation shape: {sample['observations']['agent_0']['image'].shape}")
print(f"Proprio observation shape: {sample['observations']['agent_0']['proprio'].shape}")
print(f"Actions shape: {sample['actions']['agent_0'].shape}")
print(f"Rewards shape: {sample['rewards']['agent_0'].shape}")
print(f"Terminations shape: {sample['terminations']['agent_0'].shape}")

Image observation shape: (32, 60, 60, 3)
Proprio observation shape: (32, 7)
Actions shape: (32,)
Rewards shape: (32,)
Terminations shape: (32,)


# Сетка критик

Параграф ниже реализует сеть-критик, которая принимает на вход композитное визуально-проприоцептивное наблюдение и аппроксимирует Q-значения для каждого возможного действия.

In [24]:
%%writefile submission_vdn/utils/networks.py
import torch
from torch import nn


class QCNN(nn.Module):

    def __init__(self, in_channels, acs_dim):
        super(QCNN, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(in_channels=in_channels, out_channels=48, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(in_channels=48, out_channels=64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(in_channels=64, out_channels=96, kernel_size=3, stride=1),
            nn.ReLU(),
            nn.Flatten(),
        )
        self.mlp = nn.Sequential(
            nn.Linear(7, 64),
            nn.ReLU(),
        )
        self.head = nn.Sequential(
            nn.Linear(1600, 512),
            nn.ReLU(),
            nn.LayerNorm(512),
            nn.Linear(512, acs_dim),
        )

    def forward(self, image: torch.Tensor, proprio: torch.Tensor) -> torch.Tensor:
        # Image stream
        bs, h, w, c = image.shape
        image = image.permute(0, 3, 1, 2)
        image = image / 255.
        image_repr = self.cnn(image)
        # Proprio stream
        proprio_repr = self.mlp(proprio)
        # Head
        hidden = torch.cat([image_repr, proprio_repr], 1)
        q_vals = self.head(hidden)
        return q_vals


Writing submission_vdn/utils/networks.py


# Создаем VDN агента

Ниже определим класс VDN агента, стоит отметить. что:

Класс агента должен поддерживать следующие методы:
- `.load()` - подгрузка агента из директории
- `.get_action()` - получение действия из композитного наблюдения
- `.reset_state()` - перезагрузка внутреннего состояния агента с началом эпизода

Класс агента должен быть отнаследован от абстрактного класса `aij_multiagent_rl.agent.BaseAgent`

Файл `model.py` должен содержать фабричный метод агента как показано ниже

In [25]:
%%writefile submission_vdn/model.py
import os
import abc
from copy import deepcopy
from typing import Dict

import numpy as np
import torch
from omegaconf import DictConfig
from torch import nn
from utils.networks import QCNN
from utils.utils import from_numpy, get_device, to_numpy

class BaseAgent(metaclass=abc.ABCMeta):

    @abc.abstractmethod
    def load(self, ckpt_dir: str) -> None:
        """Agent Loading

        Loading an agent from the directory with artifacts.

        Args:
            ckpt_dir: path to an individual directory with artifacts
                and the `agent_config.yaml` file for this agent
        """
        pass

    @abc.abstractmethod
    def get_action(self, observation: Dict[str, np.ndarray]) -> int:
        """Getting action

        Getting action from the agent based on visual observation

        Args:
            observation: dictionary with the following keys and values
                "image": numpy array with an image of a local field of
                    view with dimensions (60, 60, 3) and np.uint8 data type
                "proprio": numpy array with proprioceptive information
                    about the position of the agent on the general map
                    and the condition of its inventory with dimensions (7,)
                    and np.float32 data type
        Returns:
            int: index of the selected action
                {0, 1, 2, 3, 4, 5, 6, 7, 8}
        """
        pass

    @abc.abstractmethod
    def reset_state(self) -> None:
        """Resetting the internal state

        In case of accumulation of internal context for decision-making
        during the episode, this method is called before each new
        simulation to clear the internal state before a new episode.
        If the internal context is not used, the method can be left in
        its current form.
        """
        pass


class DQNAgent(BaseAgent, nn.Module):
    def __init__(
        self,
        model,
        device,
        eval_mode,
        warmup_steps=20000,
        eps_start=0.2,
        eps_decay=0.995,
        eps_decay_every=5000,
        acs_dim=9,
        seed=None
    ):
        super(DQNAgent, self).__init__()
        self.device = device
        self.warmup_steps = warmup_steps
        self.eval_mode = eval_mode
        self.steps_made = 0
        self.n_target_updates = 0
        self.current_eps = eps_start
        self.eps_start = eps_start
        self.eps_decay = eps_decay
        self.eps_decay_every = eps_decay_every
        self.acs_dim = acs_dim
        self.model = model.to(device)
        self.target_model = deepcopy(self.model)
        self.target_model.to(device)
        if seed is None:
            seed = np.random.randint(0, int(1e6), 1)
        self.rng = np.random.default_rng(seed)

    def reset_state(self) -> None:
        pass

    def load(self, ckpt_dir: str) -> None:
        self.load_state_dict(
            torch.load(
                os.path.join(ckpt_dir, "module.pth"),
                map_location=get_device()
            )
        )

    def save(self, ckpt_dir: str) -> None:
        torch.save(self.state_dict(), os.path.join(ckpt_dir, 'module.pth'))

    def _eps(self):
        if self.steps_made < self.warmup_steps:
            return 1.
        else:
            if self.steps_made % self.eps_decay_every == 0:
                self.current_eps *= self.eps_decay
            return self.current_eps

    def _get_action(self, observation: Dict[str, np.ndarray]) -> int:
        image = from_numpy(self.device, np.expand_dims(observation['image'], 0))
        proprio = from_numpy(self.device, np.expand_dims(observation['proprio'], 0))
        qvals = self.model.forward(image=image, proprio=proprio)
        qvals = to_numpy(qvals)
        return np.argmax(qvals)

    def update_target_network(self, tau: float) -> None:
        self.n_target_updates += 1
        for target_param, param in zip(
                self.target_model.parameters(), self.model.parameters()
        ):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

    def forward(self, image: torch.Tensor, proprio: torch.Tensor) -> torch.Tensor:
        return self.model.forward(image, proprio)

    @torch.no_grad()
    def forward_target(self, image: torch.Tensor, proprio: torch.Tensor) -> torch.Tensor:
        return self.target_model.forward(image, proprio)

    def get_action(self, observation: Dict[str, np.ndarray]) -> int:
        if not self.eval_mode:
            self.steps_made += 1
        eps = self._eps()
        u = self.rng.uniform(0, 1, 1).item()
        if u < eps:
            acs = self.rng.integers(0, self.acs_dim, 1).item()
        else:
            acs = self._get_action(observation=observation)
        return acs


def get_agent(config: DictConfig) -> BaseAgent:
    agent = DQNAgent(
        device=get_device(),
        model=QCNN(in_channels=config.in_channels, acs_dim=config.acs_dim),
        eval_mode=config.eval_mode,
        warmup_steps=config.warmup_steps,
        eps_start=config.eps_start,
        eps_decay=config.eps_decay,
        eps_decay_every=config.eps_decay_every,
        acs_dim=config.acs_dim,
        seed=config.seed
    )
    agent.steps_made = config.ckpt_steps_made
    agent.current_eps = config.ckpt_eps
    return agent


Writing submission_vdn/model.py


# Импортируем необходимые модули

Примечание: как можно видеть ниже, директория с вашими файлами решения будет использована как точка входа для импорта фабричного метода `get_agent`.

In [26]:
import sys
sys.path.insert(1, submission_dir)
from model import DQNAgent, get_agent
from utils.utils import get_device, from_numpy
from utils.networks import QCNN

# Определим VDN Trainer

Основная логика обновления весов алгоритма VDN реализована в методе `update()`

In [27]:
class VDNTrainer(nn.Module):
    def __init__(
        self,
        agents: Dict[str, DQNAgent],
        learning_rate: float,
        gamma: float = 0.99,
        td_criterion=F.smooth_l1_loss,
        tau: float = 0.005
    ):
        super(VDNTrainer, self).__init__()
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.td_criterion = td_criterion
        self.tau = tau
        self.n_updates = 0
        self.last_logs = {}

        # Set agents
        self.agents = nn.ModuleDict(agents)
        self.devices = {n: a.device for n, a in self.agents.items()}

        # Define optimizer
        self.optimizer = optim.Adam(
            params=(self._get_params()),
            lr=self.learning_rate
        )

    def _get_params(self):
        params = []
        ids = []
        for a in self.agents.values():
            model_id = id(a.model)
            if model_id not in ids:
                params.extend(list(a.model.parameters()))
                ids.append(model_id)
        return params

    def update_target_networks(self) -> None:
        for a in self.agents.values():
            a.update_target_network(tau=self.tau)

    def forward(
        self,
        images: Dict[str, torch.Tensor],
        proprio: Dict[str, torch.Tensor],
    ) -> Dict[str, torch.Tensor]:
        output = {}
        for name, agent in self.agents.items():
            qvals = agent.forward(images[name], proprio[name])
            output[name] = qvals
        return output

    @torch.no_grad()
    def forward_target(
        self,
        images: Dict[str, torch.Tensor],
        proprio: Dict[str, torch.Tensor],
    ) -> Dict[str, torch.Tensor]:
        output = {}
        for name, agent in self.agents.items():
            qvals = agent.forward_target(images[name], proprio[name])
            output[name] = qvals
        return output

    def save(self, dir: str, config: dict) -> None:
        for name, agent in self.agents.items():
            agent_dir = os.path.join(dir, name)
            if not os.path.exists(agent_dir):
                os.makedirs(agent_dir)
            agent.save(ckpt_dir=agent_dir)
            with open(os.path.join(agent_dir, 'agent_config.yaml'), 'w') as outfile:
                yaml.dump(config, outfile, default_flow_style=False)

    def update(self, sample: Dict[str, Any]) -> Dict[str, float]:

        # Get device
        devs = self.devices

        # Unpack data
        obs_image = {k: from_numpy(devs[k], v['image']) 
                     for k, v in sample['observations'].items()}
        next_obs_image = {k: from_numpy(devs[k], v['image']) 
                          for k, v in sample['next_observations'].items()}
        obs_proprio = {k: from_numpy(devs[k], v['proprio']) 
                       for k, v in sample['observations'].items()}
        next_obs_proprio = {k: from_numpy(devs[k], v['proprio']) 
                            for k, v in sample['next_observations'].items()}
        actions = {k: from_numpy(devs[k], v) for k, v in sample['actions'].items()}
        rewards = {k: from_numpy(devs[k], v) / 10. for k, v in sample['rewards'].items()}
        terminations = {k: from_numpy(devs[k], v) for k, v in sample['terminations'].items()}

        shared_rewards = torch.cat([r.unsqueeze(-1) for r in rewards.values()], axis=-1)
        shared_rewards = shared_rewards.sum(dim=-1, keepdims=True)

        # construct target q-values
        qa_tp1_target = self.forward_target(next_obs_image, next_obs_proprio)
        with torch.no_grad():
            qa_tp1_model = self.forward(next_obs_image, next_obs_proprio)

        # Select maximum value by agent and sum
        q_tp1 = []
        for name, qa_tp1_t_a in qa_tp1_target.items():
            qa_tp1_m_a = qa_tp1_model[name]
            q_tp1_a = torch.gather(qa_tp1_t_a, 1,
                                   qa_tp1_m_a.argmax(dim=1, keepdims=True))
            term = terminations[name].unsqueeze(1)
            q_tp1_a = q_tp1_a * torch.logical_not(term)
            q_tp1.append(q_tp1_a)
        q_tp1 = torch.cat(q_tp1, axis=-1).sum(dim=-1, keepdims=True)

        # Create targets
        q_targets = shared_rewards + self.gamma * q_tp1

        # Calculate outputs
        qa_t = self.forward(obs_image, obs_proprio)

        # Select qvalue by action
        q_t = []
        for name, qa_t_a in qa_t.items():
            acs_a = actions[name]
            q_t_a = torch.gather(qa_t_a, 1,
                                 acs_a.to(torch.long).unsqueeze(1))
            q_t.append(q_t_a)
        q_t = torch.cat(q_t, axis=-1).sum(dim=-1, keepdims=True)

        # compute loss
        loss = self.td_criterion(q_t, q_targets)

        # performing gradient step
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        self.n_updates += 1

        return {
            'dqn_loss': loss.item(),
            'mean_q_value': q_t.mean().item(),
            'n_updates': self.n_updates,
            'mean_shared_reward': shared_rewards.mean().item(),
            'mean_agents_steps': np.mean([a.steps_made for a in self.agents.values()]).item(),
            'mean_target_updates': np.mean([a.n_target_updates for a in self.agents.values()]).item(),
            'mean_eps': np.mean([a.current_eps for a in self.agents.values()]).item()
        }

# Инициализация симулятора и буфера данных

In [28]:
env = AijMultiagentEnv()
buffer = ReplayBuffer(config.buffer_size)

# Инициализация VDN trainer

Стоит отметить, что все агенты делят веса сетки критика, такая архитектура улучшает сходимость алгоритма и сокращает количество обучаемых параметров.

In [29]:
device = get_device()

model = QCNN(in_channels=config.in_channels, acs_dim=config.acs_dim)

agents = {a: DQNAgent(
    device=device,
    model=model,
    eval_mode=False,
    warmup_steps=config.warmup_steps,
    eps_start=config.eps_start,
    eps_decay=config.eps_decay,
    eps_decay_every=config.eps_decay_every,
    acs_dim=config.acs_dim
) for a in env.possible_agents}

trainer = VDNTrainer(
    agents=agents,
    learning_rate=config.learning_rate,
    gamma=config.gamma,
    tau=config.tau
)

# Заполняем буфер изначальными рандомизированными данными

In [30]:
initial_batch = sample_rollouts(
    n_rollouts=config.initial_batch_episodes, env=env, agents=trainer.agents, verbose=True
)
buffer.add_batch(initial_batch)
dataloader = DataLoader(
    dataset=buffer,
    batch_size=config.batch_size,
    num_workers=0,
    collate_fn=collate_fn,
    shuffle=True,
)
print(f'Initial Buffer Size: {len(buffer)}')

100%|██████████| 10/10 [00:29<00:00,  2.92s/it]

Initial Buffer Size: 10000





# Запускаем обучение

In [None]:
training_logs = []


for it in tqdm(range(config.n_iters)):
    # print(config.n_iters)
    # Sample batch
    batch = sample_rollouts(env=env, agents=trainer.agents,
                            n_rollouts=config.episodes_per_iter, verbose=False)
    batch_size = sum([len(e) for e in batch])

    # Add to buffer
    buffer.add_batch(rollouts=batch)
    data_iter = iter(dataloader)

    # Launch update loop
    iter_n_updates = max(1, batch_size // config.update_every)
    # print(iter_n_updates)
    iter_logs = []
    
    for _ in range(iter_n_updates):
        try:
            sample = next(data_iter)
        except StopIteration:
            data_iter = iter(dataloader)
            sample = next(data_iter)
        logs = trainer.update(sample)
        if trainer.n_updates % config.target_updates_freq == 0:
            trainer.update_target_networks()
        iter_logs.append(logs)

    # Collect Logs
    mean_agent_reward = get_mean_agent_return(batch)
    mean_episode_length = batch_size / config.episodes_per_iter
    iter_logs = {
        'mean_agent_reward': mean_agent_reward,
        'mean_episode_length': mean_episode_length,
        'batch_size': batch_size,
        'iter_n_updates': iter_n_updates,
        'buffer_size_transitions': len(buffer),
        'buffer_size_episodes': len(buffer.rollouts),
        **{k: np.mean([l[k] for l in iter_logs]) for k in iter_logs[0].keys()},
    }
    training_logs.append(iter_logs)

    # Write artifacts
    if it > 0 and it % config.iter_per_save == 0:
        ckpt_steps_made = min([a.steps_made for a in trainer.agents.values()])
        ckpt_eps = min([a.current_eps for a in trainer.agents.values()])
        save_config = {
            **config,
            'eval_mode': True,
            'ckpt_steps_made': ckpt_steps_made,
            'ckpt_eps': ckpt_eps,
            'seed': 42
        }
        trainer.save(dir=config.output_dir, config=save_config)

 22%|██▏       | 43/200 [6:54:11<5:31:23, 126.65s/it]   

# Визуализация логов обучения

Необходимо отметить, что в данном случае средняя награда обучаемых агентов `Mean Agent Reward` не обязательно должна совпадать с целевой метрикой `Mean Focal Score`, вычисляемой тестовой системой. Основная причина заключается в том, что на фазе обучения агенты играют с симметричными агентами, тогда как в тестовой системе симуляции проводятся совместно с различными ботами со скрытыми от Участников стратегиями.

In [None]:
ncols = 3
n_graphs = len(training_logs[-1].keys())
nrows = ceil(n_graphs / ncols)

fig, axs = plt.subplots(nrows, ncols, figsize=(10, 10))
for i, key in enumerate(training_logs[-1].keys()):
    axs[i // ncols, i % ncols].plot(
        [tl[key] for tl in training_logs]
    )
    axs[i // ncols, i % ncols].set_title(key)

for ax in axs.flat:
    ax.set(xlabel='Training Step', ylabel='Value')

for ax in axs.flat:
    ax.label_outer()