In [30]:
import numpy as np
import random
from typing import Tuple, Literal, Union, Optional, List, Dict, NamedTuple
from queue import Queue

In [45]:
class Map(NamedTuple):
    grid: np.ndarray
    start: Tuple[int, ...]
    goal: Tuple[int, ...]
    obstacles: List[Tuple[int, ...]]
    size: Tuple[int, int]


In [46]:
import numpy as np
import random
from typing import Tuple, Literal, Union, Optional, List, Dict
from queue import Queue

class MapGenerator:
    def __init__(
        self,
        map_type: Literal["random", "multi_narrow", "maze"] = "random",
        map_size: Union[Tuple[int, int], Tuple[int, int, int]] = (50, 50),
        obstacle_percent: float = 0.2,
        min_obstacle_size: Union[Tuple[int, int], Tuple[int, int, int]] = (2, 2),
        max_obstacle_size: Union[Tuple[int, int], Tuple[int, int, int]] = (5, 5),
        max_obstacle_count: Optional[int] = None
    ):
        self.map_type = map_type
        self.map_size = map_size
        self.obstacle_percent = obstacle_percent
        self.obstacle_list: List[Tuple[int, ...]] = []
        self.min_size = min_obstacle_size
        self.max_size = max_obstacle_size
        self.max_count = max_obstacle_count
        self.is_3d = len(map_size) == 3
        self.map = self._init_map()
        self.start = None
        self.goal = None

    def _init_map(self):
        shape = self.map_size[::-1] if self.is_3d else (self.map_size[1], self.map_size[0])
        return np.zeros(shape, dtype=np.uint8)

    def generate(self, start: Tuple[int, ...], goal: Tuple[int, ...]) -> Dict[str, Union[np.ndarray, Tuple[int, ...], List[Tuple[int, ...]], Tuple[int, int]]]:
        self.start = start
        self.goal = goal
        tries = 0
        max_tries = 100

        while tries < max_tries:
            self.map = self._init_map()
            self.obstacle_list.clear()

            if self.map_type == "maze":
                self.map = self._generate_maze()
            elif self.map_type == "random":
                self._generate_random_obstacles(start, goal)
            elif self.map_type == "multi_narrow":
                self._generate_multi_narrow(start, goal)

            if self._path_exists(start, goal):
                return Map(
                    grid=self.map,
                    start=start,
                    goal=goal,
                    obstacles=self.obstacle_list,
                    size=self.map_size
                )
            tries += 1

        raise RuntimeError("Failed to generate a connected map after multiple attempts.")

    def _add_obstacle(self, coords: Tuple[int, ...]):
        self.obstacle_list.append(coords)

    def _path_exists(self, start, goal):
        visited = set()
        q = Queue()
        q.put(start)
        visited.add(start)

        dims = len(start)
        neighbors = [(-1,0), (1,0), (0,-1), (0,1)] if dims == 2 else \
                    [(-1,0,0),(1,0,0),(0,-1,0),(0,1,0),(0,0,-1),(0,0,1)]

        while not q.empty():
            node = q.get()
            if node == goal:
                return True

            for delta in neighbors:
                neighbor = tuple(node[i] + delta[i] for i in range(dims))
                if self._in_bounds(neighbor) and neighbor not in visited:
                    if self._is_free(neighbor):
                        visited.add(neighbor)
                        q.put(neighbor)
        return False

    def _in_bounds(self, p):
        if self.is_3d:
            z, y, x = p
            return 0 <= z < self.map.shape[0] and 0 <= y < self.map.shape[1] and 0 <= x < self.map.shape[2]
        else:
            y, x = p
            return 0 <= y < self.map.shape[0] and 0 <= x < self.map.shape[1]

    def _is_free(self, p):
        if self.is_3d:
            z, y, x = p
            return self.map[z, y, x] == 0
        else:
            y, x = p
            return self.map[y, x] == 0

    def _is_inside(self, point, x, y, z, ow, oh, od):
        px, py, *pz = point
        if z is None:
            return x <= px < x + ow and y <= py < y + oh
        else:
            pz = pz[0]
            return x <= px < x + ow and y <= py < y + oh and z <= pz < z + od

    def _generate_random_obstacles(self, start, goal):
        if self.is_3d:
            w, h, d = self.map_size
            total_voxels = w * h * d
            max_obs = int(total_voxels * self.obstacle_percent)
            count = 0
            for _ in range(10000):
                ow = random.randint(self.min_size[0], self.max_size[0])
                oh = random.randint(self.min_size[1], self.max_size[1])
                od = random.randint(self.min_size[2], self.max_size[2])
                x = random.randint(0, w - ow - 1)
                y = random.randint(0, h - oh - 1)
                z = random.randint(0, d - od - 1)
                if np.any(self.map[z:z+od, y:y+oh, x:x+ow]):
                    continue
                if self._is_inside(start, x, y, z, ow, oh, od) or self._is_inside(goal, x, y, z, ow, oh, od):
                    continue
                self.map[z:z+od, y:y+oh, x:x+ow] = 1
                self._add_obstacle((x, y, z, ow, oh, od))
                count += ow * oh * od
                if self.max_count and count >= self.max_count:
                    break
                if count >= max_obs:
                    break
        else:
            w, h = self.map_size
            total_cells = w * h
            max_obs = int(total_cells * self.obstacle_percent)
            count = 0
            for _ in range(10000):
                ow = random.randint(self.min_size[0], self.max_size[0])
                oh = random.randint(self.min_size[1], self.max_size[1])
                x = random.randint(0, w - ow - 1)
                y = random.randint(0, h - oh - 1)
                if np.any(self.map[y:y+oh, x:x+ow]):
                    continue
                if self._is_inside(start, x, y, None, ow, oh, None) or self._is_inside(goal, x, y, None, ow, oh, None):
                    continue
                self.map[y:y+oh, x:x+ow] = 1
                self._add_obstacle((x, y, ow, oh))
                count += ow * oh
                if self.max_count and count >= self.max_count:
                    break
                if count >= max_obs:
                    break

    def _generate_multi_narrow(self, start, goal):
        if self.is_3d:
            self._generate_random_obstacles(start, goal)
        else:
            w, h = self.map_size
            corridor_width = 2
            spacing = 6
            for i in range(spacing, h - spacing, spacing + corridor_width):
                self.map[i:i+spacing, :] = 1
            self._carve_corridor(start, goal)

    def _carve_corridor(self, start, goal):
        if not self.is_3d:
            y0, x0 = start[1], start[0]
            y1, x1 = goal[1], goal[0]
            for x in range(min(x0, x1), max(x0, x1)+1):
                self.map[y0, x] = 0
            for y in range(min(y0, y1), max(y0, y1)+1):
                self.map[y, x1] = 0

    def _generate_maze(self):
        width, height = self.map_size
        if width % 2 == 0: width += 1
        if height % 2 == 0: height += 1

        maze = np.ones((height, width), dtype=np.uint8)
        sx, sy = 1, 1
        maze[sy, sx] = 0

        walls = [(sx + dx, sy + dy) for dx, dy in [(-2, 0), (2, 0), (0, -2), (0, 2)]
                 if 0 < sx + dx < width and 0 < sy + dy < height]

        while walls:
            wx, wy = walls.pop(random.randint(0, len(walls) - 1))
            if maze[wy, wx] == 1:
                neighbors = [(wx + dx, wy + dy) for dx, dy in [(-2, 0), (2, 0), (0, -2), (0, 2)]
                             if 0 < wx + dx < width and 0 < wy + dy < height and maze[wy + dy, wx + dx] == 0]
                if len(neighbors) == 1:
                    nx, ny = neighbors[0]
                    maze[(wy + ny) // 2, (wx + nx) // 2] = 0
                    maze[wy, wx] = 0
                    for dx, dy in [(-2, 0), (2, 0), (0, -2), (0, 2)]:
                        nx, ny = wx + dx, wy + dy
                        if 0 < nx < width and 0 < ny < height and maze[ny, nx] == 1:
                            walls.append((nx, ny))

        return maze



In [51]:
import plotly.graph_objs as go

def visualize_map_shapes(
    map_array: np.ndarray,
    start: Optional[Tuple[int, ...]] = None,
    goal: Optional[Tuple[int, ...]] = None,
    obs:List[Tuple[int, ...]] = None,
    path: Optional[List[Tuple[float, ...]]] = None,
    visited: Optional[List[Tuple[float, ...]]] = None,
    title: str = "Map Visualization"
):
    fig = go.Figure()

    if map_array.ndim == 2:
        height, width = map_array.shape
        
        for x, y, w, h in obs:
                fig.add_shape(
                    type="rect",
                    x0=x, x1=x+w, y0=y, y1=y+h,
                    fillcolor="purple",opacity=0.5,
                    line=dict(width=0)
                )

        # 경로
        if path:
            px, py = zip(*path)
            fig.add_trace(go.Scatter(
                x=px, y=py, mode="lines+markers",
                line=dict(color="green"),
                marker=dict(size=6),
                name="Path"
            ))

        # 방문 노드
        if visited:
            vx, vy = zip(*visited)
            fig.add_trace(go.Scatter(
                x=vx, y=vy, mode="markers",
                marker=dict(size=4, color="blue"),
                name="Visited"
            ))

        # 시작/목표
        if start:
            fig.add_trace(go.Scatter(
                x=[start[0]], y=[start[1]], mode="markers",
                marker=dict(size=10, color="red"),
                name="Start"
            ))

        if goal:
            fig.add_trace(go.Scatter(
                x=[goal[0]], y=[goal[1]], mode="markers",
                marker=dict(size=10, color="orange"),
                name="Goal"
            ))

        fig.add_shape(
            type="rect",
            x0=0, y0=0,
            x1=width, y1=height,
            line=dict(color="white", width=3),
            fillcolor="rgba(0,0,0,0)",  # 투명 내부
            layer="above"
        )

        fig.update_layout(
            title=title,
            xaxis=dict(scaleanchor="y", showgrid=False),
            # yaxis=dict(showgrid=False, autorange="reversed"),
            yaxis=dict(showgrid=False),
            height=600, width=600
        )

    elif map_array.ndim == 3:
        z, y, x = map_array.nonzero()
        x, y, z = list(x), list(y), list(z)

        # for x,y,w,h in obs:
        #     fig.add_trace(go.Mesh3d(
        #         x=x, y=y, z=z,
        #         color='black',
        #         opacity=1.0,
        #         alphahull=0,
        #         name='Obstacles'
        #     ))

        fig.add_trace(go.Mesh3d(
            x=x, y=y, z=z,
            color='black',
            opacity=1.0,
            alphahull=0,
            name='Obstacles'
        ))

        if visited:
            vx, vy, vz = zip(*visited)
            fig.add_trace(go.Scatter3d(
                x=vx, y=vy, z=vz,
                mode='markers',
                marker=dict(size=2, color='blue'),
                name='Visited'
            ))

        if path:
            px_, py_, pz_ = zip(*path)
            fig.add_trace(go.Scatter3d(
                x=px_, y=py_, z=pz_,
                mode='lines+markers',
                marker=dict(size=3, color='green'),
                name='Path'
            ))

        if start:
            fig.add_trace(go.Scatter3d(
                x=[start[0]], y=[start[1]], z=[start[2]],
                mode='markers',
                marker=dict(size=5, color='red'),
                name='Start'
            ))

        if goal:
            fig.add_trace(go.Scatter3d(
                x=[goal[0]], y=[goal[1]], z=[goal[2]],
                mode='markers',
                marker=dict(size=5, color='orange'),
                name='Goal'
            ))

        fig.update_layout(
            title=title,
            scene=dict(aspectmode='data'),
            height=700, width=700
        )

        

    fig.show()


In [65]:
gen = MapGenerator(map_type="random", map_size=(100, 100), min_obstacle_size=(5, 5), max_obstacle_size=(15, 15), obstacle_percent=0.32)
m = gen.generate(start=(1, 1), goal=(99, 99))
visualize_map_shapes(m.grid, obs=m.obstacles, start=m.start, goal=m.goal)

In [13]:
m.keys()

dict_keys(['grid', 'start', 'goal', 'obstacles', 'bound'])

In [None]:
import plotly.express as px

class MultiMapBenchmarker:
    def __init__(
        self,
        maps: List[np.ndarray],
        starts: List[Tuple],
        goals: List[Tuple],
        algorithm: Callable[[np.ndarray, Tuple, Tuple], Dict[str, Any]],
        name: str = "Algorithm",
        goal_tolerance: float = 1.0
    ):
        assert len(maps) == len(starts) == len(goals), "Length mismatch"
        self.maps = maps
        self.starts = starts
        self.goals = goals
        self.algorithm = algorithm
        self.name = name
        self.goal_tolerance = goal_tolerance
        self.results_df: pd.DataFrame = pd.DataFrame()

    def run(self) -> pd.DataFrame:
        results = []
        for i, (map_, start, goal) in enumerate(zip(self.maps, self.starts, self.goals)):
            start_time = time.time()
            try:
                output = self.algorithm(map_, start, goal)
            except Exception as e:
                output = {"path": [], "visited": [], "nodes": 0}
            end_time = time.time()

            path = output.get("path", [])
            visited = output.get("visited", [])
            num_nodes = output.get("nodes", len(visited))
            success = self._determine_success(path, goal)
            path_length = self._compute_path_length(path)

            results.append({
                "map_id": i,
                "algorithm": self.name,
                "success": success,
                "time_taken": end_time - start_time,
                "num_nodes": num_nodes,
                "path_length": path_length
            })

        self.results_df = pd.DataFrame(results)
        return self.results_df

    def _determine_success(self, path: List[Tuple[float, ...]], goal: Tuple) -> bool:
        if not path or len(path) < 2:
            return False
        return np.linalg.norm(np.array(path[-1]) - np.array(goal)) < self.goal_tolerance

    def _compute_path_length(self, path: List[Tuple[float, ...]]) -> float:
        if not path or len(path) < 2:
            return 0.0
        return sum(np.linalg.norm(np.array(path[i]) - np.array(path[i+1])) for i in range(len(path) - 1))

    def save_results(self, filename: str):
        if self.results_df.empty:
            raise RuntimeError("No results to save. Run benchmark first.")
        self.results_df.to_csv(filename, index=False)

    def plot_metrics(self, metric: str = "time_taken"):
        if self.results_df.empty:
            raise RuntimeError("No results to plot. Run benchmark first.")
        if metric not in self.results_df.columns:
            raise ValueError(f"Invalid metric: {metric}")

        fig = px.bar(
            self.results_df,
            x="map_id",
            y=metric,
            color="success",
            title=f"{self.name} - {metric} per map",
            labels={"map_id": "Map ID", metric: metric.replace('_', ' ').title()}
        )
        fig.show()


array([[0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       ...,
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0],
       [0, 0, 0, ..., 0, 0, 0]], shape=(100, 100), dtype=uint8)

In [15]:
from typing import List, Tuple, Dict, Any
import numpy as np
import random

class RRTPlanner:
    def __init__(
        self,
        bounds: Tuple[int, int],
        start: Tuple[float, float],
        goal: Tuple[float, float],
        obstacles: List[Tuple[float, float, float, float]],
        max_iter: int = 5000
    ):
        self.width, self.height = bounds
        self.start = start
        self.goal = goal
        self.obstacles = obstacles
        self.max_iter = max_iter

        self.step_size = 1.0
        self.goal_sample_rate = 0.05
        self.goal_tolerance = 1.5

        self.tree: Dict[Tuple[float, float], Tuple[float, float]] = {start: None}
        self.visited: List[Tuple[float, float]] = [start]
        self.path: List[Tuple[float, float]] = []

    def plan(self) -> Dict[str, Any]:
        for _ in range(self.max_iter):
            rnd = self._sample()
            nearest = self._nearest_node(rnd)
            new_node = self._steer(nearest, rnd)

            if not self._in_bounds(new_node):
                continue
            if not self._collision_free(nearest, new_node):
                continue

            self.tree[new_node] = nearest
            self.visited.append(new_node)

            if np.linalg.norm(np.array(new_node) - np.array(self.goal)) < self.goal_tolerance:
                self.tree[self.goal] = new_node
                break

        if self.goal in self.tree:
            self._build_path()
        return {
            "path": self.path,
            "visited": self.visited,
            "nodes": len(self.tree)
        }

    def _sample(self) -> Tuple[float, float]:
        if random.random() < self.goal_sample_rate:
            return self.goal
        return (random.uniform(0, self.width), random.uniform(0, self.height))

    def _nearest_node(self, p: Tuple[float, float]) -> Tuple[float, float]:
        return min(self.tree.keys(), key=lambda n: np.linalg.norm(np.array(n) - np.array(p)))

    def _steer(self, from_node: Tuple[float, float], to_node: Tuple[float, float]) -> Tuple[float, float]:
        from_np = np.array(from_node)
        to_np = np.array(to_node)
        direction = to_np - from_np
        distance = np.linalg.norm(direction)
        if distance == 0:
            return from_node
        step = self.step_size * direction / distance
        return tuple(from_np + step)

    def _in_bounds(self, p: Tuple[float, float]) -> bool:
        x, y = p
        return 0 <= x < self.width and 0 <= y < self.height

    def _collision_free(self, p1: Tuple[float, float], p2: Tuple[float, float]) -> bool:
        steps = int(np.linalg.norm(np.array(p2) - np.array(p1)) / 0.5)
        for i in range(steps + 1):
            x = p1[0] + (p2[0] - p1[0]) * i / steps
            y = p1[1] + (p2[1] - p1[1]) * i / steps
            for ox, oy, ow, oh in self.obstacles:
                if ox <= x <= ox + ow and oy <= y <= oy + oh:
                    return False
        return True

    def _build_path(self):
        self.path = [self.goal]
        while self.path[-1] != self.start:
            self.path.append(self.tree[self.path[-1]])
        self.path.reverse()


In [66]:
planner = RRTPlanner(
    bounds=m.size,
    start=m.start,
    goal=m.goal,
    obstacles=m.obstacles,
    max_iter=5000
)

In [67]:
result = planner.plan()

In [56]:
result.keys()

dict_keys(['path', 'visited', 'nodes'])

In [68]:
visualize_map_shapes(m.grid, start=m.start, goal=m.goal, obs=m.obstacles, path=result['path'], visited=result['visited'])