# DRRT star test

In [None]:
%load_ext autoreload
%autoreload 2

San check with one agent

In [None]:
from swarm_prm.utils.gaussian_prm import *
from swarm_prm.envs.roadmap import Roadmap, Obstacle 
from swarm_prm.envs.instance import Instance

roadmap = Roadmap(100, 100)
safety_radius = 2 
pos = np.array([50, 50])
obstacles  = [
    Obstacle(None, "POLYGON", [(40, 0), (40, 40), (80, 40), (80, 0)]),
    Obstacle(None, "POLYGON", [(40, 100), (40, 60), (80, 60), (80, 100)])
]
num_samples = 100 
for obs in obstacles:
    roadmap.add_obstacle(obs)
# fig, ax = roadmap.visualize()
start_mean = np.random.rand(2) * 100
goal_mean = np.random.rand(2) * 100

# Agent config
num_agents = 1
agent_radius = 2
sampling_method = "GAUSSIAN_SAMPLING"
#sampling_method = "SWARMPRM_SAMPLING"
hex_radius = 4
TEST_CVAR = False


# Curated test instance
starts = np.array([[10, 90]])
goals = np.array([[90, 10]])

starts_weight = np.array([1])
goals_weight = np.array([1])

g_starts = [GaussianGraphNode(start, None, "UNIFORM", 10) for start in starts]
g_goals = [GaussianGraphNode(goal, None, "UNIFORM", 10) for goal in goals]

instance = Instance(roadmap, g_starts, g_goals)

gaussian_prm = GaussianPRM(instance, num_samples, safety_radius=safety_radius,
                           hex_radius=hex_radius)

gaussian_prm.sample_free_space("CVT", "CVAR")
gaussian_prm.build_roadmap(roadmap_method="TRIANGULATION", radius=30)
gaussian_prm.visualize_roadmap()
gaussian_prm.visualize_g_nodes()

import pickle

with open("solutions/drrt_single_agent_roadmap.pkl", "wb") as f:
    pickle.dump(gaussian_prm, f)

In [None]:
import pickle
from swarm_prm.utils import get_agent_assignment
from swarm_prm.solvers.macro.drrt_star import DRRT_Star

with open("solutions/drrt_single_agent_roadmap.pkl", "rb") as f:
    gaussian_prm = pickle.load(f)

num_agents = 1
agent_radius = 1

starts_agent_count = get_agent_assignment(num_agents, starts_weight)
goals_agent_count = get_agent_assignment(num_agents, goals_weight)

drrt_solver = DRRT_Star(gaussian_prm, num_agents, agent_radius, starts_agent_count, goals_agent_count)
p, cost= drrt_solver.get_solution()

In [None]:
import matplotlib.pyplot as plt

fig, ax = gaussian_prm.visualize_roadmap()

for agent in range(num_agents):
    locations = [gaussian_prm.samples[location_idx[0]] for location_idx in p]
    x = [loc[0] for loc in locations]
    y = [loc[1] for loc in locations]
    ax.plot(x, y, "-")

plt.show()

Multi-agent DRRT

In [None]:
from swarm_prm.utils.gaussian_prm import *
from swarm_prm.envs.roadmap import Roadmap, Obstacle 
from swarm_prm.envs.instance import Instance

roadmap = Roadmap(100, 100)
safety_radius = 2 
pos = np.array([50, 50])
obstacles  = [
    Obstacle(None, "POLYGON", [(40, 0), (40, 40), (80, 40), (80, 0)]),
    Obstacle(None, "POLYGON", [(40, 100), (40, 60), (80, 60), (80, 100)])
]
num_samples = 500
for obs in obstacles:
    roadmap.add_obstacle(obs)
# fig, ax = roadmap.visualize()
start_mean = np.random.rand(2) * 100
goal_mean = np.random.rand(2) * 100

hex_radius = 4
TEST_CVAR = False


# Curated test instance
starts = np.array([[10, 90], [10, 10]])
goals = np.array([[90, 10], [90, 90]])

starts_weight = np.array([.5, .5])
goals_weight = np.array([.5, .5])

g_starts = [GaussianGraphNode(start, None, "UNIFORM", 10) for start in starts]
g_goals = [GaussianGraphNode(goal, None, "UNIFORM", 10) for goal in goals]
instance = Instance(roadmap, g_starts, g_goals)
gaussian_prm = GaussianPRM(instance, num_samples, safety_radius=safety_radius,
                           hex_radius=hex_radius)

gaussian_prm.sample_free_space("CVT", "CVAR")
gaussian_prm.build_roadmap(roadmap_method="TRIANGULATION", radius=50)
gaussian_prm.visualize_roadmap()
gaussian_prm.visualize_g_nodes()

import pickle

with open("solutions/drrt_multi_agent_roadmap.pkl", "wb") as f:
    pickle.dump(gaussian_prm, f)

In [None]:
import pickle
from swarm_prm.solvers.macro.drrt_star import DRRT_Star
from swarm_prm.utils import get_agent_assignment


with open("solutions/drrt_multi_agent_roadmap.pkl", "rb") as f:
    gaussian_prm = pickle.load(f)

num_agents = 3
agent_radius = 2

starts_agent_count = get_agent_assignment(num_agents, starts_weight)
goals_agent_count = get_agent_assignment(num_agents, goals_weight)

drrt_solver = DRRT_Star(gaussian_prm, num_agents, agent_radius, starts_agent_count, goals_agent_count, max_time=300)
paths, cost= drrt_solver.get_solution()

assert paths is not None

simple_paths = []
for agent in range(num_agents):
    simple_paths.append([gaussian_prm.samples[loc[agent]] for loc in paths])


In [None]:
import matplotlib.pyplot as plt

fig, ax = gaussian_prm.visualize_roadmap()
for agent in range(num_agents):
    locations = [gaussian_prm.samples[location_idx[agent]] for location_idx in paths]

    x = [loc[0] for loc in locations]
    y = [loc[1] for loc in locations]
    ax.plot(x, y, "-")

plt.show()

In [None]:
# Add interpolations 
import numpy as np

def add_linear_interpolation_points(path, subdiv=5):
    """
    Insert linearly-interpolated positions between consecutive points.
    
    Parameters
    ----------
    path : array-like of shape (N, 2)
        The original list/array of N 2D points, e.g. [(x1,y1), (x2,y2), ...].
    subdiv : int
        Number of intervals to subdivide each segment. 
        For each original segment, we will add 'subdiv - 1' new interior points 
        (except that we skip duplicates at the shared boundary).

    Returns
    -------
    new_path : np.ndarray of shape (M, 2)
        The new path including the original points and the added interpolation points.
    """
    path = np.asarray(path)
    new_path = []

    # Iterate over pairs of consecutive points
    for i in range(len(path) - 1):
        start = path[i]
        end = path[i + 1]
        
        # Create subdiv+1 points in [start, end] using linspace for each dimension
        xs = np.linspace(start[0], end[0], subdiv + 1)
        ys = np.linspace(start[1], end[1], subdiv + 1)
        
        # Add each intermediate point except the very last one
        # to avoid duplicating the next segment's start
        for j in range(subdiv):
            new_path.append([xs[j], ys[j]])

    # Finally add the very last point of the last segment
    new_path.append(path[-1].tolist())
    
    return np.array(new_path)

interpolated_paths = []
for p in simple_paths:
    new_path = add_linear_interpolation_points(p, 50)
    interpolated_paths.append(new_path)


In [None]:
import matplotlib.pyplot as plt
from matplotlib.patches import Circle
from matplotlib.animation import FuncAnimation

def animate_solution(fig, ax, speed, paths):
    """
        Visualize solution trajectory provided instance
    """
    
    agents = []
    def init():
        for agent in agents:
            agent.remove()
        agents.clear()
        return []

    def update(frame):
        idx = frame * speed
        for agent in agents:
            agent.remove()
        agents.clear()
        cmap = plt.get_cmap("tab10")
        locs = [path[idx] for path in paths]
        for i, loc in enumerate(locs):
            agent = ax.add_patch(Circle(loc, radius=agent_radius, color=cmap(i%10)))
            agents.append(agent)
        return agents

    anim = FuncAnimation(fig, update, frames=len(paths[0])// speed , 
                         init_func=init, blit=True, interval=10)
    anim.save("solutions/drrt_star_solution.gif", writer='pillow', fps=6)

# fig, ax = instance.visualize()
fig, ax = gaussian_prm.visualize_g_nodes()
animate_solution(fig, ax, 10, interpolated_paths)
