# Reactive Behaviour Implementation

This program is used to demontrate the following four Braitenberg behaviours: aggressive, bear, and love as well as curious behaviours.
The first step is to read the labels.txt file. We also needed to download the pretrained model ('alexnet-owt-4df8aa71.pth') and upload it to the robot. The model can be downloaded from the following link https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth

In [1]:
# code is from source[1]
#open the txt file and read object categories.
f = open("labels.txt", "r") 
labels = []
for x in f: # read the txt file line by line
    labels.append(x.split(':')[1]) #add it to the list
print(len(labels))


1000


Run the following code to capture image data from the RGBD sensor 

In [2]:
#use traitlets and widgets to display the image in Jupyter Notebook
import traitlets
from traitlets.config.configurable import SingletonConfigurable

#use opencv to covert the depth image to RGB image for displaying purpose
import cv2
import numpy as np

#using realsense to capture the color and depth image
import pyrealsense2 as rs

#multi-threading is used to capture the image in real time performance
import threading

predict_id = 0
class Camera(SingletonConfigurable):
    
    #this changing of this value will be captured by traitlets
    color_value = traitlets.Any()
    
    def __init__(self):
        super(Camera, self).__init__()
        
        #configure the color and depth sensor
        self.pipeline = rs.pipeline()
        self.configuration = rs.config()  
        
        #set resolution for the color camera
        self.color_width = 640
        self.color_height = 480
        self.color_fps = 30
        self.configuration.enable_stream(rs.stream.color, self.color_width, self.color_height, rs.format.bgr8, self.color_fps)

        #set resolution for the depth camera
        self.depth_width = 640
        self.depth_height = 480
        self.depth_fps = 30
        self.configuration.enable_stream(rs.stream.depth, self.depth_width, self.depth_height, rs.format.z16, self.depth_fps)

        #flag to control the thread
        self.thread_runnning_flag = False
        
        #start the RGBD sensor
        self.pipeline.start(self.configuration)
        self.pipeline_started = True
        frames = self.pipeline.wait_for_frames()

        #start capture the first color image
        color_frame = frames.get_color_frame()   
        image = np.asanyarray(color_frame.get_data())
        self.color_value = image

        #start capture the first depth image
        depth_frame = frames.get_depth_frame()           
        depth_image = np.asanyarray(depth_frame.get_data())
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
        self.depth_value = depth_colormap   


    def _capture_frames(self):
        while(self.thread_runnning_flag==True): #continue until the thread_runnning_flag is set to be False
            frames = self.pipeline.wait_for_frames() #receive data from RGBD sensor
            
            color_frame = frames.get_color_frame() #get the color image
            image = np.asanyarray(color_frame.get_data()) #convert color image to numpy array
            self.color_value = image #assign the numpy array image to the color_value variable 

            depth_frame = frames.get_depth_frame() #get the depth image           
            depth_image = np.asanyarray(depth_frame.get_data()) #convert depth data to numpy array
                
            #we only consider the central area of the vision sensor
            depth_image[:100,:]=0
            depth_image[350:,:]=0
            depth_image[:,:100]=0
            depth_image[:,550:]=0
            
            #For object avoidance, we don't consider the distance that are lower than 100mm or bigger than 1000mm
            depth_image[depth_image<100]=0

            
            #If all of the values in the depth image is 0, the depth[depth!=0] command will fail
            #we set a specific value here to prevent this failure
            depth_image[0,0]=2000
            
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
            if (depth_image[depth_image!=0].min()<650):

                self.warning_flag=1
            else:
                self.warning_flag=0
            self.depth_value = depth_colormap               
    
    def start(self): #start the data capture thread
        if self.thread_runnning_flag == False: #only process if no thread is running yet
            self.thread_runnning_flag=True #flag to control the operation of the _capture_frames function
            self.thread = threading.Thread(target=self._capture_frames) #link thread with the function
            self.thread.start() #start the thread

    def stop(self): #stop the data capture thread
        if self.thread_runnning_flag == True:
            self.thread_runnning_flag = False #exit the while loop in the _capture_frames
            self.thread.join() #wait the exiting of the thread       

def bgr8_to_jpeg(value):#convert numpy array to jpeg coded data for displaying 
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object
camera = Camera.instance()
camera.start() # start capturing the data


Run the following code to perform classification and display the images

In [None]:
#The pytorch platform is used in this tutorial
import torch
import torchvision
import torch.nn as nn
import cv2
import numpy as np

# the following AlexNet model is defined by torchvision
class AlexNet(nn.Module):

    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(64, 192, kernel_size=5, padding=2),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(192, 384, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(384, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

model = AlexNet()
model.load_state_dict(torch.load('alexnet-owt-4df8aa71.pth'))
# 'alexnet-owt-4df8aa71.pth' is the pretrained model, it can be downloaded from the following link
# https://download.pytorch.org/models/alexnet-owt-4df8aa71.pth
# this model should be placed under the same folder of this file

#We use GPU for classification 
device = torch.device('cuda')
model = model.to(device)

import time
from RobotClass import Robot
import ipywidgets.widgets as widgets
from IPython.display import display, HTML
import sys

#create widgets for the displaying of the image
display_color = widgets.Image(format='jpeg', width='45%') #determine the width of the color image
display_depth = widgets.Image(format='jpeg', width='45%')  #determine the width of the depth image
layout=widgets.Layout(width='100%')

sidebyside = widgets.HBox([display_color, display_depth],layout=layout) #horizontal 
display(sidebyside) #display the widget

#callback function, invoked when traitlets detects the changing of the color image
def process(change):
    
    image = change['new'] #retrieve data from the input dict
    display_color.value = bgr8_to_jpeg(cv2.resize(image,(320,240)))
    display_depth.value = bgr8_to_jpeg(cv2.resize(camera.depth_value,(320,240)))

#processing({'new': camera.color_value})
#the camera.observe function will monitor the color_value variable. If this value changes, the processing function will be excuted.
camera.observe(process, names='color_value')


#the following code will classify the image and measure the time 
while (1):
    t1 = cv2.getTickCount() 
    imgsized= cv2.resize(camera.color_value,(224,224)) #resize the image
    x = cv2.cvtColor(imgsized, cv2.COLOR_BGR2RGB) #convert to RGB as required by the model
    x = x.transpose((2, 0, 1)) #swith the image channels
    x = torch.from_numpy(x).float() #convert to type float
    mean = 255.0 * np.array([0.485, 0.456, 0.406]) #mean value
    stdev = 255.0 * np.array([0.229, 0.224, 0.225]) # for the nomalization of the input image
    normalize = torchvision.transforms.Normalize(mean, stdev)
    x = normalize(x)
    x = x.to(device)# send the data to GPU device
    x = x[None, ...] # increase the image dimension, the model takes a batch of images and the batch size is 1
    output = model(x) #classfy the images
    predict_id = output.max(1, keepdim=True)[1].item() #get the label
    #print('id ', predict_id," prediction time ", (cv2.getTickCount()-t1)/cv2.getTickFrequency())
    
    #end of source[1]
    
    
    
    #initialize the Robot class
    robot = Robot()
    

    
    
# Start of Function definition for reactive behaviours:

    def aggression():
        '''
        
        This function is used to show the emotion of aggression
        on detecting a running shoe.
        
        '''
        robot.forward(1)  # goes forward full speed
        time.sleep(0.5)
        robot.backward(0.2)  # goes a bit backward to avoid crashing with the object
        time.sleep(0.3)
        robot.stop()
            
            
            
            
    def fear():
        '''
        
        This function is used to show the emotion of fear
        on detecting a water bottle.
        
        Movement : At first, it moves forward a little, after detecting
        the 'water bottle',it goes a bit backward at a slow speed
        and then it goes backward at full speed. After that, it turns 
        right to change direction and escape from the intended object by 
        moving a bit forward.
        
        '''
        robot.forward(0.5)
        time.sleep(0.2)
        robot.backward(0.7)
        time.sleep(0.3)
        robot.backward(1)
        time.sleep(0.5)
        robot.right(0.6)
        time.sleep(0.6)
        robot.right(0.6)
        time.sleep(0.6)
        robot.forward(0.9)
        time.sleep(1)
        robot.stop()
            
         
        
    def love():
        '''
        
        This function is used to show the emotion of love
        on detecting a 'cellphone'.
        
        Movement : At first, it turns left, after detecting
        the 'cellphone', it goes forward and turns right
        8 times to form a circle which shows the robot's interest
        on the object.
        
        '''

       
        robot.left(0.6) 
        time.sleep(0.5)
        for i in range(6):
            robot.forward(0.6)
            time.sleep(0.6)
            robot.right(0.6)
            time.sleep(0.5)

        robot.stop()
            
            
            
            
    def look(speed,t):
        '''
        
        This function is used to make robot looking around
        using robot.left and robot.right on detecting an umbrella.
        
        '''
        robot.right(speed)
        time.sleep(t)
        robot.left(speed)
        time.sleep(t)
        robot.right(speed)
       
    
    def curious():
        '''
        This function is used to show the behaviour of curiosity on detecting an umbrella.
        
        Movement : when robot detects an umbrella, at first, by using look function the robot 
        starts to look around itself and then go forward to get close to the object.
        
        '''
        look(0.5,0.3)
        robot.forward(0.4)
        time.sleep(0.3)
        look(0.5,0.3)
        robot.forward(0.4)
        time.sleep(0.3)
        robot.stop()
            
            
            
# End of function definition for reactive behaviour     
            
            
  

            
    if(camera.warning_flag):        
        if predict_id == 770: #running shoe
                aggression()
        elif predict_id == 487: #cellphone
                fear()
        elif predict_id == 898: #water bottle
                love()
        elif predict_id == 879: #umbrella
                curious()
    else:
            robot.stop()
        


#source[1] https://learn.lboro.ac.uk/mod/resource/view.php?id=1042759

HBox(children=(Image(value=b'', format='jpeg', width='45%'), Image(value=b'', format='jpeg', width='45%')), la…

References:

source[1] https://learn.lboro.ac.uk/mod/resource/view.php?id=1042759