In [None]:
#remove data from dirs
!rm -r /content/sample_data/epochs

In [None]:
# how to check tensorflow version
import tensorflow as tf
print(tf.__version__)

In [None]:
# how to mount your google drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
#to see the loaded dataset
!ls -la /root/.keras/datasets/

# Multiclass analysis Bes/Bos Posit/Negat

In [None]:
import PIL
import PIL.Image as Image
import os
import tensorflow as tf
import pathlib
import matplotlib.pyplot as plt
import random
import numpy as np
from tensorflow.keras import layers
import math


# This is a simple script adapated for testing how to train a dataset for multiclass image classification
# Serves as companion to show nuclear medicine physicians how to train a AI model
# Code can be uncommented to try it out different scenarios
# Best used with google drive

#initial configuration options
MAIN_PATH = "/content/drive/PATH_TO_GOOGLE_DRIVE"
# PATH where to store the log results
LOG_PATH = "/content/drive/PATH_TO_GOOGLE_DRIVE/LOG_FOLDER"
# PATH where to store the best models
MODEL_PATH = "/content/sample_data/models/save_at_{epoch}_w11.keras"
IMG_SIZE = (900, 252)

DATASET = "NAME_OF_FILE_WITHOUT_ZIP_SUFFIX"
# loading of the zip file and uncompressing it so the images can be loaded into dataset
archiveTrain = tf.keras.utils.get_file(origin=f"file://{MAIN_PATH}{DATASET}.zip", extract=True)
dataDirTrain = pathlib.Path(os.path.join(pathlib.Path(archiveTrain).with_suffix('.zip'), DATASET))

# class name division
CLASS_NAMES = ['besnegat','besposit','bosnegat','bosposit']

img_height = 900
img_width = 252
# this defines the number of steps in which a epoch will be learned
batch_size = 100
number_of_classes = 4

AUTOTUNE = tf.data.AUTOTUNE
# loading the data from the zip file into the dataset
train_ds, val_ds = tf.keras.utils.image_dataset_from_directory(dataDirTrain,
                                           labels='inferred',
                                           class_names=CLASS_NAMES,
                                           batch_size=batch_size,
                                           #producing train validation split 0.3 means 70% for training 30% for validation
                                           validation_split=0.3,
                                           subset="both",
                                           seed=400,
                                           shuffle=True,
                                           image_size=IMG_SIZE)



#loading of a pretrained model for use
baseModel = tf.keras.applications.Xception(
    include_top=False,
    weights="imagenet",
    input_tensor=None,
    input_shape=(900,252,3),
)
# freezing of all the layers in the model
baseModel.trainable = False

# calculation of initial class_weights
# total_number_of_samples/(number_of_classes*sample_class_count)
def get_class_weights(data_dir,class_names,training_split):
  total_class_count=0
  #to get the full sample count divided by the training set
  for clazz in class_names:
    total_class_count += len(os.listdir(pathlib.Path(os.path.join(dataDirTrain,clazz))))*training_split
  #add each initial class weight to a dictionary
  index = 0
  dict = {}
  for clazz in class_names:
    class_count = len(os.listdir(pathlib.Path(os.path.join(dataDirTrain,clazz))))*training_split
    class_weight = total_class_count/(number_of_classes*class_count)
    dict.update({index:class_weight})
    index += 1
  return dict

#sample caount for the exponential decay rate
SAMPLES = 0
#printing of the train dataset tensors based on batch size and sample size calculation
for x, y in train_ds:
  print(x.shape, y.shape)
  SAMPLES += y.shape[0]
class_names = train_ds.class_names
print("Class names:",class_names)
print("number of samples",SAMPLES)

# print out the shape of the images and labels
for image_batch, labels_batch in train_ds:
  print("image",image_batch.shape)
  print("label", labels_batch.shape)
  break

# augmentation layers
data_augmentation_layers = tf.keras.Sequential([
  tf.keras.layers.RandomFlip("horizontal"),
  #changing these parameters may affect the final result
  tf.keras.layers.RandomRotation((-0.01,0.01)),
  tf.keras.layers.RandomContrast(0.02),
  tf.keras.layers.RandomBrightness(0.02),

])
# adding the augmentation layers to the training dataset
train_ds = train_ds.map(lambda x, y: (data_augmentation_layers(x), y))

with_augmented_samples = 0
for x,y in train_ds:
  with_augmented_samples += y.shape[0]
print("samples + augmented samples",with_augmented_samples)

#obtaining images and labels from the train dataset to be able to show them
#this is just for visualisation to see what we train
image_batch, labels_batch = next(iter(train_ds))
plt.figure(figsize=(10, 10))
take_files = train_ds.take(1)
for images, labels in take_files:
  print(len(labels))
  ha = labels.numpy().astype("uint8")
  print("binary",ha.ndim)
  for i in range(9):
    ax = plt.subplot(3, 3, i + 1)
    plt.imshow(images[i].numpy().astype("uint8"))
    #plt.title(class_names[labels.numpy().astype("uint8")[0][0]])
    plt.title(class_names[labels[i]])
    plt.axis("off")

# Convert labels to one-hot encoding
# this based on the loss function so basically [1,0,0,0] means besnegat
def one_hot_encode(image, label):
  # Assuming your labels are integers from 0 to 3
  return image, tf.one_hot(label, number_of_classes)

train_ds = train_ds.map(one_hot_encode)
val_ds = val_ds.map(one_hot_encode)

#caching of the data is performed
train_ds = train_ds.cache().prefetch(buffer_size=AUTOTUNE)
val_ds = val_ds.cache().prefetch(buffer_size=AUTOTUNE)

def tModel():
  inputs = tf.keras.Input(shape=(900, 252, 3))
  #preprocessing the data so the images are in that format the model awaits
  x = tf.keras.applications.xception.preprocess_input(inputs)
  x = baseModel(x, training=False)
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Flatten()(x)
  # here can be different Fuly connected layers activated also with regularization
  # just for testing
  #x = tf.keras.layers.Dense(512, activation='relu')(x)
  #x = tf.keras.layers.Dropout(0.3)(x)
  #x= tf.keras.layers.Dense(512, activation='relu',kernel_regularizer=tf.keras.regularizers.l2(0.001))(x)
  #x = tf.keras.layers.Dropout(0.3)(x)  # Regularize with dropout
  x= tf.keras.layers.Dense(256,activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)  # Regularize with dropout
  #x = tf.keras.layers.BatchNormalization()(x)
  x = tf.keras.layers.Dense(128,activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)  # Regularize with dropout
  #x = tf.keras.layers.BatchNormalization()(x) #
  x = tf.keras.layers.Dense(64,activation='relu')(x)
  x = tf.keras.layers.Dropout(0.5)(x)  # Regularize with dropout
  #x = tf.keras.layers.BatchNormalization()(x)
  # this is the final layer which classifies the image -> without any activation
  # we use then the distribution function in the verification script -> we get a "raw" tensor output
  outputs = tf.keras.layers.Dense(number_of_classes,name="outputs", activation=None)(x)
  return tf.keras.Model(inputs, outputs)

model = tModel() #creation of the model structure

start_lr = 0.0001
min_lr = 9.98e-5
max_lr = 0.001
rampup_epochs = 50
sustain_epochs = 15
exp_decay = 0.96
#different types of learning schedulers -> for trying out
def my_learner(epoch):
  def lr_schedule(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay):
    if epoch < rampup_epochs:
      learning_rate = (max_lr - start_lr)/rampup_epochs * epoch + start_lr
    elif epoch < rampup_epochs + sustain_epochs:
      learning_rate = max_lr
    else:
      learning_rate = (min_lr+max_lr) * math.exp(-exp_decay*epoch)
    return learning_rate
  return lr_schedule(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay)

def schedule_exp(epoch):
  def lr(epoch, start_lr, exp_decay):
     return start_lr * math.exp(-exp_decay*epoch)
  return lr(epoch, start_lr, exp_decay)

def schedule_one_cycle(epoch):
  def lr(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay):
    if epoch < rampup_epochs:
      lr = (max_lr - start_lr)/rampup_epochs * epoch + start_lr
    elif epoch < rampup_epochs + sustain_epochs:
      lr = max_lr
    else:
      lr = (max_lr - min_lr) * exp_decay**(epoch-rampup_epochs-sustain_epochs) + min_lr
    return lr
  return lr(epoch, start_lr, min_lr, max_lr, rampup_epochs, sustain_epochs, exp_decay)


epochs = 200
initial_learning_rate = 0.0003
final_learning_rate = 0.000001
learning_rate_decay_factor = (final_learning_rate / initial_learning_rate)**(1/epochs)
steps_per_epoch = int(SAMPLES/batch_size)
print("steps per epoch:",steps_per_epoch)
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
                initial_learning_rate=initial_learning_rate,
                decay_steps=steps_per_epoch,
                decay_rate=learning_rate_decay_factor,
                staircase=True)


lr_callback = tf.keras.callbacks.LearningRateScheduler(my_learner, verbose=True)

model.compile(
    # if you use a learning rate scheduler callback this is without the learning parameter
    #optimizer=tf.keras.optimizers.Adam(),
    # this is learning rate scheduler added to the optimizer
    optimizer=tf.keras.optimizers.Adam(learning_rate=lr_schedule),
    # this is a constant learning rate
    #optimizer=tf.keras.optimizers.Adam(learning_rate=9.98e-5),
    loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
    # here are all available metrices to use
    metrics=[tf.keras.metrics.AUC(name="auc", multi_label=True),
           tf.keras.metrics.TruePositives(name='tp'),
           tf.keras.metrics.FalsePositives(name='fp'),
           tf.keras.metrics.TrueNegatives(name='tn'),
           tf.keras.metrics.FalseNegatives(name='fn'),
           #tf.keras.metrics.AUC(name='prc', curve='PR'),
           tf.keras.metrics.Precision(name='precision'),
           tf.keras.metrics.Recall(name='recall'),
           tf.keras.metrics.CategoricalAccuracy(name='accuracy')],
    run_eagerly=False,
)
# model structure
model.summary()

#callback used during the traning to adapt or log the training process
callbacks = [
    # this enables to see the entire learning process in a nice UI
    tf.keras.callbacks.TensorBoard(log_dir=f"{LOG_PATH}",write_graph=False),
    # saves the best models -> google drive can be used
    # monitor -> what si monitored for the EarlyStoppping
    tf.keras.callbacks.ModelCheckpoint(MODEL_PATH, monitor="val_loss"),
    #here is the learning scheduler if this is used the optimizer start without learning parameter
    #lr_callback
    #this can be enabled -> the training will stop when the model begins to overfit
    #tf.keras.callbacks.EarlyStopping(patience=5, start_from_epoch=35,restore_best_weights=True)
]

#training the model
model.fit(
  train_ds,
  validation_data=val_ds,
  epochs=epochs,
  callbacks=callbacks,
  # calculation of initial weights in case of inbalanced dataset, 0.7 is the training data split
  class_weight=get_class_weights(dataDirTrain,CLASS_NAMES,0.7)
)



In [None]:
import tensorflow as tf
model = tf.keras.models.load_model("/content/sample_data/epochs/MODEL_NAME.keras")
model.summary()
model.save('/content/drive/MyDrive/GOOGLE_DRIVE_PATH/NAME_OF_THE_MODEL.keras')


# Tensorboard

In [None]:
%load_ext tensorboard
#%reload_ext tensorboard
#!kill 19207
%tensorboard --logdir '/content/drive/MyDrive/Colab/posnegat/logs11f_w'