# 交通标识牌识别



![title](other_data/01.jpg)

* 模型简介：

    该模型模拟真实汽车行驶在马路上的场景，我们的机器人会在给定的图中进行行驶，当机器人看到斑马线时就开始进行标识牌识别，识别的标识牌有直行、左转、右转、停止、掉头、红灯、绿灯、黄灯，然后在识别到标识牌后做出相应的动作。

* 模型最佳运行环境：室内，白色灯光下。
    模型所需材料：机器人1台，地图1张，标识牌5张，红绿灯1个。

* 模型玩法：
    1. 将机器人放置在地图跑道上；
    2. 将标识牌放置在地图上；
    3. 运行该页面所有代码块；
    4. 调整速度值与转向增益值；
    5. 点击开始按钮开始进行巡线；
    6. 根据实际场景调整巡线（黑色）HSV颜色范围值。

![title](other_data/02.jpg)

## 1.导入所需模块

In [1]:
from nxbot import Robot,event,bgr8_to_jpeg
import torchvision
import torch
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL
import numpy as np
from IPython.display import display
import ipywidgets
import traitlets
import ipywidgets.widgets as widgets
import time
import threading
from PIL import ImageDraw,ImageFont
from PIL import Image
from ipywidgets import Layout, Box, Dropdown, Label
import random


********************************************************
*                NXROBO - 机器人SDK                    *
********************************************************
                 当前SDK版本: 0.6.2
            


## 2.创建可视化小部件

In [2]:
image_widget = widgets.Image(format='jpeg')
mask_widget = widgets.Image(format='jpeg')

# 控制滑块
speed_gain_slider = ipywidgets.FloatSlider(min=0, max=0.3, step=0.01, value=0.13, description='速度')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=2.0, step=0.01, value=1.2, description='转向增益')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.05, step=0.01, value=0.03, description='回正微调')


state_info = widgets.Textarea(
    placeholder='NXROBO',
    description='状态信息',
    disabled=False
)

# HSV默认范围值
H_MIN_slider = ipywidgets.IntSlider(min=0, max=180, step=1, value=0, description='H_MIN')

H_MAX_slider = ipywidgets.IntSlider(min=0, max=180, step=1, value=180, description='H_MAX')

S_MIN_slider = ipywidgets.IntSlider(min=0, max=255, step=1, value=0, description='S_MIN')

S_MAX_slider = ipywidgets.IntSlider(min=0, max=255, step=1, value=255, description='S_MAX')

V_MIN_slider = ipywidgets.IntSlider(min=0, max=255, step=1, value=0, description='V_MIN')

V_MAX_slider = ipywidgets.IntSlider(min=0, max=255, step=1, value=120, description='V_MAX')

HSV_BOX = widgets.VBox([H_MIN_slider,H_MAX_slider,S_MIN_slider,S_MAX_slider,V_MIN_slider,V_MAX_slider])


## 3.创建可编辑HSV颜色阈值功能模块
可以通过拖动滑块去设置需要识别颜色的HSV范围值

In [3]:
def line_hsv():
    h_min = H_MIN_slider.value
    s_min = S_MIN_slider.value
    v_min = V_MIN_slider.value
    
    h_max = H_MAX_slider.value
    s_max = S_MAX_slider.value
    v_max = V_MAX_slider.value
    
    color_lower = np.array([h_min,s_min,v_min])
    color_upper = np.array([h_max,s_max,v_max])
    
    return color_lower,color_upper

## 4.交通标识牌识别模型准备

In [4]:
# 数据预处理。
img_size = 224
mean = 255.0 * np.array([0.52, 0.516, 0.501])
stdev = 255.0 * np.array([0.33, 0.311, 0.33])
normalize = transforms.Normalize(mean, stdev)

def preprocess(image):
    image = cv2.resize(image,(img_size, img_size),interpolation=cv2.INTER_CUBIC)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = image.transpose((2, 0, 1))
    image = np.ascontiguousarray(image, dtype=np.float32)
    image = normalize(torch.from_numpy(image)).unsqueeze(0).cuda()
    return image

# 加载交通标识牌识别模型
model_path = '../../../models/local/mobilenetv2/signal_classification.pth'
detect_model = torch.load(model_path)
detect_model.cuda()

# 测试模型是否正常
try:
    img_data = np.ones([img_size, img_size, 3],np.float32)
    detect_model(preprocess(img_data))
    print('模型已加载！')
except Exception as result:
    print('请检查模型是否正确',result)

模型已加载！


## 5.创建交通标识牌类别与机器人状态信息

In [5]:
global label_list
label_list = ['bg','直行','左转','右转','停止标志','掉头','红灯','红黄灯','绿灯','黄灯']
# 通过字典表示标识牌其识别到的次数
init_signal = {'bg':0, '直行':0,'左转':0,'右转':0,'停止标志':0,'掉头':0,'红灯':0,'红黄灯':0,'绿灯':0,'黄灯':0}

global rbs
rbs = {'cx':None,'dis_center':None,'task':None,'label':None,'last_label':'','steering':0,'find_turn':False,
       'interval':11,'停止标志':0,'直行':0,'line_area':False,'last_dis':200,'start_time':0,'start_button':True}

global signals
signals = init_signal

## 6.创建中文字体显示模块

In [6]:
def cv2ImgAddText(img, text, left, top, textColor=(0, 255, 0), textSize=20):
    if (isinstance(img, np.ndarray)):  #判断是否OpenCV图片类型
        img = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
    draw = ImageDraw.Draw(img)
    fontText = ImageFont.truetype(
        "/usr/share/fonts/windows/simhei.ttf", textSize, encoding="utf-8")
    draw.text((left, top), text, textColor, font=fontText)
    return cv2.cvtColor(np.asarray(img), cv2.COLOR_RGB2BGR)

## 7.创建开始与停止按钮

In [7]:
# 创建按钮外观。
button_layout = widgets.Layout(width='100px', height='80px', align_self='center')

#创建开始与停止按钮。
start_button = widgets.Button(description='开始运行', layout=button_layout)
stop_button = widgets.Button(description='停止运行', button_style='danger', layout=button_layout)

def start(change):
    global rbs
    rbs['task']='find_color_white'
    rbs['start_button'] = True
def stop(change):
    global rbs
    rbs['task']=None

# 将开始与停止绑定到按钮上
start_button.on_click(start)
stop_button.on_click(stop)


## 8.定义颜色识别功能模块

In [8]:
kernel = np.ones((5, 5), np.uint8)

def color_detection(image,rbs):
    hsv=cv2.cvtColor(image,cv2.COLOR_BGR2HSV)
    # HSV颜色范围
    color_lower, color_upper = line_hsv()
    # 固定HSV颜色范围
    mask=cv2.inRange(hsv,color_lower,color_upper)
    # 图像腐蚀
    mask=cv2.erode(mask,kernel,iterations=1)
    # 图像膨胀
    mask=cv2.dilate(mask,kernel,iterations=1)
    # 图像滤波，卷积核5×5，标准差为0
    mask=cv2.GaussianBlur(mask,(5,5),0)
    
    regin = np.array([[(40, image.shape[0]), (100, 250), (200, 250), (260, image.shape[0])]])
    mask_ = np.zeros_like(mask)
    mask_color = 255  # src3图像的通道数是1，且是灰度图像，所以颜色值在0-255
    cv2.fillPoly(mask_, regin, mask_color)
    mask = cv2.bitwise_and(mask, mask_)
    # 显示二值图
    mask_widget.value = bgr8_to_jpeg(mask)
    
    # 找出滤波后的图像轮廓
    cnts=cv2.findContours(mask.copy(),cv2.RETR_EXTERNAL,cv2.CHAIN_APPROX_SIMPLE)[0]
    # 如果有轮廓
    if len(cnts)>0:
        rbs['last_dis'] = 200
        for cnt in cnts:
            area = int(cv2.contourArea(cnt))
            # 限制轮廓大小
            if area in range(500,3000):
                rbs['line_area'] = area
                # 计算轮廓顶点
                cx,cy,w,h = cv2.boundingRect(cnt)
                cv2.rectangle(image, (cx, cy), (cx + w, cy + h), (0, 255, 0), 2)
                cv2.drawContours(image,[cnt],0,(0,0,255),-1)
                box_center = np.asarray([cx,cy])
                img_center = np.asarray([image.shape[1]/2,image.shape[0]/2])
                # 计算轮廓到图像中心点的距离
                rbs['dis_center'] = np.sqrt(np.sum((box_center - img_center) ** 2))
                # 找出距离中心点最近的轮廓
                if rbs['dis_center']<rbs['last_dis']:
                    rbs['last_dis']=rbs['dis_center']
                    rbs['cx'] = cx
                
                
    return image

## 9.定义交通标识牌识别模块

In [9]:
def signal_detection(image,signals,rbs):
    # 图像经过预处理再输入模型
    out = detect_model(preprocess(image))
    # 得出预测概率最大的概率值与对应的索引编号
    prob, index = torch.max(F.softmax(out,1), 1)
    # 通过索引编号找到对应实际类别
    label = label_list[index]
    # 如果不是背景类别
    if label!='bg':
        # 判断是否连续看到相同类别
        if rbs['last_label'] == label:
            signals[label]+=1
        else:
            signals[label]=0
        rbs['last_label'] = label
        display_txt = '检测到标志：'+ label
        # 如果连续2次看到相同类别，就将该类别进行保存
        if signals[label]>2:
            rbs['label']=label
        image = cv2ImgAddText(image, display_txt, 20, 20, (255, 0, 0), 25)
        
    else:
        rbs['label']=None
        signals['bg']+=1
        if signals['bg']>20:
            # 初始化所有标志数量为0
            signals=init_signal
            signals[label]=0
    return image

## 10.定义图像获取线程

In [10]:
global detect_flag
detect_flag = True
def detection():
    while detect_flag:
        time.sleep(0.03)
        image = rbt.camera.read()
        global rbs
        global signals
        
        if image is not None:
            if rbs['task'] is not None:
                # 如果任务为交通标识牌检测
                if rbs['task']=='detect_signal':
                    image = signal_detection(image,signals,rbs)
                # 如果是巡线
                else:
                    image = color_detection(image,rbs)
            # 将图像传输给图像显示部件
            image_widget.value = bgr8_to_jpeg(cv2.resize(image, (320, 240)))
            
# 创建线程
process1 = threading.Thread(target=detection,)

## 11.定义任务执行线程
该线程作为机器人动作执行模块，当机器人执行相应的任务，机器人就会做出相应的动作

In [11]:
global speech_word
speech_word=None

def action():
    global speech_word
    last_steering = 0
    lost_line_time = 0
    first_start = True
    while detect_flag:
        time.sleep(0.1)
        global rbs
        global signals
        rbs['interval'] = time.time()-rbs['start_time']
        if rbs['task'] is not None:
            state_info.value = rbs['task']
        if rbs['task'] is None:
            rbt.base.move(0,0,0)
        elif rbs['task']=='find_color_white':
            speed = speed_gain_slider.value
            if rbs['cx'] is not None:
                steering = (150-rbs['cx'])/150 * steering_gain_slider.value
                steering = (steering - (steering + last_steering)*steering_dgain_slider.value)
                rbt.base.move(speed, 0, steering)
                if rbs['line_area'] in range(1500,3000):
                    if first_start:
                        rbt.base.move(0, 0, 0)
                        state_info.value = '发现斑马线'
                        speech_word = '发现斑马线'
                        rbs['task'] = 'find_zebera_line'
                        first_start = False
                    else:
                        if rbs['interval']>10:
                            rbt.base.move(0, 0, 0)
                            state_info.value = '发现斑马线'
                            speech_word = '发现斑马线'
                            rbs['task'] = 'find_zebera_line'
                        
                       
                last_steering = steering
                lost_line_time = 0
                rbs['start_button']=False
            else:
                state_info.value = '没有找到line'
                lost_line_time+=1

                if lost_line_time<20 and rbs['start_button']==False:
                    rbt.base.move(speed*0.8, 0, last_steering)
                else:
                    rbt.base.move(0, 0, 0)
            rbs['cx'] = None

        elif rbs['task']=='find_zebera_line':
            
            state_info.value = '开始检测标识牌'
            speech_word = '开始检测标识牌'
            rbt.base.move(0,0,0)
            rbt.base.set_ptz(20)
            rbs['task']='detect_signal'
            time.sleep(3)

        elif rbs['task']=='detect_signal':
            if rbs['label'] is None:
                state_info.value = '没有发现标识牌'
                speech_word = '没有发现标识牌'
                rbt.base.set_ptz(-45)
                time.sleep(2)
                rbs['task'] = 'flollowing_line'
                rbs['start_time'] = time.time()
            else:
                speech_word = '识别到'+ rbs['label']
                if rbs['label'] in ['停止标志','红灯','黄灯','红黄灯']:
                    rbt.base.move(0,0,0)
                else:
                    
                    rbt.base.set_ptz(-45)
                    time.sleep(1)
                    if rbs['label'] == '绿灯':
                        rbs['start_time'] = time.time()
                        rbs['task']='flollowing_line'
                        
                    elif rbs['label'] =='直行':
                        rbs['start_time'] = time.time()
                        rbs['task']='flollowing_line'
                        
                    elif rbs['label'] == '左转':
                        rbs['start_time'] = time.time()
                        rbs['task'] = 'flollowing_line'
                        rbs['find_turn'] = True
                        
                    elif rbs['label'] == '右转':
                        rbs['start_time'] = time.time()
                        rbs['task'] = 'flollowing_line'
                        rbs['find_turn'] = True
                        
                    elif rbs['label'] == '掉头':
                        if random.random()<0.5:
                            rbt.base.turn(180,True)
                        rbs['task'] = 'find_color_white'
                        rbs['label']=None
                    
        
        elif rbs['task']=='flollowing_line':
            speed = speed_gain_slider.value
            
            if rbs['find_turn']:
                if rbs['label']=='左转':
                    if rbs['line_area'] in range(1300,3000):
                        if rbs['interval']>2:
                            rbt.base.forward(0.12, 1, True)
                            rbt.base.turn(90)
                            time.sleep(5)
                            rbs['find_turn'] = False
                            rbs['label']=None
                            rbs['task']='find_color_white'
                            rbs['start_time'] = time.time()
                    else:
                        if rbs['cx'] is not None:
                            steering = (150-rbs['cx'])/150 * steering_gain_slider.value
                            steering = (steering - (steering + last_steering)*steering_dgain_slider.value)
                            rbt.base.move(0.13, 0, steering)
                        else:
                            rbt.base.move(0, 0, 0)

                elif rbs['label']=='右转':
                    if rbs['line_area'] in range(1300,3000):
                        if rbs['interval']>2:
                            rbt.base.forward(0.12, 1, True)
                            rbt.base.turn(-90)
                            time.sleep(5)
                            rbs['find_turn'] = False
                            rbs['label']=None
                            rbs['task']='find_color_white'
                            rbs['start_time'] = time.time()
                    else:
                        if rbs['cx'] is not None:
                            steering = (150-rbs['cx'])/150 * steering_gain_slider.value
                            steering = (steering - (steering + last_steering)*steering_dgain_slider.value)
                            rbt.base.move(0.13, 0, steering)
                        else:
                            rbt.base.move(0, 0, 0) 
                
            else:    
                if rbs['interval']<5:
                    if rbs['cx'] is not None and int(rbs['cx']) in range(80,220):
                        steering = (150-rbs['cx'])/150 * steering_gain_slider.value
                        steering = (steering - (steering + last_steering)*steering_dgain_slider.value)
                        rbt.base.move(speed, 0, steering)
                        lost_line_time = 0
                    else:
                        lost_line_time+=1
                        
                    if lost_line_time<30:
                        rbt.base.move(speed*0.8, 0, last_steering)
                    else:
                        rbt.base.move(0, 0, 0)
                        
                else:
                    rbs['task']='find_color_white'
                    rbs['label']=None
             
            last_steering = steering
        
process2 = threading.Thread(target=action,)

## 12.定义语音合成线程
该线程用于接收需要外放的词语，并通过语音合成进行执行

In [12]:
def run_speech():
    last_word = ''
    while detect_flag:
        time.sleep(0.05)
        global speech_word
        if speech_word is not None:
            state_info.value = speech_word
            if last_word!= speech_word:
                rbt.speech.play_text(speech_word,True)
                last_word = speech_word
                speech_word = None
# 创建线程
process3 = threading.Thread(target=run_speech,)

## 13.启动机器人，启动设定的线程，打开可视化小部件
* 运行成功后会显示机器人视角（彩色图像和巡线HSV过滤后的二值图像），机器人参数调整小部件。

* 默认有固定速度与转向增益值，只需要点击开始即可运行。

* 当机器人识别不到巡线的颜色时（可以通过HSV过滤后的二值图像判断），手动调整HSV颜色范围值。

In [13]:
rbt = Robot()
rbt.connect()
rbt.camera.start()
rbt.base.set_ptz(-45)
# 启动线程
process1.start()
process2.start()
process3.start()
display(widgets.HBox([image_widget,mask_widget]))
display(widgets.VBox([speed_gain_slider,steering_gain_slider, steering_dgain_slider]))
display(widgets.HBox([start_button,stop_button]))
display(HSV_BOX,state_info)

HBox(children=(Image(value=b'', format='jpeg'), Image(value=b'', format='jpeg')))

VBox(children=(FloatSlider(value=0.13, description='速度', max=0.3, step=0.01), FloatSlider(value=1.2, descripti…

HBox(children=(Button(description='开始运行', layout=Layout(align_self='center', height='80px', width='100px'), st…

VBox(children=(IntSlider(value=0, description='H_MIN', max=180), IntSlider(value=180, description='H_MAX', max…

Textarea(value='', description='状态信息', placeholder='NXROBO')

## 18.断开与机器人的连接

In [14]:
# detect_flag = False
# rbt.disconnect()