In [6]:
import numpy as np
import cv2

We have several steps:
Let W, H denote your chosen input image size. If inputs from camera are too large you can downscale them to a reasonable value (perform parameter estimate).
1. Preprocess images. Output is tuple (X_train, y_train, X_val, y_val, x_test, y_test) of examples. Data should be 60% training, 20% validation, and 20% test. Each element of the tuple is of the form (N, 3, H, W). 3 is the number of YUV color channels (df to convert)
2. Build model. Use VGG net for conv net with a couple dense layers of top.
3. Train model, validate
4. Unfreeze last layers of convnet head and fine tune


## Preprocessing
First, we convert the video to a set of images using opencv

In [30]:
import os
def get_steering_angle(framenum):
    return np.random.rand() * 5

def generate_pictures_from_video(video_name):
    cap = cv2.VideoCapture(video_name)
    framenum = 1
    if 'data' not in os.listdir():
        os.mkdir('data')
        
    ret, frame = cap.read()
    while ret:
        # get steering angle
        angle = get_steering_angle(framenum)
        imgname = 'data/frame_%d_angle_%.3f.jpg' % (framenum, angle)
        cv2.imwrite(imgname, frame)
        
        framenum += 1
        ret, frame = cap.read()
        
    cap.release()
    cv2.destroyAllWindows()

VIDEO_NAME = "video01.avi"
generate_pictures_from_video(VIDEO_NAME)

In [2]:
# dummy parameters
N = 1000
H = W = 120

# Let N = total number of images
# Return (data, labels), where:
# data is numpy array of shape (N, 3, H, W). 
#     Each data[i, :, :] represents an RGB image
# labels is numpy array of shape (N, 1)
#     labels[i] represents a steering angle, float32
def rgb2yuv(data):
    for i in range(N):
        img = data[i, :, :]
        img = np.transpose(img, axes=(1, 2, 0)).astype(np.float32)
        img = cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
        img = np.transpose(img, axes=(2, 0, 1))
        data[i, :, :] = img
    return data
 
    
# generate plaecholder random images for now
def load_data():
    # TODO: Set N = number of examples
#     data = np.random.randint(255, size=(N, 3, H, W))
    
    labels = np.random.rand(N)
    
    data = rgb2yuv(data)
    
    return data, labels

def get_data():
    data, labels = load_data()
    
    train_size = int(0.6 * N)
    val_size = int(0.2 * N)
    test_size = int(0.2 * N)
    
    X_train = data[:train_size]
    X_val = data[train_size : train_size + val_size]
    X_test = data[train_size + val_size:]
    
    y_train = labels[:train_size]
    y_val = data[train_size : train_size + val_size]
    y_test = data[train_size + val_size:]
    
    return X_train, y_train, X_val, y_val, X_test, y_test

In [3]:
from tensorflow.keras import *
def make_model():
    model = models.Sequential()
    conv_base = applications.VGG16(weights='imagenet',
                                  include_top=False,
                                  input_shape=(H, W, 3))
    conv_base.trainable = False
    # TODO: cross validation pipeline
    model.add(conv_base)
    model.add(layers.Flatten())
    model.add(layers.Dense(256, activation='relu'))
    model.add(layers.Dense(256, activation='relu'))
    model.add(layers.Dense(1))
    model.compile(loss='mse',
                 optimizer=optimizers.Adam(lr=1e-2),
                 metrics=['acc'])
    # TODO: Add model training here
#     history = model.fit()
    return model

In [4]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator
def make_generators(X_train, y_train, X_val=None, y_val=None):
    train_datagen = ImageDataGenerator(rescale=1. / 255,
                                      rotation_range=40,
                                      width_shift_range=.2,
                                      height_shift_range=.2,
                                      shear_range=.2,
                                      zoom_range=.2,
                                      horizontal_flip=True,
                                      vertical_flip=True,
                                      fill_mode='nearest')
    train_generator = train_datagen.flow(X_train,
                                         y_train,
                                         batch_size=32)
    if X_val is None:
        return train_generator
    
    val_datagen = ImageDataGenerator(rescale=1. / 255)
    val_generator = val_datagen.flow(X_val,
                                    y_val,
                                    batch_size=32)
    return train_generator, val_generator    
    

In [12]:
X_train, y_train, X_val, y_val, X_test, y_test = get_data()
train_generator, val_generator = make_generators(X_train, y_train, X_val, y_val)
model = make_model()

# model.fit(train_generator,
#          epochs=5,
#          validation_data=val_generator,
#          verbose=True)


(32, 3, 120, 120)
