<a href="https://colab.research.google.com/github/KeanuInterone/AutonomousDrivingGamePlayer/blob/main/AutonoumouseGamePlayerModelTrainer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#🤖 Self Driving Model Trainer 🤖

# Imports

In [1]:
import pickle
import numpy as np
import os
import tensorflow as tf

# Configure

In [2]:
input_shape = (20, 256, 256, 3)
num_classes = 4
batch_size = 32
traing_batched_files = "/content/drive/MyDrive/Projects/AutonomousGamePlayer/BatchedExamples/train"
validation_batched_files = "/content/drive/MyDrive/Projects/AutonomousGamePlayer/BatchedExamples/val"
test_batched_files = "/content/drive/MyDrive/Projects/AutonomousGamePlayer/BatchedExamples/test"
checkpoints_path = "/content/drive/MyDrive/Projects/AutonomousGamePlayer/ModelCheckpoints"

# Get Pickle File Paths

In [3]:
def get_pickle_file_paths(dir):
  """
  Gets the list of pickle files in a directory
  ARGS:
    dir (string): Directory
  RETURNS:
    file_names (list): List of file names
  """
  file_names = []
  for filename in os.listdir(dir):
    if filename.endswith('.pickle'):
      path = os.path.join(dir, filename)
      file_names.append(path)
  return file_names

# Load Pickle Object

In [4]:
def load_pickle_object(file_name):
  """
  Loads a pickle file
  ARGS:
    file_name (string): File name
  RETURNS:
    object (dict): the pickled object
  """
  # Open file
  with open(file_name, 'rb') as handle:
    # Load data
    pickle_object = pickle.load(handle)
    return pickle_object

#Create X and y from examples

In [5]:
def create_X_y_from_examples(examples):
  """
  Create X and y from examples
  ARGS:
    examples (list): the list of examples
  RETURNS:
    X (np array): the list of features
    y (np array): the list of labels
  """
  X = []
  y = []
  for example in examples:
    X.append(example['x_screen_frames'] * 255)
    y.append(example['y_key_state'])
  np_X = np.array(X)
  np_y = np.array(y)
  del X
  del y
  return np_X, np_y

#Batch Example Generator

In [6]:
def batch_example_generator(batch_file_paths, create_x_y_func):
  """
  Batch example generator
  ARGS:
    batch_file_paths (list): the list of batch file paths
    create_x_y_func (function): the function to create X and y from examples
  RETURNS:
    example (dict): the example
  """
  while True:
    for batch_file_path in batch_file_paths:
      examples = load_pickle_object(batch_file_path)
      Xs, ys = create_x_y_func(examples)
      yield Xs, ys

#Train Model

In [7]:
def train_model(
    model,
    model_name,
    train_data_dir,
    val_data_dir,
    create_X_y_from_examples_func,
    checkpoint_path,
    batch_size,
    data_portion = 1.0
    ):
  """
  Train model
  ARGS:
    model
    model_name
    train_data_dir
    val_data_dir
    create_X_y_from_examples_func
    checkpoint_path
    batch_size
    data_portion
  """
  # Checkpoints
  checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
      filepath=f"{checkpoint_path}/{model_name}/model_checkpoint.h5",
      save_best_only=True,
      save_weights_only=False,
      monitor='val_loss',
      mode='min',
      verbose=1
  )

  # Early stoping
  early_stopping_callback = tf.keras.callbacks.EarlyStopping(
      monitor='val_loss',
      patience=2,
      verbose=1,
      restore_best_weights=True
  )

  # Get Train and Val files
  train_batch_file_paths = get_pickle_file_paths(train_data_dir)
  val_batch_file_paths = get_pickle_file_paths(val_data_dir)

  # Portion Train and Val sets
  train_batch_file_paths = train_batch_file_paths[:int(len(train_batch_file_paths) * data_portion)]
  val_batch_file_paths = val_batch_file_paths[:int(len(val_batch_file_paths) * data_portion)]

  # Create generators
  train_generator = batch_example_generator(train_batch_file_paths, create_X_y_from_examples_func)
  val_generator = batch_example_generator(val_batch_file_paths, create_X_y_from_examples_func)

  # Fit the model with callbacks
  model.fit(
      train_generator,
      batch_size=batch_size,
      steps_per_epoch=len(train_batch_file_paths),
      epochs=10,
      validation_data=val_generator,
      validation_steps=len(val_batch_file_paths),
      callbacks=[checkpoint_callback, early_stopping_callback]
  )

# Model

In [8]:
def create_model(input_shape, num_classes):
  # Define input layers
  video_input = tf.keras.layers.Input(shape=input_shape, name='video_input')

  # Resize
  image_size = 96 # in [96, 128, 160, 192, 224]
  video_input_resized = tf.keras.layers.TimeDistributed(tf.keras.layers.Resizing(image_size, image_size), name='resize')(video_input)

  # Preprosess for ResNet
  video_input_preprocess = tf.keras.applications.mobilenet_v2.preprocess_input(video_input_resized)

  # Load MobileNet base
  mobilenet_base = tf.keras.applications.MobileNetV2(
      include_top=False,
      weights='imagenet',
      input_shape=(image_size, image_size, 3),
      pooling='avg'
  )
  mobilenet_base.trainable = False

  # Image features
  image_features = tf.keras.layers.TimeDistributed(mobilenet_base, name='image_features')(video_input_preprocess)

  # LSTM layer for video temporal modeling
  video_lstm_features = tf.keras.layers.LSTM(128, return_sequences=True)(image_features)
  video_lstm_output = tf.keras.layers.LSTM(128, return_sequences=False)(video_lstm_features)

  # Fully connected layer for prediction
  output = tf.keras.layers.Dense(num_classes, activation='sigmoid', name='output')(video_lstm_output)

  # Define the model
  model = tf.keras.models.Model(inputs=video_input, outputs=output)

  # Compile the model
  model.compile(optimizer='rmsprop', loss='binary_crossentropy', metrics=['accuracy'])

  # Return
  return model

In [None]:
model = create_model(input_shape, num_classes)
model.summary()

In [None]:
train_model(
    model,
    'model_1',
    traing_batched_files,
    validation_batched_files,
    create_X_y_from_examples,
    checkpoints_path,
    batch_size
)