In [15]:
### Python Script to drive the Jetbot autonomous while reacting to signs on the track
import torchvision
import torch

# LOAD MODELS
# Model for Sign Detection:
modelSignDetection = torchvision.models.squeezenet1_1(pretrained=False)
modelSignDetection.classifier[1] = torch.nn.Conv2d(512, 7, kernel_size=1)
modelSignDetection.num_classes = 7
modelSignDetection.load_state_dict(torch.load('signDetection_V2_squeeze_conv.pth'))

# Model for Road Following
modelRoadFollowing = torchvision.models.squeezenet1_1(pretrained=False)
modelRoadFollowing.classifier[1] = torch.nn.Conv2d(512, 1, kernel_size=1)
modelRoadFollowing.num_classes = 1
modelRoadFollowing.load_state_dict(torch.load('roadFollowing_V3_squeeze_conv.pth'))

device = torch.device('cuda')

# Push models to GPU and apply half() datatype
modelSignDetection = modelSignDetection.to(device)
modelSignDetection = modelSignDetection.eval().half()
modelRoadFollowing = modelRoadFollowing.to(device)
modelRoadFollowing = modelRoadFollowing.eval().half()

In [16]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    """ 
    Method to preprocess images for the sign detection and road following models. 
    In(1): image - The image to preprocess as array
    Out(1): preprocessed image
    """
    # Activate the following line to mask the left half of the picture
    # -> Sign detection is only neccessary on right side
    # -> Is deactivated to combine preprocessing for both models in one method and to safe runtime
    # image[0:224, int(224*0): int(224*2/4)] =(0,0,0)
    
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

'\nmean = 255.0 * np.array([0.485, 0.456, 0.406])\nstdev = 255.0 * np.array([0.229, 0.224, 0.225])\n\nnormalize = torchvision.transforms.Normalize(mean, stdev)\n\ndef preprocessSign(camera_value):\n    global device, normalize\n    x = camera_value\n    x[0:224, int(224*0): int(224*2/4)] =(0,0,0)\n    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)\n    x = x.transpose((2, 0, 1))\n    x = torch.from_numpy(x).float()\n    x = normalize(x)\n    x = x.to(device)\n    x = x[None, ...]\n    return x\n'

In [18]:
import traitlets
from IPython.display import display
import ipywidgets.widgets as widgets
from jetbot import Camera, bgr8_to_jpeg

FPS = 10  # NOTE: System runs stable with about 10 FPS
display_image = False  # NOTE: Displaying the image will cost a lot of computational ressources. Only display for debugging!

# DISPLAY DASHBOARD
camera = Camera.instance(width=224, height=224, fps=FPS)
image = widgets.Image(format='jpeg', width=224, height=224)
camera_link = traitlets.dlink((camera, 'value'), (image, 'value'), transform=bgr8_to_jpeg)

# Sign Detection Sliders
fast_slider = widgets.FloatSlider(description='fast', min=0.0, max=1.0, orientation='vertical')
left_slider = widgets.FloatSlider(description='left', min=0.0, max=1.0, orientation='vertical')
right_slider = widgets.FloatSlider(description='right', min=0.0, max=1.0, orientation='vertical')
slow_slider = widgets.FloatSlider(description='slow', min=0.0, max=1.0, orientation='vertical')
stop_slider = widgets.FloatSlider(description='stop', min=0.0, max=1.0, orientation='vertical')
nothing_slider = widgets.FloatSlider(description='nothing', min=0.0, max=1.0, orientation='vertical')
attention_slider = widgets.FloatSlider(description='attention', min=0.0, max=1.0, orientation='vertical')

if display_image:
    display(widgets.HBox([image, fast_slider, left_slider, right_slider, slow_slider, stop_slider, attention_slider, nothing_slider]))
else:
    display(widgets.HBox([fast_slider, left_slider, right_slider, slow_slider, stop_slider, attention_slider, nothing_slider]))

# Driving Sliders
speed_gain_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.0, description='steering gain')
steering_dgain_slider = widgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_slider = widgets.FloatSlider(min=-1.0, max=1.0, description='steering')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, brake_slider, steering_slider)

# Init Robot to access ressources
from jetbot import Robot
robot = Robot()

HBox(children=(FloatSlider(value=0.0, description='fast', max=1.0, orientation='vertical'), FloatSlider(value=…

FloatSlider(value=0.0, description='speed gain', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering gain', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering kd', max=0.5, step=0.001)

FloatSlider(value=0.0, description='braking', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering', max=1.0, min=-1.0)

In [19]:
import torch.nn.functional as F
import time
import numpy as np

### GLOBAL VARIABLES ###
angle = 0.0
angle_last = 0.0

# Timer related signs
isStop = False
stopTimer = 100
isAttention = False
attentionTimer = 100

# Slow and fast
isSlow = False
driveSlow = False
isFast = False

# Turning signs
isRight = False
goRight = False
isLeft = False
goLeft = False
straightTimerRight = 100 
straightTimerLeft = 100


def update(change):    
    """ 
    Method that is called for every new incoming image from the camera stream.
    Note: EVERY image will access this method because they are stacked as a queue. Therefore it is not possible to apply functionalities that 
    stop the program flow in this method because the image queue will grow and cause stability issues.
    """
    global fast_slider, left_slider, right_slider, slow_slider, stop_slider, nothing_slider, attention_slider, speed_slider
    global robot, angle, angle_last 
    global isStop, stopTimer, isAttention, attentionTimer, isSlow, driveSlow, isFast, isRight, goRight, isLeft, goLeft, straightTimerRight, straightTimerLeft
    image = camera.value
    image_preprocessed = preprocess(image)
    
    # Read speed
    target_velocity = speed_gain_slider.value
    
    ### SIGN DETECTION ###
    # Strategy:
    # A sign will be detected when its probability coming from sign detection CNN reaches a certain value.
    # When a sign is detected the procedure waits till it disappears to ensure that the robot drives nearby
    # the sign before reacting to it. After this certain reaction strategies are implemented depending
    # on the actual sign.
    
    signs = modelSignDetection(image_preprocessed).detach().float().cpu()
    
    # Read out probabilities
    signs = F.softmax(signs, dim=1)
    prob_fast = float(signs.flatten()[1])
    prob_left = float(signs.flatten()[2])
    prob_right = float(signs.flatten()[4])
    prob_slow = float(signs.flatten()[5])
    prob_stop = float(signs.flatten()[6])
    prob_nothing = float(signs.flatten()[3])
    prob_attention = float(signs.flatten()[0])
    
    # STOP SIGN HANDLING
    # ---> Stop for 2 seconds
    if prob_stop >= 0.95:
        isStop = True  
    
    if prob_stop <= 0.1 and isStop == True: 
        stopTimer = 0
        isStop = False
    
    if stopTimer < (2 * FPS):  # Wait for two seconds
        robot.left_motor.value = 0.0
        robot.right_motor.value = 0.0
        stopTimer += 1
        return
    
    # ATTENTION SIGN HANDLING
    # ---> Drive slow for 2 seconds
    if prob_attention >= 0.95:
        isAttention = True
    
    if prob_attention <= 0.1 and isAttention == True:
        attentionTimer = 0
        isAttention = False
    
    if attentionTimer < (2 * FPS):
        target_velocity = 0.09
        attentionTimer += 1
        
    # 30 SIGN HANDLING
    # ---> Drive slow until fast sign appears
    if prob_slow >= 0.95:
        isSlow = True
        
    if prob_slow <= 0.1 and isSlow == True:
        driveSlow = True
        isSlow = False
    
    if driveSlow:
        target_velocity = 0.09
        
    # FAST SIGN HANDLING
    # ---> Release slow sign state
    if prob_fast >= 0.7:
        isFast = True
    
    if prob_fast <= 0.1 and isFast == True:
        driveSlow = False
        isFast = False
        target_velocity = speed_gain_slider.value
        
    # RIGHT SIGN HANDLING
    # ---> Turn right
    if prob_right >= 0.95:
        isRight = True
        
    if prob_right <= 0.1 and isRight == True:
        goRight = True
        isRight = False
        straightTimerRight = 0
        
    if goRight and straightTimerRight < (2 * FPS):
        # Run straight into crossroad
        target_velocity = 0.12
        target_velocity = 0.12
        straightTimerRight += 1
    elif goRight and straightTimerRight < (3 * FPS):
        # Turn Right
        robot.left_motor.value = 0.14
        robot.right_motor.value = - 0.14
        straightTimerRight += 1
        return
    else:
        goRight = False
        
    # Left SIGN HANDLING
    # ---> Turn left
    if prob_left >= 0.95:
        isLeft = True
        
    if prob_left <= 0.1 and isLeft == True:
        goLeft = True
        isLeft = False
        straightTimerLeft = 0
        
    if goLeft and straightTimerLeft < (2 * FPS):
        # Run straight into crossroad
        target_velocity = 0.12
        target_velocity = 0.12
        straightTimerLeft += 1
    elif goLeft and straightTimerLeft < (3 * FPS):
        # Turn Left
        robot.left_motor.value = - 0.14
        robot.right_motor.value = 0.14
        straightTimerLeft += 1
        return
    else:
        goLeft = False
    
    # Visualization
    fast_slider.value = prob_fast
    left_slider.value = prob_left
    right_slider.value = prob_right
    slow_slider.value = prob_slow
    stop_slider.value = prob_stop
    nothing_slider.value = prob_nothing
    attention_slider.value = prob_attention
    
    ### LANE DETECTION ###
    angle = modelRoadFollowing(image_preprocessed).detach().float().cpu().numpy().flatten()
    angle = angle - 90
    angle = np.deg2rad(angle)
    
    pid = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
    angle_last = angle
    
    steering_slider.value = pid  # Visualize steering 
    
    robot.left_motor.value = max(min(target_velocity + steering_slider.value, 1.0), 0.0)
    robot.right_motor.value = max(min(target_velocity - steering_slider.value, 1.0), 0.0)
        
update({'new': camera.value}) 

In [20]:
### LAUNCH CONTROL ###
camera.observe(update, names='value')

In [None]:
### BLANK CELL TO AVIOD KILLING ###

In [14]:
### KILL ###
import time
camera.unobserve(update, names='value')
time.sleep(0.1)  # add a small sleep to make sure frames have finished processing
robot.stop()