In [None]:
# @title Installation of required packages with pip. { run: "auto", display-mode: "form" }
location_to_requirements: str = 'requirements.txt'
download_requirements_from: str = 'https://gist.githubusercontent.com/KentVejrupMadsen/d0400cf3a07bbcf4f8abd9b2ea087d86/raw/c0dcbb239892e5844ca0edf3d5ab696e5a740742/requirements.txt'

!wget {download_requirements_from} -O {location_to_requirements}

%pip install -r {location_to_requirements}
%pip install keras-cv keras keras-core tensorflow --upgrade -q

In [None]:
from os     \
  import    \
  listdir,  \
  mkdir,    \
  environ,  \
  remove

from os.path  \
  import      \
  isdir,      \
  join,       \
  isfile

In [None]:
from random \
  import SystemRandom

In [None]:
from datetime \
  import      \
  date,       \
  datetime

In [None]:
import xml.etree.ElementTree \
  as ET

In [None]:
import logging

from logging \
  import CRITICAL

In [None]:
from tempfile \
  import gettempdir

In [None]:
is_dataset_saved: bool = False
stop_when_done: bool = False # @param {type:"boolean"}

model_name: str = 'mjoelner' # @param {type:"string"}
wandb_label_model: str = 'model' # @param {type:"string"}

save_cache_at_for_training: str = '/tmp/training.cache' # @param {type:"string"}
save_cache_at_for_validation: str = '/tmp/validation.cache' # @param {type:"string"}
save_cache_at_for_evaluation: str = '/tmp/evaluation.cache' # @param {type:"string"}

location_for_tensorboard = '/tmp/tensorboard/' # @param {type:"string"}

if not isdir(location_for_tensorboard):
  mkdir(location_for_tensorboard)

In [None]:
# @title Settings - Notebook Configuration { run: "auto", display-mode: "form" }
notebook_name: str = 'Targeting System, YOLO, Mjoelner.ipynb' # @param {type:"string"}

location_for_sample_data: str = '/content/sample_data' # @param {type:"string"}

is_google_colab_platform: bool = True # @param {type:"boolean"}
use_of_multiprocessing: bool = True # @param {type:"boolean"}
delete_sample_data: bool = True # @param {type:"boolean"}
shutdown_on_finish: bool = True # @param {type:"boolean"}

tensorflow_verbosity: int = 1 # @param {type:"number"}





In [None]:
use_of_workers: int = 0 # @param {type:"number"}

if (
    (use_of_workers is None)
    or
    (use_of_workers == 0)
):
  from multiprocessing \
    import cpu_count
  use_of_workers = cpu_count()

In [None]:
image_channels: int = 3 # @param {type:"number"}

def get_image_channels() -> int:
  global image_channels
  return image_channels

def set_image_channels(
  value: int
) -> None:
  global image_channels
  image_channels = value

In [None]:
# @title Settings - WanDB Configuration { run: "auto", display-mode: "form" }
use_of_wandb: bool = True # @param {type:"boolean"}

wandb_to_save_code: bool = False # @param {type:"boolean"}
wandb_to_sync_tensorboard: bool = True # @param {type:"boolean"}

wandb_project_name: str = 'Mjølner' # @param {type:"string"}
wandb_entity_name: str = 'designermadsen' # @param {type:"string"}

wandb_job_type: str = 'Training' # @param {type:"string"}
wandb_allow_reinitialisation: bool = True # @param {type:"boolean"}

finish_wandb_when_done: bool = True # @param {type:"boolean"}

wandb_api_key: str = '2fc089ae788974434469d090de2a4ce894aa2843' # @param {type:"string"}

wandb_project_tags: str = 'YOLOv8, TMOD, Targeting-System, Tensorflow, Keras, Keras-CV, Training, Evaluation, Experiment, NVIDIA, CUDA, Colab, V100' # @param {type:"string"}
wandb_tags: list = wandb_project_tags.split(',')
wandb_tags.sort()

sorted_list: list = list()

for idx in range(len(wandb_tags)):
  result = wandb_tags[idx].replace(' ', '')

  if not result == '':
    sorted_list.append(result)

wandb_tags = sorted_list.copy()

del sorted_list
print(wandb_tags)


In [None]:
# @title Settings - Hyper Parameters { run: "auto", display-mode: "form" }
hp_initial_learning_rate: float = 1.0 # @param {type:"raw"}

hp_decay_rate: float = 0.8624242424242429 # @param {type:"raw"}
hp_decay_steps: int = 550 # @param {type:"number"}

hp_epochs: int = 11 # @param {type:"number"}
hp_batch_size: int = 10 # @param {type:"number"}

hp_global_clip_at: float = 5.0 # @param {type:"raw"}

In [None]:
# @title Settings - DataSet{ run: "auto", display-mode: "form" }
download_dataset_at: str = 'https://nextcloud.nathue.dk/s/gqqte49KC8cg9Dj/download/TMOD.zip' # @param {type:"string"}
iterate_training: int = 1 # @param {type:"number"}
dataset_split_at: float = 0.25 # @param {type:"raw"}

use_resize: bool = True # @param {type:"boolean"}
use_jitter_resize: bool = False # @param {type:"boolean"}

use_random_flip: bool = True # @param {type:"boolean"}

use_random_contrast: bool = False # @param {type:"boolean"}
use_random_brightness: bool = False # @param {type:"boolean"}
use_grayscale_augmentation: bool = False # @param {type:"boolean"}

use_grid_mask: bool = False # @param {type:"boolean"}

tune_value: int = 4 # @param {type:"number"}



In [None]:
def get_tune_value() -> int:
  global tune_value
  return tune_value

In [None]:
def get_save_cache_at_for_evaluation() -> str:
  global save_cache_at_for_evaluation
  return save_cache_at_for_evaluation

In [None]:
def zero() -> int:
  return 0

In [None]:
def one() -> int:
  return 1

In [None]:
def two() -> int:
  return 2

In [None]:
def three() -> int:
  return 3

In [None]:
def four() -> int:
  return 4

In [None]:
def five() -> int:
  return 5

In [None]:
def six() -> int:
  return 6

In [None]:
def seven() -> int:
  return 7

In [None]:
def eight() -> int:
  return 8

In [None]:
def nine() -> int:
  return 9

In [None]:
def gpu_label() -> str:
  return 'GPU'

In [None]:
def max_limit_for_rgb_color() -> int:
  return 255

In [None]:
def label_dataset_annotations() -> str:
  return 'annotations'

In [None]:
def label_dataset_images() -> str:
  return 'images'

In [None]:
def get_label_name() -> str:
  label_name: str = 'name'
  return label_name

In [None]:
def get_label_object() -> str:
  label_object: str = 'object'
  return label_object

In [None]:
def get_label_bounding_box() -> str:
  label_bounding_box: str = 'bndbox'
  return label_bounding_box

In [None]:
def get_label_x_minimum() -> str:
  label_x_minimum: str = 'xmin'
  return label_x_minimum

In [None]:
def get_label_y_minimum() -> str:
  label_y_minimum: str = 'ymin'
  return label_y_minimum

In [None]:
def get_label_x_maximum() -> str:
  label_x_maximum: str = 'xmax'
  return label_x_maximum

In [None]:
def get_label_y_maximum() -> str:
  label_y_maximum: str = 'ymax'
  return label_y_maximum

In [None]:
def is_equal_to_zero(
    value: int | float
) -> bool:
  if isinstance(
      value,
      float
  ):
    return value == float(
        zero()
    )

  return value == zero()

# Setup

In [None]:
decay_steps_low_point: int = 10 # @param {type:"number"}
decay_steps_high_point: int = 500 # @param {type:"number"}

decay_rate_low_point: float = 0.2387111124 # @param {type:"raw"}
decay_rate_high_point: float = 0.9940009441 # @param {type:"raw"}


In [None]:
def generate_decay_steps() -> int:
  global                    \
    decay_steps_low_point,  \
    decay_steps_high_point


  low_point: int = decay_steps_low_point
  high_point: int = decay_steps_high_point

  return SystemRandom().randint(
      low_point,
      high_point
  )

In [None]:
def generate_decay_rate() -> float:
  global                  \
    decay_rate_low_point, \
    decay_rate_high_point

  low_point: float  = decay_rate_low_point
  high_point: float = decay_rate_high_point

  return SystemRandom().uniform(
      low_point,
      high_point
  )

In [None]:
# @title Settings - YOLOv8 { run: "auto", display-mode: "form" }
full_hd_width: int = 1920 # @param {type:"number"}
full_hd_height: int = 1080 # @param {type:"number"}

scale_by_size: float = 2 # @param {type:"raw"}

has_fhd_width: int = int(
    full_hd_width
    /
    scale_by_size
)

has_fhd_height: int = int(
    full_hd_height
    /
    scale_by_size
)

show_summary_of_model: bool = False # @param {type:"boolean"}


In [None]:
bounding_format = 'xyxy' # @param {type:"string"}
bounding_format = bounding_format.lower()

def get_bounding_format() -> str:
  global bounding_format
  return bounding_format

def set_bounding_format(
  value: str
) -> None:
  global bounding_format
  bounding_format = value

In [None]:
if (
    (hp_decay_rate is None)
    or
    (hp_decay_rate == float(zero()))
):
  hp_decay_rate = generate_decay_rate()

In [None]:
if(
    (hp_decay_steps is None)
    or
    (hp_decay_steps == zero())
):
  hp_decay_steps = generate_decay_steps()

In [None]:
if(
    (hp_epochs is None)
    or
    (hp_epochs == zero())
):
  hp_epochs = 11

In [None]:
if(
    (hp_global_clip_at is None)
    or
    (hp_global_clip_at == float(zero()))
):
  hp_global_clip_at = 10.0

In [None]:
if(
    (hp_initial_learning_rate is None)
    or
    (
      hp_initial_learning_rate
      ==
      float(
        zero()
      )
    )
):
  hp_initial_learning_rate = 0.01

In [None]:
if (
  not(download_dataset_at is None)
):
  download_dataset_at = download_dataset_at.replace(
    ' ',
    ''
  )

is_to_select_dataset: bool = (
    (download_dataset_at is None)
    or
    (download_dataset_at == '')
)

if is_to_select_dataset:
  download_links: list = list()

In [None]:
if is_to_select_dataset:
  download_links.sort()

  min_random_dl: int = zero()
  max_random_dl: int = len(download_links) - one()

  download_dataset_at = download_links[
    SystemRandom().randint(
      min_random_dl,
      max_random_dl
    )
  ]

# Run Script

In [None]:
from numpy \
  import array

In [None]:
# @title Package - Numpy { run: "auto", display-mode: "code" }
import keras

from keras.utils                \
  import                        \
  load_img,                     \
  img_to_array

from keras.optimizers           \
  import                        \
  SGD,                          \
  Adam

from keras \
  import Model

from keras.models               \
  import Sequential

from keras.layers               \
  import Rescaling

from keras.optimizers.schedules \
  import ExponentialDecay

from keras.utils                \
  import plot_model

from keras.models               \
  import load_model

from keras.callbacks            \
  import                        \
  Callback,                     \
  TensorBoard

from keras.mixed_precision \
  import set_global_policy

In [None]:
import keras_cv

from keras_cv   \
  import        \
  bounding_box, \
  visualization

from keras_cv.metrics \
  import BoxCOCOMetrics

from keras_cv   \
  import layers \
  as cv_layers

from keras_cv.models  \
  import              \
  YOLOV8Backbone,     \
  YOLOV8Detector

from keras_cv.layers \
  import Resizing

In [None]:
import tensorflow

from tensorflow.ragged \
  import constant

from tensorflow \
  import get_logger

from tensorflow.config \
  import list_physical_devices

from tensorflow.config.experimental \
  import set_memory_growth

from tensorflow.data \
  import Dataset

In [None]:
from tqdm.auto \
  import tqdm

In [None]:
import wandb

from wandb  \
  import    \
  init,     \
  finish

from wandb.integration.keras \
  import WandbMetricsLogger

In [None]:
if is_google_colab_platform:
  from google.colab \
    import runtime

In [None]:
def is_to_be_replace(
    class_label: str
) -> str:
  return_label: str = class_label
  return return_label

In [None]:
def normalise_token(
    class_label: str
) -> str:
  return class_label.lower()

In [None]:
def find_class_name(
  object
):
  cls = str(
    object.find(
      get_label_name()
    ).text
  )

  cls = is_to_be_replace(
    cls
  )
  cls = normalise_token(
    cls
  )

  return cls

In [None]:
# @title Login to wandb and download old model { run: "auto", display-mode: "form" }
notebook_name = 'Mjoelner - Training Only.ipynb'

environ['WANDB_API_KEY'] = wandb_api_key
environ['WANDB_NOTEBOOK_NAME'] = notebook_name

is_wandb_silent: bool = True # @param {type:"boolean"}
environ['WANDB_SILENT'] = str(is_wandb_silent)
environ['WANDB_SHOW_RUN'] = str(False)

# It's a notebook, so not needed and saved to git automaticly...
environ['WANDB_DISABLE_GIT'] = str(True)

if use_of_wandb:
  wandb.login()

if delete_sample_data:
  if isdir(
      location_for_sample_data
  ):
    !rm {location_for_sample_data} -R


In [None]:
# @title Configuration Tensorflow { run: "auto", display-mode: "form" }
set_memory_growth_to: bool = True # @param {type:"boolean"}
tensorflow_is_to_log_critical_information_only: bool = True # @param {type:"boolean"}
wandb_is_to_log_critical_information_only: bool = True # @param {type:"boolean"}

In [None]:
use_devices: list = list()

for device in list_physical_devices(
    gpu_label()
):
  try:
    set_memory_growth(
        device,
        set_memory_growth_to
    )

    use_devices.append(
        device
    )
  except:
    print(
      'Invalid device'
    )

if tensorflow_is_to_log_critical_information_only:
  get_logger().setLevel(
      CRITICAL
  )

if wandb_is_to_log_critical_information_only:
  wandb_logger = logging.getLogger(
      'wandb'
  )

  wandb_logger.setLevel(
      CRITICAL
  )

## Configuration

### Randomizers

In [None]:
# @title DataSet Parameters { run: "auto", display-mode: "form" }
scale_boundary_lowest: float = 0.25 # @param {type:"raw"}
scale_boundary_highest: float = 1.0 # @param {type:"raw"}

In [None]:
# @title Model Parameters & Target
target_input_width: int     = has_fhd_width
target_input_height: int    = has_fhd_height
target_input_channels: int  = image_channels

model_width: int    = has_fhd_width
model_height: int   = has_fhd_height
model_channels: int = image_channels

batch_size: int = hp_batch_size

graph_model: bool = False
global_clip_at: float = hp_global_clip_at

shuffle_sample_by: int = batch_size * 4

use_coco_metrics: bool = True
load_model_from: str | None = None

dataset_name: str = 'TMOD-10000.annotations.v1'
dataset_directory_name: str = 'dataset'
temporary_destination: str = '/tmp'

dataset_compressed_destination: str = join(
    temporary_destination,
    'dataset_folder.zip'
)

destination_to_dataset: str = join(
    temporary_destination,
    dataset_name
)

move_to: str = join(
    temporary_destination,
    dataset_directory_name
)

In [None]:
project_name: str = 'id (Colab)'

epoch: int = hp_epochs

is_to_fork_model: bool = True
url_to_dataset: str = download_dataset_at

labels_mapped: list | dict = list()

is_tensorboard_initialised: bool  = False
is_wandb_patched: bool            = False
is_wandb_initialised: bool        = False

In [None]:
def get_epochs() -> int:
  global epoch
  return epoch

def set_epochs(
  value: int
) -> None:
  global epoch
  epoch = value

In [None]:
def get_is_to_fork_model() -> bool:
  global is_to_fork_model
  return is_to_fork_model

def set_is_to_fork_model(
  value: bool
) -> None:
  global is_to_fork_model
  is_to_fork_model = value

In [None]:
def get_is_tensorboard_initialised() -> bool:
  global is_tensorboard_initialised
  return is_tensorboard_initialised

def set_is_tensorboard_initialised(
  value: bool
) -> None:
  global is_tensorboard_initialised
  is_tensorboard_initialised = value

In [None]:
def get_is_wandb_patched() -> bool:
  global is_wandb_patched
  return is_wandb_patched

def set_is_wandb_patched(
  value: bool
) -> None:
  global is_wandb_patched
  is_wandb_patched = value

In [None]:
def get_is_wandb_initialised() -> bool:
  global is_wandb_initialised
  return is_wandb_initialised

def set_is_wandb_initialised(
  value: bool
) -> None:
  global is_wandb_initialised
  is_wandb_initialised = value

In [None]:
if (
  not isdir(
    move_to
  )
):
  !wget {url_to_dataset} -O {dataset_compressed_destination}

  if(
    isfile(
      dataset_compressed_destination
    )
  ):
    !unzip -qq {dataset_compressed_destination} -d /tmp

  !rm {dataset_compressed_destination}

In [None]:
def get_label_layout() -> dict:
  global labels_mapped
  return labels_mapped

In [None]:
def label_map() -> str:
  return 'MaP'

In [None]:
def is_in_labels_mapped(
    key: str
) -> bool:
  global labels_mapped
  for label in labels_mapped:
    if label == key:
      return True

  return False

In [None]:
def parse_for_labels(
  xml_file: str
) -> list:
  returnable: list = list()

  tree = ET.parse(
      xml_file
  )

  root = tree.getroot()

  for object in root.iter(
    get_label_object()
  ):
    class_name = find_class_name(
      object
    )

    returnable.append(
        class_name
    )

  return returnable

In [None]:
if (
  not isdir(
    move_to
  )
):
  final_destination: str = join(
    destination_to_dataset,
    'Dataset'
  )

  if (
    isdir(
      final_destination
    )
  ):
    !mv {final_destination} {move_to}
    !rm {destination_to_dataset} -R
    !ls {move_to}
  else:
    !mv {destination_to_dataset} {move_to}

locations_of_annotations: str = join(
    move_to,
    'Annotations'
)

for xml_file in listdir(
    locations_of_annotations
):
  full_path_to_xml = join(
      locations_of_annotations,
      xml_file
  )

  labels = parse_for_labels(
      full_path_to_xml
  )

  for label in labels:
    normalised_label = label.lower()

    if not is_in_labels_mapped(
        normalised_label
    ):
      labels_mapped.append(
          normalised_label
      )

labels_mapped.sort()

size_of_labels: int = len(
    labels_mapped
)

labels_mapped: dict = dict(
    zip(
        range(
            size_of_labels
        ),
        labels_mapped
    )
)

found_images = listdir(
    join(
        move_to,
        'Images'
    )
)

found_annotations = listdir(
    join(
        move_to,
        'Annotations'
    )
)

found_images.sort()
found_annotations.sort()

found: list = list()

for idx in range(
    len(
        found_images
    )
):
  found.append(
      (
          found_images[idx],
          found_annotations[idx]
      )
  )

del found_images, found_annotations
size_of_found_files: int = len(
  found
)


In [None]:
def get_target_configuration() -> dict:
  global \
    target_input_width, \
    target_input_height, \
    target_input_channels

  return {
    'width': target_input_width,
    'height': target_input_height,
    'channels': target_input_channels
  }

In [None]:
def get_dataset_configuration() -> dict:
  global dataset_split_at

  return {
      'epochs': get_epochs(),
      'batch_size': get_batch_size(),

      'split': dataset_split_at,

      'target': get_target_configuration()
  }

In [None]:
def get_initial_learning_rate() -> float:
  global hp_initial_learning_rate
  return hp_initial_learning_rate

In [None]:
def get_learning_configuration() -> dict:
  return {
      'initial': get_initial_learning_rate(),
      'decay': {
        'steps': hp_decay_steps,
        'rate': hp_decay_rate
      }
  }

In [None]:
def get_vision_configuration() -> list:
  global                      \
    model_width,              \
    model_height,             \
    model_channels

  return (
      model_width,
      model_height,
      model_channels
  )

In [None]:
model_project_version_build: str = 'build.alpha.YoloV8'

In [None]:
def get_model_configuration() -> dict:
  global model_project_version_build
  return {
    'vision': get_vision_configuration(),
    'learning': get_learning_configuration(),
    'name': model_project_version_build,
  }

In [None]:
def get_batch_size() -> int:
  global batch_size
  return batch_size

In [None]:
# @title Configuration for wandb { run: "auto", display-mode: "code" }
def configuration():
  return {
    'model': get_model_configuration(),
    'dataset': get_dataset_configuration(),

    'formats': {
        'bounding box': get_bounding_format()
    },
}

## Start of Script

In [None]:
# @title Setup of Tensorboard & WanDB { run: "auto", display-mode: "code" }
if not isdir(
    location_for_tensorboard
):
  mkdir(
      location_for_tensorboard
  )

if not(
  is_tensorboard_initialised
):
  %load_ext tensorboard
  %tensorboard --logdir {location_for_tensorboard}

  is_tensorboard_initialised = True


if(
    (wandb_to_sync_tensorboard)
    and
    not(is_wandb_patched)
):
  wandb.tensorboard.patch(
      root_logdir = location_for_tensorboard
  )

  is_wandb_patched = True

if (
  use_of_wandb
  and
  not(
    is_wandb_initialised
  )
):
  init(
      name              = project_name,
      project           = wandb_project_name,
      entity            = wandb_entity_name,
      save_code         = wandb_to_save_code,
      config            = configuration(),
      sync_tensorboard  = wandb_to_sync_tensorboard,
      job_type          = wandb_job_type,
      tags              = wandb_tags,
      group             = 'Colab',
      settings          = wandb.Settings(
        start_method = 'thread'
      )
  )

  is_wandb_initialised = True

In [None]:
model_artifact = wandb.use_artifact(
    'designermadsen/Mjølner/tensorflow__mjoelner_model:latest'
)

load_model_from = model_artifact.download()

model = load_model(
  load_model_from,
  compile = False
)

learning_rate: ExponentialDecay = ExponentialDecay(
  hp_initial_learning_rate,
  decay_steps = hp_decay_steps,
  decay_rate  = hp_decay_rate
)

optimizer = SGD(
    learning_rate   = learning_rate,
    global_clipnorm = global_clip_at,
    use_ema         = True
)

model.compile(
    classification_loss = 'binary_crossentropy',
    box_loss = 'ciou',
    optimizer = optimizer
)

if show_summary_of_model:
  model.summary()

## Setup Dataset

In [None]:
def get_color_spectrum() -> tuple:
  return (
      zero(),
      max_limit_for_rgb_color()
  )

In [None]:
def get_shuffle_sample_by() -> int:
  global shuffle_sample_by
  return shuffle_sample_by

In [None]:
def generate_random_integer() -> int:
  begin: int  = one()
  end: int    = 65535

  return SystemRandom().randint(
      begin,
      end
  )

In [None]:
def is_in_formats(
    value: str
) -> bool:
  global formats

  for e in formats:
    if value == e:
      return True

  return False

In [None]:
def generate_grid_ratio() -> float:
  return SystemRandom().uniform(
      0.125,
      0.999981
  )

In [None]:
def generate_grid_rotation() -> float:
  return SystemRandom().uniform(
    0.0,
    0.9
  )

In [None]:
def default_image_range() -> tuple:
  return (
    0,
    255
  )

In [None]:
if not is_dataset_saved:
  augmentation_layers: list = list()

  if use_resize:
    augmentation_layers.append(
        cv_layers.Resizing(
            height              = target_input_height,
            width               = target_input_width,
            pad_to_aspect_ratio = True,
            bounding_box_format = get_bounding_format()
        )
    )

  if use_jitter_resize:
    augmentation_layers.append(
      cv_layers.JitteredResize(
        target_size = (
          target_input_height,
          target_input_width
        ),
        scale_factor = (
          scale_boundary_lowest,
          scale_boundary_highest
        ),
        bounding_box_format = get_bounding_format()
      )
    )

  if use_random_brightness:
    augmentation_layers.append(
        cv_layers.RandomBrightness(
            factor      = (-0.5, 0.5),
            value_range = default_image_range()
        )
    )

  if use_random_flip:
    augmentation_layers.append(
        cv_layers.RandomFlip(
            mode                = 'horizontal',
            bounding_box_format = get_bounding_format()
        )
    )

  if use_random_contrast:
    augmentation_layers.append(
        cv_layers.RandomContrast(
            factor      = (-0.5, 0.5),
            value_range = default_image_range()
        )
    )

  if use_grayscale_augmentation:
    augmentation_layers.append(
        cv_layers.Grayscale(
            output_channels = get_image_channels()
        )
    )

  if use_grid_mask:
    augmentation_layers.append(
        cv_layers.GridMask(
            ratio_factor    = (
              zero(),
              generate_grid_ratio()
            ),
            rotation_factor = generate_grid_rotation(),
            fill_mode       = 'constant',
            fill_value      = zero()
        )
    )

  resize_and_augmentations = keras.Sequential(
      layers = augmentation_layers
  )

In [None]:
dataset_paths: dict = dict(
  {
    label_dataset_images(): join(
      move_to,
      label_dataset_images().capitalize()
    ),
    label_dataset_annotations(): join(
      move_to,
      label_dataset_annotations().capitalize()
    )
  }
)

def get_dataset_paths() -> dict:
  global dataset_paths
  return dataset_paths


In [None]:
def get_dataset_paths_entry(
  key: str
) -> str:
  global dataset_paths
  return get_dataset_paths()[key]

In [None]:
def file_has_jpeg_extension(
      file_name: str
) -> bool:
  file_name_normalised = file_name.lower()

  return (
        file_name_normalised.endswith('.jpg')
        or
        file_name_normalised.endswith('.jpeg')
  )


In [None]:
def file_has_xml_extension(
      file_name: str
) -> bool:
  file_name_normalised = file_name.lower()

  return file_name_normalised.endswith(
        '.xml'
  )

In [None]:
def find_filename(
  root
) -> str:
  return str(
    root.find(
      'filename'
    ).text
  )

In [None]:
def find_x_minimum_boundary(
  bounderies
) -> float:
  return float(
    bounderies.find(
      get_label_x_minimum()
    ).text
  )

In [None]:
def find_y_minimum_boundary(
  bounderies
) -> float:
  return float(
    bounderies.find(
      get_label_y_minimum()
    ).text
  )

In [None]:
def find_x_maximum_boundary(
  bounderies
) -> float:
  return float(
    bounderies.find(
      get_label_x_maximum()
    ).text
  )

In [None]:
def find_y_maximum_boundary(
  bounderies
) -> float:
  return float(
    bounderies.find(
      get_label_y_maximum()
    ).text
  )

In [None]:
def find_bounding_boxes(
  bounderies
):
  xmin: float = find_x_minimum_boundary(
    bounderies
  )

  ymin: float = find_y_minimum_boundary(
    bounderies
  )

  xmax: float = find_x_maximum_boundary(
    bounderies
  )

  ymax: float = find_y_maximum_boundary(
    bounderies
  )

  return [
    xmin,
    ymin,
    xmax,
    ymax
  ]

In [None]:
def find_bounderies(
  object
):
  bounderies = object.find(
    get_label_bounding_box()
  )

  return find_bounding_boxes(
    bounderies
  )

In [None]:
def parse_xml(
    xml_file
  ):
    global            \
      dataset_paths,  \
      labels_mapped

    tree = ET.parse(
      xml_file
    )

    root = tree.getroot()

    image_path = join(
      get_dataset_paths()[
        label_dataset_images()
      ],
      find_filename(
        root
      )
    )

    boxes: list = []
    classes: list = []

    for object in root.iter(
      get_label_object()
    ):
      classes.append(
        find_class_name(
          object
        )
      )

      boxes.append(
          find_bounderies(
            object
          )
      )

    label_ids: list = [
      list(
        labels_mapped.keys()
      )[
        list(
          labels_mapped.values()
        ).index(
          cls
        )
      ]
        for cls in classes
    ]

    return (
      image_path,
      boxes,
      label_ids
    )

In [None]:
# @title DataSet Filetypes { run: "auto", display-mode: "code" }
if not is_dataset_saved:
  prefetch_buffer_size: int = get_tune_value()

  xml_files = sorted(
      [
          join(
              dataset_paths[
                  label_dataset_annotations()
              ],
              file_name
          )

          for file_name in listdir(
              dataset_paths[
                  label_dataset_annotations()
              ]
          )

          if file_has_xml_extension(
              file_name
          )
      ]
  )

  size_of_annotations = len(
      xml_files
  )

  image_files = sorted(
      [
          join(
              dataset_paths[
                  label_dataset_images()
              ],
              file_name
          )

          for file_name in listdir(
              dataset_paths[
                  label_dataset_images()
              ]
          )

          if file_has_jpeg_extension(
              file_name
          )
      ]
  )

  image_paths: list = []
  boundaries: list  = []
  classes: list     = []

  for file \
    in tqdm(
      xml_files
  ):
      image_path, boxes, label_ids = parse_xml(
          file
      )

      image_paths.append(
          image_path
      )

      boundaries.append(
          boxes
      )

      classes.append(
          label_ids
      )

  image_paths = constant(
      image_paths
  )

  classes = constant(
      classes
  )

  boundaries = constant(
      boundaries
  )

  dataset = tensorflow.data.Dataset.from_tensor_slices(
    (
      image_paths,
      classes,
      boundaries
    )
  )

In [None]:
def load_image(
  image_path: str
):
  image = tensorflow.io.read_file(
    image_path
  )

  image = tensorflow.image.decode_jpeg(
    image,
    channels = get_image_channels()
  )

  return image

In [None]:
def load_dataset(
  image_path,
  classes,
  bounderies
):
  image = load_image(
    image_path
  )

  bounding_boxes = {
    'classes': tensorflow.cast(
      classes,
      dtype=tensorflow.float32
    ),
    'boxes': bounderies
  }

  return {
    label_dataset_images(): tensorflow.cast(
      image,
      dtype=tensorflow.float32
    ),
    'bounding_boxes': bounding_boxes
  }

In [None]:
if not is_dataset_saved:
  number_of_validation: int = int(
      size_of_annotations
      *
      dataset_split_at
  )

  dataset = dataset.shuffle(
      get_shuffle_sample_by()
  )

  validation_data = dataset.take(
      number_of_validation
  )

  training_data = dataset.skip(
      number_of_validation
  )

  training_data = training_data.map(
      load_dataset,
      num_parallel_calls = prefetch_buffer_size
  )

  training_data = training_data.shuffle(
      get_shuffle_sample_by()
  )

  training_data = training_data.ragged_batch(
      batch_size,
      drop_remainder = True
  )

  validation_data = validation_data.map(
      load_dataset,
      num_parallel_calls = prefetch_buffer_size
  )

  validation_data = validation_data.shuffle(
      get_shuffle_sample_by()
  )

  validation_data = validation_data.ragged_batch(
      batch_size,
      drop_remainder = True
  )

In [None]:
def dict_to_tuple(
      inputs: dict
  ) -> tuple:
    bounding_boxes = bounding_box.to_dense(
      inputs['bounding_boxes'],
      max_boxes = 32
    )

    return (
      inputs[
        label_dataset_images()
      ],
      bounding_boxes
    )

In [None]:
if not is_dataset_saved:
  training_data = training_data.map(
      resize_and_augmentations,
      num_parallel_calls = prefetch_buffer_size
  )

  validation_data = validation_data.map(
      resize_and_augmentations,
      num_parallel_calls = prefetch_buffer_size
  )


In [None]:
def flag_dataset_is_save() -> None:
  global is_dataset_saved
  if not is_dataset_saved:
    is_dataset_saved = True

In [None]:
if not is_dataset_saved:
  training_data = training_data.map(
      dict_to_tuple,
      num_parallel_calls = prefetch_buffer_size
  )

  validation_data = validation_data.map(
      dict_to_tuple,
      num_parallel_calls = prefetch_buffer_size
  )

  flag_dataset_is_save()

## Callbacks and training

In [None]:
# @title Evaluate model by metrics { run: "auto", display-mode: "code" }
def save_best_model():
  global callbacks

  for c in callbacks:
    if isinstance(
        c,
        EvaluateCOCOMetricsByCallback
    ):
      pass

class EvaluateCOCOMetricsByCallback(
    Callback
):
  def __init__(
      self,
      data,
      bounding_box: str,
      evaluation_frequency: int = int(
          1e9
      )
  ):
    super().__init__()

    self.data = data.cache(
        get_save_cache_at_for_evaluation()
    ).prefetch(
        buffer_size = one()
    )

    self.metrics = BoxCOCOMetrics(
        bounding_box_format = get_bounding_format(),
        evaluate_freq = evaluation_frequency
    )

    self.save_directory: str = '/tmp/model.best.tf'
    self.save_format: str = 'tf'

    self.best_map = -1.0

    self.wandb: dict = {
        'best': None
    }

    self.save_best_version_of_model_to_wandb: bool = False

  def is_to_save_best_version_of_model_to_wandb(self) -> bool:
    return self.save_best_version_of_model_to_wandb

  def get_best_map(
    self
  ):
    return self.best_map

  def set_best_map(
    self,
    value
  ) -> None:
    self.best_map = value

  def is_best_map(
    self,
    evaluate_by
  ):
    return (
        evaluate_by
        >=
        self.get_best_map()
    )

  def __save_bm_tp(
    self
  ) -> None:
    best_model_artifact = wandb.Artifact(
        name = 'best_yolo_model',
        type = 'model'
    )

    best_model_artifact.add_dir(
        self.save_directory
    )

    self.wandb['best'] = best_model_artifact

    if self.is_to_save_best_version_of_model_to_wandb():
      wandb.log_artifact(
        best_model_artifact
      )

  def trigger_save_model(
    self
  ) -> None:
    self.model.save(
        self.save_directory,
        overwrite = True,
        save_format = self.save_format
    )

    self.__save_bm_tp()

  def on_epoch_end(
      self,
      epoch,
      logs
  ):
    self.metrics.reset_state()

    for batch in self.data:
      images, y_true = (
          batch[
              zero()
          ],
          batch[
              one()
          ]
      )

      y_pred = self.model.predict(
          images,
          verbose = zero()
      )

      self.metrics.update_state(
          y_true,
          y_pred
      )

    metrics = self.metrics.result(
        force = True
    )

    logs.update(
        metrics
    )

    current_map = metrics[
        label_map()
    ]

    if self.is_best_map(
        current_map
    ):
      self.set_best_map(
          current_map
      )
      self.trigger_save_model()

    return logs


## Setup of training and validation

In [None]:
treat_fit_as_seperate: bool = False

In [None]:
def get_tensorboard_location():
  global                      \
    location_for_tensorboard, \
    treat_fit_as_seperate

  from time \
    import time_ns

  if not treat_fit_as_seperate:
    return location_for_tensorboard
  else:
    return join(
        location_for_tensorboard,
        str(
            time_ns()
        )
    )

In [None]:
def generate_tensorboard_callback():
  callback = TensorBoard(
        log_dir                 = get_tensorboard_location(),
        histogram_freq          = one(),
        write_graph             = True,
        write_images            = True,
        write_steps_per_second  = True,
        update_freq             = 'epoch',
        embeddings_freq         = one()

  )

  return callback


In [None]:
def refresh_tensorboard_callback():
  global callbacks

  size_of_callbacks = len(
    callbacks
  )

  for idx in range(
    size_of_callbacks
  ):
    callback = callbacks[idx]

    if isinstance(
        callback,
        TensorBoard
    ):
      callbacks[idx] = generate_tensorboard_callback()

In [None]:
histories: list = list()

def get_histories() -> list:
  global histories
  return histories

def set_histories(
  value: list
) -> None:
  global histories
  histories = value


In [None]:
def append_history(
  value
) -> None:
  global histories

  histories.append(
    value
  )

In [None]:
def generate_callbacks():
  callbacks: list = list()

  callbacks.append(
    WandbMetricsLogger()
  )

  callbacks.append(
      generate_tensorboard_callback()
  )

  if use_coco_metrics:
    callbacks.append(
        EvaluateCOCOMetricsByCallback(
            validation_data,
            get_bounding_format()
        )
    )

  return callbacks

callbacks = generate_callbacks()

In [None]:
# @title { run: "auto", display-mode: "code" }
def refresh_dataset():
  global              \
    batch_size,       \
    training_data,    \
    validation_data,  \
    shuffle_sample_by

  training_data = training_data.shuffle(
    shuffle_sample_by
  )

  validation_data = validation_data.shuffle(
    shuffle_sample_by
  )


In [None]:
def flag_evaluation_has_run():
  global eval_has_run

  if not eval_has_run:
    eval_has_run = True

def get_evaluation_batch_size() -> int:
  return int(
    get_batch_size()
  )

In [None]:
# @title { run: "auto", display-mode: "code" }
def range_for_histories_size():
  global histories

  return range(
      len(
          histories
      )
  )

def append_history_to_log(
  history
):
  global histories

  histories.append(
      history.history
  )

In [None]:
def retrieve_location_for_best_model():
  global callbacks

  location_of_best_model: None | str = None

  for callback in callbacks:
    if isinstance(
        callback,
        EvaluateCOCOMetricsByCallback
    ):
      location_of_best_model: str = callback.save_directory

  return location_of_best_model

In [None]:
def set_model(
  value: Model
) -> None:
  global model
  model = value

In [None]:
def get_model() -> Model:
  global model
  return model

In [None]:
def save_model() -> None:
  global load_model_from

  get_model().save(
      load_model_from,
      save_format = 'tf'
  )

In [None]:
def get_autotune():
  from tensorflow.data import AUTOTUNE
  return AUTOTUNE

In [None]:
# @title Training Loop { run: "auto", display-mode: "code" }
def training_of_model(
  use_of_multiprocessing: bool,
  use_of_workers: int
) -> None:
  global                          \
    training_data,                \
    validation_data,              \
    callbacks,                    \
    tensorflow_verbosity,         \
    subsample_from_dataset,       \
    save_cache_at_for_training,   \
    save_cache_at_for_validation

  history = get_model().fit(
    training_data.cache(
        save_cache_at_for_training
    ).prefetch(
        buffer_size = get_autotune()
    ),
    validation_data         = validation_data.cache(
        save_cache_at_for_validation
    ).prefetch(
        buffer_size = get_autotune()
    ),
    epochs                  = get_epochs(),
    callbacks               = callbacks,
    workers                 = use_of_workers,
    use_multiprocessing     = use_of_multiprocessing,
    verbose                 = tensorflow_verbosity
  )

  append_history_to_log(
      history
  )

In [None]:
def training_process():
  global                    \
    iterate_training,       \
    use_of_multiprocessing, \
    use_of_workers

  for i in range(
    iterate_training
  ):
    training_of_model(
        use_of_multiprocessing,
        use_of_workers
    )

    load_best_model()
    refresh_tensorboard_callback()

In [None]:
def is_to_stop_when_done() -> None:
  global stop_when_done

  if stop_when_done:
    raise Exception(
        'Done'
    )

In [None]:
def load_best_model() -> None:
  get_model().load_weights(
    retrieve_location_for_best_model()
  )

In [None]:
def training() -> None:
  training_process()
  save_model()
  is_to_stop_when_done()

In [None]:
training()

In [None]:
model_assets_location: str = join(
    load_model_from,
    'assets'
)

if not isdir(
    model_assets_location
):
  mkdir(
      model_assets_location
  )

In [None]:
def save_current_model() -> None:
  global model_artifact

  model_artifact.description = 'Trained the model'
  model_artifact.save()

In [None]:
def make_new_model_version() -> None:
  global model_artifact

  model_artifact = wandb.Artifact(
    name='tensorflow__mjoelner_model',
      type='model'
  )

  model_artifact.description = 'Trained the model'

  model_artifact.add_dir(
    load_model_from
  )

In [None]:
model_asset_log_location: str = join(
    model_assets_location,
    'logs'
)

if not isdir(
    model_asset_log_location
):
  mkdir(
      model_asset_log_location
  )

current = datetime.now()

trained_on_str: str = str(
    current.strftime('%Y-%m-%d, %H:%M')
)

with open(
    join(
        model_asset_log_location,
        'trained_on.log'
    ),
    'a'
) as write_to_log:
  write_to_log.write(
      str(
          trained_on_str + '\n'
      )
  )

if get_is_to_fork_model():
  make_new_model_version()
else:
  save_current_model()

wandb.log_artifact(
    model_artifact
)

## Done

In [None]:
# @title Shutdown Process { run: "auto", display-mode: "code" }
if use_of_wandb:
  if finish_wandb_when_done:
    finish()

if shutdown_on_finish:
  if is_google_colab_platform:
    runtime.unassign()