In [None]:
import cv2
import numpy as np
import ipywidgets.widgets as widgets
from IPython.display import display
from ultralytics import YOLO
import traitlets
import ipywidgets

# RobotPal 라이브러리 임포트
# (RobotPal-python 패키지가 설치되어 있거나 PYTHONPATH에 있어야 합니다)
from robotpal import Camera, bgr8_to_jpeg
from robotpal import Robot

# 카메라 인스턴스 연결 (Singleton)
camera = Camera.instance(width=300, height=300)
#카메라로 찍은 frame 을 띄워줄 image 객체 생성, 크기를 맞출 필요는 없다.
# image = widgets.Image(format='jpeg', width=300, height=300) 

# camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

# display(image)

# YOLO 모델 로드
# 최초 실행 시 'yolov8n.pt' 파일을 자동으로 다운로드합니다.
# 'classes=[2]' 옵션을 통해 '자동차(car)'만 감지하도록 설정할 예정입니다.
model = YOLO('yolov8n.pt') 

# print(">>> 모델 로드 완료 & 카메라 연결 성공")

[RobotPal] 서버 시작됨 | WebSocket: 9999, TCP: 9998
[RobotPal] 통신 대기 중... (WS: 9999, TCP: 9998)
[System] WebSocket 프로세서 시작
>>> 모델 로드 완료 & 카메라 연결 성공


[TCP] 클라이언트 연결됨 (('127.0.0.1', 60087))


In [2]:
def bgr8_to_jpeg(value, quality=75):
    """OpenCV BGR 이미지를 ipywidgets Image 위젯용 JPEG 바이트로 변환"""
    return bytes(cv2.imencode('.jpg', value)[1])

def calculate_distance(pixel_width):
    """
    바운딩 박스 너비(pixel_width)를 기반으로 거리를 추정합니다.
    공식: Distance = Calibration_Constant / Pixel_Width
    """
    # [보정 필요] 이 상수는 시뮬레이터 환경과 해상도(224x224)에 맞춰 조절해야 합니다.
    # 예: 실제 거리가 100cm일 때 화면상 차의 폭이 40px라면 -> 100 * 40 = 4000
    CALIBRATION_CONSTANT = 4000.0 
    
    if pixel_width <= 0:
        return 0.0
        
    return CALIBRATION_CONSTANT / pixel_width

In [3]:
# ---------------------------------------------------------
# 3. UI 위젯 생성
# ---------------------------------------------------------

# 실시간 영상을 보여줄 이미지 위젯
image_widget = widgets.Image(format='jpeg', width=300, height=300)

# 거리 정보를 텍스트로 보여줄 라벨 위젯
info_label = widgets.Label(value="대기 중...")

# 화면에 위젯 배치 및 표시
display(widgets.VBox([image_widget, info_label]))

VBox(children=(Image(value=b'', format='jpeg', height='300', width='300'), Label(value='대기 중...')))

In [4]:
# ---------------------------------------------------------
# 4. 메인 루프 (이미지 처리 콜백)
# ---------------------------------------------------------

def update_camera(change):
    # 카메라로부터 들어온 새로운 프레임 (BGR numpy array)
    new_image = change['new']
    
    # 원본 이미지를 복사 (여기에 박스와 텍스트를 그립니다)
    annotated_frame = new_image.copy()
    
    # YOLO 추론 실행
    # classes=[2]: COCO 데이터셋 ID 2번인 'car'만 감지
    # verbose=False: 불필요한 로그 출력 방지
    results = model(new_image, classes=[2], verbose=False)
    
    nearest_distance = None
    
    # 결과 처리 루프
    for r in results:
        boxes = r.boxes
        for box in boxes:
            # 좌표 추출 (정수로 변환)
            x1, y1, x2, y2 = box.xyxy[0]
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            
            # 너비 계산
            w = x2 - x1
            
            # 거리 추정 함수 호출
            dist = calculate_distance(w)
            
            # 가장 가까운 거리 기록 (단순 비교)
            if nearest_distance is None or dist < nearest_distance:
                nearest_distance = dist
            
            # [시각화] 바운딩 박스 그리기 (초록색, 두께 2)
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # [시각화] 거리 텍스트 표시
            text = f"{dist:.1f}cm"
            cv2.putText(annotated_frame, text, (x1, y1 - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # 위젯 업데이트: 처리된 이미지를 화면에 표시
    image_widget.value = bgr8_to_jpeg(annotated_frame)
    
    # 상태 메시지 업데이트
    if nearest_distance:
        info_label.value = f"전방 차량 거리: {nearest_distance:.1f} cm"
    else:
        info_label.value = "차량 검색 중..."

In [None]:
# ---------------------------------------------------------
# 5. 실행 (Observer 등록)
# ---------------------------------------------------------

# 혹시 이전에 실행된 관찰자가 있다면 제거 (중복 실행 방지)
try:
    camera.unobserve_all()
except:
    pass

# 카메라 데이터가 변경될 때마다 update_camera 함수 실행
camera.observe(update_camera, names='value')

print(">>> 실시간 감지 시작됨 (멈추려면 'camera.unobserve_all()'을 실행하세요)")

>>> 실시간 감지 시작됨 (멈추려면 'camera.unobserve_all()'을 실행하세요)


opening handshake failed
Traceback (most recent call last):
  File "c:\jupyter ex\.venv\Lib\site-packages\websockets\asyncio\server.py", line 356, in conn_handler
    await connection.handshake(
  File "c:\jupyter ex\.venv\Lib\site-packages\websockets\asyncio\server.py", line 207, in handshake
    raise self.protocol.handshake_exc
  File "c:\jupyter ex\.venv\Lib\site-packages\websockets\server.py", line 138, in accept
    ) = self.process_request(request)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\jupyter ex\.venv\Lib\site-packages\websockets\server.py", line 233, in process_request
    raise InvalidUpgrade(
websockets.exceptions.InvalidUpgrade: missing Connection header
opening handshake failed
Traceback (most recent call last):
  File "c:\jupyter ex\.venv\Lib\site-packages\websockets\http11.py", line 138, in parse
    request_line = yield from parse_line(read_line)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\jupyter ex\.venv\Lib\site-packages\websockets\http1

[TCP] 예외 발생: [WinError 10054] 현재 연결은 원격 호스트에 의해 강제로 끊겼습니다
[TCP] 연결 종료 (('127.0.0.1', 60087))
