In [None]:
import tensorflow as tf
import pandas as pd
import numpy as np
from keras.models import Model
from keras.layers import Conv2D, BatchNormalization, Layer, Input, MaxPool2D, Activation, Flatten, Dense
from keras.activations import relu
from keras import Sequential
import ntpath
import os
import matplotlib.image as img
import cv2
from sklearn.model_selection import train_test_split
import tensorflow_datasets as tfds
import matplotlib.pyplot as plt


In [None]:
columns = ['center', 'left', 'right', 'steering', 'throttle', 'reverse', 'speed']
datadir = '../input/udacityselfdrivingcar/3'
df = pd.read_csv(os.path.join(datadir, 'driving_log.csv'), names=columns)
df.head()


In [None]:
def get_path_tail(path):
  _, tail = ntpath.split(path)
  return tail

for i in range(3):
  df[columns[i]] = df[columns[i]].apply(get_path_tail)

df.head()

In [None]:
def load_img_steering(datadir, df, bias=0.25):
  """Get img and steering data into arrays"""
  image_path = []
  steering = []
  for i in range(len(df)):
    indexed_data = df.iloc[i]
    if indexed_data[3] + bias  <  np.random.rand():
        continue
    center, left, right = indexed_data[0], indexed_data[1], indexed_data[2]
    image_path.append(os.path.join(datadir, center.strip()))
    steering.append(float(indexed_data[3]))
  image_paths = np.asarray(image_path)
  steerings = np.asarray(steering)
  return image_paths, steerings[:, np.newaxis]




In [None]:
def preprocess_images(paths, img_size):
  images = []
  for i in range(len(paths)):
    image = tf.keras.preprocessing.image.load_img(image_paths[i])
    image = tf.keras.preprocessing.image.img_to_array(image)
    image = cv2.resize(image, img_size) / 255
    images.append(image)
  return np.array(images)

In [None]:
image_paths, steerings = load_img_steering(datadir + '/IMG' , df)
plt.plot(steerings, 'r.')

In [None]:
images = preprocess_images(image_paths, input_size)
plt.imshow(images[np.random.randint(0, images.shape[0] - 1)])

In [None]:
X_train, X_test, y_train, y_test = train_test_split(images, steerings, test_size=0.3, random_state=42)
X_validate, X_test, y_validate, y_test = train_test_split(X_test, y_test, test_size=0.5, random_state=42)
print(f'X_training_size: {X_train.shape}')
print(f'X_validation_size: {X_validate.shape}')
print(f'X_testing_size: {X_test.shape}')

print(f'y_training_size: {y_train.shape}')
print(f'y_validation_size: {y_validate.shape}')
print(f'y_testing_size: {y_test.shape}')

In [None]:
class ResidualBlock(Layer):
  def __init__(self,num_channels, kernel_size=3, strides=1, use_1x1Conv=False,**kwargs):
    super(ResidualBlock, self).__init__(**kwargs)
    self.conv1 = Conv2D(filters=num_channels // 4, kernel_size=(1, 1), strides=strides, padding='same')
    self.conv2 = Conv2D(filters=num_channels // 4, kernel_size=(3, 3), padding='same')
    self.conv3 = Conv2D(filters=num_channels, kernel_size=(1, 1), padding='same')
    self.conv4 = None
    if use_1x1Conv:
      self.conv4 = Conv2D(filters=num_channels, kernel_size=(1, 1), strides=strides, padding='same')
    self.bn1 = BatchNormalization()
    self.bn2 = BatchNormalization()
    self.bn3 = BatchNormalization()

  def call(self, inputs):
    Y = self.conv1(relu(self.bn1(inputs)))
    Y = self.conv2(relu(self.bn2(Y)))
    Y = self.conv3(relu(self.bn3(Y)))
    if self.conv4 is not None:
      inputs = self.conv4(relu(inputs))
    return Y + inputs

  def get_config(self):
    config = super().get_config().copy()
    config.update({
        'conv1': self.conv1,
        'conv2': self.conv2,
        'conv3': self.conv3,
        'conv4': self.conv4,
        'bn1': self.bn1,
        'bn2': self.bn2,
        'bn3': self.bn3
    })
    return config


In [None]:
class ResidualStage(Layer):
  def __init__(self, num_of_blocks, num_channels,**kwargs):
    super(ResidualStage, self).__init__(**kwargs)
    self.blocks = []
    for i in range(num_of_blocks):
      if i == 0:
        self.blocks.append(ResidualBlock(num_channels,strides=2,use_1x1Conv=True))
      else:
        self.blocks.append(ResidualBlock(num_channels))

  def call(self, inputs):
    X = inputs
    for block in self.blocks:
      X = block(X)
    return X

  def get_config(self):
    config = super().get_config().copy()
    config.update({
        'blocks': self.blocks,
    })
    return config

In [None]:
input_size = (100, 100)
model = Sequential([
  Input(shape=(input_size[0], input_size[1], 3)),
  Conv2D(filters=64, kernel_size=(7, 7), strides=(2, 2), padding='same'), 
  BatchNormalization(),
  Activation('relu'),
  MaxPool2D(pool_size=3, strides=2, padding='same'),
  ResidualStage(3, 256, name='res_stage_1'),
  ResidualStage(4, 512, name='res_stage_2'),
  ResidualStage(36, 1024, name='res_stage_3'),
  ResidualStage(3, 2048, name='res_stage_4'),
  Flatten(),
  Dense(units=100),
  tf.keras.layers.Dropout(0.5),
  Dense(units=50),
  tf.keras.layers.Dropout(0.5),
  Dense(units=10),
  tf.keras.layers.Dropout(0.5),
  Dense(units=1),
])

In [None]:
model.compile(
    optimizer=tf.keras.optimizers.Adam(1e-3),
    loss=tf.keras.losses.MeanSquaredError(),
    metrics=tf.keras.metrics.MeanSquaredError()
)

In [None]:
model.build()
model.summary()

In [None]:
history= model.fit(
   X_train, y_train,
   batch_size=128,
   epochs=5,
   validation_data = (X_validate, y_validate)
)

In [None]:
plt.plot(history.history['loss'][3:])
plt.plot(history.history['val_loss'][3:])
plt.legend(['training', 'validation'])
plt.title('Loss')
plt.xlabel('Epoch')

In [None]:
loss, mse = model.evaluate(X_test, y_test)
y_pred = model.predict(X_test)
y_test.shape, y_pred.shape

In [None]:
plt.plot(y_pred, 'r.', label='prediction', alpha=0.5)
plt.plot(y_test, 'b.', label='actual', alpha=0.5)
plt.legend()
plt.show()