# Learning to steer

In this notebook we're going to define a way to video from our racer along with a vector that contains the steering information of our racer. This should allow us to take an imitation learning approach in that we have collected a form of human priors of throttle and steering given a situation that the car might be found in and use the steering and throttles responses at different length of timesteps $n$ relative to the frame that out model is viewing.

We will use the following notation throughout the notebook.

* $\mathcal{S}$ - set of steering states
* $\mathcal{T}$ - set of throttle states
* $\mathcal{E}$ - set of environments
* $c$ - vector of left and right steering and acceleration and braking values
    * $\theta$ - left steering value
    * $r$ - right steering value
    * $a$ - accelerator value
    * $b$ - breaking value

$$ c = \{\theta_{t}, a_{t}, b_{t}\} $$

In [1]:
from multiprocessing import Process, Manager, Pool
import traitlets
from traitlets import HasTraits, Bool, observe
import time

class Capture(HasTraits):
    capture = Bool(False, help='Boolean to track image and steering capture state').tag(config=True)
    
    def __init__(self):
        
        self.manager = Manager()
        self.shared_controller_dicts = self.manager.list()
        #self.capture_process = Process(target=self._check_memory)
        
    @observe('capture')
    def _capture(self, change):
        """
        Start capture of frames and acceleration / steering
        """
        if change['new'] == True:
            self.capture_process = Process(target=self._check_memory)
            self.capture_process.start()
            if self.capture_process.is_alive() == True:
                print("Capture Started!")
        if change['new'] == False:
            self.capture_process.terminate()
            if self.capture_process.is_alive() == False:
                print("Capture Stopped!")
            
    def _check_memory(self):
        """
        Copy manager list to local process list for saving TFRecords
        """
        
        # TODO : This is hacky, and I know it. I will inevitably drop frames on the floor
        # because of how this is strung together. Can definitely use more work.
        while True:
            if len(self.shared_controller_dicts) >= 300:
                local_proc_controller_dicts = []; local_proc_controller_dicts.extend(self.shared_controller_dicts) #performance?
                self.shared_controller_dicts[:] = []
                ts = datetime.datetime.now().timestamp()
                save_tf_records(f"tfrecords/{ts}.tfrecord", local_proc_controller_dicts)
                

In [2]:
# Here we're going to try and map an old controller via wireless for control
import ipywidgets.widgets as widgets

controller = widgets.Controller(index=0)
display(controller)

from jetracer.nvidia_racecar import NvidiaRacecar
car = NvidiaRacecar()

Controller()

In [3]:
steering_control = traitlets.dlink((controller.axes[3], 'value'), (car, 'steering'), transform=lambda x: -x)
throttle_control = traitlets.dlink((controller.axes[1], 'value'), (car, 'throttle'), transform=lambda x: -x)

from jetcam.csi_camera import CSICamera
import datetime
import numpy as np
from IPython.display import display
import ipywidgets
from jetcam.utils import bgr8_to_jpeg
from jupyter_clickable_image_widget import ClickableImageWidget
import tensorflow as tf 

capture_tracker = Capture()

camera = CSICamera(width=224, height=224, capture_width=1080, capture_height=720, capture_fps=20)

image = camera.read()

image_widget = ipywidgets.Image(format='jpeg')

camera.running = True

#controller_dicts = []

def update_img(change):
    image = change['new']
    controller_dict = {}
    image_widget.value = bgr8_to_jpeg(image)
    timestamp = datetime.datetime.now().timestamp()
    controller_dict["timestamp"] = timestamp
    controller_dict["image"] = image
    controller_dict["steering_theta"] = controller.axes[1].value
    controller_dict["accelerator"] = controller.axes[3].value
    controller_dict["height"] = int(224)
    controller_dict["width"] = int(224)
    controller_dict["capture_width"] = int(1080)
    controller_dict["capture_height"] = int(720)
    controller_dict["capture_fps"] = int(20)
    controller_dict["num_channels"] = int(3)
    capture_tracker.shared_controller_dicts.append(controller_dict)

def _ndarray_feature(ndarray: np.ndarray):
    assert isinstance(ndarray, np.ndarray) # check if ndarray
    dtype = ndarray.dtype
    if dtype == np.float64 or dtype == np.float32:
        return lambda array: tf.train.Feature(float_list=tf.train.FloatList(value=array))
    elif dtype == np.int64:
        return lambda array: tf.train.Feature(int64_list=tf.train.Int64List(value=array))
    else:
        raise ValueError(f"Input should be a numpy ndarray. Instead got {ndarray.dtype}")
        
def _bytes_feature(value):
    if isinstance(value, type(tf.constant(0))):
        value = value.numpy()
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))

def _float_feature(value):
    """
    Return float_list from float/double.
    """
    return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def _int64_feature(value):
    """
    Returns an int64_list from a bool / enum / int / uint.
    """
    return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))

def serialize_data(timestamp: float, image: np.ndarray, steering_theta: float, 
                   accelerator: float, height: int, width: int,
                   capture_height: int, capture_width: int, capture_fps: int,
                   num_channels: int):
    """
    Serialize a TFRecord for our dataset consisting of the following structure
    {"timestamp": (image, (steering_theta, accelerator_delta))}
    """
    image_str = image.tostring()
    control_features = {
        'timestamp': _float_feature(timestamp),
        'image': _bytes_feature(image_str),
        'steering_theta': _float_feature(steering_theta),
        'accelerator': _float_feature(accelerator),
        'height': _int64_feature(height),
        'width': _int64_feature(width),
        'capture_height': _int64_feature(capture_height),
        'capture_width': _int64_feature(capture_width),
        'capture_fps': _int64_feature(capture_fps),
        'num_channels': _int64_feature(num_channels)
    }
    
    sdc_proto = tf.train.Example(features=tf.train.Features(feature=control_features))
    return sdc_proto.SerializeToString()

def save_tf_records(filename: str, examples: list):
    """
    Serialize to TFRecords
    
    We'll likely want to use a batching logic here with a predicate from the timestamp field of each dictionary contained
    with controller_dicts -- this will allow us to thread out the processing and flushing of the controller_dicts list
    """
    with tf.python_io.TFRecordWriter(filename) as writer:
        for i in examples:
            example = serialize_data(i["timestamp"], i["image"], i["steering_theta"], 
                                     i["accelerator"], i["height"], i["width"],
                                     i["capture_height"], i["capture_width"], i["capture_fps"],
                                     i["num_channels"])
            writer.write(example)
    return

In [17]:
camera.observe(update_img, names='value')
camera_link = traitlets.dlink((camera, 'value'), (image_widget, 'value'), transform=bgr8_to_jpeg)
display(image_widget)

Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [82]:
capture_tracker.capture = True

Capture Started!


In [83]:
print(len(capture_tracker.shared_controller_dicts))

0


In [84]:
capture_tracker.capture = False

In [79]:
camera.unobserve(update_img, names='value')

In [52]:
import threading

class Capture(HasTraits):
    capture = Bool(False, help='Boolean to track image and steering capture state').tag(config=True)
    
    def __init__(self):
        self.current_thread = threading.Thread(target=self._check_memory)
        
    
    @observe('capture')
    def _capture(self, change):
        """
        Start capture of frames and acceleration / steering
        """
        current_thread = threading.Thread(target=self._check_memory)
        if change['new'] == True:
            self.current_thread.start()
            print("Capture Started!")
        if change['new'] == False:
            self.current_thread.do_run = False
            self.current_thread.join()
            print("Capture Stopped!")
            
    def _check_memory(self):
            current_thread = threading.currentThread()
            while getattr(current_thread, "do_run", True):
                if len(controller_dicts) >= 1200:
                    new_controller_dicts = []; new_controller_dicts.extend(controller_dicts)
                    controller_dicts[:] = []
                    ts = datetime.datetime.now().timestamp()
                    save_tf_records(f"tfrecords/{ts}.tfrecord", new_controller_dicts)

capture_tracker = Capture()

In [6]:
import threading 

class Capture(HasTraits):
    capture = Bool(False, help="Bool to track image and steering state capture").tag(config=True)
    
    def __init__(self):
        threading.Thread.__init__(self)
        self.paused = False
        self.paused_condition = threading.Condition(threading.Lock())
    
    @observe('capture')
    def _capture(self, change):
        """
        Start capture of frames and acceleration / steering
        """      
        if change['new'] == True:
            self.resume_capture()
            print("Capture Started!")
        if change['new'] == False:
            self.pause_capture()
            print("Capture Stopped!")
            
    def _check_memory(self):
        while True:
            with self.paused_condition:
                while self.paused:
                    self.pause_condition.wait()
                    
                if len(controller_dicts) >= 1200:
                    new_controller_dicts = []; new_controller_dicts.extend(controller_dicts)
                    controller_dicts[:] = []
                    ts = datetime.datetime.now().timestamp()
                    save_tf_records(f"tfrecords/{ts}.tfrecord", new_controller_dicts)
    
    def pause_capture(self):
        self.paused = True
        self.paused_condition.acqiure()
    
    def resume_capture(self):
        self.paused = False
        self.paused_condition.notify()
        self.paused_condition.release()
        

capture_tracker = Capture()