In [1]:
import cv2
import time
import json
import requests
from collections import defaultdict
import numpy as np
import ipywidgets as widgets
from IPython.display import display
import warnings
import os
from tiki.mini import TikiMini
from ultralytics import YOLO
warnings.filterwarnings('ignore', category=UserWarning)
os.environ['GST_DEBUG'] = '0'

##############################################
# 1. YOLOv8 학습 모델 로드 (가장 먼저 실행)
##############################################
model = YOLO("./best.pt")  # 🔥 학습한 best.pt 경로 확인

##############################################
# 2. ArUco 설정
##############################################
aruco_dict = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
aruco_params = cv2.aruco.DetectorParameters()
aruco_detector = cv2.aruco.ArucoDetector(aruco_dict, aruco_params)

marker_to_point = {
    1: "alpha",
    2: "sector1", 3: "sector2", 4: "sector3",
    5: "sector4", 6: "sector5", 7: "sector6",
    8: "bravo",
    9: "sector7",
    10: "charlie",
    11: "sector8", 12: "sector9",
    13: "finish"
}

visited_points = set()

object_names = ['box', 'car', 'enemy', 'hazmat', 'missile', 'mortar', 'tank']

##############################################
# 4. 로봇 초기화
##############################################
tiki = TikiMini()
tiki.set_motor_mode(tiki.MOTOR_MODE_PWM)
print("로봇 초기화 완료")

# 주행 파라미터
base_speed = 70
max_steering = 25
frame_center_x = 160  # 320x240 기준 중앙 X

# PID 제어
class PIDController:
    def __init__(self, kp=0.6, ki=0.0, kd=0.15):
        self.kp = kp
        self.ki = ki
        self.kd = kd
        self.integral = 0
        self.prev_error = 0

    def compute(self, error):
        self.integral += error
        derivative = error - self.prev_error
        self.prev_error = error
        self.integral = np.clip(self.integral, -200, 200)
        return self.kp * error + self.ki * self.integral + self.kd * derivative

pid = PIDController()

##############################################
# 5. 서버 전송 함수 (필요 시 사용)
##############################################
def send_to_server(point, detected_objects):
    data = {
        "mission_code": "####",
        "points": [point],
        "detection": detected_objects
    }

    json_content = json.dumps(data, indent=2, ensure_ascii=False)
    files = {
        'file': (f"{point}.json", json_content, 'application/json')
    }

    try:
        print(f"[SEND] {point} → 서버 전송 중…")
        rsp = requests.post("http://58.229.150.23:5000/dashboard_json", files=files, timeout=10)
        print("[Server Response]", rsp.text)
    except Exception as e:
        print("[ERROR] 서버 전송 실패:", e)

##############################################
# 6. 단일 카메라 파이프라인 (주행 + YOLO 공용)
##############################################
pipeline = (
    "nvarguscamerasrc ! "
    "video/x-raw(memory:NVMM), width=640, height=480, format=NV12, framerate=30/1 ! "
    "nvvidconv ! video/x-raw, format=BGRx ! videoconvert ! "
    "video/x-raw, format=BGR ! appsink drop=true max-buffers=1"
)

cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
if not cap.isOpened():
    raise RuntimeError("카메라를 열 수 없습니다.")

# 최신 프레임만 사용하도록 버퍼 최소화
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)

##############################################
# 7. Jupyter용 두 화면 위젯 (주행 / 객체인식)
##############################################
drive_widget = widgets.Image(
    format='jpeg',
    layout=widgets.Layout(width='640px', height='480px')
)
yolo_widget = widgets.Image(
    format='jpeg',
    layout=widgets.Layout(width='640px', height='480px')
)

display(widgets.HBox([drive_widget, yolo_widget]))

##############################################
# 8. Bird's-Eye View 설정 (주행용)
##############################################
bird_width, bird_height = 320, 240

src_points = np.float32([
    [60, 120],
    [260, 120],
    [310, 230],
    [10, 230],
])

dst_points = np.float32([
    [0, 0],
    [bird_width - 1, 0],
    [bird_width - 1, bird_height - 1],
    [0, bird_height - 1],
])

M = cv2.getPerspectiveTransform(src_points, dst_points)


def frame_to_bytes_drive(frame):
    """주행 디버그 패널 전송용 (낮은 품질, 부하 적음)"""
    _, buf = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 30])
    return buf.tobytes()


def frame_to_bytes_yolo(frame):
    """YOLO/ArUco 시각화용 (더 높은 품질, 사람이 보기 좋게)"""
    _, buf = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])  # 필요시 60~80 사이에서 조절
    return buf.tobytes()


# ============================================================
#   9) 슬라이딩 윈도우 + 시각화 (주행용)
# ============================================================
def sliding_window_center_with_vis(binary_mask, num_windows=8, margin=20, min_pixels=30):
    h, w = binary_mask.shape[:2]
    window_height = h // num_windows
    ys, xs = np.where(binary_mask == 255)

    vis = cv2.cvtColor(binary_mask, cv2.COLOR_GRAY2BGR)

    if len(xs) == 0:
        return None, vis

    bottom_thresh = int(h * 0.75)
    bottom_inds = ys >= bottom_thresh
    if np.any(bottom_inds):
        current_x = int(np.mean(xs[bottom_inds]))
    else:
        current_x = int(np.mean(xs))

    centers = []
    pts = []

    for i in range(num_windows):
        y_low = h - (i + 1) * window_height
        y_high = h - i * window_height
        y_low = max(0, y_low)
        if y_high <= y_low:
            continue

        inds = (ys >= y_low) & (ys < y_high)
        win_xs = xs[inds]
        if len(win_xs) == 0:
            continue

        x_min = max(0, current_x - margin)
        x_max = min(w - 1, current_x + margin)

        lane_inds = (win_xs >= x_min) & (win_xs <= x_max)
        if np.sum(lane_inds) < min_pixels:
            continue

        current_x = int(np.mean(win_xs[lane_inds]))
        centers.append(current_x)

        cy = (y_low + y_high) // 2
        pts.append((current_x, cy))

        cv2.rectangle(vis, (x_min, y_low), (x_max, y_high), (0, 255, 0), 1)
        cv2.circle(vis, (current_x, cy), 3, (255, 0, 255), -1)

    if len(centers) == 0:
        return int(np.mean(xs)), vis

    pts_np = np.array(pts, np.int32)
    cv2.polylines(vis, [pts_np], False, (255, 0, 255), 2)

    return int(np.mean(centers)), vis


# ============================================================
#   10) 중앙 흰색 라인 + 디버그뷰 생성 (주행용)
# ============================================================
def detect_lane_center(frame_bgr, white_threshold=180):
    h, w = frame_bgr.shape[:2]

    # ROI (하단부)
    roi_y1 = int(h * 0.60)
    roi_y2 = h
    roi_x1 = int(w * 0.10)
    roi_x2 = int(w * 0.90)
    roi = frame_bgr[roi_y1:roi_y2, roi_x1:roi_x2]
    roi_h, roi_w = roi.shape[:2]

    # ① BGR -> HSV 변환
    hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV)

    # 흰색만 남기기 위한 조건
    S_MAX = 60
    V_MIN = 180

    lower_white = np.array([0, 0, V_MIN], dtype=np.uint8)
    upper_white = np.array([180, S_MAX, 255], dtype=np.uint8)
    white_mask = cv2.inRange(hsv, lower_white, upper_white)

    # 노란색 제거 (필요 시)
    lower_yellow = np.array([15, 80, 80], dtype=np.uint8)
    upper_yellow = np.array([40, 255, 255], dtype=np.uint8)
    yellow_mask = cv2.inRange(hsv, lower_yellow, upper_yellow)

    white_mask = cv2.bitwise_and(white_mask, cv2.bitwise_not(yellow_mask))
    white_mask = cv2.GaussianBlur(white_mask, (5, 5), 0)

    ys, xs = np.where(white_mask == 255)
    white_mask_bgr = cv2.cvtColor(white_mask, cv2.COLOR_GRAY2BGR)

    if len(xs) == 0:
        return None, False, {
            "roi": roi,
            "white": white_mask_bgr,
            "sliding": white_mask_bgr.copy(),
        }

    # 슬라이딩윈도우
    center_x_rel, sliding_vis = sliding_window_center_with_vis(white_mask)

    if center_x_rel is None:
        center_x_rel = int(np.mean(xs))

    center_x_frame = roi_x1 + center_x_rel

    # 가로 흰줄 판단 (STOP LINE 등)
    x_range = np.max(xs) - np.min(xs)
    y_range = np.max(ys) - np.min(ys)
    is_horizontal_bar = (x_range > roi_w * 0.6 and y_range < roi_h * 0.25)

    return center_x_frame, is_horizontal_bar, {
        "roi": roi,
        "white": white_mask_bgr,
        "sliding": sliding_vis,
    }


# ============================================================
#   11) 2×2 디버그 패널 생성 (주행용)
# ============================================================
def make_panel(bird, dbg):
    def R(img):
        return cv2.resize(img, (320, 240))

    panel_top = cv2.hconcat([R(bird), R(dbg["roi"])] )
    panel_bot = cv2.hconcat([R(dbg["white"]), R(dbg["sliding"])])
    return cv2.vconcat([panel_top, panel_bot])


##############################################
# 12. 메인 루프 (단일 카메라 → 주행 + YOLO 두 화면 출력)
##############################################
print("[START] 주행 + YOLO + ArUco 시스템 실행")

frame_idx = 0
annotated = None  # 마지막 YOLO 결과 프레임 보관

try:
    while True:
        ret, frame = cap.read()
        if not ret:
            time.sleep(0.01)
            continue

        frame_idx += 1

        # 공통 원본 프레임 (카메라 기준 회전 보정)
        frame = cv2.flip(frame, -1)

        # 주행용 처리를 위해 320x240으로 다운스케일한 프레임 사용
        frame_small = cv2.resize(frame, (bird_width, bird_height))

        # ----- (1) 주행용 버드뷰 / 차선 인식 -----
        bird = cv2.warpPerspective(frame_small, M, (bird_width, bird_height))
        center_x, is_horizontal_bar, dbg = detect_lane_center(bird)
        view = bird.copy()

        if center_x is not None:
            error = center_x - frame_center_x

            steering = pid.compute(error)
            steering *= 0.5
            steering = float(np.clip(steering, -max_steering, max_steering))
            if is_horizontal_bar:
                steering = 0

            L = int(np.clip(base_speed + steering, 0, 127))
            Rm = int(np.clip(base_speed - steering, 0, 127))

            tiki.set_motor_power(tiki.MOTOR_LEFT, L)
            tiki.set_motor_power(tiki.MOTOR_RIGHT, Rm)
        else:
            tiki.stop()

        panel = make_panel(view, dbg)

        # ----- (2) 객체 인식 (YOLO + ArUco) : N프레임마다만 수행 -----
        # 8프레임마다 한 번씩만 YOLO 실행 (해상도는 그대로, 빈도만 더 낮춤)
        frame_for_aruco = frame.copy()

        # (A) ArUco 포인트 감지 (원본 해상도 사용)
        corners, ids, _ = aruco_detector.detectMarkers(frame_for_aruco)

        if ids is not None:
            ids_list = [int(i[0]) for i in ids]

            for marker_id in ids_list:
                if marker_id in marker_to_point:
                    point_name = marker_to_point[marker_id]

                    if point_name not in visited_points:
                        visited_points.add(point_name)
                        print(f"[POINT] {point_name} 최초 통과 → 서버 전송")
                        send_to_server(point_name, {"info": "passing point"})

            cv2.aruco.drawDetectedMarkers(frame_for_aruco, corners, ids)

            # (B) YOLO 객체 감지 (프레임 단위, 카메라 원본 해상도 640x480 사용)
        if frame_idx % 5 == 0:  # 8프레임마다 한 번씩만 YOLO 실행 (해상도는 그대로, 빈도만 더 낮춤)
            frame_for_yolo = frame.copy()
            
            results = model(
                frame_for_yolo,
                conf=0.45,
            )

            detected_counts = defaultdict(int)

            if results and len(results[0].boxes) > 0:
                classes = results[0].boxes.cls.cpu().numpy()

                for cls in classes:
                    class_name = object_names[int(cls)]
                    detected_counts[class_name] += 1

            if len(detected_counts) > 0:
                print("[DETECTION] 객체 감지:", dict(detected_counts))

            annotated = results[0].plot()
            cv2.putText(
                annotated,
                "Object Detection",
                (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX,
                1,
                (255, 0, 0),
                2,
                cv2.LINE_AA,
            )

        # ----- (3) 두 화면을 Jupyter 위젯으로 출력 -----
        # 주행 화면은 매 프레임 갱신 (낮은 품질 JPEG)
        drive_widget.value = frame_to_bytes_drive(panel)

        # YOLO 화면은 마지막 YOLO 결과가 있을 때만 갱신 (고품질 JPEG)
        if annotated is not None and frame_idx % 2 == 0:  # 너무 자주 갱신하지 않도록 2프레임마다
            yolo_widget.value = frame_to_bytes_yolo(annotated)

        time.sleep(0.02)

except KeyboardInterrupt:
    tiki.stop()
    cap.release()
    cv2.destroyAllWindows()
    print("[END] 프로그램 종료")


로봇 초기화 완료
GST_ARGUS: Creating output stream
CONSUMER: Waiting until producer is connected...
GST_ARGUS: Available Sensor modes :
GST_ARGUS: 3264 x 2464 FR = 21.000000 fps Duration = 47619048 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 3264 x 1848 FR = 28.000001 fps Duration = 35714284 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1920 x 1080 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1640 x 1232 FR = 29.999999 fps Duration = 33333334 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1280 x 720 FR = 59.999999 fps Duration = 16666667 ; Analog Gain range min 1.000000, max 10.625000; Exposure Range min 13000, max 683709000;

GST_ARGUS: 1280 x 720 FR = 120.000005 fps Duration = 8333333 ; Analog Gain range min 1.000000, max 10.62500



HBox(children=(Image(value=b'', format='jpeg', layout="Layout(height='480px', width='640px')"), Image(value=b'…

[START] 주행 + YOLO + ArUco 시스템 실행

0: 320x416 1 mortar, 76.3ms
Speed: 130.3ms preprocess, 76.3ms inference, 75.1ms postprocess per image at shape (1, 3, 320, 416)
[DETECTION] 객체 감지: {'mortar': 1}

0: 320x416 1 car, 1 mortar, 74.7ms
Speed: 7.5ms preprocess, 74.7ms inference, 7.5ms postprocess per image at shape (1, 3, 320, 416)
[DETECTION] 객체 감지: {'mortar': 1, 'car': 1}

0: 320x416 (no detections), 74.7ms
Speed: 6.8ms preprocess, 74.7ms inference, 2.6ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 (no detections), 75.8ms
Speed: 6.9ms preprocess, 75.8ms inference, 2.4ms postprocess per image at shape (1, 3, 320, 416)

0: 320x416 1 car, 76.5ms
Speed: 6.9ms preprocess, 76.5ms inference, 5.7ms postprocess per image at shape (1, 3, 320, 416)
[DETECTION] 객체 감지: {'car': 1}

0: 320x416 1 car, 76.7ms
Speed: 7.9ms preprocess, 76.7ms inference, 5.7ms postprocess per image at shape (1, 3, 320, 416)
[DETECTION] 객체 감지: {'car': 1}

0: 320x416 (no detections), 75.8ms
Speed: 7.7ms preproce