In [8]:
import os
import pickle
import cv2
import numpy as np
import math
import time
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import bgr8_to_jpeg
from jetbot import ObjectDetector
from jetbot import Camera
from jetbot import Robot

In [2]:
model = ObjectDetector('../Notebooks/object_following/ssd_mobilenet_v2_coco.engine')
camera = Camera.instance(width=300, height=300)
robot = Robot()

In [9]:
# Load COCO labels
filename = "coco_labels.dat"
filehandler = open(filename, 'rb')
COCO_labels = pickle.load(filehandler)

# Load camera calibration data for undistort
filename = "calibration.dat"
filehandler = open(filename, 'rb')
camera_cal = pickle.load(filehandler)
mtx = camera_cal['mtx']
dist = camera_cal['dist']
f_u = mtx[0,0]   # focal center coordinates
f_v = mtx[1,1]
focal_center = np.array([f_u, f_v])

# Mapping between set_motor "speed" and measured wheel angular velocity "omega"
# for 0.1 second motor running time
wheel_calibration = {
    "speed": [0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8],
    "omega": [0.0, 3.85, 9.23, 15.0, 25.8, 29.2, 35.4]
}

# Open Image Widget
image_widget = widgets.Image(format='jpeg', width=300, height=300)
width = int(image_widget.width)
height = int(image_widget.height)

BLUE = (255, 0, 0)
GREEN = (0, 255, 0)
RED = (0, 0, 255)

diag_dir = 'diagnostics'

# we have this "try/except" statement because these next functions can throw an error if the directories exist already
try:
    os.makedirs(diag_dir)
except FileExistsError:
    print('Directories not created because they already exist')

Directories not created because they already exist


In [10]:
# Bbox for TV and stool at various camera poses
# For (0,0,0) to (1,0,0)
beg_bbox_TV = [(72, 92), (136, 137)]  # Pose (0,0,0)
mid_bbox_TV = [(33, 69), (114, 124)]  # Pose ~(0.5,0,0)
end_bbox_stool = [(175, 84), (237, 186)]   # Pose (1.0,0,0)

# Bbox for 2nd stool at various camera poses
# For (1,0,0) to (2,0,pi)
end_bbox_horse = [(147, 103), (247, 201)]
mid_bbox_stool_2 = [(163, 80), (263, 185)]   # midpoint to Pose (1,2,0)

In [43]:
def undistort(img, mtx, dist, crop=False):
    """Undistort camera image based on calibration data"""
    h,w = img.shape[:2]
    # print (h,w)
    newcameramtx, roi=cv2.getOptimalNewCameraMatrix(mtx,dist,(w,h),1,(w,h))
    
    # undistort
    dst = cv2.undistort(img, mtx, dist, None, newcameramtx)

    # crop the image (optional)
    if crop:
        x,y,w,h = roi
        dst = dst[y:y+h, x:x+w]
    return dst

def draw_bbox(img, width, height, bbox, color, line_width):
    bbox_pixel = [(int(width * bbox[0]), int(height * bbox[1])), 
                  (int(width * bbox[2]), int(height * bbox[3]))]
    cv2.rectangle(img, bbox_pixel[0], bbox_pixel[1], color, line_width)
    return bbox_pixel

def display_detected(img, detections, width, height, debug=False):
    """ put blue bounding boxes on detected objects on image """
    
    for det in detections[0]:
        label = COCO_labels[det['label']]
        bbox = det['bbox']
        bbox_pixel = draw_bbox(img, width, height, bbox, BLUE, 1)
        if debug:
            print(label,det['label'], bbox_pixel)
    return

def detection_center(detection):
    """Computes the center x, y coordinates of the object"""
    bbox = detection['bbox']
    center_x = (bbox[0] + bbox[2]) / 2.0 - 0.5
    center_y = (bbox[1] + bbox[3]) / 2.0 - 0.5
    return (center_x, center_y)

def object_center(object_features):
    """Computes the center x, y coordinates of the object using its feature points"""
    UL_fp, UR_fp, LR_fp, LL_fp = obj_features
    UL_fp_x, UL_fp_y = UL_fp
    LR_fp_x, LR_fp_y = LR_fp
    center_x = (UL_fp_x + LR_fp_x) / 2.0 - 0.5
    center_y = (UL_fp_y + LR_fp_y) / 2.0 - 0.5
    return np.array([center_x, center_y])
    
def norm(vec):
    """Computes the length of the 2D vector"""
    return np.sqrt(vec[0]**2 + vec[1]**2)

def closest_detection(detections, debug= False):
    """Finds the detection closest to the image center"""
    closest_detection = None
    for det in detections:
        center = detection_center(det)
        if debug:
            print(center)
        if closest_detection is None:
            closest_detection = det
        elif norm(detection_center(det)) < norm(detection_center(closest_detection)):
            closest_detection = det
    return closest_detection

def create_feature_pts(bbox):
    """ Return 4 feature points from a bounding box """
    # extract bounding x and y coordinates
    UL_fp, LR_fp = bbox
    UL_fp_x, UL_fp_y = UL_fp
    LR_fp_x, LR_fp_y = LR_fp
   
    # return UL(upper left), UR(upper right), LR(lower right), LL(lower left) feature points
    return [(UL_fp_x, UL_fp_y), (LR_fp_x, UL_fp_y), (LR_fp_x, LR_fp_y), (UL_fp_x, LR_fp_y)]

def generate_ref_obj_features(ref_obj, img, detections, width, height, debug=False):
    """ Find reference object in detected objects and generate 4 feature points """
    matching_detections = [d for d in detections[0] if d['label'] == ref_obj]

    # get detection closest to center of field of view and draw it (if model detects multiple
    # reference objects)
    det = closest_detection(matching_detections)

    if det is not None:
        bbox = det['bbox']
        # bound reference object in green
        draw_bbox(img, width, height, bbox, GREEN, 1)
        # convert to pixel units
        bbox_pixel = [(int(width * bbox[0]), int(height * bbox[1])), 
                       (int(width * bbox[2]), int(height * bbox[3]))]
        return create_feature_pts(bbox_pixel) # return 4 feature points of bounding box
    else:
        return None
    
def fp_error(current, goal):
    """ Return error of a feature point """
    current_x, current_y = current
    goal_x, goal_y = goal
    # Use Corke's convention of (p*-p) --> gain is a +ve number
    return (goal_x-current_x, goal_y-current_y)  

def features_error(currentFeatures, goalFeatures):
    """ Return feature points error vector """
    UL_current, UR_current, LR_current, LL_current = currentFeatures 
    UL_goal, UR_goal, LR_goal, LL_goal = goalFeatures 
    
    # calculate error for each feature point
    UL_error = fp_error(UL_current, UL_goal)
    error_vector = np.asarray(UL_error)
    UR_error = fp_error(UR_current, UR_goal)
    error_vector = np.concatenate((error_vector, UR_error), axis=None)
    LR_error = fp_error(LR_current, LR_goal)
    error_vector = np.concatenate((error_vector, LR_error), axis=None)
    LL_error = fp_error(LL_current, LL_goal)
    error_vector = np.concatenate((error_vector, LL_error), axis=None)

    # return 1x8 error vector of UL, UR, LR, LL feature points
    return np.reshape(error_vector, (1,-1))

def image_jacobian(fp, mtx, depth):
    """ Generate image jacobiab L for a feature point """
    # focal lengths in pixel unit
    f_u = mtx[0,0]
    f_v = mtx[1,1]
    c_u = mtx[0,2]
    c_v = mtx[1,2]

    # Estimated distance of reference object (m)
    Z = depth

    # Calculate J of feature point
    u_raw, v_raw = fp
    u = u_raw - c_u
    v = v_raw - c_v    
    
    L = np.array([[-f_u/Z, 0,      u/Z, u*v/f_u,     -(f_u+u*u/f_u), v],
                  [0,      -f_v/Z, v/Z, f_v+v*v/f_v, -u*v/f_v,      -u]])
    return L

def robot2world(robot_orientation):
    """ calculate the robot jacobian """
    theta = robot_orientation
    
    return np.array([[math.cos(theta),0],
                  [math.sin(theta),0],
                  [0,0],
                  [0,0],
                  [0,0],
                  [0,1]])

def control2robot(wheel_radius, axle_length):
    """ transform wheel speeds to robot motion in world frame """
    l = axle_length
    r = wheel_radius

    return np.array([[r/2, r/2],
                  [r/l, -r/l]])

def save_snapshot(img, directory, name, i):
    image_path = os.path.join(directory, 'detect_'+name+str(i+1)+'.jpg')
    cv2.imwrite(image_path, img) 
    return

def omega2speed(in_val, mapping):
    """ Map wheel angular speed to motor speed setting based on a calibration mapping """
    
    if in_val < 0:
        sign = -1
        in_val = abs(in_val)
    else:
        sign = 1
        
    out_lower = 0
    in_lower = 0
    out_val = 0

    for i, in_upper in enumerate(mapping["omega"]):
        # print (i, in_upper)
        if in_val < in_upper:
            out_upper = mapping["speed"][i]
            out_val = out_lower + (in_val - in_lower)/(in_upper - in_lower) \
                *(out_upper-out_lower)
            # print("yes", out_val)
            break
        else:
            # print("no")
            out_lower = mapping["speed"][i]
            in_lower = in_upper
            
    if out_val is 0:
        print ("Input is too high!!!", in_val)
        out_val = 0
        
    return sign*out_val

def rotate_left_2wheels(speed, Rtime):
    
    robot.set_motors(-speed, speed)
    time.sleep(Rtime)
    robot.stop()
    
    return

def clamp(wheel_velocities, lower_limit, debug=False):
    """ Put limits on motor speed """
    robot_velocities = np.dot(T, wheel_velocities)  # get (tranl_vel, ang_vel)
    tranl_velocity = robot_velocities[0,0]
    
    # Set lower limit on translational speed to 0.15m/s (motor setting 0.3)
    
    if abs(tranl_velocity) < lower_limit:
        robot_velocities[0,0] = tranl_velocity/abs(tranl_velocity) * 0.15
        if debug:
            print("Robot speed boosted", robot_velocities) 
    """
    elif abs(tranl_velocity) > 0.9:
        robot_velocities[0,0] = tranl_velocity/abs(tranl_velocity) * 0.8
    """ 
    # return clamped wheel velocities
    return np.dot(np.linalg.pinv(T), robot_velocities)
        
 

## IBVS Main Procedure

In [60]:
def ibvs_control(robot_params, control_params):
    """ Control loop for IBVS """
    
    # Load control loop parameters
    gain = control_params["gain"]
    Rtime = control_params["Rtime"]
    num_iter = control_params["num_iter"]
    interval = control_params["interval"]
    no_motion = control_params["no_motion"]
    debug = goal_bbox = control_params["debug"]
    ref_obj = control_params["ref_obj"]
    obj_name = control_params["obj_name"]
    goal_bbox = control_params["goal_bbox"]
    error_window = control_params["error_window"]
    speed_lower_limit = control_params["speed_lower"]
    
    # load robot control parameters
    depth = robot_params["depth"]
    orientation = robot_params["orientation"]
    wheel_radius = robot_params["wheel_radius"]
    axle_length = robot_params["axle_length"]
    
    goal_features = create_feature_pts(goal_bbox)
    print("Tracking", COCO_labels[ref_obj]," to ", goal_features)

    start = time.perf_counter()
    for i in range(num_iter):
        
        print("Iteration ", i)
        
        """ Grab camera image, undistort and detect objects """
        image = undistort(camera.value, mtx, dist) # undistort camera image
        detections = model(image) # Use SSD model to detect objects
        display_detected(image, detections, width, height, debug = False) # put bounding boxes on detected objects
        
        """ If ref object is detected, bound it in green box """
        obj_features = generate_ref_obj_features(ref_obj, image, detections, width, height)
    
        if obj_features is None:
            print("Reference object not detected!!!")
            continue
        
        """ Generate error vector (goal-current) """
        # Paint ref object's bbox at robot's goal pose in red
        cv2.rectangle(image, goal_features[0], goal_features[2], RED, 2)
        error = features_error(obj_features,goal_features)
        error_norm = np.linalg.norm(error)
    
        if i%interval == 0 and debug:
            save_snapshot(image, diag_dir, obj_name, i)  # save snap shot
            print ("Ref Object:", obj_features)
            print("Error:", error, error_norm)

        if error_norm < error_window:  # exit control loop if error is within acceptable window
            print ("Goal point achieved!!!")
            break
        
        image_widget.value = bgr8_to_jpeg(image)  # update image widget with camera image
        
        """ Implementation of IBVS control: w = gain*pseudo_inv(LHT).error """        
        # T transforms wheel angular velocities (R,L) to robot velocities (translation, angular)
        T = control2robot(wheel_radius, axle_length) 
        # H transform robot velocities to world frame velocities (v_x,v_y,v_z, w_x,w_y, w_z)
        H = robot2world(orientation)  
        # Image Jacobian L - formed by stacking image jacobians of 4 fp of the ref object
        UL_fp, UR_fp, LR_fp, LL_fp = obj_features
        L = np.vstack((image_jacobian(UL_fp, mtx, depth), image_jacobian(UR_fp, mtx, depth), \
              image_jacobian(LR_fp, mtx, depth), image_jacobian(LL_fp, mtx, depth)))
        
        # implement w = gain * pinv(LHT).error
        J = np.dot(L,np.dot(H,T))
        inv_J = np.linalg.pinv(J)
        unclamped_velocities = gain * np.dot(np.linalg.pinv(J), error.T)
       
        wheel_velocities = clamp(unclamped_velocities, speed_lower_limit, debug=True) 
        robot_velocities = np.dot(T, wheel_velocities)
        
        
        """ Map wheel angular velocities to motor setting, then run motors """ 
        w_r = omega2speed(wheel_velocities[0,0],wheel_calibration) 
        w_l = omega2speed(wheel_velocities[1,0],wheel_calibration)
        
        if no_motion is False:
            robot.set_motors(w_l, w_r)  # left, right
            time.sleep(Rtime)
            robot.stop()
        
        if i%interval == 0 and debug:
            print(wheel_velocities, (w_l, w_r))  
            print(robot_velocities)
        
        """ Update robot's depth and orientation """ 
        transl_velocity = robot_velocities[0,0]
        ang_velocity = robot_velocities[1,0]
        orientation += Rtime*ang_velocity
        # depth = online_est()  TBD
        
        if i%interval == 0 and debug:
            print(wheel_velocities, (w_l, w_r))
            print(robot_velocities)
            print("Orientation:", orientation)
            print("Depth:", depth)
            
    save_snapshot(image, diag_dir, obj_name, i)    # save the last image
    end = time.perf_counter()
    print ("FPS: {:.1f}".format(i/(end-start)))
    
    return

### (0,0,0) --> (1,0,0)

In [58]:
# Display camera image with bounding boxes for detected objects
display(widgets.HBox([image_widget]))

robot_params = {
    "depth": 2.0,
    "orientation": 0,
    "wheel_radius": 0.0325,
    "axle_length": 0.12
}

control_params = {
    "gain": 1.0,
    "Rtime": 0.1, # The loop run at 8fps --> 0.125s duration
    "num_iter": 80,
    "interval": 3,
    "no_motion": False,  # Flag to disable motor (for debugging)
    "debug": True,
    "ref_obj": 72,  # Use TV as reference object
    "obj_name": "TV",
    "goal_bbox": mid_bbox_TV,  # This should get the robot to ~(0.5,0,0) 
    "error_window": 15,
    "speed_lower": 0.14  # This ensures motor settings > ~0.3
}

ibvs_control(robot_params, control_params)
robot.stop()

HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

Tracking laptop  to  [(33, 69), (114, 69), (114, 124), (33, 124)]
Iteration  0
Ref Object: [(62, 92), (123, 92), (123, 133), (62, 133)]
Error: [[-29 -23  -9 -23  -9  -9 -29  -9]] 55.35341001239219
[[12.60284779]
 [13.34030081]] (0.4712357159094587, 0.45845490103128816)
[[ 0.42157616]
 [-0.19972686]]
[[12.60284779]
 [13.34030081]] (0.4712357159094587, 0.45845490103128816)
[[ 0.42157616]
 [-0.19972686]]
Orientation: -0.019972685916907672
Depth: 2.0
Iteration  1
Iteration  2
Iteration  3
Ref Object: [(66, 90), (126, 90), (126, 134), (66, 134)]
Error: [[-33 -21 -12 -21 -12 -10 -33 -10]] 59.565090447341724
[[14.54174797]
 [15.37763904]] (0.5034966578165561, 0.4920580237964005)
[[ 0.48619004]
 [-0.22638717]]
[[14.54174797]
 [15.37763904]] (0.5034966578165561, 0.4920580237964005)
[[ 0.48619004]
 [-0.22638717]]
Orientation: -0.08863310005172818
Depth: 2.0
Iteration  4
Iteration  5
Robot speed boosted [[ 0.15      ]
 [-0.12127066]]
Iteration  6
Ref Object: [(42, 81), (112, 81), (112, 133), (42,

In [63]:
# Display camera image with bounding boxes for detected objects
display(widgets.HBox([image_widget]))

robot_params = {
    "depth": 1.5,
    "orientation": 0,
    "wheel_radius": 0.0325,
    "axle_length": 0.12
}

control_params = {
    "gain": 1.0,
    "Rtime": 0.1, # The loop run at 8fps --> 0.125s duration
    "num_iter": 80,
    "interval": 3,
    "no_motion": False,  # Flag to disable motor (for debugging)
    "debug": True,
    "ref_obj": 62,  # Use TV as reference object
    "obj_name": "stool_1",
    "goal_bbox": end_bbox_stool,  # This should get the robot to ~(1,0,0) 
    "error_window": 15,
    "speed_lower": 0.14  # This ensures motor settings > ~0.3
}

ibvs_control(robot_params, control_params)
robot.stop()

HBox(children=(Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C…

Tracking couch  to  [(175, 84), (237, 84), (237, 186), (175, 186)]
Iteration  0
Ref Object: [(159, 124), (194, 124), (194, 183), (159, 183)]
Error: [[ 16 -40  43 -40  43   3  16   3]] 86.18584570566097
[[-10.27944619]
 [-11.25510308]] (-0.4350971070227968, -0.41818797553180437)
[[-0.34993643]
 [ 0.26424041]]
[[-10.27944619]
 [-11.25510308]] (-0.4350971070227968, -0.41818797553180437)
[[-0.34993643]
 [ 0.26424041]]
Orientation: 0.02642404069040298
Depth: 1.5
Iteration  1
Iteration  2
Iteration  3
Ref Object: [(175, 128), (214, 128), (214, 184), (175, 184)]
Error: [[  0 -44  23 -44  23   2   0   2]] 70.27090436304346
[[-4.0229221 ]
 [-5.17994914]] (-0.32472024426878654, -0.3032141654878862)
[[-0.14954666]
 [ 0.31336149]]
[[-4.0229221 ]
 [-5.17994914]] (-0.32472024426878654, -0.3032141654878862)
[[-0.14954666]
 [ 0.31336149]]
Orientation: 0.11716009285781792
Depth: 1.5
Iteration  4
Robot speed boosted [[-0.15      ]
 [ 0.26282143]]
Iteration  5
Robot speed boosted [[-0.15      ]
 [ 0.2746

In [55]:
w = np.array([[4],[4]])
T = control2robot(0.0325, 0.12) 
np.dot(T,w)

array([[0.13],
       [0.  ]])

In [59]:
save_snapshot(image, diag_dir, obj_name, 70) 

NameError: name 'image' is not defined