# Execution parameters

In [None]:
# Contains the path where the training data files can be found
TRAINING_DATA_PATH = '/content/drive/MyDrive/Race2TheFuture/TrainingData'

# Contains the path where the trained model must be saved, if this folder already contains a trained model it will be overwritten
TRAINED_MODEL_OUTPUT_PATH = '/content/drive/MyDrive/Race2TheFuture/TrainedModels'

# Update this parameter if you want extra information for debugging
%env CUSTOM_AUGMENTATION_WRITE_IMG = True

# Uodate this parameter if you want the trained model files to be downloaded to your local machine
DOWNLOAD_TRAINEDMODEL = False


# Validate preconditons

## Package imports

In [None]:
import os
import shutil

## Variables

In [None]:
executionFailed = False

## Validate preconditions

In [None]:
# Commented out as google drive should already be mounted, if not the case thia code can be uncommented
# Mount google drive
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# Validate if TRAINING_DATA_PATH exist
try:
  if os.path.isdir(TRAINING_DATA_PATH):
    print('Path is accessible: {}'.format(TRAINING_DATA_PATH))
  else:
    executionFailed = True
    print('Path does not exist or is not accessible: {}'.format(TRAINING_DATA_PATH))
except:
  executionFailed = True
  print('Path does not exist or is not accessible: {}'.format(TRAINING_DATA_PATH))

In [None]:
# Retrieve all .tar files available in the training data folder
if not executionFailed:
  TRAINING_DATA_FILES = []
  for fileName in os.listdir(TRAINING_DATA_PATH):
    fileExtension = os.path.splitext(fileName)[1]
    if fileExtension.lower() == '.tar':
        TRAINING_DATA_FILES.append(fileName)
  print(TRAINING_DATA_FILES)
else:
  print('Execution skipped as an error occured in a previous step')

In [None]:
# Validate that there are training files available
if not executionFailed:
  if len(TRAINING_DATA_FILES) == 0:
    executionFailed = True
    print('Execution stopped as not training data is available')
  else:
    print('Number of training files available: {}'.format(len(TRAINING_DATA_FILES)))
else:
  print('Execution skipped as an error occured in a previous step')

In [None]:
# Validate if TRAINED_MODEL_OUTPUT_PATH exist, if not create
if not executionFailed:
  try:
    if os.path.isdir(TRAINED_MODEL_OUTPUT_PATH):
      print('Path is accessible: {}'.format(TRAINED_MODEL_OUTPUT_PATH))
    else:
      print('Path does not exist, try to create it: {}'.format(TRAINED_MODEL_OUTPUT_PATH))
      os.makedirs(TRAINED_MODEL_OUTPUT_PATH)
      if os.path.isdir(TRAINED_MODEL_OUTPUT_PATH):
        print('=> Path successfully created')
      else:
        executionFailed = True
        print('=> Unable to create path')
  except:
    executionFailed = True
    print('Unable to create or access: {}'.format(TRAINED_MODEL_OUTPUT_PATH))
else:
  print('Execution skipped as an error occured in a previous step')

# Setup TensorFlow

In [None]:
if not executionFailed:
  import tensorflow
  print('Tensorflow version: {}'.format(tensorflow.__version__))
else:
  print('Execution skipped as an error occured in a previous step')

# Setup DonkeyCar

## Clone DonkeyCar repository en switch to correct branch

In [None]:
if not executionFailed:
  !git clone https://github.com/autorope/donkeycar.git 
  !git checkout main

  %cd /content/donkeycar
else:
  print('Execution skipped as an error occured in a previous step')

## Install DonkeyCar

In [None]:
if not executionFailed:
  !pip3 install -e .[pc]
else:
  print('Execution skipped as an error occured in a previous step')

## Create project (mycar)

In [None]:
if not executionFailed:
  !donkey createcar --path /content/mycar
else:
  print('Execution skipped as an error occured in a previous step')

# Upload trainingdata

In [None]:
if not executionFailed:
  trainingdata_uploadpath = '/content/mycar/tubs'
else:
  print('Execution skipped as an error occured in a previous step')

## Prepare the upload path

In [None]:
if not executionFailed:
  !rm -r /content/mycar/data &> /dev/null
  !rm -r {trainingdata_uploadpath} &> /dev/null
  !mkdir {trainingdata_uploadpath}

  if os.path.isdir(trainingdata_uploadpath):
    print('Path is accessible: {}'.format(trainingdata_uploadpath))
  else:
    executionFailed = True
    print('Unable to acces path is accessible: {}'.format(trainingdata_uploadpath))
else:
  print('Execution skipped as an error occured in a previous step')

## Extract the trainingdata (tar files) to the upload location

In [None]:
if not executionFailed:
  # Switch to the mycar directory
  %cd /content/mycar/
  !ls -al
else:
  print('Execution skipped as an error occured in a previous step')

In [None]:
if not executionFailed:
  # Switch to the tub directory
  %cd {trainingdata_uploadpath}
  !ls -al
else:
  print('Execution skipped as an error occured in a previous step')

In [None]:
if not executionFailed:
  # Extract each training data file
  for fileName in TRAINING_DATA_FILES:
    tubName = os.path.splitext(fileName)[0]
    fullTarFilePath = os.path.join(os.sep, TRAINING_DATA_PATH, fileName)
    fullTubPath = os.path.join(os.sep, trainingdata_uploadpath, tubName)
    print('Extract {} => {}'.format(fullTarFilePath, fullTubPath))
    !mkdir {fullTubPath}
    %cd {fullTubPath}
    !tar -xvf {fullTarFilePath}
else:
  print('Execution skipped as an error occured in a previous step')

# Train your model

In [None]:
if not executionFailed:
  training_tub_paths = ''
  for fileName in TRAINING_DATA_FILES:
    tubName = os.path.splitext(fileName)[0]
    fullTubFilePath = os.path.join(os.sep, trainingdata_uploadpath, tubName)
    fullTubFilePath_data = os.path.join(os.sep, fullTubFilePath, 'data')
    if training_tub_paths == '':
      training_tub_paths = fullTubFilePath_data
    else:
      training_tub_paths = '{},{}'.format(training_tub_paths, fullTubFilePath_data)
  print(training_tub_paths)
else:
  print('Execution skipped as an error occured in a previous step')

In [None]:
if not executionFailed:
  !donkey train --tub '{training_tub_paths}' --model /content/mycar/models/mypilot.h5 --config /content/mycar/config.py
else:
  print('Execution skipped as an error occured in a previous step')

# Retrieve trained model

In [None]:
if not executionFailed:
  trainedModelPath = '/content/mycar/models'
  expectedTrainingFiles = ['database.json', 'mypilot.h5', 'mypilot.png', 'mypilot.tflite']
else:
  print('Execution skipped as an error occured in a previous step')

## Validate if trained model is available

In [None]:
if not executionFailed:
  !ls -alh {trainedModelPath}
  %cd {trainedModelPath}
else:
  print('Execution skipped as an error occured in a previous step')

In [None]:
if not executionFailed:
  for fileName in expectedTrainingFiles:
    trainedModelFullFilePath = os.path.join(os.sep, trainedModelPath, fileName)
    if os.path.isfile(trainedModelFullFilePath):
      print('Training file: {} => Exists'.format(trainedModelFullFilePath))
    else:
      executionFailed = True
      print('Training file: {} => Does not exists'.format(trainedModelFullFilePath))
else:
  print('Execution skipped as an error occured in a previous step')

## Copy files to TRAINED_MODEL_OUTPUT_PATH

In [None]:
if not executionFailed:
  for fileName in expectedTrainingFiles:
    trainedModelFullFilePath_Source = os.path.join(os.sep, trainedModelPath, fileName)
    trainedModelFullFilePath_Target = os.path.join(os.sep, TRAINED_MODEL_OUTPUT_PATH, fileName)
    !cp {trainedModelFullFilePath_Source} {trainedModelFullFilePath_Target} &> /dev/null
    if os.path.isfile(trainedModelFullFilePath):
      print('File copy: {} => SUCCESS'.format(trainedModelFullFilePath_Source))
    else:
      executionFailed = True
      print('File copy: {} => FAILED'.format(trainedModelFullFilePath_Source))
else:
  print('Execution skipped as an error occured in a previous step')

## Download files to your local machine

In [None]:
if not executionFailed:
  if DOWNLOAD_TRAINEDMODEL:
    from google.colab import files

    for fileName in expectedTrainingFiles:
      trainedModelFullFilePath = os.path.join(os.sep, TRAINED_MODEL_OUTPUT_PATH, fileName)
      files.download(trainedModelFullFilePath)
else:
  print('Execution skipped as an error occured in a previous step')

#Final

In [None]:
if not executionFailed:
  print('No error loggged, training should be successfull')
else:
  print('Something went wrong, please check the logging in the previous steps to see the issue')