In [1]:
import dataclasses
import sys
from typing import Dict, Optional
from typing import List

import numpy as np
import plotly.graph_objects as go
import shapely.affinity
import torch
import torch.nn.functional
from shapely.geometry import Polygon, MultiPolygon
from torch import nn
from tqdm import tqdm

import_paths = [".."]
for import_path in import_paths:
    sys.path.append(import_path)

from swarm_nfomp.utils.math import interpolate_1d_pytorch, unfold_angles, wrap_angles
from swarm_nfomp.utils.position2d import Position2D
from swarm_nfomp.utils.position_array2d import PositionArray2D
from swarm_nfomp.utils.timer import Timer
from swarm_nfomp.utils.universal_factory import UniversalFactory

SyntaxError: invalid syntax (position_array2d.py, line 43)

# Collision detection class

In [None]:
@dataclasses.dataclass
class RobotSwarmState:
    positions: PositionArray2D
    shapes: List[Polygon]


class CollisionDetector:
    def __init__(self, inside_rectangle_region: MultiPolygon, outside_rectangle_region: Polygon,
                 robot_shapes: List[Polygon]):
        self.inside_rectangle_region = inside_rectangle_region
        self.outside_rectangle_region = outside_rectangle_region
        self.robot_shapes = robot_shapes

    @staticmethod
    def affine_transform(position: Position2D):
        matrix = position.as_matrix()
        return [matrix[0, 0], matrix[0, 1], matrix[1, 0], matrix[1, 1], matrix[0, 2], matrix[1, 2]]

    def transformed_robot_shapes(self, robot_swarm_positions) -> List[Polygon]:
        return [shapely.affinity.affine_transform(robot_shape, self.affine_transform(robot_position))
                for robot_shape, robot_position in zip(self.robot_shapes, robot_swarm_positions)]

    def is_collision(self, robot_swarm_positions: PositionArray2D) -> np.array:
        transformed_robot_shapes: List[Polygon] = self.transformed_robot_shapes(robot_swarm_positions)
        for shape in transformed_robot_shapes:
            if self.inside_rectangle_region.intersects(shape):
                return True
            if not self.outside_rectangle_region.contains(shape):
                return True
        for i in range(len(transformed_robot_shapes)):
            for j in range(i + 1, len(transformed_robot_shapes)):
                if transformed_robot_shapes[i].intersects(transformed_robot_shapes[j]):
                    return True
        return False


    def is_collision_for_each_robot(self, robot_swarm_positions: PositionArray2D) -> np.array:
        transformed_robot_shapes: List[Polygon] = self.transformed_robot_shapes(robot_swarm_positions)
        result = np.zeros(len(transformed_robot_shapes), dtype=bool)
        for i in range(len(transformed_robot_shapes)):
            shape = transformed_robot_shapes[i]
            if self.inside_rectangle_region.intersects(shape):
                result[i] = True
            elif not self.outside_rectangle_region.contains(shape):
                result[i] = True
            for j in range(i + 1, len(transformed_robot_shapes)):
                if transformed_robot_shapes[i].intersects(transformed_robot_shapes[j]):
                    result[i] = True
                    break
        return np.array(result)

    def is_collision_for_list(self, robot_swarm_positions: List[PositionArray2D]) -> np.array:
        return np.array([self.is_collision(robot_swarm_position) for robot_swarm_position in robot_swarm_positions])


    def is_collision_for_each_robot_for_list(self, robot_swarm_positions: List[PositionArray2D]) -> np.array:
        return np.array([self.is_collision_for_each_robot(robot_swarm_position) for robot_swarm_position in robot_swarm_positions])


class CollisionDetectorFactory:
    @staticmethod
    def make(data: Dict) -> CollisionDetector:
        inside_region = MultiPolygon([Polygon(p) for p in data["inside_polygon"]])
        outside_region = Polygon(data["outside_polygon"])
        robot_shapes = [Polygon(p) for p in data["robot_shapes"]]
        return CollisionDetector(inside_region, outside_region, robot_shapes)


In [None]:
@dataclasses.dataclass
class CollisionDetectionResultVisualizerConfig:
    xmin: float
    xmax: float
    ymin: float
    ymax: float


class CollisionDetectionResultVisualizer:
    def __init__(self, parameters: CollisionDetectionResultVisualizerConfig):
        self._fig = go.Figure()
        self.parameters = parameters

    def visualize(self, detector: CollisionDetector, robot_positions: List[PositionArray2D]):
        self.draw_initial_polygons(detector)
        self.add_frames(detector, robot_positions)
        self.update_layout()

    def draw_initial_polygons(self, detector: CollisionDetector):
        self._fig.add_trace(self.draw_multipolygon(detector.inside_rectangle_region))
        self._fig.add_trace(self.draw_polygon(detector.outside_rectangle_region))
        self._fig.add_trace(self.draw_polygons(detector.robot_shapes, color="green"))

    @staticmethod
    def draw_multipolygon(polygon: MultiPolygon):
        # create an empty list to hold the coordinates of the polygons
        x, y = [], []

        # iterate over the polygons in the MultiPolygon
        for poly in polygon.geoms:
            # get the exterior coordinates of the polygon
            poly_coords = poly.exterior.coords.xy
            x.extend(poly_coords[0].tolist())
            y.extend(poly_coords[1].tolist())
            x.append(None)
            y.append(None)

        # create a scatter trace with the polygon coordinates
        return go.Scatter(x=x, y=y, mode='lines', fill="toself", line=dict(color='blue'), name="inside polygon")

    @staticmethod
    def draw_polygon(polygon: Polygon):
        # get the exterior coordinates of the polygon
        poly_coords = polygon.exterior.coords.xy
        x = poly_coords[0].tolist()
        y = poly_coords[1].tolist()

        # create a scatter trace with the polygon coordinates
        return go.Scatter(x=x, y=y, mode='lines', line=dict(color='red'), name="outside polygon")

    @staticmethod
    def draw_polygons(polygons: List[Polygon], color: str):
        # create an empty list to hold the coordinates of the polygons
        x, y = [], []

        # iterate over the polygons in the MultiPolygon
        for poly in polygons:
            # get the exterior coordinates of the polygon
            poly_coords = poly.exterior.coords.xy
            x.extend(poly_coords[0].tolist())
            y.extend(poly_coords[1].tolist())
            x.append(None)
            y.append(None)

        # create a scatter trace with the polygon coordinates
        return go.Scatter(x=x, y=y, mode='lines', line=dict(color=color), fill="toself", name="robot shapes")

    def add_frames(self, detector: CollisionDetector, robot_positions: List[PositionArray2D]):
        frames = []
        for robot_position in robot_positions:
            is_collision = collision_detector.is_collision(robot_position)
            robot_shapes = detector.transformed_robot_shapes(robot_position)
            color = "red" if is_collision else "green"
            frame = go.Frame(
                data=[
                    self.draw_multipolygon(detector.inside_rectangle_region),
                    self.draw_polygon(detector.outside_rectangle_region),
                    self.draw_polygons(robot_shapes, color=color),
                ],
            )
            frames.append(frame)
        self._fig.frames = frames

    def update_layout(self):
        self._fig.update_layout(
            updatemenus=[
                dict(
                    type="buttons",
                    direction="left",
                    showactive=False,
                    x=0.1,
                    y=1.2,
                    buttons=list([
                        dict(
                            label="Play",
                            method="animate",
                            args=[None, {
                                "frame": {
                                    "duration": 50,
                                    "redraw": False
                                },
                                "fromcurrent": True,
                                "transition": {
                                    "duration": 0,
                                }
                            }]
                        ),
                    ]),
                )
            ]
        )
        self._fig.update_layout(
            yaxis={
                "scaleanchor": "x",
                "scaleratio": 1,
                "range": [self.parameters.ymin, self.parameters.ymax]
            },
            xaxis={
                "range": [self.parameters.xmin, self.parameters.xmax]
            }
        )

    def save(self, filename):
        self._fig.write_html(filename)

In [None]:
collision_detector_parameters = {
    "outside_polygon": [
        [-5, -5], [-5, 5], [5, 5], [5, -5]
    ],
    "inside_polygon": [
        [[-5, -5], [-5, -2], [-2, -2], [-2, -5]],
        [[2, -5], [2, -2], [5, -2], [5, -5]],
        [[-5, 2], [-5, 5], [-2, 5], [-2, 2]],
        [[2, 2], [2, 5], [5, 5], [5, 2]],
    ],
    "robot_shapes": [
        [[-1, -0.5], [-1, 0.5], [1, 0.5], [1, -0.5]],
        [[-1, -0.5], [-1, 0.5], [1, 0.5], [1, -0.5]],
    ]
}
collision_detector = CollisionDetectorFactory.make(collision_detector_parameters)

In [None]:
random_robot_positions = np.random.uniform(0, 1, size=(100, 2, 3))
random_robot_positions[:, :, :2] = random_robot_positions[:, :, :2] * 10 - 5
random_robot_positions[:, :, 2] = random_robot_positions[:, :, 2] * 2 * np.pi - np.pi
random_robot_positions = [PositionArray2D.from_vec(position) for position in random_robot_positions]

visualizer_parameters = CollisionDetectionResultVisualizerConfig(
    xmin=-10, xmax=10, ymin=-5, ymax=5)
visualizer = CollisionDetectionResultVisualizer(visualizer_parameters)
visualizer.visualize(collision_detector, random_robot_positions)
visualizer.save("data/warehouse_collision_detector_visualizer.html")

# NFOMP

In [None]:
@dataclasses.dataclass
class ONFModelConfig:
    mean: float
    sigma: float
    input_dimension: int
    encoding_dimension: int
    output_dimension: int
    hidden_dimensions: List[int]


class ONF(nn.Module):
    def __init__(self, parameters: ONFModelConfig):
        super().__init__()
        self.encoding_layer = nn.Linear(parameters.input_dimension, parameters.encoding_dimension)
        self.mlp1 = self.make_mlp(parameters.encoding_dimension, parameters.hidden_dimensions[:-1],
                                  parameters.hidden_dimensions[-1])
        self.mlp2 = self.make_mlp(parameters.encoding_dimension + parameters.hidden_dimensions[-1], [], parameters.output_dimension)
        self._mean = parameters.mean
        self._sigma = parameters.sigma

    @classmethod
    def make_mlp(cls, input_dimension, hidden_dimensions, output_dimension):
        modules = []
        for dimension in hidden_dimensions:
            modules.append(nn.Linear(input_dimension, dimension))
            modules.append(nn.ReLU())
            input_dimension = dimension
        modules.append(nn.Linear(input_dimension, output_dimension))
        return nn.Sequential(*modules)

    def forward(self, x):
        x = (x - self._mean) / self._sigma
        x = self.encoding_layer(x)
        x = torch.sin(x)
        input_x = x
        x = self.mlp1(input_x)
        x = torch.cat([x, input_x], dim=1)
        x = self.mlp2(x)
        return x


In [7]:
@dataclasses.dataclass
class PathPlannerTask:
    start: PositionArray2D
    goal: PositionArray2D
    collision_detector: CollisionDetector


@dataclasses.dataclass
class ResultPath:
    positions_: np.array

    @property
    def numpy_positions(self):
        return self.positions_



@dataclasses.dataclass
class PathOptimizedState:
    positions_: torch.Tensor  # [ n_optimized_states, n_robots, 3]
    direction_constraint_multipliers: torch.Tensor
    start_position: torch.Tensor  #[n_robots, 3]
    goal_position: torch.Tensor  #[n_robots, 3]

    @property
    def result_path(self) -> ResultPath:
        return ResultPath(
            self.positions.cpu().detach().numpy()
        )

    @property
    def positions(self) -> torch.Tensor:
        return torch.cat(
            [self.start_position[None], self.positions_, self.goal_position[None]], dim=0)

    def reparametrize(self):
        distances = self.calculate_distances()
        old_times = torch.cumsum(distances, dim=0)
        old_times = torch.cat([torch.zeros(1), old_times], dim=0)
        new_times = torch.linspace(0, old_times[-1], old_times.shape[0])
        # self.positions_[:, :, 2] = unfold_angles(self.positions_[:, :, 2])
        positions: torch.Tensor = self.positions
        reshaped_positions = positions.reshape(self.positions.shape[0], -1)
        interpolated_positions = interpolate_1d_pytorch(reshaped_positions, old_times, new_times)[1:-1]
        self.positions_.data = interpolated_positions.reshape(*self.positions_.shape)
        multipliers_old_times = (old_times[:-1] + old_times[1:]) / 2
        multipliers_new_times = (new_times[:-1] + new_times[1:]) / 2
        self.direction_constraint_multipliers.data = interpolate_1d_pytorch(
            self.direction_constraint_multipliers, multipliers_old_times, multipliers_new_times)

    def calculate_distances(self) -> torch.Tensor:
        points = self.positions[:, :, :2]
        drone_distances = torch.linalg.norm(points[1:] - points[:-1], dim=2)
        return torch.max(drone_distances, dim=1).values

    @property
    def optimized_parameters(self):
        return [self.positions_]


@dataclasses.dataclass
class OptimizerImplConfig:
    lr: float
    beta1: float
    beta2: float


class OptimizerImpl:
    def __init__(self, parameters: OptimizerImplConfig):
        self._optimizer = None
        self._parameters = parameters

    def setup(self, model_parameters):
        self._optimizer = torch.optim.Adam(model_parameters, lr=self._parameters.lr,
                                           betas=(self._parameters.beta1, self._parameters.beta2))

    def zero_grad(self):
        self._optimizer.zero_grad()

    def step(self):
        self._optimizer.step()


@dataclasses.dataclass
class OptimizerWithLagrangeMultipliersConfig(OptimizerImplConfig):
    lagrange_multiplier_lr: float
    base_lr: float
    max_lr: float
    step_size_up: int
    step_size_down: int


class OptimizerWithLagrangeMultipliers(OptimizerImpl):
    def __init__(self, parameters: OptimizerWithLagrangeMultipliersConfig):
        super().__init__(parameters)
        self._parameters = parameters
        self._lagrange_multiplier_parameters = None
        self._scheduler = None

    # noinspection PyMethodOverriding
    def setup(self, model_parameters, lagrange_multiplier_parameters):
        self._optimizer = torch.optim.Adam(model_parameters, lr=self._parameters.lr,
                                           betas=(self._parameters.beta1, self._parameters.beta2))
        self._scheduler = torch.optim.lr_scheduler.CyclicLR(self._optimizer, base_lr=self._parameters.base_lr,
                                                            max_lr=self._parameters.max_lr,
                                                            step_size_up=self._parameters.step_size_up,
                                                            step_size_down=self._parameters.step_size_down,
                                                            cycle_momentum=False)
        self._lagrange_multiplier_parameters = lagrange_multiplier_parameters

    def zero_grad(self):
        self._optimizer.zero_grad()
        for p in self._lagrange_multiplier_parameters:
            p.grad = None

    def step(self):
        self._optimizer.step()
        with torch.no_grad():
            for p in self._lagrange_multiplier_parameters:
                p.data += self._parameters.lagrange_multiplier_lr * p.grad
        self._scheduler.step()


class PathOptimizedStateInitializer:
    def __init__(self, planner_task: PathPlannerTask, path_state_count: int, device: str):
        self._planner_task = planner_task
        self._path_state_count = path_state_count
        self._device = device

    def init(self) -> PathOptimizedState:
        with torch.no_grad():
            positions = self._initialize_positions()
            start_position=torch.tensor(self._planner_task.start.as_vec(), requires_grad=False, device=self._device,
                                        dtype=torch.float32)
            goal_position=torch.tensor(self._planner_task.goal.as_vec(), requires_grad=False, device=self._device,
                                       dtype=torch.float32)
            goal_position[:, 2] = start_position[:, 2] + wrap_angles(goal_position[:, 2] - start_position[:, 2])
            return PathOptimizedState(
                positions_=positions[1:-1].clone().detach().requires_grad_(True),
                start_position=start_position,
                goal_position=goal_position,
                direction_constraint_multipliers=torch.zeros(self._path_state_count + 1, len(self._planner_task.start), requires_grad=True,
                                                             device=self._device, dtype=torch.float32)
            )

    def _initialize_positions(self) -> torch.Tensor:
        start_point: np.ndarray = self._planner_task.start.as_vec()
        goal_point: np.ndarray = self._planner_task.goal.as_vec()
        trajectory_length = self._path_state_count + 2
        trajectory = torch.zeros(self._path_state_count + 2, len(start_point), 3, requires_grad=True,
                                 device=self._device, dtype=torch.float32)
        for i in range(len(start_point)):
            trajectory[:, i, 0] = torch.linspace(start_point[i, 0], goal_point[i, 0], trajectory_length)
            trajectory[:, i, 1] = torch.linspace(start_point[i, 1], goal_point[i, 1], trajectory_length)
            trajectory[:, i, 2] = start_point[i, 2] + torch.linspace(0, wrap_angles(goal_point[i, 2] - start_point[i, 2]), trajectory_length)
        return trajectory


@dataclasses.dataclass
class PathLossBuilderConfig:
    regularization_weight: float
    collision_weight: float
    direction_constraint_weight: float
    second_differences_weight: float


class PathLossBuilder:
    def __init__(self, planner_task: PathPlannerTask, parameters: PathLossBuilderConfig):
        self._parameters = parameters
        self._planner_task = planner_task

    def get_loss(self, collision_model: nn.Module, optimized_state: PathOptimizedState):
        loss = self._parameters.regularization_weight * self._distance_loss(optimized_state)
        loss = loss + self._parameters.collision_weight * self._collision_loss(collision_model, optimized_state)
        loss = loss + self._direction_constraint_loss(optimized_state)
        loss = loss + self._parameters.second_differences_weight * self._second_differences_loss(optimized_state)
        return loss

    @staticmethod
    def _distance_loss(optimized_state: PathOptimizedState):
        points = optimized_state.positions
        delta = points[1:] - points[:-1]
        delta[:, :, 2] = wrap_angles(delta[:, :, 2])
        return torch.mean(torch.abs(delta) ** 1.5)

    def _collision_loss(self, collision_model: nn.Module, optimized_state: PathOptimizedState):
        positions: torch.Tensor = optimized_state.positions
        positions = self.get_intermediate_points(positions)
        positions = positions.reshape(positions.shape[0], -1)
        return torch.mean(torch.nn.functional.softplus(collision_model(positions)))

    @staticmethod
    def get_intermediate_points(positions):
        t = torch.rand(positions.shape[0] - 1)
        delta = positions[1:] - positions[:-1]
        delta[:, :, 2] = wrap_angles(delta[:, :, 2])
        return positions[1:] + t[:, None, None] * delta

    def _direction_constraint_loss(self, optimized_state: PathOptimizedState):
        deltas = self.non_holonomic_constraint_deltas(optimized_state.positions)
        return self._parameters.direction_constraint_weight * torch.mean(deltas ** 2) + torch.mean(optimized_state.direction_constraint_multipliers * deltas)

    @staticmethod
    def non_holonomic_constraint_deltas(positions):
        dx = positions[1:, :, 0] - positions[:-1, :, 0]
        dy = positions[1:, :, 1] - positions[:-1, :, 1]
        angles = positions[:, :, 2]
        delta_angles = wrap_angles(angles[1:] - angles[:-1])
        mean_angles = angles[:-1] + delta_angles / 2
        return dx * torch.sin(mean_angles) - dy * torch.cos(mean_angles)

    def _second_differences_loss(self, optimized_state: PathOptimizedState):
        return torch.mean((optimized_state.positions[2:] - 2 * optimized_state.positions[1:-1] + optimized_state.positions[:-2]) ** 2)


class GradPreconditioner:
    def __init__(self, device, velocity_hessian_weight: float):
        self._device = device
        self._velocity_hessian_weight = velocity_hessian_weight
        self._inverse_hessian = None

    def precondition(self, optimized_state: PathOptimizedState):
        point_count = optimized_state.positions_.shape[0]
        if self._inverse_hessian is None:
            self._inverse_hessian = self._calculate_inv_hessian(point_count)
        reshaped_grad = optimized_state.positions_.grad.reshape(point_count, -1)
        reshaped_grad = self._inverse_hessian @ reshaped_grad
        optimized_state.positions_.grad = reshaped_grad.reshape(optimized_state.positions_.grad.shape)

    def _calculate_inv_hessian(self, point_count):
        hessian = self._velocity_hessian_weight * self._calculate_velocity_hessian(point_count) + np.eye(point_count)
        inv_hessian = np.linalg.inv(hessian)
        return torch.tensor(inv_hessian.astype(np.float32), device=self._device)

    @staticmethod
    def _calculate_velocity_hessian(point_count):
        hessian = np.zeros((point_count, point_count), dtype=np.float32)
        for i in range(point_count):
            hessian[i, i] = 4
            if i > 0:
                hessian[i, i - 1] = -2
                hessian[i - 1, i] = -2
        return hessian


class PathOptimizer:
    def __init__(self, timer: Timer, optimizer: OptimizerWithLagrangeMultipliers, loss_builder: PathLossBuilder,
                 state_initializer: PathOptimizedStateInitializer, grad_preconditioner: GradPreconditioner):
        self._loss_builder = loss_builder
        self._optimizer = optimizer
        self._timer = timer
        self._grad_preconditioner = grad_preconditioner
        self._state_initializer = state_initializer
        self._optimized_state: Optional[PathOptimizedState] = None

    def setup(self):
        self._optimized_state = self._state_initializer.init()
        self._optimizer.setup(self._optimized_state.optimized_parameters,
                              [self._optimized_state.direction_constraint_multipliers])

    def step(self, collision_model):
        self._optimizer.zero_grad()
        loss = self._loss_builder.get_loss(collision_model, self._optimized_state)
        self._timer.tick("trajectory_backward")
        loss.backward()
        self._timer.tock("trajectory_backward")
        self._timer.tick("inv_hes_grad_multiplication")
        self._grad_preconditioner.precondition(self._optimized_state)
        self._timer.tock("inv_hes_grad_multiplication")
        self._timer.tick("trajectory_optimizer_step")
        self._optimizer.step()
        self._timer.tock("trajectory_optimizer_step")

    def reparametrize(self):
        with torch.no_grad():
            self._optimized_state.reparametrize()

    @property
    def result_path(self) -> ResultPath:
        return self._optimized_state.result_path


class CollisionModelPointSampler:
    def __init__(self, fine_random_offset: float, course_random_offset: float, angle_random_offset):
        self._fine_random_offset = fine_random_offset
        self._course_random_offset = course_random_offset
        self._angle_random_offset = angle_random_offset

    def sample(self, result_path: ResultPath) -> np.ndarray:
        positions: np.ndarray = result_path.numpy_positions
        points = positions[:, :, :2]
        angles = positions[:, :, 2]

        course_points = points + np.random.randn(*points.shape) * self._course_random_offset
        fine_points = points + np.random.randn(*points.shape) * self._fine_random_offset
        points = np.concatenate([course_points, fine_points], axis=0)
        angles = np.concatenate([angles, angles], axis=0) + np.random.randn(2 * angles.shape[0], angles.shape[1]) * self._angle_random_offset

        positions = np.concatenate([points, angles[:, :, None]], axis=2)
        return positions


class CollisionModelFactory:
    def __init__(self, parameters: ONFModelConfig):
        self._parameters = parameters

    def make_collision_model(self):
        return ONF(self._parameters)


class CollisionNeuralFieldModelTrainer:
    def __init__(self, timer: Timer, planner_task: PathPlannerTask,
                 collision_model_point_sampler: CollisionModelPointSampler,
                 optimizer: OptimizerImpl, collision_model_factory: CollisionModelFactory,
                 device: str):
        self._timer = timer
        self._collision_model_point_sampler = collision_model_point_sampler
        self._optimizer = optimizer
        self._collision_model_factory = collision_model_factory
        self._planner_task = planner_task
        self._collision_model: Optional[nn.Module] = None
        self._device = device
        self._collision_loss_function = nn.BCEWithLogitsLoss()
        self._timer = timer

    def setup(self):
        self._collision_model = self._collision_model_factory.make_collision_model()
        self._optimizer.setup(self._collision_model.parameters())

    def learning_step(self, path: ResultPath):
        self._timer.tick("optimize_collision_model")
        points = self._collision_model_point_sampler.sample(path)
        self._collision_model.requires_grad_(True)
        self._optimizer.zero_grad()
        predicted_collision = self._calculate_predicted_collision(points)
        truth_collision = self._calculate_truth_collision(points)
        truth_collision = torch.tensor(truth_collision.astype(np.float32), device=self._device)
        loss = self._collision_loss_function(predicted_collision, truth_collision)
        loss.backward()
        self._optimizer.step()
        self._collision_model.requires_grad_(False)
        self._timer.tock("optimize_collision_model")

    def _calculate_predicted_collision(self, positions: np.array):
        positions = positions.reshape(positions.shape[0], -1)
        result = self._collision_model(torch.tensor(positions.astype(np.float32), device=self._device))
        return result

    def _calculate_truth_collision(self, positions: np.array):
        result = self._planner_task.collision_detector.is_collision_for_each_robot_for_list([PositionArray2D.from_vec(x) for x in positions])
        return result

    @property
    def collision_model(self) -> nn.Module:
        return self._collision_model


class NeuralFieldOptimalPathPlanner:
    def __init__(self, planner_task: PathPlannerTask,
                 collision_neural_field_model_trainer: CollisionNeuralFieldModelTrainer, path_optimizer: PathOptimizer,
                 iterations: int, reparametrize_rate: int):
        self._reparametrize_rate = reparametrize_rate
        self._planner_task = planner_task
        self._path_optimizer = path_optimizer
        self._collision_neural_field_model_trainer = collision_neural_field_model_trainer
        self._iterations = iterations

    def plan(self) -> ResultPath:
        self._path_optimizer.setup()
        self._collision_neural_field_model_trainer.setup()
        for i in tqdm(range(self._iterations)):
            self.step()
            if self._reparametrize_rate != -1 and i % self._reparametrize_rate == 0:
                self._path_optimizer.reparametrize()
        self._path_optimizer.reparametrize()
        return self._path_optimizer.result_path

    def step(self):
        path: ResultPath = self._path_optimizer.result_path
        self._collision_neural_field_model_trainer.learning_step(path)
        collision_model = self._collision_neural_field_model_trainer.collision_model
        self._path_optimizer.step(collision_model)

In [8]:
class WarehousePathPlannerResultVisualizer(CollisionDetectionResultVisualizer):
    def visualize(self, detector: CollisionDetector, path: ResultPath):
        self.draw_initial_polygons(detector)
        self.draw_paths(path)
        self.add_frames(detector, [PositionArray2D.from_vec(x) for x in path.numpy_positions])
        self.update_layout()

    def draw_initial_polygons(self, detector):
        super().draw_initial_polygons(detector)

    def draw_paths(self, path: ResultPath):
        positions: np. ndarray = path.numpy_positions
        for i in range(positions.shape[1]):
            xs = []
            ys = []
            for position in positions[:, i]:
                xs.append(position[0])
                xs.append(position[0] + np.cos(position[2]) * 1)
                xs.append(None)
                ys.append(position[1])
                ys.append(position[1] + np.sin(position[2]) * 1)
                ys.append(None)
            trace = go.Scatter(
                x=xs,
                y=ys,
                mode="lines",
                name=f"Robot {i} orientation",
                line=dict(
                    width=2
                )
            )
            self._fig.add_trace(trace)
            trace = go.Scatter(
                x=positions[:, i, 0],
                y=positions[:, i, 1],
                mode="lines+markers",
                name=f"Robot {i}",
                line=dict(
                    width=2
                )
            )
            self._fig.add_trace(trace)

In [9]:
global_timer = Timer()
device_parameter = "cpu"

collision_detector_parameters = {
    "outside_polygon": [
        [-5, -5], [-5, 5], [5, 5], [5, -5]
    ],
    "inside_polygon": [
        [[-5, -5], [-5, -2], [-2, -2], [-2, -5]],
        [[2, -5], [2, -2], [5, -2], [5, -5]],
        [[-5, 2], [-5, 5], [-2, 5], [-2, 2]],
        [[2, 2], [2, 5], [5, 5], [5, 2]],
    ],
    "robot_shapes": [
        [[-1, -0.5], [-1, 0.5], [1, 0.5], [1, -0.5]],
        [[-1, -0.5], [-1, 0.5], [1, 0.5], [1, -0.5]],
        [[-1, -0.5], [-1, 0.5], [1, 0.5], [1, -0.5]],
        [[-1, -0.5], [-1, 0.5], [1, 0.5], [1, -0.5]],
    ]
}

planner_parameters = {
    "collision_neural_field_model_trainer": {
        "collision_model_point_sampler": {
            "course_random_offset": 5.,
            "fine_random_offset": 1.,
            "angle_random_offset": 1,
        },
        "optimizer": {
            "lr": 5e-2,
            "beta1": 0.9,
            "beta2": 0.9
        },
        "collision_model_factory": {
            "mean": 0,
            "sigma": 3.,
            "input_dimension": len(collision_detector_parameters["robot_shapes"]) * 3,
            "encoding_dimension": 128,
            "output_dimension": len(collision_detector_parameters["robot_shapes"]),
            "hidden_dimensions": [128, 128, 128]
        }
    },
    "path_optimizer": {
        "optimizer": {
            "lr": 1e-1,
            "beta1": 0.9,
            "beta2": 0.9,
            "lagrange_multiplier_lr": 1.0,
            "base_lr": 5e-3,
            "max_lr": 5e-1,
            "step_size_up": 1,
            "step_size_down": 100,
        },
        "loss_builder": {
            "regularization_weight": 0.5,
            "collision_weight": 0.06,
            "direction_constraint_weight": 10,
            "second_differences_weight": 1

        },
        "state_initializer": {
            "path_state_count": 100,
        },
        "grad_preconditioner": {
            "velocity_hessian_weight": 0.5,
        }
    },
    "iterations": 1500,
    "reparametrize_rate": 10,
}


collision_detector = CollisionDetectorFactory.make(collision_detector_parameters)
simple_planner_task = PathPlannerTask(
    start=PositionArray2D.from_vec(np.array([[0, -3, np.pi / 2], [-3, 0, 0]])),
    goal=PositionArray2D.from_vec(np.array([[0, 3, np.pi / 2], [3, 0, 0]])),
    collision_detector=collision_detector
)

three_robots_planner_task = PathPlannerTask(
    start=PositionArray2D.from_vec(np.array([[0, -3, np.pi / 2], [-3, 0, 0], [3, 0, 0], [0, 3, np.pi / 2]])),
    goal=PositionArray2D.from_vec(np.array([[0, 3, np.pi / 2], [3, 0, 0], [-3, 0, 0], [0, -3, np.pi / 2]])),
    collision_detector=collision_detector
)

swarm_path_planner = UniversalFactory().make(NeuralFieldOptimalPathPlanner, planner_parameters,
                                             planner_task=three_robots_planner_task, timer=global_timer,
                                             device=device_parameter)
swarm_path: ResultPath = swarm_path_planner.plan()

visualizer_parameters = CollisionDetectionResultVisualizerConfig(
    xmin=-10, xmax=10, ymin=-5, ymax=5)
visualizer = WarehousePathPlannerResultVisualizer(parameters=visualizer_parameters)
visualizer.visualize(collision_detector, swarm_path)
visualizer.save("data/warehouse_path_planner_result.html")

100%|██████████| 1500/1500 [02:29<00:00, 10.05it/s]
