In [43]:
import os
import numpy as np
import cv2
import matplotlib.pyplot as plt

from pprint import pprint
from keras.optimizers import Adam
from keras.utils import Sequence
from keras.callbacks import ModelCheckpoint
from keras.layers import Input, Dense
from keras.models import Model
from keras.layers import Convolution2D, MaxPooling2D, BatchNormalization
from keras.layers import Activation, Dropout, Dense, GlobalAveragePooling2D
from sklearn.model_selection import train_test_split

from smartcar.utils.path import get_data_paths
from smartcar.utils.read import read_json_label
from smartcar.learn.brightness import randomize_brightness

In [26]:
data_dir = "C:\Projects\SmartCar\data"
image_fnames, label_fnames = get_data_paths(data_dir)

batch_size = 16

In [72]:
class SmartCarGenerator(Sequence):
    def __init__(self, image_fnames, label_fnames, batch_size, image_shape, shuffle=True):
        self.image_fnames = np.array(image_fnames)
        self.label_fnames = np.array(label_fnames)
        self.batch_size = batch_size
        self.image_shape = image_shape
    
        if shuffle:
            indices = np.array([i for i in range(len(image_fnames))])
            indices = np.random.permutation(indices)
            self.image_fnames = self.image_fnames[indices]
            self.label_fnames = self.label_fnames[indices]
    
    def load_image(self, path, flip=False):
        image = cv2.imread(path)
        image = cv2.resize(image, (self.image_shape[1], self.image_shape[0]), interpolation=cv2.INTER_NEAREST)
        if flip:
            image = cv2.flip(image, 1)
        image = randomize_brightness(image)
        return image
    
    def __len__(self):
        return np.ceil(len(self.image_fnames) / float(self.batch_size)).astype(np.int)
                       
    def __getitem__(self, idx):
        x_fnames = self.image_fnames[idx * self.batch_size:(idx+1) * self.batch_size]
        y_fnames = self.label_fnames[idx * self.batch_size:(idx+1) * self.batch_size]
        
        size = len(x_fnames)
        batch_x = np.zeros((size, self.image_shape[0], self.image_shape[1], 3))
        batch_y = np.zeros((size, 2))
        
        for i in range(size):
            flip = 0.5 >= np.random.rand(1)
            image = self.load_image(x_fnames[i], flip)
            batch_x[i] = image / 255.
            
            angle, speed = read_json_label(y_fnames[i])
            if flip:
                angle = 1 - angle
            batch_y[i][0] = angle
            batch_y[i][1] = speed
        return batch_x, batch_y

In [73]:
X_train, X_test, y_train, y_test = train_test_split(image_fnames, label_fnames, train_size=0.80, random_state=42)

In [74]:
datagen_train = SmartCarGenerator(X_train, y_train, batch_size=16, image_shape=(120, 160, 3), shuffle=True)
datagen_test = SmartCarGenerator(X_test, y_test, batch_size=16, image_shape=(120, 160, 3), shuffle=True)

In [75]:
def CustomCNN(dropout=0.0):
    img_in = Input(shape=(120, 160, 3), name='img_in')
    x = img_in
    x = Convolution2D(8, (3,3), strides=(2,2), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Convolution2D(16, (3,3), strides=(2,2), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = Convolution2D(32, (3,3), strides=(2,2), activation='relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = GlobalAveragePooling2D(name='flattened')(x)                                        
    x = Dense(32, activation='relu')(x)                                     
    x = Dropout(dropout)(x)
    x = BatchNormalization()(x)
    
    out = Dense(2, activation='sigmoid', name='out')(x)
    model = Model(inputs=[img_in], outputs=[out])
    return model

In [76]:
model = CustomCNN()
                           
opt = Adam(learning_rate=1e-4)
model.compile(optimizer=opt,
              loss={'out' : 'mean_squared_error'})

In [77]:
model_path = os.path.abspath("models/test.h5")
checkpointer = ModelCheckpoint(model_path, save_best_only=True, monitor='val_loss', mode='min')
hist = model.fit_generator(generator=datagen_train,
                           validation_data=datagen_test,
                           epochs=20,
                           shuffle=True,
                           callbacks=[checkpointer])

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
Epoch 11/20
Epoch 12/20
Epoch 13/20
Epoch 14/20
Epoch 15/20
Epoch 16/20
Epoch 17/20
Epoch 18/20
Epoch 19/20
Epoch 20/20


In [None]:
plt.plot(hist.history["loss"][1:])
plt.plot(hist.history["val_loss"][1:])
ax = plt.gca()
ax.set_ylim([0.06, 0.11])
plt.savefig('loss.jpg')