In [1]:
# import required packages
from jetbot import Robot, Camera, bgr8_to_jpeg, ObjectDetector

import time

import ipywidgets.widgets as widgets
from IPython.display import display
import traitlets

import torch
import torchvision
import torch.nn.functional as F

import cv2
import numpy as np

In [2]:
# Load the pretrained coco model for use in kChaser
model = ObjectDetector('ssd_mobilenet_v2_coco.engine')




In [3]:
# initialize the camera
camera = Camera.instance(width=300, height=300)

In [4]:
# detect the camera output
# Index 1 is a human, index 18 is a dog
detections = model(camera.value)
print(detections)


[[{'label': 23, 'confidence': 0.639510989189148, 'bbox': [0.3840806484222412, 0.0010841339826583862, 0.674470067024231, 0.3559338450431824]}]]


In [5]:
# print out the detected objects
detections_widget = widgets.Textarea()
detections_widget.value = str(detections)
display(detections_widget)



Textarea(value="[[{'label': 23, 'confidence': 0.639510989189148, 'bbox': [0.3840806484222412, 0.00108413398265…

In [6]:
# print the first detected object 
image_number = 0
object_number = 0
print(detections[image_number][object_number])


{'label': 23, 'confidence': 0.639510989189148, 'bbox': [0.3840806484222412, 0.0010841339826583862, 0.674470067024231, 0.3559338450431824]}


In [7]:
# load the collision detection & balloon models

device = torch.device('cuda')

collision_model = torchvision.models.alexnet(pretrained=False)
collision_model.classifier[6] = torch.nn.Linear(collision_model.classifier[6].in_features, 2)
collision_model.load_state_dict(torch.load('../collision_avoidance/best_model.pth')) 
collision_model = collision_model.to(device)

balloon_model = torchvision.models.alexnet(pretrained=False)
balloon_model.classifier[6] = torch.nn.Linear(balloon_model.classifier[6].in_features, 2)
balloon_model.load_state_dict(torch.load('best_balloon_model.pth'))
balloon_model = balloon_model.to(device)


# define the preprocessing function
mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])

normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.resize(x, (224, 224))
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x

In [8]:
# initialize a robot instance
robot = Robot()

# confirm the robot is functional
robot.left(.4)
time.sleep(1.0)
robot.stop()


In [9]:
#display all the control widgets and connect the network execution function to the camera updates

blocked_widget = widgets.FloatSlider(min=0.0, max=1.0, value=0.0, description='blocked')
balloon_widget = widgets.FloatSlider(min=0.0, max=1.0, value=0.0, description='balloon')
image_widget = widgets.Image(format='jpeg', width=300, height=300)
label_widget = widgets.IntText(value=18, description='tracked label')
speed_widget = widgets.FloatSlider(value=0.4, min=0.0, max=1.0, description='speed')
turn_gain_widget = widgets.FloatSlider(value=0.8, min=0.0, max=2.0, description='turn gain')

display(widgets.VBox([
    widgets.HBox([image_widget, blocked_widget]),
    balloon_widget,
    label_widget,
    speed_widget,
    turn_gain_widget
]))

width = int(image_widget.width)
height = int(image_widget.height)


# define object detection functions

def detection_center(detection):
    """Computes the center x, y coordinates of the object"""
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)
    
def norm(vec):
    """Computes the length of the 2D vector"""
    return np.sqrt(vec[0]**2 + vec[1]**2)

def closest_detection(detections):
    """Finds the detection closest to the image center"""
    closest_detection = None
    for det in detections:
        center = detection_center(det)
        if closest_detection is None:
            closest_detection = det
        elif norm(detection_center(det)) < norm(detection_center(closest_detection)):
            closest_detection = det
    return closest_detection
        
    
# define the execution function to apply the neural networks frame by frame from the video feed
def execute(change):
    image = change['new']
    
    # execute collision model to determine if blocked
    collision_output = collision_model(preprocess(image)).detach().cpu()
    prob_blocked = float(F.softmax(collision_output.flatten(), dim=0)[0])
    blocked_widget.value = prob_blocked
    
    
    # turn left if blocked
    if prob_blocked > 0.5:
        robot.left(0.4)
        image_widget.value = bgr8_to_jpeg(image)
        return
    
    time.sleep(0.001)
    
    # execute balloon to determine if a balloon is present
    balloon_output = balloon_model(preprocess(image)).detach().cpu()
    prob_balloon = float(F.softmax(balloon_output.flatten(), dim=0)[0])
    balloon_widget.value = prob_balloon
    
    # turn right if a balloon is present
    if prob_balloon > 0.5:
        robot.right(.4)
        time.sleep(1)
        image_widget.value = bgr8_to_jpeg(image)
        return
    #else:
        #robot.left(0.4)
        
    time.sleep(0.001)
        
    # compute all detected objects
    detections = model(image)
    
    # draw all detections on image
    for det in detections[0]:
        bbox = det['bbox']
        cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (255, 0, 0), 2)
    
    # select detections that match selected class label
    matching_detections = [d for d in detections[0] if d['label'] == int(label_widget.value)]
    
    # get detection closest to center of field of view and draw it
    det = closest_detection(matching_detections)
    if det is not None:
        bbox = det['bbox']
        cv2.rectangle(image, (int(width * bbox[0]), int(height * bbox[1])), (int(width * bbox[2]), int(height * bbox[3])), (0, 255, 0), 5)
    
        
    # otherwise go forward if no target detected
    if det is None:
        robot.forward(float(speed_widget.value))
        
    # otherwsie steer towards target
    else:
        # move robot forward and steer proportional target's x-distance from center
        center = detection_center(det)
        robot.set_motors(
            float(speed_widget.value - turn_gain_widget.value * center[0]),
            float(speed_widget.value + turn_gain_widget.value * center[0])
        )
    
    # update image widget
    image_widget.value = bgr8_to_jpeg(image)
    
execute({'new': camera.value})

VBox(children=(HBox(children=(Image(value=b'', format='jpeg', height='300', width='300'), FloatSlider(value=0.…

In [14]:
# connect the execute function to each camera frame update

camera.unobserve_all()
camera.observe(execute, names='value')

In [15]:
#manually disconnect the processing from the camera and stop the robot

camera.unobserve_all()
time.sleep(1.0)
robot.stop()

In [None]:
# initialize and load the trained model 

#model = torchvision.models.resnet18(pretrained=False)
#model.classifier[6] = torch.nn.Linear(model.classifier[6].in_features, 2)


#resnet18 = models.resnet18()

# change the model name below as necessary
#model.load_state_dict(torch.load('models_ft_state_dict.pth'))

#model = torch.load(models_ft.pth)

model = torch.load("models_ft.pth")

# execute the code on the gpu vs. cpu
device = torch.device('cuda')
model = model.to(device)


In [None]:
# create the pre-processing function

mean = 255.0 * np.array([0.485, 0.456, 0.406])
stdev = 255.0 * np.array([0.229, 0.224, 0.225])

normalize = torchvision.transforms.Normalize(mean, stdev)

def preprocess(camera_value):
    global device, normalize
    x = camera_value
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)
    x = x.transpose((2, 0, 1))
    x = torch.from_numpy(x).float()
    x = normalize(x)
    x = x.to(device)
    x = x[None, ...]
    return x



In [None]:
# Start and display the camera

camera = Camera.instance(width=224, height=224)
image = widgets.Image(format='jpeg', width=224, height=224)
blocked_slider = widgets.FloatSlider(description='blocked', min=0.0, max=1.0, orientation='vertical')

camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

display(widgets.HBox([image, blocked_slider]))

In [None]:
# create robot instance to drive the motors

robot = Robot()

In [None]:
# test that the robot is functioning

robot.forward(speed=0.4)
time.sleep(1)
robot.stop()

robot.left(speed=0.4)
time.sleep(1)
robot.stop()

In [None]:
# write the neural network execution function
# if probability that kenai is in the photo is > 0.5, robot will turn

def update(change):
    global blocked_slider, robot
    x = change['new'] 
    x = preprocess(x)
    y = model(x)
    
    # we apply the `softmax` function to normalize the output vector so it sums to 1 (which makes it a probability distribution)
    y = F.softmax(y, dim=1)
    
    prob_blocked = float(y.flatten()[0])
    
    blocked_slider.value = prob_blocked
    
    if prob_blocked < 0.5:
        robot.backward(0.4)
    else:
        robot.left(0.4)
    
    time.sleep(0.001)
        
update({'new': camera.value})  # we call the function once to intialize



In [None]:
# Attach the neural network execution function to the camera for processing 

camera.observe(update, names='value')  # this attaches the 'update' function to the 'value' traitlet of our camera

In [None]:
# stop the robot

camera.unobserve(update, names='value')
time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
robot.stop()

In [None]:
# unlink & link the camera feed as needed

camera_link.unlink()  # don't stream to browser (will still run camera)
camera_link.link()  # stream to browser (wont run camera)


In [None]:
# create and display widgets
# widgets include sliders, buttons, video feed, etc. 

# create a stop button! 
# create a display that highlights identified object in green
# slider for speed
# slider for probability that target objects are identified



In [None]:
# connect the widgets to the robot with traitlets




In [None]:
# unlink the trailets

