In [1]:
import torchvision
import torch

model = torchvision.models.resnet18(pretrained=False)
model.fc = torch.nn.Linear(512, 2)

In [2]:
model.load_state_dict(torch.load('best_steering_model_xy.pth'))

<All keys matched successfully>

In [3]:
device = torch.device('cuda')
model = model.to(device)
model = model.eval().half()

In [4]:
import torchvision.transforms as transforms
import torch.nn.functional as F
import cv2
import PIL.Image
import numpy as np

mean = torch.Tensor([0.485, 0.456, 0.406]).cuda().half()
std = torch.Tensor([0.229, 0.224, 0.225]).cuda().half()

def preprocess(image):
    image = PIL.Image.fromarray(image)
    image = transforms.functional.to_tensor(image).to(device).half()
    image.sub_(mean[:, None, None]).div_(std[:, None, None])
    return image[None, ...]

In [5]:
from IPython.display import display
from traitlets.config.configurable import SingletonConfigurable
import ipywidgets
import traitlets
#using realsense to capture the color and depth image
import pyrealsense2 as rs
#multi-threading is used to capture the image in real time performance
import threading

class Camera(SingletonConfigurable):
    
    #this changing of these two values will be captured by traitlets
    color_value = traitlets.Any()
    depth_value = traitlets.Any()
    
    def __init__(self):
        super(Camera, self).__init__()
        #self.i=0
        #configure the color and depth sensor
        
        self.pipeline = rs.pipeline()
        self.configuration = rs.config()  
        fourcc = cv2.VideoWriter_fourcc('m','p','4','v')
        self.out = cv2.VideoWriter('output.mp4',fourcc, 30.0, (640,480))
        #set resolution for the color camera
        self.color_width = 640
        self.color_height = 480
        self.color_fps = 30
        self.configuration.enable_stream(rs.stream.color, self.color_width, self.color_height, rs.format.bgr8, self.color_fps)

        #set resolution for the depth camera
        self.depth_width = 640
        self.depth_height = 480
        self.depth_fps = 30
        self.configuration.enable_stream(rs.stream.depth, self.depth_width, self.depth_height, rs.format.z16, self.depth_fps)

        self.stop_flag = False
        
        #start capture the first depth and color image
        self.pipeline.start(self.configuration)
        self.pipeline_started = True
        frames = self.pipeline.wait_for_frames()

        color_frame = frames.get_color_frame()   
        image = np.asanyarray(color_frame.get_data())
        self.color_value = image

        depth_frame = frames.get_depth_frame()           
        depth_image = np.asanyarray(depth_frame.get_data())
        depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
        self.depth_value = depth_colormap    

    def _capture_frames(self):
        while(self.stop_flag==False):
            frames = self.pipeline.wait_for_frames()
            color_frame = frames.get_color_frame()
            image = np.asanyarray(color_frame.get_data())
            self.color_value = image

            depth_frame = frames.get_depth_frame()              
            depth_image = np.asanyarray(depth_frame.get_data())
            depth_colormap = cv2.applyColorMap(cv2.convertScaleAbs(depth_image, alpha=0.03), cv2.COLORMAP_JET)
            self.depth_value = depth_colormap
            #cv2.imwrite(str(self.i)+'.jpg',image)
            #self.i+=1
            self.out.write(self.color_value)
    def start(self):
        if not hasattr(self, 'thread') or not self.thread.isAlive():
            self.thread = threading.Thread(target=self._capture_frames)
            self.thread.start()

    def stop(self):
        self.stop_flag = True
        if hasattr(self, 'thread'):
            self.thread.join()        
        if self.pipeline_started:
            self.pipeline.stop()
        self.out.release()
def bgr8_to_jpeg(value,quality=75):
    return bytes(cv2.imencode('.jpg',value)[1])

#create a camera object
camera = Camera.instance()

image_widget = ipywidgets.Image()

traitlets.dlink((camera, 'color_value'), (image_widget, 'value'), transform=bgr8_to_jpeg)

display(image_widget)

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [6]:
from RobotClass import Robot

robot = Robot()

In [7]:
speed_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, description='speed gain')
steering_gain_slider = ipywidgets.FloatSlider(min=0.0, max=1.0, step=0.01, value=0.2, description='steering gain')
steering_dgain_slider = ipywidgets.FloatSlider(min=0.0, max=0.5, step=0.001, value=0.0, description='steering kd')
steering_bias_slider = ipywidgets.FloatSlider(min=-0.3, max=0.3, step=0.01, value=0.0, description='steering bias')

display(speed_gain_slider, steering_gain_slider, steering_dgain_slider, steering_bias_slider)

FloatSlider(value=0.0, description='speed gain', max=1.0, step=0.01)

FloatSlider(value=0.2, description='steering gain', max=1.0, step=0.01)

FloatSlider(value=0.0, description='steering kd', max=0.5, step=0.001)

FloatSlider(value=0.0, description='steering bias', max=0.3, min=-0.3, step=0.01)

In [14]:
x_slider = ipywidgets.FloatSlider(min=-10.0, max=10.0, description='x')
y_slider = ipywidgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='y')
steering_slider = ipywidgets.FloatSlider(min=-1.0, max=1.0, description='steering')
speed_slider = ipywidgets.FloatSlider(min=0, max=1.0, orientation='vertical', description='speed')

display(ipywidgets.HBox([y_slider, speed_slider]))
display(x_slider, steering_slider)

HBox(children=(FloatSlider(value=0.0, description='y', max=1.0, orientation='vertical'), FloatSlider(value=0.0…

FloatSlider(value=0.9427734375000001, description='x', max=10.0, min=-10.0)

FloatSlider(value=0.08897068108303745, description='steering', max=1.0, min=-1.0)

In [15]:
angle = 0.0
angle_last = 0.0

def normalisex(x):
    x = 2*((x+4)/(5)) - 1
    return x

def execute(change):
    global angle, angle_last
    image = change['new']
    xy = model(preprocess(image)).detach().float().cpu().numpy().flatten()
    x= xy[0]
    y = xy[1]
    
    x_slider.value = x
    y_slider.value = y
    
    speed_slider.value = speed_gain_slider.value
    
    angle = np.arctan2(x, y)
    pid = angle * steering_gain_slider.value + (angle - angle_last) * steering_dgain_slider.value
    angle_last = angle
    
    steering_slider.value = pid + steering_bias_slider.value
    robot.set_motors(max(min(speed_slider.value + steering_slider.value, 1.0), 0.0), 
                     max(min(speed_slider.value - steering_slider.value, 1.0), 0.0), 
                     max(min(speed_slider.value + steering_slider.value, 1.0), 0.0), 
                     max(min(speed_slider.value - steering_slider.value, 1.0), 0.0))
    
execute({'new': camera.color_value})

In [16]:
camera.start() # start capturing the data
camera.observe(execute, names='color_value')

In [None]:
import time

camera.unobserve(execute, names='value')

time.sleep(0.1)  # add a small sleep to make sure frames have finished processing

camera.stop()
robot.stop()