# Part B Group 3

Code for following human and avoiding obstacle

In [1]:
import tensorrt as trt
from tensorrt_model import TRTModel
from ssd_tensorrt import load_plugins, parse_boxes,TRT_INPUT_NAME, TRT_OUTPUT_NAME
import ctypes
import numpy as np
import cv2
import os
import ctypes
    
mean = 255.0 * np.array([0.5, 0.5, 0.5])
stdev = 255.0 * np.array([0.5, 0.5, 0.5])

def bgr8_to_ssd_input(camera_value):
    x = camera_value
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1)).astype(np.float32)
    x -= mean[:, None, None]
    x /= stdev[:, None, None]
    return x[None, ...]

class ObjectDetector(object):
    
    def __init__(self, engine_path, preprocess_fn=bgr8_to_ssd_input):
        logger = trt.Logger()
        trt.init_libnvinfer_plugins(logger, '')
        load_plugins()
        self.trt_model = TRTModel(engine_path, input_names=[TRT_INPUT_NAME],output_names=[TRT_OUTPUT_NAME, TRT_OUTPUT_NAME + '_1'])
        self.preprocess_fn = preprocess_fn
        
    def execute(self, *inputs):
        trt_outputs = self.trt_model(self.preprocess_fn(*inputs))
        return parse_boxes(trt_outputs)
    
    def __call__(self, *inputs):
        return self.execute(*inputs)

model = ObjectDetector('ssd_mobilenet_v2_coco.engine')


In [2]:
#use traitlets and widgets to display the image in Jupyter Notebook
import traitlets
from traitlets.config.configurable import SingletonConfigurable

#use opencv to covert the depth image to RGB image for displaying purpose
import cv2
import numpy as np

#using realsense to capture the color and depth image
import pyrealsense2 as rs

#multi-threading is used to capture the image in real time performance
import threading

class Camera(SingletonConfigurable):
    
    #this changing of this value will be captured by traitlets
    color_value = traitlets.Any()
    
    def __init__(self):
        super(Camera, self).__init__()
        
        #configure the color and depth sensor
        self.pipeline = rs.pipeline()
        self.configuration = rs.config()  
        
        #set resolution for the color camera
        self.color_width = 640
        self.color_height = 480
        self.color_fps = 30
        self.configuration.enable_stream(rs.stream.color, self.color_width, self.color_height, rs.format.bgr8, self.color_fps)

        #set resolution for the depth camera
        self.depth_width = 640
        self.depth_height = 480
        self.depth_fps = 30
        self.configuration.enable_stream(rs.stream.depth, self.depth_width, self.depth_height, rs.format.z16, self.depth_fps)

        #flag to control the thread
        self.thread_runnning_flag = False
        
        #start the RGBD sensor
        self.pipeline.start(self.configuration)
        self.pipeline_started = True
        frames = self.pipeline.wait_for_frames()

        #start capture the first color image
        color_frame = frames.get_color_frame()   
        image = np.asanyarray(color_frame.get_data())
        self.color_value = image

        #start capture the first depth image
        depth_frame = frames.get_depth_frame()           
        self.depth_image = np.asanyarray(depth_frame.get_data())
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(self.depth_image, alpha=0.03), cv2.COLORMAP_JET)
        self.depth_value = depth_colormap   

    def _capture_frames(self):
        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
            frames = self.pipeline.wait_for_frames() #receive data from RGBD sensor
            
            color_frame = frames.get_color_frame() #get the color image
            image = np.asanyarray(color_frame.get_data()) #convert color image to numpy array
            self.color_value = image #assign the numpy array image to the color_value variable 

            depth_frame = frames.get_depth_frame() #get the depth image           
            self.depth_image = np.asanyarray(depth_frame.get_data()) #convert depth data to numpy array
            #conver depth data to BGR image for displaying purpose
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(self.depth_image, alpha=0.03), cv2.COLORMAP_JET)
            self.depth_value = depth_colormap #assign the color BGR image to the depth value
    
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread       

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object
camera = Camera.instance()
camera.start() # start capturing the data


In [3]:
import ipywidgets.widgets as widgets
from IPython.display import display, HTML

width = 640
height = 480
left=260
right=380

image_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=1, description='tracked label')
display(widgets.VBox([
    widgets.HBox([image_widget,]),
    label_widget
]))


import time
from RobotClass import Robot

#initialize the Robot class
robot = Robot()

def detection_center(detection):
    """Computes the center x, y coordinates of the object"""
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)
    
def norm(vec):
    """Computes the length of the 2D vector"""
    return np.sqrt(vec[0]**2 + vec[1]**2)

def closest_detection(detections):
    """Finds the detection closest to the image center"""
    closest_detection = None
    for det in detections:
        center = detection_center(det)
        if closest_detection is None:
            closest_detection = det
        elif norm(detection_center(det)) < norm(detection_center(closest_detection)):
            closest_detection = det
    return closest_detection

def processing(change):
    """processes the camera input to follow humans and avoid obstacles"""
    
    image = change['new']
     
    imgsized= cv2.resize(image,(300,300))
    detections = model(imgsized)
    
    matching_detections = [d for d in detections[0] if d['label'] == int(label_widget.value)]
    #matching all the identified objects other than humans and mark them as obstacles
    obstacles = [d for d in detections[0] if d['label'] != int(label_widget.value)]
    #find the closest obstacle
    closet_obstacle = closest_detection(obstacles)
    a=0
    b=0
    obstacle_depth=701
    if closet_obstacle is not None:
        #calculating the center of the closest obstacle
        a=int((width * closet_obstacle["bbox"][0] + width * closet_obstacle["bbox"][2])/2)
        b=int((height * closet_obstacle["bbox"][1] + height * closet_obstacle["bbox"][3])/2)
        #getting the depth of the centre of the obstacle from the depth image
        obstacle_depth=camera.depth_image[b,a]
    
   
    #find the closest human
    closest=closest_detection(matching_detections)
    human_x=0
    human_y=0
    human_depth=0
    if closest is not None:
        #calculating the center of the closest human
        human_x=int((width * closest["bbox"][0] + width * closest["bbox"][2])/2)
        human_y=int((height * closest["bbox"][1] + height * closest["bbox"][3])/2)
        #getting the depth of the centre of the human from the depth image
        human_depth=camera.depth_image[human_y,human_x]
    
        
        
    #Region of interst for the whole time for the robot to take the manuvering decisions        
    camera.depth_image[:left,:]=0
    camera.depth_image[290:,:]=0
    camera.depth_image[:,:160]=0
    camera.depth_image[:,right:]=0
    camera.depth_image[camera.depth_image<100]=0
    camera.depth_image[camera.depth_image>1000]=0
    camera.depth_image[0,0]=2000
    
    #plotting bounding boxes over the obstacles
    for i in obstacles:
        bbox=i['bbox']
        cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 0, 255), 2)
    #case when there is no human in the frame
    if len(matching_detections)==0:
        robot.forward(.4)
        if camera.depth_image[camera.depth_image!=0].min()<500:
            robot.left(.8)
    else:
        #case when there is a human in the frame
        center = detection_center(closest)
        bbox=closest["bbox"]
        #plotting bounding boxes over humans  
        cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (255, 0, 0), 2)
        cv2.circle(image,(a,b),10,(0,0,255),-1)
        cv2.circle(image,(human_x,human_y),10,(255,0,0),-1)
        

        if closest is not None and human_depth>600 and human_depth<900:
            robot.stop()

        
        elif closet_obstacle is not None and obstacle_depth>600 and obstacle_depth<800:


            for i in range(0,5000):
                robot.left(1)
            for i in range(0,17000):
                robot.forward(.5)
            for i in range(0,5000):
                 robot.right(1)
                    

        elif closest is not None and human_depth>0 and human_depth<600:

            robot.set_motors(
            -.7,
            -.7,
            -.7,
            -.7)
        else:
            robot.set_motors(
            float(.4 + 2 * center[0]),
            float(.4 - 2 * center[0]),
            float(.4 + 2 * center[0]),
            float(.4 - 2 * center[0]))
            
    image_widget.value = bgr8_to_jpeg(image)
    
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')

VBox(children=(HBox(children=(Image(value=b'', format='jpeg', height='300', width='300'),)), IntText(value=1, …

In [4]:
camera.unobserve_all()
camera.stop()
time.sleep(1.0)
robot.stop()
