In [1]:
import cv2
import time
import json
import numpy as np
from collections import deque
from ultralytics import YOLO

# my_model_path = "yolov8m.onnx"
# stop_target_list = ["cup", "bottle"]
# signal_target_list = ["cell phone", "remote"]
my_model_path = "best.onnx"
stop_target_list = ["person"]
signal_target_list = ["circle", "cross"]

my_map_path = "map_data.json"



class LegosRobot:
    # [상태 상수 정의 - 하드웨어 프로토콜]
    STOP = -1
    PAUSE = 0
    STRAIGHT = 1
    TURN_RIGHT = 2
    TURN_LEFT = 3
    ADJUST = 7
    TURN_180 = 4

    def __init__(self, model_path):
        # 1. 모델 로드
        self.model = YOLO(model_path)

        # 2. 지도 데이터 로드 및 초기화
        self.my_map_list = None
        self.my_map_numpy = None

        # [상태 제어 멤버 변수 초기화]
        self.drive_path = None
        self.all_commands = []
        self.current_cmd_idx = 0
        self.is_completed = False
        self.target_ready_for_next = True   # Target을 지났는지 확인
        self.last_execution_time = 0        # 마지막으로 Target 인식이 실행된 시점
        self.cooldown_duration = 3.0        # Target 인식 최소 간격 (3초)
        self.current_state = self.STOP      # Robot 의 동작 상태 변수
        self.is_action_finished = True
        self.state_before_stop = self.STRAIGHT # 정지 전 상태 저장용

    def move(self, new_state):
        # 상태가 변할 때만 로그를 출력하도록 하여 가독성을 높임
        if self.current_state != new_state:
            self.current_state = new_state
            # 새로운 명령이 들어오면 일단 동작 중(False)으로 설정
            self.is_action_finished = False

            if self.current_state == self.STRAIGHT:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (전진 ↑)")

            elif self.current_state == self.TURN_RIGHT:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (우회전 →)")
                time.sleep(1.0)
                self.is_action_finished = True

            elif self.current_state == self.TURN_LEFT:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (좌회전 ←)")
                time.sleep(1.0)
                self.is_action_finished = True

            elif self.current_state == self.PAUSE:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (일시정지)")
                self.is_action_finished = False

            elif self.current_state == self.STOP:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (정지)")
                self.is_action_finished = True

            elif self.current_state == self.ADJUST:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (위치조정)")
                time.sleep(0.5)
                self.current_state = self.STOP # 위치 조정 후 정지
                self.is_action_finished = True

            elif self.current_state == self.TURN_180:
                print(f">>> [MOVE] 신호 발생: {self.current_state} (180도 유턴 ↻)")
                # 90도 회전(1.0초)의 두 배인 2.0초 동안 회전 (로봇 성능에 맞춰 조절)
                time.sleep(2.0)
                self.current_state = self.STOP
                self.is_action_finished = True

    def load_map(self, map_path):
        try:
            with open(map_path, 'r', encoding='utf-8') as f:
                self.my_map_list = json.load(f)
            # 리스트를 NumPy 배열로 변환하여 저장
            self.my_map_numpy = np.array(self.my_map_list, dtype=object)
            print(f">>> [Legos] Map Loaded: {map_path}")
            return True
        except Exception as e:
            print(f">>> [Error] Map Load Failed: {e}")
            return False

    def find_path(self, start, goal):
        # 클래스 멤버인 self.grid_map을 사용 (기존 grid 인자 대체)
        grid = self.my_map_numpy

        # 행과 열의 수를 정의 (NumPy Shape 활용)
        rows, cols = grid.shape

        # 방문 기록을 위한 NumPy 배열 (True | False)
        visited = np.zeros((rows, cols), dtype=bool)
        # 경로 역추적을 위한 부모 좌표 저장 배열 (행, 열 저장용 3차원 배열)
        # 첫 번째 층 : 행 정보 / 두 번째 층 : 열 정보
        parent_map = np.full((rows, cols, 2), -1, dtype=int)

        # 3. 가용 경로 마스크 생성 (NumPy의 강력한 비교 연산)
        # 0이거나 "+"인 곳을 True로 표시
        is_passable = (grid == 0) | (grid == "+")

        # 상, 하, 좌, 우
        # 각 값을 현재 위치 index 에 더하면 각 방향으로 이동한 좌표가 된다.
        directions = [(-1, 0), (1, 0), (0, -1), (0, 1)]

        # queue를 deque로 정의 ( 양방향 input이 가능함 | FIFO/LIFO 형식 모두 구현 가능 )
        queue = deque([start])

        # start 좌표에 방문 기록 update
        visited[start] = True

        while queue:
            # queue에 담긴 위치를 하나 pop
            curr_r, curr_c = queue.popleft()

            # 만약 pop 된 위치가 목적지일 경우
            if (curr_r, curr_c) == goal:
                path = []
                # 현재 위치부터 부모 위치를 역탐색하여 path list에 append
                while curr_r != -1:
                    path.append((curr_r, curr_c))
                    curr_r, curr_c = parent_map[curr_r, curr_c]
                # path list의 순서를 역방향으로 뒤집어서 return
                return path[::-1]

            # 아직 목적지가 아닐 경우, 현재 위치에서 이동 가능한 위치들을 방문.
            for dr, dc in directions:
                nr, nc = curr_r + dr, curr_c + dc

                # 경계 검사 및 방문 여부, 통과 가능 여부 확인
                if 0 <= nr < rows and 0 <= nc < cols:
                    if not visited[nr, nc] and is_passable[nr, nc]:
                        visited[nr, nc] = True
                        parent_map[nr, nc] = [curr_r, curr_c]
                        queue.append((nr, nc))

        return None

    def get_commands(self, path):
        # 입력된 경로를 NumPy 배열로 변환
        path = np.array(path)

        if len(path) < 2:
            return ["경로가 너무 짧아 명령을 생성할 수 없습니다."]

        # 1. 모든 이동 벡터를 한 번에 계산 (다음 좌표 - 현재 좌표)
        diffs = np.diff(path, axis=0)

        # 2. 방향 매핑
        heading_lookup = {(-1, 0): 0, (0, 1): 1, (1, 0): 2, (0, -1): 3}
        heading_names = {0: "상(↑)", 1: "우(→)", 2: "하(↓)", 3: "좌(←)"}

        # 모든 구간의 절대 방향(Heading)을 리스트로 변환
        headings = [heading_lookup[tuple(d)] for d in diffs]
        headings = np.array(headings)

        commands = []

        # --- 1단계: 초기 헤딩 설정 ---
        current_heading = headings[0]
        commands.append(f"1단계: 직진 (Go Straight)")

        # --- 2단계: 상대적 회전 계산 (NumPy 차분 활용) ---
        rotations = np.diff(headings) % 4

        # 회전 명령 매핑
        cmd_map = {0: "직진 (Go Straight)", 1: "우회전 (Turn Right)",
                   2: "유턴 (Turn Back)", 3: "좌회전 (Turn Left)"}

        # 계산된 회전값(rotations)을 바탕으로 명령어 생성
        for i, rot in enumerate(rotations):
            target_heading = headings[i+1]
            cmd = cmd_map[rot]
            commands.append(f"{i+2}단계: {cmd}")

        return commands

    def run_auto_drive(self, start_pos, goal_pos, m=0):
            # [경로 탐색 및 이동 경로 정의]
            """
            m=0: 새 경로 탐색 (Normal)
            m=1: 복귀모드 (Return)
            """
            if m == 0:
                self.drive_path = self.find_path(start_pos, goal_pos)
                if self.drive_path:
                    self.all_commands = self.get_commands(self.drive_path)
                else:
                    self.all_commands = ["경로를 찾을 수 없습니다."]
                    return False
            elif m == 1:
                # 복귀 mode
                if self.drive_path:
                    # 기존경로 그대로 뒤집기
                    reversed_path = self.drive_path[::-1]
                    # 뒤집힌 경로 주행 command 재설정 (회전 방향 자동 보정)
                    self.all_commands = self.get_commands(reversed_path)
                    self.drive_path = reversed_path
                    self.move(self.TURN_180)
                else:
                    print(">>> [오류] 이전에 주행한 경로 데이터가 없습니다.")
                    return False

            total_cmds = len(self.all_commands)
            self.current_cmd_idx = 0
            self.is_completed = False
            self.target_ready_for_next = True
            self.last_execution_time = 0

            cap = cv2.VideoCapture(0)
            if not cap.isOpened():
                print("웹캡을 열 수 없습니다.")
                return

            while True:
                # [영상 정보 정제]
                ret, frame = cap.read()
                if not ret:
                    print("프레임을 가져올 수 없습니다.")
                    break
                fliped_frame = cv2.flip(frame, 1)

                # [Target 탐지 영역 설정 (roi 영역)]
                height, width, _ = fliped_frame.shape
                rx1, rx2 = width // 3, 2 * width // 3
                ry1, ry2 = 2 * height // 3, height - 1

                # [YOLO 추론]
                results = self.model.predict(source=fliped_frame, conf=0.25, verbose=False)
                annotated_frame = results[0].plot()

                # [객체 분석]
                stop_target = False
                target_in_roi = False

                for box in results[0].boxes:
                    name = self.model.names[int(box.cls[0])]
                    b_cx, b_cy, b_width, b_height = box.xywh[0].tolist()

                    if name in stop_target_list:
                        if ((b_width * b_height) / (width * height)) >= 0.1 :
                            stop_target = True
                    elif name in signal_target_list:
                        if (rx1 <= b_cx <= rx2) and (ry1 <= b_cy <= ry2):
                            target_in_roi = True

                # --- [제어 로직] ---
                print("-" * 30)
                current_time = time.time()
                time_passed = (current_time - self.last_execution_time) > self.cooldown_duration

                if self.is_completed:
                    status_text = "ALL MISSIONS DONE"
                    display_color = (255, 0, 0)
                    print(">>> [주행 종료] 자동 주행을 종료합니다.")
                    self.move(self.STOP)
                    cap.release()
                    cv2.destroyAllWindows()
                    return True
                elif stop_target:
                    status_text = "STOP: PERSON DETECTED"
                    display_color = (0, 0, 255)
                    # 정지하기 전, 현재가 STOP 이나 PAUSE 가 아닐 때만 상태를 백업
                    if self.current_state != self.STOP and self.current_state != self.PAUSE:
                        self.state_before_stop = self.current_state
                    print(f">>> [정지] 사람이 감지되었습니다.")
                    self.move(self.PAUSE)
                else:
                    if not target_in_roi:
                        self.target_ready_for_next = True

                    if target_in_roi and self.target_ready_for_next:
                        if time_passed:
                            print(f">>> [신호 승인] {self.all_commands[self.current_cmd_idx]} 완료. 다음 단계로!")
                            self.current_cmd_idx += 1
                            self.last_execution_time = current_time
                            self.target_ready_for_next = False

                            if self.current_cmd_idx >= total_cmds:
                                self.is_completed = True
                                status_text = "COMPLETED"
                                print(">>> [위치 조정 중] 주행 종료 전 최종 위치 조정중입니다.")
                                print("move() 호출")
                                self.move(self.ADJUST)
                                print(">>> [주행 완료] 주행을 완료하여 정지합니다.")
                            else:
                                status_text = "NEXT STEP LOADED"
                        else:
                            print(f">>> [위치 조정] 쿨다운 중 재인식됨. 위치조정 실행 중...")
                            print("move() 호출")
                            self.move(self.ADJUST)
                            self.target_ready_for_next = False
                            continue

                    if not self.is_completed:
                        if not self.target_ready_for_next:
                            status_text = "WAIT: TARGET을 지나치고 있습니다."
                            print(">>> [위치 조정 중] 다음 경로 진행을 위해 위치 조정중입니다.")
                            print("move() 호출")
                            self.move(self.ADJUST)
                        else:
                            if self.is_action_finished:
                                # print(f">>> [수행 중] 현재 동작: {self.all_commands[self.current_cmd_idx]}")
                                current_cmd = self.all_commands[self.current_cmd_idx]
                                if "직진" in current_cmd:
                                    self.move(self.STRAIGHT)
                                elif "우회전" in current_cmd:
                                    self.move(self.TURN_RIGHT)
                                    self.move(self.STRAIGHT)
                                elif "좌회전" in current_cmd:
                                    self.move(self.TURN_LEFT)
                                    self.move(self.STRAIGHT)
                                status_text = f"STATE: {self.current_state} (MOVING)"
                                print(status_text)
                            else:
                                if self.current_state == self.PAUSE:
                                    print(f">>> {self.state_before_stop} 재개")
                                    self.move(self.state_before_stop)
                                else:
                                    status_text = f"STATE: {self.current_state} (IN PROGRESS...)"
                                    print(status_text)
                    display_color = (0, 255, 0)

                # [화면 표시]
                cv2.putText(annotated_frame, status_text, (30, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, display_color, 2)
                cv2.rectangle(annotated_frame, (rx1, ry1), (rx2, ry2), (0, 0, 255), 2)
                cv2.imshow("Robot Command Center", annotated_frame)

                if cv2.waitKey(1) & 0xFF == 27:
                    self.move(self.STOP)
                    cap.release()
                    cv2.destroyAllWindows()
                    return False

            cap.release()
            cv2.destroyAllWindows()

# --- [메인 실행부] ---
if __name__ == "__main__":
    print(">>> [Legos] booting...")

    # 로봇 객체 생성 (모델과 맵 데이터 로드)
    try:
        robot = LegosRobot(model_path=my_model_path)
        robot.load_map(my_map_path)
        print(">>> [Legos] Center Map & Model Loading Success!")
    except Exception as e:
        print(f">>> [Error] 로딩 중 오류 발생: {e}")
        exit()

    print(">>> [Legos] starting...")

    # 모드 정의
    USER_DRIVE = 1
    AUTO_DRIVE = 2
    EXIT_PROGRAM = 0

    while True:
        print("\n" + "="*40)
        print("="*40)

        try:
            user_input = input(">>> [Legos] 이동방식을 선택해주세요 : ")

            # 입력값이 비어있는 경우 방지
            if not user_input: continue
            Drive_method = int(user_input)

            if Drive_method == USER_DRIVE:
                print(">>> [Legos] 사용자 조종 모드로 진입합니다.")
                # TODO: 사용자 조종 로직(키보드 제어 등) 함수를 여기에 연결하세요.
                print(">>> [알림] 현재 사용자 조종 모드는 준비 중입니다.")

            elif Drive_method == AUTO_DRIVE:
                print(">>> [Legos] 자동 이동 모드로 진입합니다.")

                # 1. 지도 데이터 존재 여부 먼저 확인
                if robot.my_map_numpy is None:
                    print(">>> [경고] 지도 데이터가 없습니다! 먼저 지도를 로드해 주세요.")
                    # GUI라면 여기서 경고 알림창(MessageBox)을 띄우면 좋습니다.
                    continue

                # 목적지 입력 예시 (추후 input으로 좌표를 받을 수도 있습니다)
                start_node = (0, 3)
                goal_node = (3, 4)
                print(f">>> [Legos] {start_node}에서 {goal_node}로 자동 이동을 시작합니다.")
                # 클래스 내부의 자동 주행 메서드 호출
                auto_d = robot.run_auto_drive(start_node, goal_node, 0)
                if auto_d:
                    print("\n>>> [Legos] 자동 주행이 완료되었습니다.")
                    is_return = input(">>> [Legos] 복귀하시겠습니까?(y or n) : ")
                    if is_return == "y":
                        print(f">>> [Legos] {goal_node}에서 {start_node}로 복귀를 시작합니다.")
                        robot.run_auto_drive(start_node, goal_node, 1)
                        print("\n>>> [Legos] 복귀가 완료되었습니다.")
                    else :
                        print(">>> [Legos] 메인 메뉴로 돌아갑니다.")
                        continue
                else:
                    print("\n>>> [Legos] 자동 주행이 비정상적으로 종료되었습니다.")
                    print(">>> [Legos] 메인 메뉴로 돌아갑니다.")
                    continue

            elif Drive_method == EXIT_PROGRAM:
                print(">>> [Legos] 프로그램을 종료합니다. Bye!")
                break

            else:
                print(">>> [경고] 잘못된 번호입니다. 다시 선택해주세요.")

        except ValueError:
            print(">>> [경고] 숫자만 입력 가능합니다.")
        except KeyboardInterrupt:
            print("\n>>> [Legos] 강제 종료 신호 감지. 종료합니다.")
            break

>>> [Legos] booting...
>>> [Legos] Map Loaded: map_data.json
>>> [Legos] Center Map & Model Loading Success!
>>> [Legos] starting...

>>> [Legos] 자동 이동 모드로 진입합니다.
>>> [Legos] (0, 3)에서 (3, 4)로 자동 이동을 시작합니다.
Loading best.onnx for ONNX Runtime inference...
Using ONNX Runtime 1.23.2 with CPUExecutionProvider
------------------------------
>>> [정지] 사람이 감지되었습니다.
>>> [MOVE] 신호 발생: 0 (일시정지)


error: OpenCV(4.13.0) D:\a\opencv-python\opencv-python\opencv\modules\highgui\src\window.cpp:1301: error: (-2:Unspecified error) The function is not implemented. Rebuild the library with Windows, GTK+ 2.x or Cocoa support. If you are on Ubuntu or Debian, install libgtk2.0-dev and pkg-config, then re-run cmake or configure script in function 'cvShowImage'


In [1]:
import cv2

print(f"OpenCV Version: {cv2.__version__}")

# 현재 버전의 빌드 정보를 확인 (GUI 지원 여부 체크 가능)
# 'GUI' 항목이나 'Win32 UI' 등이 YES로 되어 있는지 보세요.
print(cv2.getBuildInformation())

OpenCV Version: 4.13.0

  Version control:               4.13.0-1-gb4c5ec4042

  Platform:
    Timestamp:                   2026-01-15T16:45:22Z
    Host:                        Windows 10.0.26100 AMD64
    CMake:                       3.24.2
    CMake generator:             Visual Studio 17 2022
    CMake build tool:            C:/Program Files/Microsoft Visual Studio/2022/Enterprise/MSBuild/Current/Bin/amd64/MSBuild.exe
    MSVC:                        1944
    Configuration:               Debug Release
    Algorithm Hint:              ALGO_HINT_ACCURATE

  CPU/HW features:
    Baseline:                    SSE SSE2 SSE3
      requested:                 SSE3
    Dispatched code generation:  SSE4_1 SSE4_2 AVX FP16 AVX2 AVX512_SKX
      SSE4_1 (17 files):         + SSSE3 SSE4_1
      SSE4_2 (1 files):          + SSSE3 SSE4_1 POPCNT SSE4_2
      AVX (9 files):             + SSSE3 SSE4_1 POPCNT SSE4_2 AVX
      FP16 (0 files):            + SSSE3 SSE4_1 POPCNT SSE4_2 AVX FP16
      AVX2 (3