In [1]:
############################################
# 0) UDP 수신 쓰레드  ―  종료 지원 버전
############################################
import socket, threading

UDP_PORT = 5555
flag = 0                         # 0/1 = run, 2 = stop
stop_event = threading.Event()   # ← 종료 플래그

def listen_udp():
    global flag
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(("", UDP_PORT))
    sock.settimeout(0.5)         # 0.5 초마다 깨어나 stop 이벤트 확인
    print(f"[LISTEN] UDP *:{UDP_PORT}")

    while not stop_event.is_set():
        try:
            data, addr = sock.recvfrom(8)
            msg = int(data.decode().strip())
            if msg in (0, 1, 2):
                flag = msg
                print(f"[RX] {addr[0]} : {msg}")
        except socket.timeout:
            continue             # 타임아웃 → stop_event 다시 체크
        except Exception as e:
            print("decode err:", e)

    sock.close()
    print("[UDP] listener terminated")

listener = threading.Thread(target=listen_udp, daemon=True)
listener.start()


[LISTEN] UDP *:5555


In [2]:
import torchvision
import torch

model = torchvision.models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(512, 2)

model.load_state_dict(torch.load('best_steering_model_big3.pth'))

device = torch.device('cuda')
model = model.to(device)
model = model.eval().half()

import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

from IPython.display import display
import ipywidgets
import traitlets
from jetbot import Camera, bgr8_to_jpeg

camera = Camera()

image_widget = ipywidgets.Image()

traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

display(image_widget)

from jetbot import Robot

robot = Robot()

speed_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.001, value=0.001, description='steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='steering bias')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)


x_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, description='x')
y_slider = ipywidgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='y')
steering_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, description='steering')
speed_slider = ipywidgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='speed')

display(ipywidgets.HBox([y_slider, speed_slider]))
display(x_slider, steering_slider)

[RX] 192.168.0.16 : 1


Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

FloatSlider(value=0.0, description='speed gain', max=1.0, step=0.01)

FloatSlider(value=0.001, description='steering gain', max=1.0, step=0.001)

FloatSlider(value=0.0, description='steering kd', max=0.5, step=0.001)

FloatSlider(value=0.0, description='steering bias', max=0.3, min=-0.3, step=0.01)

HBox(children=(FloatSlider(value=0.0, description='y', max=1.0, orientation='vertical'), FloatSlider(value=0.0…

FloatSlider(value=0.0, description='x', max=1.0, min=-1.0)

FloatSlider(value=0.0, description='steering', max=1.0, min=-1.0)

In [3]:
import time

# ────── 매뉴버 파라미터 ──────
TURN_SPEED         = 0.1   # 회전 속도
DRIVE_SPEED        = 0.15   # 전진/후진 속도
DANGER_TURN_TIME   = 1.5   # 좌회전(90°) 지속 시간 (s)
DANGER_DRIVE_TIME  = 1.5   # 전진 지속 시간 (s)
RECOVERY_BACK_TIME = 1.5   # 후진 지속 시간 (s)
RECOVERY_TURN_TIME = 1.5   # 우회전(90°) 지속 시간 (s)

# 상태 변수
danger_action_done  = False
recovery_action_done = False
angle = angle_last = 0.0

def execute(change):
    global angle, angle_last, flag
    global danger_action_done, recovery_action_done

    img = change['new']

    # 1) 위험 상태(danger) 매뉴버
    if flag == 2:
        if not danger_action_done:
            # 좌회전 90°
            robot.left_motor.value  = -TURN_SPEED
            robot.right_motor.value =  TURN_SPEED
            time.sleep(DANGER_TURN_TIME)
            # 전진
            robot.left_motor.value  = DRIVE_SPEED
            robot.right_motor.value = DRIVE_SPEED
            time.sleep(DANGER_DRIVE_TIME)
            # 정지
            robot.stop()
            danger_action_done   = True
            recovery_action_done = False
        return

    # 2) 정상 복귀 시 리커버리 매뉴버
    if flag in (0,1) and danger_action_done and not recovery_action_done:
        # 후진
        robot.left_motor.value  = -DRIVE_SPEED
        robot.right_motor.value = -DRIVE_SPEED
        time.sleep(RECOVERY_BACK_TIME)
        # 우회전 90°
        robot.left_motor.value  = TURN_SPEED
        robot.right_motor.value = -TURN_SPEED
        time.sleep(RECOVERY_TURN_TIME)
        # 정지
        robot.stop()
        danger_action_done = False
        recovery_action_done = True
        # 이후 정상 주행으로 넘어갑니다

    # 3) 정상 주행 루틴 (flag 0 or 1)
    #    여기부터는 원래 execute() 안의 주행 코드 복사
    xy  = model(preprocess(img))[0].detach().float().cpu().numpy()
    x   = xy[0]
    y   = (0.5 - xy[1]) / 2.0
    y   = max(0.0, min(1.0, y))

    # 슬라이더·위젯 업데이트
    x_slider.value        = x
    y_slider.value        = y
    speed_slider.value    = speed_gain_slider.value

    # PID 계산
    angle = np.arctan2(x, y)
    pid   = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
    angle_last = angle

    steering = pid + steering_bias_slider.value
    steering_slider.value = steering

    # 모터 구동
    left  = np.clip(speed_slider.value + steering, 0.0, 1.0)
    right = np.clip(speed_slider.value - steering, 0.0, 1.0)
    robot.left_motor.value, robot.right_motor.value = left, right

# 첫 프레임 실행 & 옵저버 등록
execute({'new': camera.value})
camera.observe(execute, names='value')


In [None]:
############################################
# 3) 중지 셀 (원할 때 실행) ---------------
############################################
import time

stop_event.set()      # 쓰레드에게 “끝내라” 신호
listener.join()       # 쓰레드 종료 대기

camera.unobserve_all('value')
time.sleep(0.1)
robot.stop()
camera.stop()


[UDP] listener terminated
