# Алгоритмы сокращения пространства поиска на графе регулярной декомпозиции за счет симметрии и смежных техник (JPS и его модификации).

_Федор Мамаев, Прохор Архипов, 22Б09_

TODO: написать какую-то очень умную воду

<center><img src="./img/1.png"/></center>

In [1]:
import random
import math
import traceback
import time
import sys
import asyncio
from heapq import heappop, heappush
from pathlib import Path
from textwrap import dedent
from typing import *

import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
from PIL import Image, ImageDraw

%matplotlib inline

rand = random.Random(192837)

## 1. Представление карты

In [2]:
class Cell:
    def __init__(self, i: int, j: int):
        self._i = i
        self._j = j
    
    @property
    def i(self):
        return self._i
    
    @property
    def j(self):
        return self._j
    
    def __eq__(self, other):
        return isinstance(other, Cell) and (self._i, self._j) == (other._i, other._j)
    
    def __hash__(self) -> int:
        return hash((self._i, self._j))

In [3]:
class Direction:
    def __init__(self, di: int, dj: int):
        self._di = di
        self._dj = dj
    
    @property
    def di(self):
        return self._di
    
    @property
    def dj(self):
        return self._dj
    
    @property
    def delta(self):
        return (self._di, self._dj)
    
    @property
    def is_diagonal(self):
        return self._di != 0 and self._dj != 0
    
    def reversed(self):
        return Direction(-self._di, -self._dj)
    
    def __eq__(self, other):
        return isinstance(other, Direction) and self.delta == other.delta
    
    def __hash__(self):
        return hash(self.delta)

Direction.NONE = Direction(0, 0)
Direction.DOWN = Direction(1, 0)
Direction.RIGHT = Direction(0, 1)
Direction.UP = Direction(-1, 0)
Direction.LEFT = Direction(0, -1)
Direction.DIAG_DR = Direction(1, 1)
Direction.DIAG_UR = Direction(-1, 1)
Direction.DIAG_DL = Direction(1, -1)
Direction.DIAG_UL = Direction(-1, -1)
Direction.ALL_DIRECTIONS = [
    Direction.DOWN,
    Direction.RIGHT,
    Direction.UP,
    Direction.LEFT,
    Direction.DIAG_DR,
    Direction.DIAG_UR,
    Direction.DIAG_DL,
    Direction.DIAG_UL,
]

def direction_from_to(a: Cell, b: Cell):
    return Direction(b.i - a.i, b.j - a.j)

def step(cell: Cell, d: Direction):
    return Cell(cell.i + d.di, cell.j + d.dj)

In [4]:
class Map:
    def __init__(self, cells: npt.NDArray):
        self._width = cells.shape[1]
        self._height = cells.shape[0]
        self._cells = cells

    def in_bounds(self, cell: Cell) -> bool:
        return 0 <= cell.j < self._width and 0 <= cell.i < self._height

    def traversable(self, cell: Cell) -> bool:
        return self.in_bounds(cell) and not self._cells[cell.i, cell.j]

    def get_neighbours(self, cell: Cell) -> List[Cell]:
        neighbours = []
        
        i = cell.i
        j = cell.j

        # срезание углов разрешено
        for d in Direction.ALL_DIRECTIONS:
            next = step(cell, d)
            if self.traversable(next):
                neighbours.append(next)

        return neighbours

    @property
    def size(self) -> Tuple[int, int]:
        return (self._height, self._width)
    
    @property
    def height(self) -> int:
        return self._height
    
    @property
    def width(self) -> int:
        return self._width

In [5]:
def compute_cost(a: Cell, b: Cell) -> Union[int, float]:
    if abs(a.i - b.i) + abs(a.j - b.j) == 1:  # Cardinal move
        return 1
    elif abs(a.i - b.i) == 1 and abs(a.j - b.j) == 1:
        return math.sqrt(2)
    else:
        raise ValueError("Trying to compute the cost of a non-supported move! ONLY cardinal and diagonal moves are supported.")

## 2. Вершина поиска

In [6]:
class Node:
    def __init__(
        self,
        cell: Cell,
        g: Union[float, int] = 0,
        h: Union[float, int] = 0,
        f: Optional[Union[float, int]] = None,
        parent: "Node" = None,
    ):
        self.cell = cell
        self.g = g
        self.h = h
        if f is None:
            self.f = self.g + h
        else:
            self.f = f
        self.parent = parent

    @property
    def i(self):
        return self.cell.i
    
    @property
    def j(self):
        return self.cell.j

    def __eq__(self, other):
        return self.cell == other.cell

    def __hash__(self):
        return hash(self.cell) ^ 0x19283746

    def __lt__(self, other):
        return (self.f < other.f) or (self.f == other.f and self.h < other.h)  # лучший tie-break

## 3. Дерево поиска

In [7]:
class SearchTreePQD:
    def __init__(self):
        self._open = []
        self._closed = {}
        self._enc_open_duplicates = 0

    def __len__(self) -> int:
        return len(self._open) + len(self._closed)

    def open_is_empty(self) -> bool:
        return len(self._open) == 0

    def add_to_open(self, item: Node):
        heappush(self._open, item)

    def get_best_node_from_open(self) -> Optional[Node]:
        while not self.open_is_empty() and self.was_expanded(self._open[0]):
            self._enc_open_duplicates += 1
            heappop(self._open)
        if self.open_is_empty():
            return None
        return heappop(self._open)
        
    def add_to_closed(self, item: Node):
        self._closed[item.cell] = item

    def was_expanded(self, item: Node) -> bool:
        return item.cell in self._closed

    @property
    def opened(self):
        return self._open

    @property
    def expanded(self):
        return self._closed.values()

    @property
    def number_of_open_duplicates(self):
        return self._enc_open_duplicates

## 4. Валидация результатов

In [8]:
def make_path(goal: Optional[Node]) -> Tuple[Optional[List[Node]], Union[float, int]]:
    if goal is None:
        return None, 0

    length = goal.g
    current = goal
    path = []
    while current.parent:
        path.append(current)
        current = current.parent
    path.append(current)
    return path[::-1], length

## 5. Чтение заданий из файла

In [9]:
class Task:
    # TODO: пока optimal_dist читается из файла, но в дальнейшем
    # нужно придумать, как оценивать его с помощью стандартного A*,
    # не делая это два раза
    def __init__(self, bucket: int, map: Map, start: Cell, goal: Cell, optimal_dist: float):
        self.bucket = bucket
        self.map = map
        self.start = start
        self.goal = goal
        self.optimal_dist = optimal_dist

In [10]:
def read_map_from_file(filename: str) -> Map:
    with open(filename, 'r') as file:
        file.readline()                           # type octile
        height = int(file.readline().strip().split()[1])  # height XX
        width  = int(file.readline().strip().split()[1])  # width YY
        file.readline()                           # map

        map = np.ndarray(shape=(height, width), dtype=np.dtypes.BoolDType)
        for i in range(height):
            row = file.readline()
            for j in range(width):
                map[i, j] = row[j] in ['@', 'O', 'T']
        
        return Map(map)

In [11]:
map_storage: Dict[str, Map] = {}

def get_map(filename: str) -> Map:
    if filename not in map_storage:
        map_storage[filename] = read_map_from_file(f'./tasks/maps/{filename}')
    return map_storage[filename]

In [12]:
def read_scenario_from_file(filename: str) -> List[Task]:
    tasks = []
    with open(filename, 'r') as file:
        file.readline()  # version x.x

        for line in file.readlines():
            bucket_s, map_name, height_s, width_s, start_j_s, start_i_s, goal_j_s, goal_i_s, optimal_dist_s = line.strip().split('\t')
            map = get_map(map_name)
            tasks.append(Task(
                int(bucket_s),
                map,
                Cell(int(start_i_s), int(start_j_s)),
                Cell(int(goal_i_s), int(goal_j_s)),
                float(optimal_dist_s)
            ))
    return tasks

## 6. Параметры алгоритма A*

In [13]:
# типы

HeuristicFunc = Callable[[Cell, Cell], float]
NeighboursFunc = Callable[[Node, Map, Cell, HeuristicFunc], List[Node]]  # curr node, map, goal, heuristic -> neighbours

In [14]:
def heuristic_octile(a: Cell, b: Cell):
    dr = max(abs(a.i - b.i), abs(a.j - b.j))
    dl = min(abs(a.i - b.i), abs(a.j - b.j))
    return (dr - dl) + dl * math.sqrt(2)

In [15]:
def weighted_heuristic(heuristic_func: HeuristicFunc, weight: float) -> HeuristicFunc:
    return lambda a, b: heuristic_func(a, b) * weight

In [16]:
def neighbours_all(curr_node: Node, map: Map, goal: Cell, heuristic_func: HeuristicFunc):
    return [
        Node(
            cell=next_cell,
            g=curr_node.g + compute_cost(curr_node.cell, next_cell),
            h=heuristic_func(next_cell, goal),
            parent=curr_node
        )
        for next_cell in map.get_neighbours(curr_node.cell)
    ]

### 6.1. Функция поиска точек прыжка для алгоритма JPS

In [17]:
# можем ли мы повернуть в соответствии с Canonical Ordering
def can_turn(curr_d: Direction, next_d: Direction):
    if curr_d == next_d:
        return True
    if curr_d.is_diagonal and \
       (next_d.delta == (curr_d.di, 0) or \
        next_d.delta == (0, curr_d.dj)):
        return True
    return False

# направление первого шага от a к b согласно canonical ordering
def canonical_direction_from_to(a: Cell, b: Cell):
    d = direction_from_to(a, b)
    d = Direction((d.di // abs(d.di) if d.di != 0 else 0),
                  (d.dj // abs(d.dj) if d.dj != 0 else 0))
    return d

# см. статью
def get_forced_neighbours(curr_cell: Cell, map: Map, d: Direction):
    parent_cell = step(curr_cell, d.reversed())
    return [
        next_cell
        for next_cell in map.get_neighbours(curr_cell)
        if canonical_direction_from_to(parent_cell, next_cell) != d and \
           not map.traversable(step(parent_cell, canonical_direction_from_to(parent_cell, next_cell)))
    ]

def jps_jump(curr_node: Node, map: Map, d: Direction, goal: Cell, heuristic_func: HeuristicFunc, bound: Optional[int]) -> Optional[Node]:
    curr_cell = curr_node.cell
    next_cell = step(curr_cell, d)

    if not map.traversable(next_cell):
        return None
    
    next_node = Node(
        cell=next_cell,
        g=curr_node.g + compute_cost(curr_cell, next_cell),
        h=heuristic_func(next_cell, goal),
        parent=curr_node
    )

    if next_cell == goal:
        return next_node
    if bound is not None and bound == 0:
        return next_node
    
    if len(get_forced_neighbours(next_cell, map, d)) != 0:
        return next_node
    
    if d.is_diagonal:
        for d_proj in (Direction(d.di, 0), Direction(0, d.dj)):
            if jps_jump(next_node, map, d_proj, goal, heuristic_func, (bound - 1) if bound is not None else None) is not None:
                return next_node

    return jps_jump(next_node, map, d, goal, heuristic_func, (bound - 1) if bound is not None else None)

In [18]:
# направления для раскрытия в соответствии с full canonical ordering
def canonical_directions_for_expansion(curr_node: Node, map: Map):
    if curr_node.parent is None:
        return Direction.ALL_DIRECTIONS
    
    d = direction_from_to(curr_node.parent.cell, curr_node.cell)
    if len(get_forced_neighbours(curr_node.cell, map, d)) != 0:
        return Direction.ALL_DIRECTIONS
    if d.is_diagonal:
        return [d, Direction(d.di, 0), Direction(0, d.dj)]
    return [d]

def jps_generate_neighbours(curr_node: Node, map: Map, goal: Cell, heuristic_func: HeuristicFunc, bound: Optional[int]) -> List[Node]:
    return list(filter(lambda it: it is not None, [
        jps_jump(curr_node, map, d, goal, heuristic_func, bound)
        for d in canonical_directions_for_expansion(curr_node, map)
    ]))

def neighbours_bounded_jps(bound: int) -> NeighboursFunc:
    return lambda curr_node, map, goal, heuristic_func: jps_generate_neighbours(
        curr_node, map, goal, heuristic_func, bound
    )

neighbours_canonical_astar = neighbours_bounded_jps(0)
neighbours_jps = neighbours_bounded_jps(None)

## 7. Алгоритм A*

In [19]:
def astar_search(
    task_map: Map,
    start: Cell,
    goal: Cell,
    heuristic_func: HeuristicFunc,
    neighbours_func: NeighboursFunc,
    search_tree: Type[SearchTreePQD],
) -> Tuple[bool, Optional[Node], int, int, int]:
    # Returns: found or not, final node, steps, tree size
    ast = search_tree()
    steps = 0

    start_node = Node(cell=start, g=0, h=heuristic_func(start, goal))
    ast.add_to_open(start_node)

    while not ast.open_is_empty():
        curr_node: Node = ast.get_best_node_from_open()
        if curr_node is None:
            break
        curr_cell: Cell = curr_node.cell
        ast.add_to_closed(curr_node)

        if curr_cell == goal:
            return True, curr_node, steps, len(ast)

        steps += 1
        for next_node in neighbours_func(curr_node, task_map, goal, heuristic_func):
            if not ast.was_expanded(next_node):
                ast.add_to_open(next_node)

    return False, None, steps, len(ast)

In [20]:
AstarAlgorithm = Callable[[Map, Cell, Cell], Tuple[bool, Optional[Node], int, int, int]]

standard_astar  = lambda map, start, goal: astar_search(map, start, goal, heuristic_octile, neighbours_all, SearchTreePQD)
canonical_astar = lambda map, start, goal: astar_search(map, start, goal, heuristic_octile, neighbours_canonical_astar, SearchTreePQD)
jps             = lambda map, start, goal: astar_search(map, start, goal, heuristic_octile, neighbours_jps, SearchTreePQD)

def weighted_astar(weight: float):
    return lambda map, start, goal: astar_search(
        map, start, goal, weighted_heuristic(heuristic_octile, weight), neighbours_all, SearchTreePQD
    )
def weighted_jps(weight: float):
    return lambda map, start, goal: astar_search(
        map, start, goal, weighted_heuristic(heuristic_octile, weight), neighbours_jps, SearchTreePQD
    )
def bounded_jps(bound: int):
    return lambda map, start, goal: astar_search(
        map, start, goal, heuristic_octile, neighbours_bounded_jps(bound), SearchTreePQD
    )
def weighted_bounded_jps(weight: float, bound: int):
    return lambda map, start, goal: astar_search(
        map, start, goal, weighted_heuristic(heuristic_octile, weight), neighbours_bounded_jps(bound), SearchTreePQD
    )

## 8. Давайте проведём бенчмаркинг

TODO: написать ещё какую-нибудь умную воду (вообще её надо написать везде, где только можно)

Все карты скачаны с https://movingai.com/benchmarks/grids.html

<center><img src="./img/2.png"/></center>

### 8.0. Полезные функции

In [21]:
async def run_on_task(
    task: Task, algorithm: AstarAlgorithm
) -> Tuple[bool, Optional[List[Node]], float, int, int, float]:
    # Returns: found o/n, result path, path length, steps, tree size, time
    start_time = time.time()
    found, final_node, steps, tree_size = algorithm(
        task.map,
        task.start,
        task.goal
    )
    end_time = time.time()
    result_path, path_length = make_path(final_node)
    return found, result_path, path_length, steps, tree_size, end_time - start_time

In [22]:
async def run_on_tasks(
    tasks: List[Task],
    algorithms: List[Tuple[str, AstarAlgorithm]],
    show_info: bool = True,
    progressbar_length: int = 50
) -> Dict[str, Tuple[np.array, np.array, np.array, np.array]]:
    summary = {
        alg_name: ([], [], [], [], []) for alg_name, _ in algorithms
    }

    tasks_num = len(tasks)
    tasks_completed = 0
    progress_units_completed = 0
    if show_info:
        print(' ' + '_' * progressbar_length + ' ')
        print('[', end='')

    async_loop = asyncio.new_event_loop()
    async_tasks = {
        alg_name: [] for alg_name, _ in algorithms
    }
    for task in tasks:
        for alg_name, algorithm in algorithms:
            async_tasks[alg_name].append(async_loop.create_task(run_on_task(task, algorithm)))

    for i in range(len(tasks)):
        for alg_name, algorithm in algorithms:
            _, _, path_length, steps, tree_size, time_spent = await async_tasks[alg_name][i]
            optimality = path_length / tasks[i].optimal_dist
            summary[alg_name][0].append(optimality)
            summary[alg_name][1].append(path_length)
            summary[alg_name][2].append(tree_size)
            summary[alg_name][3].append(steps)
            summary[alg_name][4].append(time_spent)
        
        tasks_completed += 1
        if show_info:
            while tasks_completed >= int(tasks_num / progressbar_length * (progress_units_completed + 1)) and progress_units_completed < progressbar_length:
                progress_units_completed += 1
                print('|', end='')
                sys.stdout.flush()
    
    if show_info:
        print(']')
    
    return {
        alg_name: tuple(map(lambda lst: np.array(lst), summary[alg_name]))
        for alg_name in summary.keys()
    }

In [23]:
def plot_summary(summary: Dict[str, Tuple[np.array, np.array, np.array, np.array, np.array]], bar_color: str):
    algorithm_names = list(summary.keys())

    optimality_data = [
        np.array(summary[alg_name][0]) * 100
        for alg_name in algorithm_names
    ]
    length_data = [
        summary[alg_name][1]
        for alg_name in algorithm_names
    ]
    tree_size_data = [
        summary[alg_name][2]
        for alg_name in algorithm_names
    ]
    steps_data = [
        summary[alg_name][3]
        for alg_name in algorithm_names
    ]
    time_data = [
        summary[alg_name][4]
        for alg_name in algorithm_names
    ]
    optimality_avg = [
        float(np.mean(summary[alg_name][0])) * 100
        for alg_name in algorithm_names
    ]
    length_avg = [
        float(np.mean(summary[alg_name][1]))
        for alg_name in algorithm_names
    ]
    tree_size_avg = [
        float(np.mean(summary[alg_name][2]))
        for alg_name in algorithm_names
    ]
    steps_avg = [
        float(np.mean(summary[alg_name][3]))
        for alg_name in algorithm_names
    ]
    time_avg = [
        float(np.mean(summary[alg_name][4]))
        for alg_name in algorithm_names
    ]

    print('===== SUMMARY =====')
    print('=== Average optimality ratio: ===')
    print('\n'.join(['Weight = %s: %d%%' % it for it in zip(algorithm_names, map(int, optimality_avg))]))
    print('=== Average path length: ===')
    print('\n'.join(['Weight = %s: %.6f' % it for it in zip(algorithm_names, length_avg)]))
    print('=== Average search tree size: ===')
    print('\n'.join(['Weight = %s: %d' % it for it in zip(algorithm_names, tree_size_avg)]))
    print('=== Average expansions: ===')
    print('\n'.join(['Weight = %s: %d' % it for it in zip(algorithm_names, steps_avg)]))
    print('=== Average time usage: ===')
    print('\n'.join(['Weight = %s: %.4f s' % it for it in zip(algorithm_names, time_avg)]))

    fig, axs = plt.subplots(nrows=5, ncols=2, figsize=(12, 12))
    ((ax_opbw, ax_opa), (ax_lnbw, ax_lna), (ax_tsbw, ax_tsa), (ax_exbw, ax_exa), (ax_tmbw, ax_tma)) = axs
    ax_opbw.set_title('Cost Overhead (%)')
    ax_lnbw.set_title('Path Length')
    ax_tsbw.set_title('Search Tree Size')
    ax_exbw.set_title('Expansions')
    ax_tmbw.set_title('Time Spent (s)')
    ax_opa.set_title('Average Cost Overhead (%)')
    ax_lna.set_title('Average Path Length')
    ax_tsa.set_title('Average Search Tree Size')
    ax_exa.set_title('Average Expansions')
    ax_tma.set_title('Average Time (s)')

    boxplots = [
        ax_opbw.boxplot(optimality_data, tick_labels=algorithm_names, patch_artist=True),
        ax_lnbw.boxplot(length_data,     tick_labels=algorithm_names, patch_artist=True),
        ax_tsbw.boxplot(tree_size_data,  tick_labels=algorithm_names, patch_artist=True),
        ax_exbw.boxplot(steps_data,      tick_labels=algorithm_names, patch_artist=True),
        ax_tmbw.boxplot(time_data,       tick_labels=algorithm_names, patch_artist=True)
    ]
    
    for boxplot in boxplots:
        for patch in boxplot['boxes']:
            patch.set_facecolor(bar_color)
    
    barplots = [
        ax_opa.bar(algorithm_names, optimality_avg, color=bar_color),
        ax_lna.bar(algorithm_names, length_avg,     color=bar_color),
        ax_tsa.bar(algorithm_names, tree_size_avg,  color=bar_color),
        ax_exa.bar(algorithm_names, steps_avg,      color=bar_color),
        ax_tma.bar(algorithm_names, time_avg,       color=bar_color)
    ]

    for ax in axs[:, 0]:
        ax.set_xticks(range(1, 1 + len(algorithm_names)))
        ax.set_xticklabels(algorithm_names, rotation=45)
    for ax in axs[:, 1]:
        ax.set_xticks(range(len(algorithm_names)))
        ax.set_xticklabels(algorithm_names, rotation=45)

    fig.tight_layout()
    plt.show()

### 8.1. Список алгоритмов

### TODO: убрать и написать там. Примечание:
Здесь из списка заданий будет выбираться случайным образом 500 и делиться по уровням сложности. Первые 250 -- Easy, следующие 150 -- Normal, следующие 75 -- Hard, последние 25 -- Brutal. Оцениваться будут стандартным A*

In [24]:
optimal_algorithm = ('A*', standard_astar)
all_algorithms = [
    optimal_algorithm,
    ('Canonical A*', canonical_astar),
    ('BJPS (bound = 1)', bounded_jps(1)),
    ('BJPS (bound = 4)', bounded_jps(4)),
    ('BJPS (bound = 16)', bounded_jps(16)),
    ('BJPS (bound = 64)', bounded_jps(64)),
    ('JPS', jps),
    ('WA* (W = 1.2)', weighted_astar(1.2)),
    ('WJPS (W = 1.2)', weighted_jps(1.2)),
    ('WA* (W = 2)', weighted_astar(2)),
    ('WJPS (W = 2)', weighted_jps(2)),
    ('WA* (W = 5)', weighted_astar(5)),
    ('WJPS (W = 5)', weighted_jps(5)),
    ('WBJPS (W = 2, bound = 16)', weighted_bounded_jps(2, 16))
]

In [25]:
# Maximum number of tasks per difficulty level
EASY_NUM = 250
MEDIUM_NUM = 150
HARD_NUM = 75
BRUTAL_NUM = 25
TASK_NUM = EASY_NUM + MEDIUM_NUM + HARD_NUM + BRUTAL_NUM

In [None]:
def test_on_scenarios(scenario_filenames: List[str]):
    tasks: List[Task] = []

    for scen_fn in scenario_filenames:
        tasks += read_scenario_from_file(f'tasks/scenarios/{scen_fn}')
    
    while len(tasks) > TASK_NUM:
        del tasks[rand.randint(0, len(tasks) - 1)]

    print(f'Running A* on tasks to estimate difficulty level...')
    optimal_path_length = (await run_on_tasks(tasks, [optimal_algorithm]))[optimal_algorithm[0]][1]
    for i in range(len(tasks)):
        tasks[i].optimal_dist = optimal_path_length[i]
    tasks = sorted(tasks, key=lambda task: task.optimal_dist)

    tasks_easy = tasks[: EASY_NUM]
    tasks_medm = tasks[EASY_NUM : EASY_NUM + MEDIUM_NUM]
    tasks_hard = tasks[EASY_NUM + MEDIUM_NUM : EASY_NUM + MEDIUM_NUM + HARD_NUM]
    tasks_shrd = tasks[EASY_NUM + MEDIUM_NUM + HARD_NUM :]

    if len(tasks_easy) > 0:
        print(f'Running tests on Easy tasks, {len(tasks_easy)} tasks total...')
        plot_summary(await run_on_tasks(tasks_easy, all_algorithms), 'Green')
    else:
        print(f'No Easy tasks found in the scenarios')
    if len(tasks_medm) > 0:
        print(f'Running tests on Medium tasks, {len(tasks_medm)} tasks total...')
        plot_summary(await run_on_tasks(tasks_medm, all_algorithms), 'Orange')
    else:
        print(f'No Medium tasks found in the scenarios')
    if len(tasks_hard) > 0:
        print(f'Running tests on Hard tasks, {len(tasks_hard)} tasks total...')
        plot_summary(await run_on_tasks(tasks_hard, all_algorithms), 'Red')
    else:
        print(f'No Hard tasks found in the scenarios')
    if len(tasks_shrd) > 0:
        print(f'Running tests on Brutal tasks, {len(tasks_shrd)} tasks total...')
        plot_summary(await run_on_tasks(tasks_shrd, all_algorithms), '#66023c')
    else:
        print(f'No Brutal tasks found in the scenarios')

### 8.2. Карты городов 1024x1024

In [None]:
city1024_scenarios = [
    'Berlin_1_1024.map.scen',
    'Milan_1_1024.map.scen',
    'Moscow_0_1024.map.scen',
    'Shanghai_0_1024.map.scen'
]

test_on_scenarios(city1024_scenarios)