In [1]:
# Importing servos and torch
from SCSCtrl import TTLServo
import torch
# Setting camera little bit lower if it is not yet
TTLServo.servoAngleCtrl(5,10,1,150)
# Transfer the device from CPU memory to the GPU device
device = torch.device('cuda')

Succeeded to open the port
Succeeded to change the baudrate


In [2]:
# Importing TRT optimized models
from torch2trt import TRTModule

model_road = TRTModule()
model_road.load_state_dict(torch.load('best_steering_model_xy_trt.pth')) # well trained road following model

model_collision = TRTModule()
model_collision.load_state_dict(torch.load('best_model_trt.pth')) # well trained collision avoidance model

<All keys matched successfully>

In [3]:
# This is from road_following/live_demo.ipynb
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()
# Converting models to match camera
def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [4]:
# Setting camera, also its from jetbot own notebooks
from IPython.display import display
import ipywidgets
import traitlets
from jetbot import Camera, bgr8_to_jpeg

camera = Camera()

image_widget = ipywidgets.Image()

traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

<traitlets.traitlets.directional_link at 0x7ef25b7be0>

In [5]:
#Importing tobot to control motors
from jetbot import Robot

robot = Robot()

In [6]:
# Road following sliders
speed_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01,  description='steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001,description='steering kd')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, description='steering bias')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)

FloatSlider(value=0.0, description='speed gain', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering gain', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering kd', max=0.5, step=0.001)

FloatSlider(value=0.0, description='steering bias', max=0.3, min=-0.3, step=0.01)

In [7]:
#Collision avoidance sliders
stop_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, orientation='vertical',description='stop')

display(ipywidgets.HBox([stop_slider]))
display(image_widget)

HBox(children=(FloatSlider(value=0.0, description='stop', max=1.0, orientation='vertical'),))

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [10]:
# Importing all needed stuff
import cv2 as cv
import numpy as np
from SCSCtrl import TTLServo
import torch.nn.functional as F
import time
import math
import imutils
import datetime

angle = 0.0
angle_last = 0.0
avg = None

lastMovtionCaptured = datetime.datetime.now()

def movement_detection(imageInput):
    global avg, lastMovtionCaptured
    servos = 0
    # Current timestamp
    timestamp = datetime.datetime.now()
    # Image to gray color space
    gray = cv.cvtColor(imageInput, cv.COLOR_BGR2GRAY)
    # GaussionBlur helps camera to understand movement better
    gray = cv.GaussianBlur(gray, (21, 21), 0)
    # If the background has not been obtained, create a new one.
    if avg is None:
        avg = gray.copy().astype("float")
        return imageInput
    # background update
    cv.accumulateWeighted(gray, avg, 0.5)
    # Comparing the difference between the new frame and the background.
    frameDelta = cv.absdiff(gray, cv.convertScaleAbs(avg))
    # Get the outline of the changed area in the frame.
    thresh = cv.threshold(frameDelta, 5, 255,
        cv.THRESH_BINARY)[1]
    thresh = cv.dilate(thresh, None, iterations=2)
    cnts_motion = cv.findContours(thresh.copy(), cv.RETR_EXTERNAL,
        cv.CHAIN_APPROX_SIMPLE)
    cnts_motion = imutils.grab_contours(cnts_motion)
    
    for c in cnts_motion:
        # smaller value means more sensitive movement, 800 is maximum value.
            if cv.contourArea(c) > 100:
                # Draw elements, including rectangle and text.
                (mov_x, mov_y, mov_w, mov_h) = cv.boundingRect(c)
                # Rectangle color is blue
                cv.rectangle(imageInput, (mov_x, mov_y), (mov_x+mov_w, mov_y+mov_h), (255, 0, 0), 2)

                # Save the current timestamp to mark the time when the change is detected.
                lastMovtionCaptured = timestamp
                # Servo does the movement
                TTLServo.servoAngleCtrl(3, -30, 1, 500)
                servos = 1
            # if there is no movement then servo stays on normal position
    if servos == 0:
        TTLServo.servoAngleCtrl(3, 0, 1, 150)

            
    return imageInput

def color_detection(imageInput):
     # Image to hsv color space
    hsv = cv.cvtColor(imageInput, cv.COLOR_BGR2HSV)
    # Values for yellow, because train have yellow color and stop background dont have that color
    colorUpperYellow = np.array([44, 255, 255])
    colorLowerYellow = np.array([24, 100, 100])
    # Values for blue, because train have blue color and stop background dont have that color
    colorUpperBlue = np.array([140, 255, 255])
    colorLowerBlue = np.array([100, 50, 50])
    
    # Mask for both colors
    maskYellow = cv.inRange(hsv, colorLowerYellow, colorUpperYellow)
    maskBlue = cv.inRange(hsv, colorLowerBlue, colorUpperBlue)
    # Restoring bigger and smaller areas for both masks
    maskYellow = cv.erode(maskYellow, None, iterations=2)
    maskYellow = cv.dilate(maskYellow, None, iterations=2)
    maskBlue = cv.erode(maskBlue, None, iterations=2)
    maskBlue = cv.dilate(maskBlue, None, iterations=2)
    # Creating one mask which contains both colors separately or together
    mask = cv.bitwise_or(maskBlue, maskYellow)
    # Obtaining the conformed area contour
    cnts = cv.findContours(mask.copy(), cv.RETR_EXTERNAL,
        cv.CHAIN_APPROX_SIMPLE)[-2]
    
    if len(cnts) > 0:
        # Finding the largest contour area
        flag_area = max(cnts, key=cv.contourArea)
        # Changing mask values to work better with image
        mask = cv.bitwise_not(mask)
        mask = cv.bitwise_and(imageInput,imageInput, mask=mask)
        # Creating rectangle for the recognised color
        (xg,yg,wg,hg) = cv.boundingRect(flag_area)
        # Rectangle color is green
        cv.rectangle(imageInput,(xg,yg),(xg+wg, yg+hg),(0,255,0),2)
        # Servo does the movement
        TTLServo.servoAngleCtrl(3, -30, 1, 500)
    # If colors that train have are not found then servo is on normal state
    else:
        TTLServo.servoAngleCtrl(3, 0, 1, 150)
        
    return imageInput

def railroad_crossing(imageInput):
    # Global variables
    global angle, angle_last, stop_slider, robot
    global speed_value, steering_bias, steering_gain, steering_dgain
    # Preprocessed road following model
    xy = model_road(preprocess(imageInput)).detach().float().cpu().numpy().flatten()
    x = xy[0]
    y = (0.5 - xy[1]) / 2.0
    # Slider values for road following model
    speed_value = speed_gain_slider.value
    steering_bias = steering_bias_slider.value
    steering_gain = steering_gain_slider.value
    steering_dgain = steering_dgain_slider.value
    # Preprocessed collision avoidance model
    prob_stop = float(F.softmax(model_collision(preprocess(imageInput)), dim=1).flatten()[0])
    # Slider value for stop from collision avoidance
    stop_slider.value = prob_stop
    
    
    # If the robot is in right place it stops and waits movement or right colors
    # Thresshold value is 0.07 but you can change it higher to stop easier 
    if prob_stop < 0.07:
        # All values to 0 because then robot dont move
        robot.right_motor.value = 0.0
        robot.right_motor.value = 0.0
        x = 0.0
        y = 0.0
        speed_value = 0
        steering_value = 0
        steering_gain = 0
        # Movement function, comment it if you dont want to use it
        movement_detection(imageInput)
        # Color function, comment it if you dont want to use it
        color_detection(imageInput)


    # Values from road following notebook to get robot move properly
    angle = np.arctan2(x, y)
    pid = angle * steering_gain + (angle - angle_last) * steering_dgain
    angle_last = angle
    steering_value = pid + steering_bias

    robot.left_motor.value = max(min(speed_value + steering_value, 1.0), 0.0)
    robot.right_motor.value = max(min(speed_value - steering_value, 1.0), 0.0)


    return imageInput

In [11]:
# executing new camera frame
def execute(change):
    global image_widget
    image = change['new']
    image_widget.value = bgr8_to_jpeg(railroad_crossing(image))
    
execute({'new': camera.value})
camera.unobserve_all()
camera.observe(execute, names='value')

In [12]:
# Shutting robot and camera down
import time

camera.unobserve(execute, names='value')

time.sleep(0.1)  # add a small sleep to make sure frames have finished processing

robot.stop()
camera.stop()