# Skript für den Jetbot der Roadfollowing und Collision Avoidance kombiniert

In [None]:
import math
import os
import time
from uuid import uuid1

import PIL.Image
import ipywidgets as widgets
import torch
import torch.nn.functional as functional
import torchvision
import torchvision.transforms as transforms
import traitlets
from IPython.display import display

from jetbot import Camera, bgr8_to_jpeg
from jetbot import Robot
from lib.AutomaticDataCollector import AutomaticDataCollector

Load models

In [None]:
model_road = torchvision.models.resnet18(pretrained=False)
model_road.fc = torch.nn.Linear(512, 2)
model_road.load_state_dict(torch.load('models/road_following/best_steering_model_xy_12_04_a.pth'))

model_collision = torchvision.models.alexnet(pretrained=False)
model_collision.classifier[6] = torch.nn.Linear(model_collision.classifier[6].in_features, 2)
model_collision.load_state_dict(torch.load('models/collision/collision_model_13_04_a.pth'))


Add cuda device to models

In [None]:
device = torch.device('cuda')
model_road = model_road.to(device)
model_collision = model_collision.to(device)

Preprocess Image

In [None]:
mean = torch.Tensor([0.485, 0.456, 0.406]).half().float().to(device)
std = torch.Tensor([0.229, 0.224, 0.225]).half().float().to(device)


def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device)
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

Create Camera instance

In [None]:
camera = Camera.instance(width=224, height=224, fps=10)

Widget for information and camera feed display

In [None]:
image_widget = widgets.Image()

speed_output = widgets.Text(
    value='',
    placeholder='',
    description='(l,r):',
    disabled=True
)

model_output = widgets.Textarea(
    value='',
    placeholder='Type something',
    description='Model:',
    disabled=False
)

traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

Sliders and information widgets

In [None]:
# Road Following sliders
speed_control_slider = widgets.FloatSlider(value=0.0, min=0.0, max=1.0, step=0.01, description='speed control')
steering_gain_slider = widgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.04, description='steering gain')
steering_kd_gain_slider = widgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_bias_slider = widgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='steering bias')

#Collision Avoidance sliders
blocked_slider = widgets.FloatSlider(min=0.0, max=1.0, orientation='horizontal', description='blocked')
stop_duration_slider = widgets.IntSlider(min=1, max=1000, step=1, value=10, description='time for stop')
blocked_threshold = widgets.FloatSlider(min=0, max=1.0, step=0.01, value=0.8, description='blocked threshold')

# Steering sliders
diff_gain_limit_slider = widgets.FloatSlider(min=0.0, max=1.8, step=0.01, value=1.5, description='diff_gain_limit')
curve_min_val_slider = widgets.FloatSlider(min=0.0, max=0.25, step=0.01, value=0.08, description='curve_min_val')
alpha_factor_slider = widgets.FloatSlider(min=-1.0, max=1.0, step=0.01, value=0.00, description='alpha_factor')
steering_correction_activation = widgets.FloatSlider(min=0.0, max=math.pi, step=0.05, value=0.25, description='steering_correction_activation')

toggle_exec_btn = widgets.ToggleButton(
    value=False,
    description='Start',
    disabled=False,
    button_style='success',
    tooltip='Description',
    icon='check'
)

snapshot_btn = widgets.Button(
    description='snapshot'
)


make snapshot

In [None]:
def take_snapshot(b=None):
    try:
        os.makedirs("snapshot")
    except FileExistsError:
        pass
    with open(os.path.join('snapshot', str(uuid1()) + '.jpg'), 'wb') as f:
        f.write(image_widget.value)


snapshot_btn.on_click(take_snapshot)

debug slider for speed

In [None]:
a_slider = widgets.FloatSlider(min=-math.pi, max=math.pi, step=0.01, description='a')
xy_out = widgets.Text()

Display the controlling widgets

Variables for slider controls

In [None]:
speed_control, steer_gain, steer_kd_gain, steer_bias = speed_control_slider.value, 0.0, 0.0, 0.0

Init Roboter Object

In [None]:
robot = Robot()

Global Vars

In [None]:
angle_last, stop_counter = 0.0, 0
can_drive = True
stop_counter_limit = 10
x, y = 0.0, 0.0
robot_is_stopped = True

calculate speed values

In [None]:
def calculate_speed(last_a: float, x_in: float, y_in: float) -> (float, float, float):
    global a_slider
    if robot_is_stopped:
        return 0.0, 0.0, 0.0
    a = math.atan2(x_in, y_in)
    a_slider.value = a
    xy_out.value = f"({x:.02f} {y:.02f})"
    pid = a * steer_gain + (a - last_a) * steer_kd_gain
    steer_val = pid + steer_bias
    left = max(min(speed_control + steer_val, 1.0), 0.0)
    right = max(min(speed_control - steer_val, 1.0), 0.0)
    return a, left, right

In [None]:
def get_collision_chance(image) -> float:
    collision = model_collision(image)
    collision_softmax = functional.softmax(collision, dim=1).flatten()
    return float(collision_softmax[0])

In [None]:
def get_road_direction(image) -> (float, float):
    model_xy = model_road(image).detach().float().cpu().numpy().flatten()
    model_x = model_xy[0]
    model_y = (0.5 - model_xy[1]) / 2.0
    return model_x, model_y

In [None]:
def correct_steering(left: float, right: float, angle: float) -> (float, float):
    abs_angle = math.fabs(angle)
    if abs_angle < steering_correction_activation.value:
        return left, right
    
    angle_f = abs_angle / math.pi + alpha_factor_slider.value
    diff_gain_limit = diff_gain_limit_slider.value
    curve_min_val = curve_min_val_slider.value

    if left * diff_gain_limit < right:
        l = (left * diff_gain_limit)
        right = min(l if l > 0 else curve_min_val, right)
        left = left - (angle_f * left)

    elif left > right * diff_gain_limit:
        r = (right * diff_gain_limit)
        left = min(r if r > 0 else curve_min_val, left)
        right = right - (angle_f * right)

    return left, right

drive execute logic

In [None]:
def execute(change):
    global angle_last, blocked_slider, robot, stop_counter, stop_counter_limit, can_drive, x, y, blocked_threshold
    global speed_control, steer_gain, steer_kd_gain, steer_bias

    image = change['new']
    image_preproc = preprocess(image).to(device)

    # get slider values
    steer_gain = steering_gain_slider.value
    steer_kd_gain = steering_kd_gain_slider.value
    steer_bias = steering_bias_slider.value
    stop_counter_limit = stop_duration_slider.value
    max_speed = speed_control_slider.value

    prob_blocked = get_collision_chance(image_preproc)
    blocked_slider.value = prob_blocked

    if can_drive:
        can_drive = prob_blocked <= blocked_threshold.value
        if can_drive:
            stop_counter = 0
            x, y = get_road_direction(image_preproc)
            speed_control = max_speed
        else:
            stop_counter += 1
    else:
        stop_counter += 1
        if stop_counter < stop_counter_limit:
            x, y, speed_control = 0.0, 0.0, 0
        else:
            can_drive = True
            stop_counter = 0

    angle, left, right = calculate_speed(angle_last, x, y)
    angle_last = angle

    left, right = correct_steering(left, right, angle)

    robot.left_motor.value = left
    robot.right_motor.value = right

    speed_output.value = f'{left:.02f}/{right:.02f}'
execute({'new': camera.value})

In [None]:
collector = AutomaticDataCollector(camera, wait_time=2)

def start_stop_toggle(obj):
    global robot_is_stopped
    if obj["new"]:
        toggle_exec_btn.button_style = 'warning'
        toggle_exec_btn.icon = 'warning'
        toggle_exec_btn.description = "Stop"
        camera.observe(execute, names='value')
        robot_is_stopped = False
        collector.start()
    else:
        toggle_exec_btn.button_style = 'success'
        toggle_exec_btn.icon = 'check'
        toggle_exec_btn.description = 'Start'
        camera.unobserve(execute, names='value')
        robot_is_stopped = True
        time.sleep(0.1)
        robot.stop()
        collector.stop()


toggle_exec_btn.observe(start_stop_toggle, 'value')


In [None]:
display(speed_control_slider, steering_gain_slider, steering_kd_gain_slider, steering_bias_slider)

display(diff_gain_limit_slider, curve_min_val_slider, alpha_factor_slider, steering_correction_activation)

display(widgets.HBox([toggle_exec_btn, snapshot_btn]))

display(widgets.HBox([image_widget]))
display(speed_output)

display(widgets.HBox([blocked_slider, blocked_threshold, stop_duration_slider]))
display(a_slider, xy_out)