# Part B - Following behaviours

In [9]:
import tensorrt as trt
from tensorrt_model import TRTModel
from ssd_tensorrt import load_plugins, parse_boxes,TRT_INPUT_NAME, TRT_OUTPUT_NAME
import ctypes
import os

#use traitlets and widgets to display the image in Jupyter Notebook
import traitlets
from traitlets.config.configurable import SingletonConfigurable

#use opencv to covert the depth image to RGB image for displaying purpose
import cv2
import numpy as np

#using realsense to capture the color and depth image
import pyrealsense2 as rs

#multi-threading is used to capture the image in real time performance
import threading

import ipywidgets.widgets as widgets
from IPython.display import display, HTML

import time
from RobotClass import Robot

COLORS = {
    "RED": {
        "lower": np.array([136, 87, 111], np.uint8),
        "upper": np.array([180, 255, 255], np.uint8),
        "bgr": (0, 0, 255),
    },
     "GREEN": {
        "lower": np.array([35, 125, 72], np.uint8),
        "upper": np.array([70, 255, 255], np.uint8),
        "bgr": (0, 255, 0),
    },
     "BLUE": {
        "lower": np.array([94, 80, 2], np.uint8),
        "upper": np.array([120, 255, 255], np.uint8),
        "bgr": (255, 0, 0),
    }
}

OBSTACLE_MAX_DEPTH = 500

mean = 255.0 * np.array([0.5, 0.5, 0.5])
stdev = 255.0 * np.array([0.5, 0.5, 0.5])

width = 640
height = 480

width_left_edge = 100
width_right_edge = 200

MAX_DEPTH = 500

robot_state = "FOLLOWING"


MIN_COLOUR_AREA = 5000

color_to_detect = "GREEN"

image_widget = widgets.Image(format='jpeg', width=640, height=480)
# color_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=1, description='tracked label')


In [2]:

def bgr8_to_ssd_input(camera_value):
    """Converts BGR to SSD for input."""
    x = camera_value
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1)).astype(np.float32)
    x -= mean[:, None, None]
    x /= stdev[:, None, None]
    return x[None, ...]

class ObjectDetector(object):
    """Creates a model for detecting objects from the given engine."""
    
    def __init__(self, engine_path, preprocess_fn=bgr8_to_ssd_input):
        logger = trt.Logger()
        trt.init_libnvinfer_plugins(logger, '')
        load_plugins()
        self.trt_model = TRTModel(engine_path, input_names=[TRT_INPUT_NAME],output_names=[TRT_OUTPUT_NAME, TRT_OUTPUT_NAME + '_1'])
        self.preprocess_fn = preprocess_fn
        
    def execute(self, *inputs):
        trt_outputs = self.trt_model(self.preprocess_fn(*inputs))
        return parse_boxes(trt_outputs)
    
    def __call__(self, *inputs):
        return self.execute(*inputs)

model = ObjectDetector('ssd_mobilenet_v2_coco.engine')


### Initialize the camera instance for the Intel realsense sensor D435i

In [3]:
class Camera(SingletonConfigurable):
    """Captures real time camera information from Intel realsense sensor D435i."""
    color_value = traitlets.Any()
    
    def __init__(self):
        super(Camera, self).__init__()
        
        #configure the color and depth sensor
        self.pipeline = rs.pipeline()
        self.configuration = rs.config()  
        
        #set resolution for the color camera
        self.color_width = 640
        self.color_height = 480
        self.color_fps = 30
        self.configuration.enable_stream(rs.stream.color, self.color_width, self.color_height, rs.format.bgr8, self.color_fps)

        #set resolution for the depth camera
        self.depth_width = 640
        self.depth_height = 480
        self.depth_fps = 30
        self.configuration.enable_stream(rs.stream.depth, self.depth_width, self.depth_height, rs.format.z16, self.depth_fps)

        #flag to control the thread
        self.thread_runnning_flag = False
        
        #start the RGBD sensor
        self.pipeline.start(self.configuration)
        self.pipeline_started = True
        frames = self.pipeline.wait_for_frames()

        #start capture the first color image
        color_frame = frames.get_color_frame()   
        image = np.asanyarray(color_frame.get_data())
        self.color_value = image

        #start capture the first depth image
        depth_frame = frames.get_depth_frame()           
        depth_image = np.asanyarray(depth_frame.get_data())
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
        self.depth_value = depth_colormap   
        
    def _capture_frames(self):
        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
            frames = self.pipeline.wait_for_frames() #receive data from RGBD sensor
            
            color_frame = frames.get_color_frame() #get the color image
            image = np.asanyarray(color_frame.get_data()) #convert color image to numpy array
            self.color_value = image #assign the numpy array image to the color_value variable 

            depth_frame = frames.get_depth_frame() #get the depth image           
            depth_image = np.asanyarray(depth_frame.get_data()) #convert depth data to numpy array
            
            #conver depth data to BGR image for displaying purpose
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
            
            self.depth_value = depth_colormap #assign the color BGR image to the depth value
            
            self.depth_image = depth_image             
                
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread       
    
    def right_collision(self):
        """Returns the min depth for all objects on right side of the frame."""
        
        right_image = np.copy(self.depth_image)
        
        right_image[:160,:]=0         # TOP   
        right_image[290:480, :]=0     # BOTTOM
        right_image[160:290, :480]=0  # LEFT
        
        # Clear noise from central area
        right_image[right_image<100]=0
        right_image[right_image>1000]=0
        
        # Set one of the values to be a non-zero to avoid error
        right_image[0,0]=2000
        
        return right_image[right_image!=0].min()
    
    
    def left_collision(self):
        """Returns the min depth for all objects on left side the frame."""

        left_image = np.copy(self.depth_image)
        
        # Set rest area depth values to 0
        left_image[:160,:]=0          # TOP  
        left_image[290:480, :]=0      # BOTTOM
        left_image[160:290,190:]=0    # RIGHT
        
        # Clear noise from central area
        left_image[left_image<100]=0
        left_image[left_image>1000]=0
        
        # Set one of the values to be a non-zero to avoid error
        left_image[0,0]=2000        
        
        return left_image[left_image!=0].min()
            
    def best_direction(self):
        """Returns the direction where the depth distance is highest."""
        
        left_distance = self.left_collision()
        right_distance = self.right_collision()
        
        if (left_distance > right_distance):
            return 'LEFT'
        else:
            return 'RIGHT'
         
def bgr8_to_jpeg(value):
    """convert numpy array to jpeg coded data for displaying."""
    
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object
camera = Camera.instance()
camera.start() # start capturing the data

#initialize the Robot class
robot = Robot()

In [4]:
def detect_color_of_box(x,y,w,h):
    """Get the color area for the given coordinates."""

    imageFrame = np.copy(camera.color_value)
    imageFrame = imageFrame[y:y+h, x:x+w]

    # color space
    hsvFrame = cv2.cvtColor(imageFrame, cv2.COLOR_BGR2HSV)
  

    # define mask        
    mask = cv2.inRange(hsvFrame, COLORS[color_to_detect]["lower"], COLORS[color_to_detect]["upper"])
      
    # Morphological Transform, Dilation for each color and bitwise_and operator between
    # imageFrame and mask determines to detect only that particular color
    kernal = np.ones((5, 5), "uint8")
    
    mask = cv2.dilate(mask, kernal)
    res = cv2.bitwise_and(imageFrame, imageFrame, mask = mask)
   
    # Creating contour to track red color
    contours, hierarchy = cv2.findContours(mask, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
            
    areas = []
    for pic, contour in enumerate(contours):
        area = cv2.contourArea(contour)
        areas.append(area)
          
    sum_area = sum(areas) if len(areas) else 0
    
    return sum_area, imageFrame

In [5]:

def min_depth(bbox):
    """Returns the min depth of the given coordinates."""
    
    depth_image = np.copy(camera.depth_image)
    
    #we only consider the central area of the vision sensor
    depth_image[:int(height * bbox[1]),:]=0
    depth_image[int(height * bbox[3]):,:]=0
    depth_image[:,:int(width * bbox[0])]=0
    depth_image[:,int(width * bbox[2]):]=0
            
    #For object avoidance, we don't consider the distance that are lower than 100mm or bigger than 1000mm
    depth_image[depth_image<100]=0
    depth_image[depth_image>1000]=0
            
    #If all of the values in the depth image is 0, the depth[depth!=0] command will fail
    #we set a specific value here to prevent this failure
    depth_image[0,0]=2000
                    
    return(depth_image[depth_image!=0].min())

# source 1 - [s1]
def detection_center(detection):
    """Computes the center x, y coordinates of the object"""
    
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)
# end of source 1 - [s1]

def detect_section(coord):
    """Detects what section the object is in and calculates the turn gain.
    turn gain is between 0 and 1."""
    
    direction = "CENTER"
    center_x = coord[0] 
    x = center_x * width
    
    if(x < width_left_edge):
        direction= "LEFT"
 
    elif(x > width_right_edge):
        direction= "RIGHT"
    return direction

def move_robot(center):
    """ Move robot based on the center."""
    
    speed = 0.5
    left_speed = float(speed + 0.5 * center[0])
    right_speed = float(speed - 0.5 * center[0])        
    robot.set_motors(left_speed, right_speed, left_speed, right_speed)

def turn_left():
    """Turn robot left on the spot."""
    
    speed = 0.1
    left_speed = float(speed - 0.5 )
    right_speed = float(speed + 0.5 )  
    robot.set_motors(left_speed, right_speed, left_speed, right_speed)


def turn_right():
    """Turn robot right on the spot."""
    speed = 0.1
    left_speed = float(speed + 0.5 )
    right_speed = float(speed - 0.5 )   
    robot.set_motors(left_speed, right_speed, left_speed, right_speed)
    
    
def turn_forward_right():
    """Turn robot forward right on the spot."""
    
    speed = 0.5
    right_speed = float(speed / 5)
    left_speed = float(speed*2 )  
    robot.set_motors(left_speed, right_speed, left_speed, right_speed)
    
def turn_forward_left():
    """Turn robot forward left on the spot."""
    
    speed = 0.5
    left_speed = float(speed / 5)
    right_speed = float(speed *2)  
    robot.set_motors(left_speed, right_speed, left_speed, right_speed)

In [6]:
class Behavior(object):
    """Behaviour class for robot."""
    def __init__(self):
        self.target_center = None
        self.target_direction = None        
        self.target_depth = None
        self.target_last_position = None
        pass
    
    def move(self):
        pass
    
class Follow(Behavior):
    """
        Implements the follow behaviour to find, follow, and stop the robot when near the target.        
    """
    def __init__(self):
        super().__init__()
        
    def find_target(self, _robot_state, use_robot_state=False): 
        """Turn the robot towards the target using target_last_position."""
        if self.target_last_position is not None:
            if use_robot_state and _robot_state == "AVOIDING":
                return
            
            new_turn_direction = detect_section(self.target_last_position)
            if new_turn_direction == "LEFT": 
                turn_left()
            else:
                turn_right()
        
        else:
            robot.stop()
            self.target_detected = False
        
    def target_reached(self):
        """Stop the robot when when in target range."""
        self.target_last_position = None
        robot.stop()
    
    def move(self, center):
        """Move towards the target center when its detected."""
        self.target_center = center
        move_robot(center)
               
        
    
class Avoid(Behavior):
    """Implements the avoid behaviour to move away from ."""
    def __init__(self):
        super().__init__()
        
    def move(self, last_follow_direction):
        """Move away from obstacle based on the target's last_follow_direction and obstacle direction."""
        if self.target_direction == 'CENTER':
            
            if last_follow_direction == 'LEFT':
                turn_forward_left()
            
            elif last_follow_direction == 'RIGHT':
                turn_forward_right()
            
            else:                
                best_direction = camera.best_direction()
                if(best_direction == 'LEFT'):
                    turn_left()
                else:
                    turn_right()

        elif self.target_direction == 'RIGHT':
            turn_forward_left()

        else:
            turn_forward_right()
    
    
follow_behaviour = Follow()
avoid_behaviour = Avoid()

In [14]:
def process_detections(detections, detect_color = False, image = None, draw_rect=False, draw_color=(255, 0, 0) ):    
    """Returns the depths and colors for each detections."""
    
    depths = np.array([])        
    colors = np.array([])
    
    for det in detections:    
        
        bbox = det['bbox']
        depths = np.append(depths, min_depth(bbox)) 

        if draw_rect :
            cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), draw_color , 2)
        
        if detect_color:
            x, y = int(width * bbox[0]), int(height * bbox[1])
            w = int(width * bbox[2]) - int(width * bbox[0])
            h = int(height * bbox[3]) - int(height * bbox[1])

            target_area, imageFrame = detect_color_of_box(x, y, w, h)
            colors = np.append(colors, target_area)
        
    return depths, colors

def processing(change):
    """Runs every frame based on camera feed and processes the image."""
    
    global robot_state
    
    image = change['new']
    
    imgsized= cv2.resize(image,(300,300))
    obstacle = None
    
    # compute all detected objects
    detections = model(imgsized)
    
    human_detections = [d for d in detections[0] if d['label'] == 1]
    
    object_detections = [d for d in detections[0] if d['label'] != 1]
    
    # get depth and colors for each detections.
    detection_depths, detection_colors =  process_detections(human_detections, detect_color = True)    
    object_detection_depths, _ =  process_detections(object_detections, image=image, draw_rect=True)
    
    # Get the closest object and show on screen.
    if len(object_detections) != 0:        
        closest_idx = np.where(object_detection_depths == np.amin(object_detection_depths))[0][0]
        obstacle = object_detections[closest_idx]
        obstacle_depth = object_detection_depths[closest_idx] 
        bbox = obstacle['bbox']
        cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 0, 255), 2)
           
    if len(human_detections)==0 :
        follow_behaviour.find_target(robot_state)
        robot_state = "SEARCHING"
    else:
        # get the index of max colour area of detected humans
        idx = np.where(detection_colors == np.amax(detection_colors))[0][0]
                
        if detection_colors[idx] < MIN_COLOUR_AREA:
            follow_behaviour.find_target(robot_state, use_robot_state=True)
            robot_state = "SEARCHING"
        else:
            target = human_detections[idx]
            bbox = target['bbox']
            target_depth = detection_depths[idx]
            # Draw rectangle for target
            cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 255, 0), 2)
            
            # Obstacle collision avoidance
            if obstacle is not None and obstacle_depth <= OBSTACLE_MAX_DEPTH and obstacle_depth < target_depth:
                                
                robot_state = "AVOIDING" 

                target_center = detection_center(target)                    
                target_direction = detect_section(target_center)

                avoid_behaviour.target_center = detection_center(obstacle) 
                avoid_behaviour.target_direction = detect_section(avoid_behaviour.target_center)                    
                avoid_behaviour.move(target_direction)

                follow_behaviour.target_last_position = target_center                    
            
            # Robot reached target
            elif(target_depth <= MAX_DEPTH):                
                follow_behaviour.target_reached()
                robot_state = "STOPPED"

            # Follow the target
            else:
                robot_state = "FOLLOWING"
                center = detection_center(target)
                follow_behaviour.move(center)
                follow_behaviour.target_last_position = center
#                 print(center)
    
    cv2.putText(image, robot_state, (320,40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2, cv2.LINE_AA)            
    
   
    image_widget.value = bgr8_to_jpeg(image)
        
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')

In [10]:
display(widgets.VBox([
    image_widget,
    label_widget
]))

VBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

In [15]:
camera.unobserve_all()
camera.stop()
camera.start()
time.sleep(1.0)
robot.stop()
follow_behaviour.target_last_position = None

## References

[s1]- https://github.com/NVIDIA-AI-IOT/jetbot/blob/master/notebooks/object_following/live_demo.ipynb
