* 경로 생성기 (WaypointGraph + Dijkstra)

In [1]:
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import math
import heapq

@dataclass
class Pose2D:
    x: float
    y: float
    yaw: float = 0.0

class WaypointGraph:
    def __init__(self, waypoints: Dict[str, Pose2D], edges: Dict[str, List[str]]):
        self.waypoints = waypoints
        self.edges = edges

    def dist(self, a: str, b: str) -> float:
        pa, pb = self.waypoints[a], self.waypoints[b]
        return math.hypot(pa.x - pb.x, pa.y - pb.y)

    def dijkstra(self, start: str, goal: str) -> List[str]:
        """가중치=유클리드 거리인 최단 경로 노드 시퀀스 반환. 실패하면 []"""
        if start == goal:
            return [start]
        if start not in self.waypoints or goal not in self.waypoints:
            return []

        pq: List[Tuple[float, str]] = [(0.0, start)]
        dist: Dict[str, float] = {start: 0.0}
        prev: Dict[str, Optional[str]] = {start: None}

        while pq:
            cost, u = heapq.heappop(pq)

            # 이미 더 좋은 값으로 방문된 적 있으면 스킵 (다익스트라 최적화)
            if cost != dist.get(u, 1e18):
                continue

            if u == goal:
                break

            for v in self.edges.get(u, []):
                if v not in self.waypoints:
                    continue
                nd = cost + self.dist(u, v)
                if nd < dist.get(v, 1e18):
                    dist[v] = nd
                    prev[v] = u
                    heapq.heappush(pq, (nd, v))

        if goal not in prev:
            return []

        # reconstruct
        path = []
        cur: Optional[str] = goal
        while cur is not None:
            path.append(cur)
            cur = prev.get(cur)
        path.reverse()
        return path

    def path_length(self, path_nodes: List[str]) -> float:
        """경로 총 길이(거리)"""
        if len(path_nodes) <= 1:
            return 0.0
        total = 0.0
        for i in range(len(path_nodes) - 1):
            total += self.dist(path_nodes[i], path_nodes[i+1])
        return total


* Pinky 그래프 입력 (waypoints / edges)

In [2]:
# ---- Waypoints / Graph ----
waypoints = {
    "A": Pose2D(0.0,    0.0,    0.0),
    "B": Pose2D(0.060,  0.395,  0.070),
    "C": Pose2D(0.4506, 0.4250, 1.6023),
    "D": Pose2D(0.5491, -0.4896, -1.6009),
    "M": Pose2D(0.2692, -1.1414, 3.0821),
}
edges = {
    "A": ["B", "D", "M"],
    "B": ["A", "C"],
    "C": ["B", "D"],
    "D": ["A", "C", "M"],
    "M": ["A", "D"],
}

graph = WaypointGraph(waypoints, edges)
print("Graph ready. Nodes:", list(waypoints.keys()))


Graph ready. Nodes: ['A', 'B', 'C', 'D', 'M']


* “경로 생성만” 실험 (원하는 start/goal로 테스트)

In [3]:
def test_path(start: str, goal: str):
    path = graph.dijkstra(start, goal)
    if not path:
        print(f"{start} -> {goal}: (no path)")
        return
    length = graph.path_length(path)
    print(f"{start} -> {goal}: path={path}, length={length:.4f}")

# 예시 테스트들
test_path("A", "C")
test_path("C", "A")
test_path("A", "M")
test_path("B", "M")

A -> C: path=['A', 'B', 'C'], length=0.7913
C -> A: path=['C', 'B', 'A'], length=0.7913
A -> M: path=['A', 'M'], length=1.1727
B -> M: path=['B', 'A', 'M'], length=1.5722


* R1/R2 경로 “생성”해서 출력하기

    - (이제 path_r1, path_r2를 하드코딩이 아니라 다익스트라 결과로 만들기)

In [4]:
# "로봇"은 여기선 그냥 start/goal만 다르다고 가정
path_r1 = graph.dijkstra("A", "C")
path_r2 = graph.dijkstra("C", "A")

print("path_r1 =", path_r1)
print("path_r2 =", path_r2)
print("len_r1  =", graph.path_length(path_r1))
print("len_r2  =", graph.path_length(path_r2))


path_r1 = ['A', 'B', 'C']
path_r2 = ['C', 'B', 'A']
len_r1  = 0.7912813547257209
len_r2  = 0.7912813547257209


In [5]:
# 양방향 누락된 엣지 찾기
missing = []
for u, vs in edges.items():
    for v in vs:
        if u not in edges.get(v, []):
            missing.append((u, v))
print("Missing reverse edges:", missing)

Missing reverse edges: []


### “락 생성 + 락을 피해서 서로 겹치지 않는(critical 구간이 겹치지 않는) 경로 생성” 데모
* 이 방식은 흔히 Token/Next-Node Lock, Hand-over-hand locking(한 칸씩 락)이라고 부르
    - Option A: 모든 노드를 critical로 지정 ✅

    - Option C: 경로 전체를 한 번에 lock 하지 말고, “다음 1개 노드만” lock 하면서 진행 ✅
 
- 다만 중요한 전제가 하나 있어:

    - System1이 “중간 단계에서 멈췄다가 다음 노드 락을 받을 때까지 대기”를 해주지 않으면,
    - FMS가 1칸 락만 잡는 전략이 의미가 없어.

- 그래서 “System1을 거의 안 바꾸고” 하려면 가장 쉬운 구현은:

    - ✅ FMS가 ExecuteMission을 ‘한 번에 전체 path’로 보내지 않고, ‘한 칸(다음 move_to 1개)’씩 끊어서 보내는 방식이야.
    - (즉, System1은 그냥 받은 미션 1스텝 수행하고 끝. FMS가 다음 스텝을 다시 보내는 식)

- 아래 코드가 그 방식으로 Option C를 실제로 구현한 FleetManager야.

* 1) 클래스 정의 (Lock + Lock-aware Dijkstra)

In [None]:
import math
import heapq
import time
from dataclasses import dataclass
from typing import Dict, Any, List, Optional, Tuple, Set

@dataclass
class Pose2D:
    x: float
    y: float
    yaw: float = 0.0

class NodeLockManager:
    """
    Option A + C:
      - 모든 노드를 critical로 간주
      - "다음 노드 1개"만 lock 잡는다
      - lock은 node 단위만 관리
    """
    def __init__(self, all_nodes: Set[str]):
        self.all_nodes = set(all_nodes)
        self.locks: Dict[Tuple, Dict[str, Any]] = {}   # ("node",name)->{robot,mission_id,ts}
            # 현재 누가 어떤 노드를 점유(lock) 중인지 저장하는 “락 테이블”이야.
            # 전체 구조는:
            #     self.locks = {
            #       ("node","B"): {"robot":"pinky1","mission_id":"m001","ts":...},
            #       ("node","C"): {"robot":"pinky2","mission_id":"m002","ts":...},
            #     }
            # ✅ 왜 키가 Tuple이야?
            #     키를 ("node", node) 같은 형태로 만들어두면,
            #     나중에 edge lock까지 확장할 때도 통일된 방식으로 관리 가능해.
            #     예: ("edge","A","B") 같은 형태로도 확장 가능

    # 노드 이름을 “락 테이블에서 쓰는 표준 키(key)” 형태로 바꿔주는 함수
    # 왜 이런 구조로 쓰냐?
    # ✅ 장점 1) 키 충돌 방지
    #     나중에 edge lock을 추가하고 싶을 때도 같은 딕셔너리를 쓸 수 있어.
    #         노드락 키: ("node", "B")
    #         간선락 키: ("edge", "A", "B")
    #     이렇게 하면 "B"랑 ("A","B") 같은 게 섞여도 헷갈리지 않고 충돌이 없음.
    def _k(self, node: str) -> Tuple:
        return ("node", node)

    def is_locked_by_other(self, node: str, robot: str) -> bool:
        k = self._k(node)           # ("node", node) 형태의 키 생성
        if k not in self.locks:     # 락이 아예 없으면
            return False            # -> 다른 로봇이 잠근 것도 아님(자유)
        return self.locks[k]["robot"] != robot    # 락이 있으면, 잠근 로봇이 내가 아니면 True
        # 현재 락 테이블이 이렇게 있다고 하자:
        #     self.locks = {
        #       ("node","B"): {"robot":"pinky1", "mission_id":"m1", "ts":...}
        #     }
        #     is_locked_by_other("B", "pinky2") → True
        #         (B는 pinky1이 잠궜고, pinky2 입장에서는 “남이 잠금”)
        #     is_locked_by_other("B", "pinky1") → False
        #         (내가 잠근 거라 통과 허용)
        #     is_locked_by_other("C", "pinky2") → False
        #         (C는 잠금 자체가 없음)

    def try_lock_node(self, robot: str, mission_id: str, node: str) -> bool:
        k = self._k(node)
        v = self.locks.get(k)
        if v and v["robot"] != robot:
            return False
        self.locks[k] = {"robot": robot, "mission_id": mission_id, "ts": time.time()}
        return True

    # “그 노드 락을 내가 잡고 있으면 풀어주고(True), 내가 잡은 게 아니면 안 풀고(False)”
    def release_node_if_owned(self, robot: str, node: str) -> bool:
        k = self._k(node)            # ("node", node) 키 만들기
        v = self.locks.get(k)        # 그 키로 현재 락 정보 가져오기
    
        if not v:                    # 1) 락이 아예 없으면
            return False             #    -> 풀 것도 없음
    
        if v.get("robot") != robot:  # 2) 락은 있는데, 잡은 로봇이 내가 아니면
            return False             #    -> 남의 락이니까 못 품
    
        self.locks.pop(k, None)      # 3) 락이 있고, 내가 잡은 게 맞으면
        return True                  #    -> 락 해제 성공
        # 왜 이런 함수가 필요해?
        #     “다음 노드 1개만 lock” 전략(슬라이딩 락)을 쓸 때,
        #     노드에 도착하면 ✅ 이전 노드 락을 풀어줘야 하고
        #     다른 로봇이 실수로 남의 락을 풀면 ❌ 큰 사고가 나니까

    # “특정 로봇(robot)이 특정 미션(mission_id) 때문에 잡아둔 락을 전부 찾아서 한 번에 싹 풀어주는 함수”
    # 그리고 몇 개를 풀었는지 개수(int) 를 돌려줘
    def release_all_for_mission(self, robot: str, mission_id: str) -> int:
        to_del = []
        for k, v in self.locks.items():   # self.locks를 전부 훑어봄
            if v.get("robot") == robot and v.get("mission_id") == mission_id:
                to_del.append(k)
                    # 락 정보(v) 안에
                    #     "robot" == robot 이고
                    #     "mission_id" == mission_id 인 것만 골라서
                    #     그 키들을 to_del에 모아둠
        # to_del에 있는 락들을 전부 pop()으로 삭제(해제)
        for k in to_del:    
            self.locks.pop(k, None)
        return len(to_del)
        # 예시
        #     락 테이블이 이렇게 있을 때:
        #         self.locks = {
        #           ("node","B"): {"robot":"pinky1", "mission_id":"m1"},
        #           ("node","C"): {"robot":"pinky1", "mission_id":"m1"},
        #           ("node","D"): {"robot":"pinky1", "mission_id":"m2"},
        #           ("node","M"): {"robot":"pinky2", "mission_id":"m1"},
        #         }
        #     release_all_for_mission("pinky1","m1") 실행하면
        #         B, C 락만 풀림

    # 현재 락(점유) 상태를 사람이 보기 좋은 형태로 “요약 출력용 데이터”로 만들어주는 함수
    def dump(self) -> Dict[str, Any]:
        out = []
        for k, v in self.locks.items():
            out.append({"type": "node", "node": k[1], **v})
        return {"locks": out}
        # dump() 결과는:
        #     {
        #       "locks": [
        #         {"type": "node", "node": "B", "robot": "pinky1", "mission_id": "m1", "ts": 123.4},
        #         {"type": "node", "node": "C", "robot": "pinky2", "mission_id": "m9", "ts": 130.1},
        #       ]
        #     }

### 이 클래스 이해는 _dijkstra_wp_path_EX.ipynb 파일 참고 !!! ###
class WaypointGraph:
    def __init__(self, waypoints: Dict[str, Pose2D], edges: Dict[str, List[str]]):
        self.waypoints = waypoints
        self.edges = edges

    def dist(self, a: str, b: str) -> float:
        pa, pb = self.waypoints[a], self.waypoints[b]
        return math.hypot(pa.x - pb.x, pa.y - pb.y)

    def dijkstra(self, start: str, goal: str) -> List[str]:
        if start == goal:
            return [start]
        pq = [(0.0, start)]
        dist = {start: 0.0}
        prev = {start: None}

        while pq:
            cost, u = heapq.heappop(pq)
            if cost != dist.get(u, 1e18):
                continue
            if u == goal:
                break
            for v in self.edges.get(u, []):
                nd = cost + self.dist(u, v)
                if nd < dist.get(v, 1e18):
                    dist[v] = nd
                    prev[v] = u
                    heapq.heappush(pq, (nd, v))

        if goal not in prev:
            return []
        path = []
        cur = goal
        while cur is not None:
            path.append(cur)
            cur = prev.get(cur)
        path.reverse()
        return path
        
    

* 실제 로봇 구동 대신 가상의 성공 호출을 보내는 클래스
* step_time 만큼 시간이 흐른 뒤 성공 콜백을 호출

In [6]:
class FakeActionRunner:
    """
    ROS2 action 대신:
      - (robot, plan, on_done)를 큐에 넣고
      - step_time 후에 on_done(True, "ok")를 호출
    """
    def __init__(self, step_time_sec: float = 1.0):
        self.step_time_sec = step_time_sec
        self.queue: List[Dict[str, Any]] = []  # {robot, ready_t, on_done, plan}

    def send(self, robot: str, plan: Dict[str, Any], on_done):
        ready_t = time.time() + self.step_time_sec
        self.queue.append({"robot": robot, "ready_t": ready_t, "on_done": on_done, "plan": plan})

    def tick(self):
        """ready된 작업들을 완료 처리"""
        now = time.time()
        done = [item for item in self.queue if item["ready_t"] <= now]
        self.queue = [item for item in self.queue if item["ready_t"] > now]
        for item in done:
            item["on_done"](True, "ok")  # 항상 성공으로 가정

NameError: name 'Any' is not defined

* 주피터용 FleetManagerOptionC (핵심 로직 동일)