<a href="https://colab.research.google.com/github/Yunodo/maxup/blob/main/MaxUp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf
import os
import re
from tensorflow import keras

In [None]:
#Connecting to GPU in Colaboratory
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
  raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

Found GPU at: /device:GPU:0


In [None]:
BATCH_SIZE = 16 
IMG_SIZE = (256, 256)

In [None]:
#Downloading data

_URL = 'https://s3.amazonaws.com/fast-ai-imageclas/imagenette2-320.tgz'
path_to_zip = tf.keras.utils.get_file('imagenette2-320.tgz', origin=_URL, extract=True)
PATH = os.path.join(os.path.dirname(path_to_zip), 'imagenette2-320')

train_dir = os.path.join(PATH, 'train')
validation_dir = os.path.join(PATH, 'val')

#Preparing datasets

training_dataset = tf.keras.preprocessing.image_dataset_from_directory(train_dir,
                                             shuffle=True,
                                             batch_size=BATCH_SIZE,
                                             image_size=IMG_SIZE,
                                             labels = 'inferred',
                                             label_mode = 'categorical')

validation_dataset = tf.keras.preprocessing.image_dataset_from_directory(validation_dir,
                                                  shuffle=True,
                                                  batch_size=BATCH_SIZE,
                                                  image_size=IMG_SIZE,
                                                  labels = 'inferred',
                                                  label_mode = 'categorical')

Downloading data from https://s3.amazonaws.com/fast-ai-imageclas/imagenette2-320.tgz
Found 9469 files belonging to 10 classes.
Found 3925 files belonging to 10 classes.


In [None]:
def normalise_data(image, cl):
  """

  Inputs:
    image - Tensor of image matrix values
    cl    - Tensor of class values

  Outputs:
    normalized - Tensor of normalized image matrix values
    cl         - Tensor of class values

  """

  normalized = tf.cast(image, tf.float32) / 255.0 # convert each 0-255 value to floats in [0, 1] range
  return normalized, cl

training_dataset = training_dataset.map(normalise_data)
validation_dataset = validation_dataset.map(normalise_data)

In [None]:
#Initialising a new model


def create_model():
  base_model = tf.keras.applications.ResNet50V2(input_shape= IMG_SIZE + (3,),
                                               include_top=False,
                                               weights='imagenet') #pre-trained model
  base_model.trainable = True
  model = tf.keras.Sequential([
                               base_model,
                               tf.keras.layers.GlobalAveragePooling2D(),
                               tf.keras.layers.Dropout(0.2),
                               tf.keras.layers.Dense(10, activation = 'softmax') # stacking layers on top of pre-trained model
  ])

  return model

model = create_model()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
#Image Data Augmentations: courtesy to https://www.wouterbulten.nl/blog/tech/data-augmentation-using-tensorflow-data-dataset/

def rotate(x: tf.Tensor) -> tf.Tensor:
    """Rotation augmentation

    Args:
        x: Image

    Returns:
        Augmented image
    """

    # Rotate 0, 90, 180, 270 degrees
    return tf.image.rot90(x, tf.random.uniform(shape=[], minval=0, maxval=4, dtype=tf.int32))

def flip(x: tf.Tensor) -> tf.Tensor:
    """Flip augmentation

    Args:
        x: Image to flip

    Returns:
        Augmented image
    """
    x = tf.image.random_flip_left_right(x)
    x = tf.image.random_flip_up_down(x)

    return x


def color(x: tf.Tensor) -> tf.Tensor:
    """Color augmentation

    Args:
        x: Image

    Returns:
        Augmented image
    """
    x = tf.image.random_hue(x, 0.08)
    x = tf.image.random_saturation(x, 0.6, 1.6)
    x = tf.image.random_brightness(x, 0.05)
    x = tf.image.random_contrast(x, 0.7, 1.3)
    return x





augmentations = [flip, color, rotate]

In [None]:
def MaxUp(model, x_batch_train, y_batch_train, loss_fn, augmentations, times):
  """MaxUp (https://arxiv.org/abs/2002.09024) implementation of choosing augmentations with highest loss

    Args:
        model 
        x_batch_train 
        y_batch_train 
        loss_fn
        augmentations: a list of augmentation functions
        times: - the number of times augmentations apply


    Returns:
        Training batch with augmented images with the highest loss
    """

  x_batch = x_batch_train.numpy()
  batch_size = tf.shape(x_batch_train)[0]

  for i in range(batch_size):

    x = tf.expand_dims(x_batch_train[i], axis = 0)
    y = tf.expand_dims(y_batch_train[i], axis = 0)
    logits = model(x, training = False)
    loss_value = loss_fn(y, logits)

    for j in range(0, times):

      for f in augmentations:

        new_x = tf.expand_dims(tf.clip_by_value(f(x_batch_train[i]),0,1), axis = 0)
        logits = model(new_x, training = False)
        new_loss_value = loss_fn(y, logits)

        if new_loss_value > loss_value:

          loss_value = new_loss_value
          x_batch[i] = new_x

  return tf.convert_to_tensor(x_batch)

In [None]:
optimizer = keras.optimizers.SGD(learning_rate=1e-3)
loss_fn = keras.losses.CategoricalCrossentropy(from_logits=True)
metrics = ['Categorical_Accuracy', 'Categorical_Crossentropy']


model.compile(optimizer, loss_fn, metrics)
train_acc_metric = keras.metrics.CategoricalAccuracy()

In [None]:
epochs = 3
for epoch in range(epochs):
    print("\nStart of epoch %d" % (epoch,))

    
    for step, (x_batch_train, y_batch_train) in enumerate(training_dataset):
      augmented_x_batch_train = MaxUp(model, x_batch_train, y_batch_train, loss_fn, augmentations, times = 2)

        
      with tf.GradientTape() as tape:

            
          logits = model(augmented_x_batch_train, training=True)  

            
          loss_value = loss_fn(y_batch_train, logits)

      grads = tape.gradient(loss_value, model.trainable_weights)


      optimizer.apply_gradients(zip(grads, model.trainable_weights))
      train_acc_metric.update_state(y_batch_train, logits)


      if step % 10 == 0:
          model.save_weights("ckpt")
          print(
              "Training loss (for one batch) at step %d: %.4f"
              % (step, float(loss_value))
          )
          print("Seen so far: %s samples" % ((step + 1) * 16))
          print("accuracy {:1.2f}".format(train_acc_metric.result().numpy()))

    train_acc_metric.reset_states()



Start of epoch 0
Training loss (for one batch) at step 0: 1.6505
Seen so far: 16 samples
accuracy 0.81
Training loss (for one batch) at step 10: 1.6708
Seen so far: 176 samples
accuracy 0.85


KeyboardInterrupt: ignored

In [None]:
from google.colab import files
uploaded = files.upload()

Saving my_model.h5 to my_model.h5


In [None]:
model.load_weights('my_model.h5')

In [None]:
model.compile(optimizer = 'adam', loss = None, metrics = keras.metrics.CategoricalAccuracy())

In [None]:
model.evaluate(validation_dataset)



[0.0, 0.9556688070297241]

# New Section