# 5장. GUI 원격 제어기로 인공지능 무인운반차량(AGV) 제어하기

유저는 GUI 원격 제어기의 버튼 제어부를 터치하여 명령을 publish 합니다.  
publish 된 명령어들은 인공지능 무인운반차량(AGV) 에서 subscribe 하여 동작합니다.

인공지능 무인운반차량(AGV)는 1초에 한번 씩, 랜덤 값을 생성하여 publish 합니다.  
publish 된 데이터들은 GUI 원격 제어기에서 subscribe 하여 sensingTable에 표시합니다.

## 라이브러리 가져오기

In [1]:
from IPython.display import display
import ipywidgets
import ipywidgets.widgets as widgets
import traitlets
from jetbot import Robot, Camera, bgr8_to_jpeg, ObjectDetector
from SCSCtrl import TTLServo

import threading

import paho.mqtt.client as mqtt
from datetime import datetime
import pytz
import time
import json
import random as rd
import cv2
import base64
import torch
import torchvision
import torch.nn.functional as F
import numpy as np
print("namseok")

Succeeded to open the port
Succeeded to change the baudrate
namseok


## 카메라, 로봇 객체 초기화 하기

In [2]:
robot = Robot()
camera = Camera.instance(width=300, height=300)
model = ObjectDetector('ssd_mobilenet_v2_coco.engine')
image = widgets.Image(format='jpeg', width=300, height=300)
camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)
detections = model(camera.value)
TTLServo.xyInput(150, 100)
TTLServo.servoAngleCtrl(4, -10, 1, 150)
print("namseok")

namseok


In [3]:
# control 위젯 코드
blocked_widget = widgets.FloatSlider(min=0.0, max=1.0, value=0.0, description='blocked')
image_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=1, description='tracked label')
speed_widget = widgets.FloatSlider(value=0.4, min=0.0, max=1.0, description='speed')
turn_gain_widget = widgets.FloatSlider(value=0.8, min=0.0, max=2.0, description='turn gain')

# widget 출력 코드
display(widgets.VBox([
    widgets.HBox([image_widget, blocked_widget]),
    label_widget,
    speed_widget,
    turn_gain_widget
]))

width = int(image_widget.width)
height = int(image_widget.height)

# 탐지된 객체 중심 구하는 함수
def detection_center(detection):
    bbox = detection['bbox']
    #if(detection['label'] != 1): 
        #print(detection['label'])
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)

# 중심점을 이용해 벡터의 Euclidean Norm 구하는 함수
def norm(vec):
    return np.sqrt(vec[0]**2 + vec[1]**2)

# 가장 가까운 객체 찾는 함수
def closest_detection(detections):
    closest_detection = None
    for det in detections:
        center = detection_center(det)
        if closest_detection is None:
            closest_detection = det
        elif norm(detection_center(det)) < norm(detection_center(closest_detection)):
            closest_detection = det
    return closest_detection

def execute(change):
    image = change['new']
        
    # 객체 탐지
    detections = model(image)
    
    # 탐지된 객체 중 내가 특정한 객체인지 체크
    matching_detections = [d for d in detections[0] if d['label'] == 44]
    
    # 내가 특정한 객체 중 가장 가까운 객체에 box 그리는 코드
    det = closest_detection(matching_detections)
    if det is not None:
        bbox = det['bbox']
        cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 255, 0), 5)
    
    # 탐지된 객체 없으면 직진
    if det is None:
        robot.forward(float(speed_widget.value))
        
    # 있다면, 객체 쪽으로 회전
    else:
        # 카메라의 중심에서부터 내가 특정한 객체와의 거리에 따라 AGV 회전
        center = detection_center(det)
        robot.set_motors(
            float(speed_widget.value + turn_gain_widget.value * center[0]),
            float(speed_widget.value - turn_gain_widget.value * center[0])
        )
    
    # 이미지 출력
    image_widget.value = bgr8_to_jpeg(image)


VBox(children=(HBox(children=(Image(value=b'', format='jpeg', height='300', width='300'), FloatSlider(value=0.…

## widget 생성하기

widget 은 간단하게 디자인하였습니다.

Start 버튼을 눌러서, AGV에서 GUI Controller 로 랜덤으로 생성한 데이터를 publish 합니다.  
manual 버튼을 누르면, GUI Controller 에서 버튼을 눌러서 AGV를 제어할 수 있습니다.

In [4]:
lbl1 = ipywidgets.Label(value="MQTT Test")

startBtn = widgets.Button(description='Start', button_style='info')

lbl2 = ipywidgets.Label(value="sensingData")
lbl3 = ipywidgets.Label(value="num1 : ")
num1lbl = ipywidgets.Label(value="0")
hbox1 = widgets.HBox([lbl3, num1lbl] )

lbl4 = ipywidgets.Label(value="num2 : ")
num2lbl = ipywidgets.Label(value="0")
hbox2 = widgets.HBox([lbl4, num2lbl] )
vbox1 = widgets.VBox([startBtn, lbl2, hbox1, hbox2])

manualBtn = widgets.Button(description='Manual', button_style='success', disabled = True)
lbl5 = ipywidgets.Label(value="commandData")
lbl6 = ipywidgets.Label(value="command : ")
commandlbl = ipywidgets.Label(value="None")
hbox3 = widgets.HBox([lbl6, commandlbl] )
vbox2 = widgets.VBox([manualBtn, lbl5, hbox3])

hbox4 = widgets.HBox([vbox1, vbox2] )

display(lbl1, hbox4)

servoPos_4 = 0

moveStartTor = 0.15
fb_input = 0           # 전방 및 후방 방향의 속도 매개변수를 저장합니다.
lr_input = 0           # 조향에 대한 매개변수를 저장합니다.

# AGV의 이동이 멈춥니다.
def moveStop():
    global fb_input, lr_input
    fb_input = 0
    lr_input = 0
    robot.set_motors(0,0)
    camera.unobserve_all()
    time.sleep(1.0)
    robot.stop()

# AGV의 이동을 제어합니다.
def moveSmoothCtrl(commandType, valueInput):
    global fb_input, lr_input
    if commandType == 'y':
        fb_input = valueInput
    elif commandType == 'x':
        lr_input = valueInput
    else:
        return

    if abs(fb_input) < moveStartTor and abs(lr_input) < moveStartTor:
        moveStop()
        return
    leftSpeed  = -fb_input + lr_input
    rightSpeed = -fb_input - lr_input
    robot.left_motor.value  = leftSpeed
    robot.right_motor.value = rightSpeed

Label(value='MQTT Test')

HBox(children=(VBox(children=(Button(button_style='info', description='Start', style=ButtonStyle()), Label(val…

## MQTT Protocol을 위한 변수 설정하기

In [5]:
# 한국 시간대 (Asia/Seoul)로 설정
korea_timezone = pytz.timezone("Asia/Seoul")

#Broker IP Address 와 Port
address = "70.12.107.69"
#address = "218.39.192.165"
port = 1883

commandTopic = "AGV/command"
videoTopic = "AGV/video"
video_streamer = None
print("namseok")

namseok


## 인공지능 무인운반차량(AGV) 제어용 함수 정의하기

제어용 함수에 매개변수를 추가하여, arg_string 값으로 세부 동작을 제어할 수 있습니다.

In [6]:
def agv_stop():
    robot.stop()
    
def agv_forward():
    robot.forward(0.4)

def agv_backward():
    robot.backward(0.4)

def agv_left():
    robot.left(0.3)

def agv_right():
    robot.right(0.3)
print("namseok")

namseok


## MQTT protocol을 위한 콜백함수 정의하기

MQTT protocol을 위한 콜백함수를 사용자가 새롭게 정의할 수 있습니다.  
MQTT protocol의 .loop_start() / .loop_stop() 을 이용하면, 내부적으로 thread 로 동작하여 비동기적으로 제어가 가능합니다.

In [7]:
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("connected OK")
    else:
        print("Bad connection Returned code=", rc)

def on_publish(client, userdata, result):
    a = 1
    #print("data published")

def on_message(client, userdata, msg):
    global message
    message = json.loads(msg.payload.decode("utf-8"))
    if "axis" in message:
        axis = message["axis"]
        value = message["value"]
        moveSmoothCtrl(axis, value)
    elif "cmd_string" in message:
        if message["cmd_string"] == "back": moveStop()
        elif message["cmd_string"] == "left": agv_left()
        elif message["cmd_string"] == "right": agv_right()
        elif message["cmd_string"] == "go": #agv_forward()
            execute({'new': camera.value})
            camera.unobserve_all()
            camera.observe(execute, names='value')
print("namseok")

namseok


## sensorReadPublish() 클래스 생성하기

Start 버튼을 누르면, sensorReadPublish() 로 객체를 생성하여 비동기적으로 동작합니다.  
random 라이브러리를 이용하여, 변수 값을 만들고, 현재 시간 데이터와 함께 GUI Remote Controller 로 publish 합니다.

In [8]:
class VideoCapturePublish(threading.Thread):
    def __init__(self):
        super().__init__()
        self.fps = 60
        self.cap = None
        self.th_flag = True

    def run(self):
        while self.th_flag:
            frame = camera.value
            
            #frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            
            frame = cv2.resize(frame, (150, 150))
                
            # 프레임을 JPEG로 압축
            _, buffer = cv2.imencode('.jpg', frame)

            # Base64로 인코딩
            jpg_as_text = base64.b64encode(buffer).decode('utf-8')

            # MQTT로 데이터 전송
            client.publish(videoTopic, jpg_as_text)

            # FPS에 맞게 대기
            time.sleep(1 / self.fps)

    def stop(self):
        self.th_flag = False
        print("Stopping video capture...")
print("namseok")

namseok


In [9]:
# 그리퍼 집기 함수
def grab():
    TTLServo.servoAngleCtrl(4, -10, 1, 150)
    TTLServo.xyInput(85, 150)
    time.sleep(1)
    TTLServo.servoAngleCtrl(4, -70, 1, 150)
    time.sleep(2)
    TTLServo.xyInput(150, 100) #초기상태
    
# 그리퍼 풀기 함수
def loose():
    TTLServo.servoAngleCtrl(4, -10, 1, 150)

## Start 버튼용 함수 정의하고 바인딩하기

Start 버튼을 클릭하면, MQTT Client() 객체를 생성하고, Broker 에 연결을 시도합니다. 
sensorReadPublish() 객체를 생성하고, GUI Controller 로 data를 publish 합니다.  
Manual 버튼을 활성화하고, Start 버튼의 text 를 Stop으로 변경합니다.  

Stop 버튼을 클릭하면, data Publish 를 중단합니다.

In [10]:
client = None
def startPub(change):
    global video_streamer, startBtn, client
    
    if startBtn.description == "Start" :
        client = mqtt.Client()
        client.connect(address, port)
        
        client.on_connect = on_connect
        client.on_publish = on_publish
        client.loop_start()
        
        video_streamer = VideoCapturePublish()
        video_streamer.start()
        
        startBtn.description = "Stop"
        startBtn.button_style = 'danger'
        manualBtn.disabled = False
        
    elif startBtn.description == "Stop" :
        video_streamer.stop()
        
        startBtn.description = "Start"
        startBtn.button_style = 'info'
        #print("Stop", client.is_connected())

startBtn.on_click(startPub)
print("namseok")

namseok


## Manual 버튼용 함수 정의하고 바인딩하기
Manual 버튼을 클릭하면, GUI Controller 에서 보낸 데이터를 Subscribe 하게 됩니다.  
on_message 콜백 함수에서 정의한 것과 같이, GUI Controller 에서 버튼을 누르면 해당 명령어대로 AGV를 조종할 수 있습니다.

In [11]:
def startSub(change):
    global manualBtn, client
    
    if manualBtn.description == "Manual" :
        
        client.subscribe(commandTopic, 1)
        client.on_message = on_message
        
        manualBtn.description = "Auto"
        manualBtn.button_style = 'success'
        

    elif manualBtn.description == "Auto" :
        manualBtn.description = "Manual"
        manualBtn.button_style = 'warning'
        #print("Stop", client.is_connected())
        manualBtn.disabled = True

manualBtn.on_click(startSub)
print("namseok")

namseok


## 프로젝트 종료하기

In [1]:
time.sleep(0.1)
robot.stop()
camera.unobserve_all()
camera.stop()
video_streamer.stop()

print('End')

NameError: name 'time' is not defined