<a href="https://colab.research.google.com/github/Matan-Vinkler/self-driving-robot/blob/main/behavioral_cloning_model_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Model Training on Behavioral Cloning Approach

Behavioral cloning is a supervised learning approach where a model learns to imitate a driver's behavior directly from examples. You record camera frames while a human (or a reliable controller) drives and log the corresponding steering command at each frame. After preprocessing each frame (crop → color transform → blur → resize → normalize), a CNN (e.g., the NVIDIA end-to-end architecture) maps an image to a single continuous output: the steering angle. The model is trained to minimize the difference between its predicted angle and the logged angle (e.g., MSE or Huber loss). At runtime, the camera feed is fed through the same preprocessing and the network's predicted angle is used to control the vehicle—optionally with smoothing, speed governors, and safety checks. This bypasses explicit lane detection or planning and instead mimics the demonstrated driving policy.

* [Installing and Importing Libraries](#scrollTo=NKw1owXNOP4i)

* [Data Loading](#scrollTo=6acrZ1dAOM5A)

* [Data Preprocessing](#scrollTo=uqsXizuEOIH9)

* [Model Training](#scrollTo=AiOLWuMDN659)



## Installing and Importing Libraries

This reset to clean state and downloading dataset from GitHub

In [None]:
!rm -rf track
!rm nvidia_model.h5
!git clone https://github.com/rslim087a/track
!ls track

Installing necessary libraries and dependencies

In [None]:
# Run this on fresh state
!pip install -q tf2onnx onnx
!pip3 install imgaug
!pip3 install --upgrade --force-reinstall "numpy==1.26.4"

Importing everything we'll need later

In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import keras
from keras.models import Sequential
from keras.layers import Conv2D, MaxPooling2D, Dense, Dropout, Flatten, Input
from keras.optimizers import Adam
from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split
from imgaug import augmenters as iaa
import cv2
import pandas as pd
import random
import ntpath
import tf2onnx
import onnx

from google.colab import files

## Data Loading

Loading dataset from `.csv` file to the memory

In [None]:
datadir = "track"
columns = ["center", "left", "right", "steering", "throttle", "reverse", "speed"]

data_df = pd.read_csv(os.path.join(datadir, "driving_log.csv"), names=columns)
pd.set_option("display.max_colwidth", None)
data_df.head()

In [None]:
def path_leaf(path):
    head, tail = ntpath.split(path)
    return tail

data_df["center"] = data_df["center"].apply(path_leaf)
data_df["left"] = data_df["left"].apply(path_leaf)
data_df["right"] = data_df["right"].apply(path_leaf)

data_df.head()

In [None]:
num_bins = 25
samples_per_bin = 400

hist, bins = np.histogram(data_df["steering"], num_bins)
center = (bins[:-1] + bins[1:]) * 0.5

plt.bar(center, hist, width=0.05)
plt.plot((np.min(data_df["steering"]), np.max(data_df["steering"])), (samples_per_bin, samples_per_bin))
plt.show()

In [None]:
print("Total data:", len(data_df))

remove_list = []
for j in range(num_bins):
  list1_ = []
  for i in range(len(data_df["steering"])):
    if data_df["steering"][i] >= bins[j] and data_df["steering"][i] <= bins[j + 1]:
      list1_.append(i)
  list1_ = shuffle(list1_)
  list1_ = list1_[samples_per_bin:]
  remove_list.extend(list1_)

print("Removed:", len(remove_list))
data_df.drop(data_df.index[remove_list], inplace=True)
print("Remaining:", len(data_df))

hist, _ = np.histogram(data_df["steering"], num_bins)
plt.bar(center, hist, width=0.05)
plt.plot((np.min(data_df["steering"]), np.max(data_df["steering"])), (samples_per_bin, samples_per_bin))
plt.show()

In [None]:
print("Before:", data_df.iloc[1])

def load_img_steering(datadir, df):
  image_path = []
  steering = []

  for i in range(len(data_df)):
    indexed_data = data_df.iloc[i]
    center, left, right = indexed_data[0], indexed_data[1], indexed_data[2]

    # center
    image_path.append(os.path.join(datadir, center.strip()))
    steering.append(float(indexed_data[3]))

    # left
    image_path.append(os.path.join(datadir, left.strip()))
    steering.append(float(indexed_data[3])+0.15)

    #right
    image_path.append(os.path.join(datadir, right.strip()))
    steering.append(float(indexed_data[3])-0.15)

  image_path = np.asarray(image_path)
  steering = np.asarray(steering)
  return image_path, steering

image_path, steering = load_img_steering(datadir + "/IMG", data_df)

print(len(image_path), len(steering))

Splitting the dataset into training and testing subsets

In [None]:
X_train, X_valid, y_train, y_valid = train_test_split(image_path, steering, test_size=0.2, random_state=6)

print("Training:", len(X_train))
print("Validation:", len(X_valid))

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].hist(y_train, bins=num_bins, width=0.05, color="blue")
axes[0].set_title("Training set")
axes[1].hist(y_valid, bins=num_bins, width=0.05, color="red")
axes[1].set_title("Validation set")
plt.show()

## Data Preprocessing

Define zoom function

In [None]:
def zoom(image):
  zoom = iaa.Affine(scale=(1, 1.3))
  image = zoom.augment_image(image)
  return image

image = image_path[random.randint(0, 1000)]
original_image = mpimg.imread(image)
zoomed_image = zoom(original_image)

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
fig.tight_layout()
axes[0].imshow(original_image)
axes[0].set_title("Original Image")
axes[1].imshow(zoomed_image)
axes[1].set_title("Zoomed Image")
plt.show()

Define pan function

In [None]:
def pan(image):
  pan = iaa.Affine(translate_percent={"x": (-0.1, 0.1), "y": (-0.1, 0.1)})
  image = pan.augment_image(image)
  return image

image = image_path[random.randint(0, 1000)]
original_image = mpimg.imread(image)
panned_image = pan(original_image)

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
fig.tight_layout()
axes[0].imshow(original_image)
axes[0].set_title("Original Image")
axes[1].imshow(panned_image)
axes[1].set_title("Panned Image")
plt.show()

Define random brightness function

In [None]:
def img_random_brightness(image):
  brightness = iaa.Multiply((0.2, 1.2))
  image = brightness.augment_image(image)
  return image

image = image_path[random.randint(0, 1000)]
original_image = mpimg.imread(image)
brightness_image = img_random_brightness(original_image)

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
fig.tight_layout()
axes[0].imshow(original_image)
axes[0].set_title("Original Image")
axes[1].imshow(brightness_image)
axes[1].set_title("Brightness Image")
plt.show()

Define random flip function

In [None]:
def img_random_flip(image, steering_angle):
  image = cv2.flip(image, 1)
  steering_angle = -steering_angle
  return image, steering_angle

idx = random.randint(0, 1000)
image = image_path[idx]
steering_angle = steering[idx]

original_image = mpimg.imread(image)
flipped_image, flipped_steering_angle = img_random_flip(original_image, steering_angle)

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
fig.tight_layout()
axes[0].imshow(original_image)
axes[0].set_title(f"Original Image (steering angle: {steering_angle})")
axes[1].imshow(flipped_image)
axes[1].set_title(f"Flipped Image (steering angle: {flipped_steering_angle})")
plt.show()

Migrate those functions to one `random_augment` function

In [None]:
def random_augment(image, steering_angle):
  image = mpimg.imread(image)
  if np.random.rand() < 0.5:
    image = pan(image)
  if np.random.rand() < 0.5:
    image = zoom(image)
  if np.random.rand() < 0.5:
    image = img_random_brightness(image)
  if np.random.rand() < 0.5:
    image, steering_angle = img_random_flip(image, steering_angle)
  return image, steering_angle

In [None]:
ncols = 2
nrows = 10

fig, axes = plt.subplots(nrows, ncols, figsize=(15, 50))
fig.tight_layout()

for i in range(10):
  idx = random.randint(0, len(image_path) - 1)
  random_image = image_path[idx]
  random_steering = steering[idx]

  original_image = mpimg.imread(random_image)
  augmented_image, steering_aug = random_augment(random_image, random_steering)

  axes[i][0].imshow(original_image)
  axes[i][0].set_title(f"Original Image (steering angle: {random_steering})")
  axes[i][1].imshow(augmented_image)
  axes[i][1].set_title(f"Augmented Image (steering angle: {steering_aug})")

plt.show()

Define preprocessing function for image

In [None]:
def img_preprocess(img):
  img = img[60:135,:,:]
  img = cv2.cvtColor(img, cv2.COLOR_RGB2YUV)
  img = cv2.GaussianBlur(img, (3, 3), 0)
  img = cv2.resize(img, (200, 66))
  img = img / 255.0
  return img

In [None]:
idx = random.randint(0, len(image_path) - 1)
image = image_path[idx]
original_img = mpimg.imread(image)
preprocessed_img = img_preprocess(original_img)

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
fig.tight_layout()
axes[0].imshow(original_img)
axes[0].set_title(f"Original Image {idx}")
axes[1].imshow(preprocessed_img)
axes[1].set_title(f"Preprocessed Image {idx}")
plt.show()

Define batch generator for more effecient training

In [None]:
def batch_generator(image_paths, steerings_ang, batch_size, is_training):
  while True:
    batch_img = []
    batch_steering = []

    for i in range(batch_size):
      random_index = random.randint(0, len(image_paths) - 1)

      if is_training:
        # Apply augmentation after reading the image
        img, st = random_augment(image_paths[random_index], steerings_ang[random_index])
      else:
        img = mpimg.imread(image_paths[random_index])
        st = steerings_ang[random_index]

      # Preprocess the image
      img = img_preprocess(img)
      batch_img.append(img)
      batch_steering.append(st)

    yield (np.asarray(batch_img), np.asarray(batch_steering))

In [None]:
X_train_gen, y_train_gen = next(batch_generator(X_train, y_train, 1, 1))
X_valid_gen, y_valid_gen = next(batch_generator(X_valid, y_valid, 1, 0))

fig, axes = plt.subplots(1, 2, figsize=(15, 10))
fig.tight_layout()

axes[0].imshow(X_train_gen[0])
axes[0].set_title(f"Training Image {y_train_gen[0]}")
axes[1].imshow(X_valid_gen[0])
axes[1].set_title(f"Validation Image {y_valid_gen[0]}")
plt.show()

Generating and downloading calibration set for use to convert the model from `ONNX` to `HEF`

In [None]:
calibration_set, _ = next(batch_generator(X_valid, y_valid, batch_size=200, is_training=0))
print(f"Calibration set shape: {calibration_set.shape}")

In [None]:
calib_dir = "calib_set"
if not os.path.exists(calib_dir):
    os.makedirs(calib_dir)

np.save(os.path.join(calib_dir, "calibration_set.npy"), calibration_set)

In [None]:
files.download("calib_set/calibration_set.npy")

## Model Training

Defining the model:

In [None]:
def nvidia_model() -> Sequential:
  model = Sequential(name="nvidia_model")
  model.add(Input(shape=(66, 200, 3)))
  model.add(Conv2D(24, (5, 5), strides=(2, 2), activation="elu", name="conv_1"))
  model.add(Conv2D(36, (5, 5), strides=(2, 2), activation="elu", name="conv_2"))
  model.add(Conv2D(48, (5, 5), strides=(2, 2), activation="elu", name="conv_3"))
  model.add(Conv2D(64, (3, 3), activation="elu", name="conv_4"))
  model.add(Conv2D(64, (3, 3), activation="elu", name="conv_5"))

  model.add(Flatten(name="flat"))

  model.add(Dense(100, activation="elu", name="fc_1"))
  model.add(Dense(50, activation="elu", name="fc_2"))
  model.add(Dense(10, activation="elu", name="fc_3"))
  model.add(Dense(1, name="fc_4"))

  optimizer = Adam(learning_rate=1e-3)
  model.compile(loss="mse", optimizer=optimizer)

  return model

In [None]:
model = nvidia_model()
model.summary()

Begin training

In [None]:
history = model.fit(batch_generator(X_train, y_train, batch_size=100, is_training=1),
                              steps_per_epoch=300,
                              epochs=10,
                              validation_data=batch_generator(X_valid, y_valid, batch_size=100, is_training=0),
                              validation_steps=200,
                              verbose=1,
                              shuffle=1)

In [None]:
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.legend(["training", "validation"])
plt.title("Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.show()

Download the `.h5` model

In [None]:
model.save("nvidia_model.h5")
files.download("nvidia_model.h5")

Convert it to `ONNX` and download it

In [None]:
onnx_model_path = "nvidia_model.onnx"
tf2onnx.convert.from_keras(model, output_path=onnx_model_path)

print(f"Model converted and saved as {onnx_model_path}")

files.download("nvidia_model.onnx")