# Rover Logic 

All the logic comes together across all use-cases here. Load all models, create widgets for display, and do the entire logic for each of the use-case situations

Start with importing all libraries

In [2]:
import torch
import torchvision
import torch.nn.functional as F
import numpy as np
import cv2
import time
from jetbot import bgr8_to_jpeg
from jetbot import Camera
from jetbot import Robot
from IPython.display import display
from jetbot import ObjectDetector
import ipywidgets.widgets as widgets

Load labels from the COCO dataset into a list to serve as a lookup table for object IDs for display around bounding boxes

In [3]:
labels_list = open("ms-coco-labels.txt",'r').read().split('\n')

Load the COCO model for object identification with 91 classes of objects. 

This model is used for object identification, navigation goal setting and object following. This model is not used for collision avoidance.

I do not do transfer learning on this model because the rover will not know the exact types of objects on a new planet or moon. I use this model as is

In [4]:
model = ObjectDetector('ssd_mobilenet_v2_coco.engine')

Load the collision avoidance model. This model is trained to classify objects in the camera's view into 2 classes - too near with high risk of collision vs low risk of collision. What the neural network has actually learned is amount of floor visible that's safe vs unsafe. 

In [5]:
collision_model = torchvision.models.alexnet(pretrained=False)
collision_model.classifier[6] = torch.nn.Linear(collision_model.classifier[6].in_features, 2)
collision_model.load_state_dict(torch.load('best_model.pth'))
device = torch.device('cuda')
collision_model = collision_model.to(device)

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])

normalize = torchvision.transforms.Normalize(mean, stdev)

Preprocessing of images captured in a neat helper function

In [6]:
def preprocess(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.resize(x, (224, 224))
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

Initialize the camera, and set FPS at 5, which is sufficient for our needs and allows functioning without lag

In [7]:
camera = Camera.instance(width=300, height=300, capture_width=300, capture_height=300, fps=5)

Create all visual widgets of interest

In [8]:
image_widget = widgets.Image(format='jpeg', width=300, height=300)
width = int(image_widget.width)
height = int(image_widget.height)

blocked_widget = widgets.FloatSlider(min=0.0, max=1.0, value=0.0, description='blocked')
speed_widget = widgets.FloatSlider(value=0.2, min=0.0, max=1.0, description='speed')
turn_gain_widget = widgets.FloatSlider(value=0.2, min=0.0, max=2.0, description='turn gain')

objectofinterest_widget = widgets.Textarea()
detections_widget = widgets.Textarea()
found_widget = widgets.Label()

objectofinterest_widget.value = '88' # 88 = teddy bear
found_widget.value = ""

Create and initialize the Robot object

In [9]:
robot = Robot()

Helper function for object annotation. This is valuable for debugging too.

In [10]:
def draw_annotated_boundingbox(image, det):
    bbox = det['bbox']
    left = int(width * bbox[0])
    top = int(height * bbox[1])
    right = int(width * bbox[2])
    bottom = int(height * bbox[3])
    color = (255, 255, 0)
    thickness = 2
    confidence = "{:.0%}".format(det['confidence'])
    cv2.rectangle(image, (left, top), (right, bottom), color, thickness)
    cv2.putText(image, str(labels_list[int(det['label'])]) + ", " + str(confidence), (left, bottom + 12), 0, 1e-3 * height, color, thickness//8)    

Helper function for goal setting. Knowing the center of an object seen in an image, you can reorient the jetbot and its camera to point towards that detected object's center such that further movement of the Jetbot will go in the direction of that object's center.

In [11]:
def find_boundingbox_center(detection):
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)

Helper functions to perform the planning and motion control actions

In [12]:
def keep_exploring():
    robot.forward(0.2)
    
def collision_avoid():
    robot.left(0.2)
    
def goal_setting(matching_detection):
    # move robot forward and steer proportional target's x-distance from center
    center = find_boundingbox_center(matching_detection)
    robot.set_motors(float(speed_widget.value + turn_gain_widget.value * center[0]), 
                     float(speed_widget.value - turn_gain_widget.value * center[0]))
    
def object_follow_reached():
    # Some objects are detected. If the most likely one is a the object of interest
    camera.unobserve_all()
    robot.stop()
    # Found an alien
    found_widget.value = 'Found An Alien!!! Stopping here to stream alien data to Earth.'
    robot.stop()

Central operation of the rover's logic, that's placed as a callback function and is called every time the Jetbot returns a new image.

In [13]:
def execute(change):
    
    image = change['new']
    
    # Perception elements
    collision_output = collision_model(preprocess(image)).detach().cpu()
    prob_blocked = float(F.softmax(collision_output.flatten(), dim=0)[0])
    blocked_widget.value = prob_blocked
    
    detections = model(image)
    detections_widget.value = str(detections)
    
    # For all objects identified, bounding box with classification and probability
    for det in detections[0]:   
        draw_annotated_boundingbox(image, det)
   
    # Planning & Control elements
    if prob_blocked > 0.75:
        # We are blocked by something
        if any(detections) and detections[0][0]['label'] == int(objectofinterest_widget.value):
            # The blocking object matches our target, stop here to observe safely
            object_follow_reached()
        else:
            # Collision probable with any further exploration in this direction
            collision_avoid()
    else:
        # Not blocked by anything, so see if any object in our view matches our target
        matching_detections = [d for d in detections[0] if d['label'] == int(objectofinterest_widget.value)]
        
        if len(matching_detections) > 0:
            # One or more objects in the image matches our target
            goal_setting(matching_detections[0])
        else:
            # No object detected in the image matches our target
            keep_exploring()
    
    cv2.resize(image,(600, 600))
    image_widget.value = bgr8_to_jpeg(image)

Display all UI widgets

In [14]:
display(image_widget)
display(blocked_widget)
display(speed_widget)
display(turn_gain_widget)
display(detections_widget)
display(found_widget)
display(objectofinterest_widget)

Image(value=b'', format='jpeg', height='300', width='300')

FloatSlider(value=0.0, description='blocked', max=1.0)

FloatSlider(value=0.2, description='speed', max=1.0)

FloatSlider(value=0.2, description='turn gain', max=2.0)

Textarea(value='')

Label(value='')

Textarea(value='88')

Close any existing handles to the camera and create a fresh new one which runs the callback function whenever the camera sends in the next image.

In [19]:
camera.unobserve_all()
camera.observe(execute, names='value')

Shut down the camera and the robot

In [30]:
camera.unobserve_all()
robot.stop()
robot.stop()

88
