# AI-Driven Autonomous Robotics: Human Tracking and Reactive Behaviour Analysis using AI and Sensing Technologies


### Aims
This project aims to design and implement a program in Python to control mobile robots which will guide a robot on how to effectively manoeuvre its way through an environment while tracking an instance of a human or multiple humans using the pre-trained [ssd_mobilenet_v2_coco](https://docs.openvino.ai/2021.4/omz_models_model_ssd_mobilenet_v2_coco.html) for object detection.
Additionally, the project seeks to implement several robot reactive behaviours and high level behaviours.

### Main tasks:

* Part A: Reactive behaviours -- The robot must demonstrate the following four Braitenberg behaviours: Aggression, Fear, Love, and Curiousity.   

* Part B: Following behaviour -- The robot follows a person whilst maintaining a safe distance. The robot should avoid collisions with the object's in the environment and the person it's tracking. In the event where there is more than one human visible to the robot, the robot is capable of detecting all humans visible in the frame but will only track and follow the movements of the human which is closest to the camera by calculating the distance of all visible humans and prioritising that which is closest.     


### Objectives:
* Design and implement a program in Python which controls the movements of a robot.
* The robot to adhere to the four Braitenberg behaviours. 
* The robot to detect a human using the pretrained model. 
* The robot to track and follow a human’s movements whilst maintaining a safe distance. 
* The robot to avoid collisions with humans and other obstacles.  
* The robot to track and follow only one human at an instance.

**Further Information:**   

This project will use mobile robots built with the latest AI and sensing technology. The robots have a latest Realsense camera from Intel, a Nano board from Nvidia to support image processing and deep learning etc, proximity sensors, IMU, motors and driving circuits.  

### Step 1 load the pre-trained object detection model

In [1]:
import tensorrt as trt
from tensorrt_model import TRTModel
from ssd_tensorrt import load_plugins, parse_boxes,TRT_INPUT_NAME, TRT_OUTPUT_NAME
import ctypes
import numpy as np
import cv2
import os
import ctypes
    
mean = 255.0 * np.array([0.5, 0.5, 0.5])
stdev = 255.0 * np.array([0.5, 0.5, 0.5])

def bgr8_to_ssd_input(camera_value):
    x = camera_value
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1)).astype(np.float32)
    x -= mean[:, None, None]
    x /= stdev[:, None, None]
    return x[None, ...]

class ObjectDetector(object):
    
    def __init__(self, engine_path, preprocess_fn=bgr8_to_ssd_input):
        logger = trt.Logger()
        trt.init_libnvinfer_plugins(logger, '')
        load_plugins()
        self.trt_model = TRTModel(engine_path, input_names=[TRT_INPUT_NAME],output_names=[TRT_OUTPUT_NAME, TRT_OUTPUT_NAME + '_1'])
        self.preprocess_fn = preprocess_fn
        
    def execute(self, *inputs):
        trt_outputs = self.trt_model(self.preprocess_fn(*inputs))
        return parse_boxes(trt_outputs)
    
    def __call__(self, *inputs):
        return self.execute(*inputs)

model = ObjectDetector('ssd_mobilenet_v2_coco.engine')


### Step 2 Initialize the camera instance for the Intel realsense sensor D435i

In [2]:
#use traitlets and widgets to display the image in Jupyter Notebook
import traitlets
from traitlets.config.configurable import SingletonConfigurable

#use opencv to covert the depth image to RGB image for displaying purpose
import cv2
import numpy as np

#using realsense to capture the color and depth image
import pyrealsense2 as rs

#multi-threading is used to capture the image in real time performance
import threading

class Camera(SingletonConfigurable):
    
    
    color_value = traitlets.Any()
    
    def __init__(self):
        super(Camera, self).__init__()
        
        #configure the color and depth sensor
        self.pipeline = rs.pipeline()
        self.configuration = rs.config()  
        
        #set resolution for the color camera
        self.color_width = 640
        self.color_height = 480
        self.color_fps = 30
        self.configuration.enable_stream(rs.stream.color, self.color_width, self.color_height, rs.format.bgr8, self.color_fps)

        #set resolution for the depth camera
        self.depth_width = 640
        self.depth_height = 480
        self.depth_fps = 30
        self.configuration.enable_stream(rs.stream.depth, self.depth_width, self.depth_height, rs.format.z16, self.depth_fps)

        #flag to control the thread
        self.thread_runnning_flag = False
        
        #start the RGBD sensor
        self.pipeline.start(self.configuration)
        self.pipeline_started = True
        frames = self.pipeline.wait_for_frames()

        #start capture the first color image
        color_frame = frames.get_color_frame()   
        image = np.asanyarray(color_frame.get_data())
        self.color_value = image

        #start capture the first depth image
        depth_frame = frames.get_depth_frame()           
        depth_image = np.asanyarray(depth_frame.get_data())
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
        self.depth_value = depth_colormap   

    def _capture_frames(self):
        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
            frames = self.pipeline.wait_for_frames() #receive data from RGBD sensor
            
            color_frame = frames.get_color_frame() #get the color image
            image = np.asanyarray(color_frame.get_data()) #convert color image to numpy array
            self.color_value = image #assign the numpy array image to the color_value variable 

            depth_frame = frames.get_depth_frame() #get the depth image           
            depth_image = np.asanyarray(depth_frame.get_data()) #convert depth data to numpy array
            #conver depth data to BGR image for displaying purpose
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
            self.depth_value = depth_colormap #assign the color BGR image to the depth value
            self.depth_image_ = depth_image
    
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread       

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object
camera = Camera.instance()
camera.start() # start capturing the data


### Step 3 Create helper module

In [3]:
#---Helper Module---
import ipywidgets.widgets as widgets
from IPython.display import display, HTML, Image

width = 640
height = 480

import time
from RobotClass import Robot

#initialize the Robot class
robot = Robot()


def create_blue_box(image, bbox):
    cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 255, 0), 2)    
    
def get_x_y(bbox):
    # ymin, xmin, ymax, xmax
    y_min, x_min, y_max, x_max = bbox[0], bbox[1], bbox[2], bbox[3]
    middle_width = round(y_min + (y_max - y_min)/2, 1)
    middle_height = round(x_min + (x_max - x_min)/2, 1)
    y = width * middle_width
    x = height * middle_height
    return (x, y, middle_width, middle_height)
    
def get_depth_from_data(depth_array, x, y):
    return depth_array[int(x)-1, int(y)-1]
    
def get_closest_object(object_list):
    depth_track = []
    if len(object_list) > 1:
        for object_ in object_list:
            bbox = object_['bbox']
            xy = get_x_y(bbox)
            x, y = xy[0], xy[1]
            detection_depth = get_depth_from_data(camera.depth_image_, x , y)
            depth_track.append(detection_depth)
        return get_closest(object_list, depth_track)
    elif object_list:
        return object_list[0]
    else:
        return None
    
def get_closest(object_list, depth_list):
    if depth_list:
        index = depth_list.index(min(depth_list))
        return object_list[index]  
        
def obstacle_avoid_right():
    robot.right(0.48)
    time.sleep(1)
    robot.forward(0.5)
    time.sleep(1)
    robot.left(0.5)
    time.sleep(1)
    robot.forward(0.5)
    time.sleep(1)
    robot.stop()
    
def obstacle_avoid_left():
    robot.left(0.48)
    time.sleep(1)
    robot.forward(0.5)
    time.sleep(1)
    robot.right(0.5)
    time.sleep(1)
    robot.forward(0.5)
    time.sleep(1)
    robot.stop()
    


In [None]:
#---FOLLOWING AND AVOIDING OBSTACLES---
def processing(change):
    
    depth_track = []
    
    image = change['new']8888
    
    imgsized= cv2.resize(image,(300,300))
    
    
    depth_data = camera.depth_image_
    depth_min = np.amin(depth_data[depth_data != 0])
    if 0 < depth_min < 160:
        robot.backward(2)
        robot.left(0.5)
        robot.stop()
    
    # compute all detected objects
    detections = model(imgsized)
    #
    
    #   AVOID BOTTLES  
    matching_detections = []
    obstacle = []
    
    for d in detections[0]:
        label = d['label']
        if label == 1:
            matching_detections.append(d)
        elif label == 44:
            obstacle.append(d)
    

    closest_obstacle = get_closest_object(obstacle)
    
    if closest_obstacle:
        det = closest_obstacle
        bbox = det['bbox']
        create_blue_box(image, bbox)
        x, y, middle_width, middle_height = get_x_y(bbox)
        depth = get_depth_from_data(camera.depth_image_, x , y)
        
        if depth < 1500:
            if middle_width < 0.5:
                obstacle_avoid_right()
            elif middle_width < 0.5:
                obstacle_avoid_left()
        
    
    closest_human = get_closest_object(matching_detections)

    if closest_human:
        det = closest_human
        bbox = det['bbox']
        create_blue_box(image, bbox)
        x, y, middle_width, middle_height = get_x_y(bbox)
        depth = get_depth_from_data(camera.depth_image_, x , y)

        if middle_width < 0.4:
            robot.left(0.5)
        elif middle_width > 0.6:
            robot.right(0.5)
        else:
            if depth < 600:
                robot.backward(0.3)
            elif 800 > depth > 600:
                robot.stop()
            else:
                robot.forward(0.25)
          
        
    else:
        if depth_min > 300:
            robot.forward(0.4)
        else:
            robot.backward(1)
            robot.left(0.4)
            time.sleep(0.5)
            robot.stop()
          
    image_widget.value = bgr8_to_jpeg(image)      
    
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')    


In [None]:
#---FEAR---
def processing(change):
    
    depth_track = []
    
    image = change['new']
    
    imgsized= cv2.resize(image,(300,300))
    # compute all detected objects
    detections = model(imgsized)
    
    matching_detections = [d for d in detections[0] if d['label'] == 1]

    
    
    if matching_detections:
        for detection in matching_detections:
            det = detection
            bbox = det['bbox']
            create_blue_box(image, bbox)
            x, y, middle_width, middle_height = get_x_y(bbox)
            depth = get_depth_from_data(camera.depth_image_, x , y)
    
            if middle_width < 0.4:
                robot.left(1)
                robot.stop()
            elif middle_width > 0.6:
                robot.right(1)
                robot.stop()
            else:
                if 600 > depth > 0:
                    robot.left(1)
                    time.sleep(1)
                    robot.forward(0.7)
                    time.sleep(1)
                    robot.right(1)
                    time.sleep(1)         
                    robot.stop()
          
          
          
    image_widget.value = bgr8_to_jpeg(image)      
    
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')  

In [None]:
#---AGRESSION---
def processing(change):
    
    depth_track = []
    
    image = change['new']
    
    imgsized= cv2.resize(image,(300,300))
    
    # compute all detected objects
    detections = model(imgsized)
    #
    
    #   AVOID BOTTLES  
    matching_detections = []
    obstacle = []
    
    for d in detections[0]:
        label = d['label']
        if label == 1:
            matching_detections.append(d)
        elif label == 44:
            obstacle.append(d)
    

    closest_obstacle = get_closest_object(obstacle)
    
    if closest_obstacle:
        det = closest_obstacle
        bbox = det['bbox']
        create_blue_box(image, bbox)
        x, y, middle_width, middle_height = get_x_y(bbox)
        depth = get_depth_from_data(camera.depth_image_, x , y)
        
        if depth < 1500:
            if middle_width < 0.5:
                obstacle_avoid_right()
            elif middle_width < 0.5:
                obstacle_avoid_left()
        
    
    closest_human = get_closest_object(matching_detections)

    if closest_human:
        det = closest_human
        bbox = det['bbox']
        create_blue_box(image, bbox)
        x, y, middle_width, middle_height = get_x_y(bbox)
        depth = get_depth_from_data(camera.depth_image_, x , y)

        if middle_width < 0.4:
            robot.left(0.5)
        elif middle_width > 0.6:
            robot.right(0.5)
        else:
            if 600 > depth > 0:
                robot.forward(8)
                time.sleep(1.5)
                robot.stop()
            else:
                robot.forward(0.5)
          
          
          
    image_widget.value = bgr8_to_jpeg(image)      
    
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')    
                 

In [None]:
# Love

def processing(change):
    
    depth_track = []
    
    image = change['new']
    
    imgsized= cv2.resize(image,(300,300))
    
    
    # compute all detected objects
    detections = model(imgsized)
    
robot.stop()
     
    matching_detections = []
    
    for d in detections[0]:
        label = d['label']
        if label == 1:
            matching_detections.append(d)
    
    
    closest_human = get_closest_object(matching_detections)

    if closest_human:
        det = closest_human
        bbox = det['bbox']
        create_blue_box(image, bbox)
        x, y, middle_width, middle_height = get_x_y(bbox)
        depth = get_depth_from_data(camera.depth_image_, x , y)
        print(f'Person Depth: {depth}')

        if middle_width < 0.4:
            print(f'Person left: {middle_width}')
            robot.left(0.5)
        elif middle_width > 0.6:
            print(f'Person right: {middle_width}')
            robot.right(0.5)
        else:
            if depth < 600:
                robot.backward(0.5)
            elif 1000 > depth > 600:
                robot.stop()
            else:
                robot.forward(0.5)
          
          
          
    image_widget.value = bgr8_to_jpeg(image)      
    
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')

In [None]:
# Curious

def processing(change):
    
    depth_track = []
    
    image = change['new']
    
    imgsized= cv2.resize(image,(300,300))
    
    
    # compute all detected objects
    detections = model(imgsized)
    #
     
    matching_detections = []
    
    for d in detections[0]:
        label = d['label']
        if label == 1:
            matching_detections.append(d)
    
    
    closest_human = get_closest_object(matching_detections)

    if closest_human:
        det = closest_human
        bbox = det['bbox']
        create_blue_box(image, bbox)
        x, y, middle_width, middle_height = get_x_y(bbox)
        depth = get_depth_from_data(camera.depth_image_, x , y)
        print(f'Person Depth: {depth}')

        if middle_width < 0.4:
            print(f'Person left: {middle_width}')
            robot.left(0.5)
        elif middle_width > 0.6:
            print(f'Person right: {middle_width}')
            robot.right(0.5)
        else:
            if not depth:
                return
            if depth > 1500:
                robot.stop()
            elif depth < 1500:
                robot.forward(0.25)
                if depth < 500:
                    robot.stop()
          
          
          
    image_widget.value = bgr8_to_jpeg(image)      
    
#the camera.observe function will monitor the color_value variable. If this value changes, the excecute function will be excuted.
camera.observe(processing, names='color_value')    

In [None]:
image_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=1, description='tracked label')
display(widgets.VBox([
    widgets.HBox([image_widget,]),
    label_widget
]))

In [None]:
camera.unobserve_all()
camera.stop()
time.sleep(1.0)
robot.stop()

In [27]:
#---Testing---

def test_get_x_y():
    width = 640
    height = 480
    bbox = [0.2,0.1,0.7,0.9]
    real_middle_width =round(((0.7-0.2)/2)+0.2,1)
    real_middle_height = round(((0.9-0.1)/2)+0.1,1)
    real_y = width*real_middle_width
    real_x = height*real_middle_height
    
    x, y, middle_width, middle_height = get_x_y(bbox)
    correct=0
    if real_middle_height == middle_height:
        print('middle height value correct')
        correct = correct+1
    else:
        print('middle height value incorrect')
        
    if real_middle_width == middle_width:
        print('middle width value correct')
        correct = correct+1
    else:
        print('middle width value incorrect')
        
    if real_y == y:
        print('relative width value correct')
        correct = correct+1
    else:
        print('relative width value incorrect')
        
    if real_x == x:
        print('relative height value correct')
        correct = correct+1
    else:
        print('relative height value incorrect')
        
    print(str(correct)+'/4 were correct')
    
    
def test_part_b():
    
    width = 640
    height = 480
    bbox_left = [0.2,0.1,0.5,0.8]
    bbox_right = [0.6,0.6,0.8,0.8]
    bbox_middle = [0.2,0.1,0.8,0.8]
    x, y, middle_width_left, middle_height = get_x_y(bbox_left)
    x, y, middle_width_right, middle_height = get_x_y(bbox_right)
    x, y, middle_width_middle, middle_height = get_x_y(bbox_middle)    
    
    tests=[middle_width_left,middle_width_right,middle_width_middle]
    results=[]
    print('Expected: left, rigth, middle')
    print('Got:')
    for middle_width in tests:
        if middle_width < 0.4:
            print('bbox centre to the left')
        elif middle_width > 0.6:
            print('bbox cetre to the right')
        else:
            print('bbox centre in the middle')

In [10]:
test_get_x_y()

middle height value correct
middle width value correct
relative width value correct
relative height value correct
4/4 were correct


In [28]:
test_part_b()

Expected: left, rigth, middle
Got:
bbox centre to the left
bbox cetre to the right
bbox centre in the middle
