# RandAugment

In [None]:
!pip uninstall --yes imgaug
!pip install imgaug==0.4.0

In [None]:
import imgaug.augmenters as iaa
aug = iaa.RandAugment(n=2, m=9)

def randaugment(img):
  return aug(images=img)

# Chargement des données

In [None]:
!git clone https://github.com/axelcarlier/projsemisup.git
path = "./projsemisup/"

In [None]:
import os

CLASSES = os.listdir(path + 'Lab/')
print(CLASSES)

In [None]:
IMAGE_SIZE = 64

In [None]:
import os
import PIL
from PIL import Image
import numpy as np

def load_semisup_data(path, classes, image_size=64):

  file_path_lab = os.listdir(path + 'Lab/')
  nb_lab = 360
  # Initialise les structures de données
  x_lab = np.zeros((nb_lab, image_size, image_size, 3))
  y_lab = np.zeros((nb_lab, 1))
  i = 0
  for c in file_path_lab:

    class_label = classes.index(c)
    list_images = os.listdir(path + 'Lab/' + c + '/')

    for img_name in list_images:
      # Lecture de l'image
      img = Image.open(path + 'Lab/' + c + '/' + img_name)
      # Mise à l'échelle de l'image
      img = img.resize((image_size,image_size), Image.ANTIALIAS)
      img = img.convert('RGB')
      # Remplissage de la variable x
      x_lab[i] = np.asarray(img)
      y_lab[i] = class_label
      i = i + 1


  file_path_test = os.listdir(path + 'Test/')
  nb_test = 1800
  # Initialise les structures de données
  x_test = np.zeros((nb_test, image_size, image_size, 3))
  y_test = np.zeros((nb_test, 1))
  i = 0
  for c in file_path_test:

    class_label = classes.index(c)
    list_images = os.listdir(path + 'Test/' + c + '/')

    for img_name in list_images:
      # Lecture de l'image
      img = Image.open(path + 'Test/' + c + '/' + img_name)
      # Mise à l'échelle de l'image
      img = img.resize((image_size,image_size), Image.ANTIALIAS)
      img = img.convert('RGB')
      # Remplissage de la variable x
      x_test[i] = np.asarray(img)
      y_test[i] = class_label
      i = i + 1


  return x_lab, y_lab, x_test, y_test


x_lab, y_lab, x_test, y_test = load_semisup_data(path, CLASSES, image_size=IMAGE_SIZE)


In [None]:
print(x_lab.shape, y_lab.shape)
print(x_test.shape, y_test.shape)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

# Randomisation des indices et affichage de 9 images aléatoires de la base d'apprentissage
indices = np.arange(x_lab.shape[0])
np.random.shuffle(indices)
plt.figure(figsize=(12, 12))
for i in range(0, 9):
    plt.subplot(3, 3, i+1)
    plt.title(CLASSES[int(y_lab[indices[i]])])
    plt.imshow(x_lab[indices[i]]/255)
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

# Randomisation des indices et affichage de 9 images aléatoires de la base de test
indices = np.arange(x_test.shape[0])
np.random.shuffle(indices)
plt.figure(figsize=(12, 12))
for i in range(0, 9):
    plt.subplot(3, 3, i+1)
    plt.title(CLASSES[int(y_test[indices[i]])])
    plt.imshow(x_test[indices[i]]/255)
plt.tight_layout()
plt.show()

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras import optimizers

def create_model(image_size=64):
  model = Sequential()

  model.add(Conv2D(64,(3,3),input_shape=(image_size,image_size,3),activation='relu', padding='same'))
  model.add(MaxPooling2D(pool_size=(2,2)))
  model.add(Conv2D(128,(3,3),activation='relu', padding='same'))
  model.add(MaxPooling2D(pool_size=(2,2)))
  model.add(Conv2D(256,(3,3),activation='relu', padding='same'))
  model.add(MaxPooling2D(pool_size=(2,2)))
  model.add(Conv2D(256,(3,3),activation='relu', padding='same'))
  model.add(MaxPooling2D(pool_size=(2,2)))
  model.add(Flatten())
  model.add(Dense(512,activation='relu'))
  model.add(Dense(95,activation='softmax'))

  return model

# Apprentissage supervisé avec augmentation

In [None]:
import tensorflow as tf
from tensorflow import keras
import math

x_train_lab = x_lab
y_train_lab = y_lab

# Hyperparamètres de l'apprentissage
epochs = 200
batch_size = 16
steps_per_epoch = math.floor(x_train_lab.shape[0]/batch_size)

model = create_model(image_size=IMAGE_SIZE)

# Instanciation d'un optimiseur et d'une fonction de coût.
optimizer = keras.optimizers.Adam(learning_rate=1e-4)
loss_fn = keras.losses.SparseCategoricalCrossentropy()

# Préparation des métriques pour le suivi de la performance du modèle.
train_acc_metric = keras.metrics.SparseCategoricalAccuracy()
test_acc_metric = keras.metrics.SparseCategoricalAccuracy()

# Indices de l'ensemble labellisé
indices = np.arange(x_train_lab.shape[0])

# Boucle sur les epochs
for epoch in range(epochs):

  # A chaque nouvelle epoch, on randomise les indices de l'ensemble labellisé
  np.random.shuffle(indices) 

  # Et on recommence à cumuler la loss
  cum_loss_value = 0

  for step in range(steps_per_epoch):

    # Sélection des données du prochain batch
    x_batch = x_train_lab[indices[step*batch_size:(step+1)*batch_size]]
    y_batch = y_train_lab[indices[step*batch_size:(step+1)*batch_size]]

    # Augmentation puis des données
    x_batch_augment = randaugment(x_batch.astype('uint8'))
    x_batch_augment = x_batch_augment.astype('float')/255

    # Etape nécessaire pour comparer y_batch à la sortie du réseau
    y_batch = np.expand_dims(y_batch, 1)

    # Les opérations effectuées par le modèle dans ce bloc sont suivies et permettront
    # la différentiation automatique.
    with tf.GradientTape() as tape:

      # Application du réseau aux données d'entrée
      y_pred = model(x_batch_augment, training=True)  # Logits for this minibatch

      # Calcul de la fonction de perte sur ce batch
      loss_value = loss_fn(y_batch, y_pred)

      # Calcul des gradients par différentiation automatique
      grads = tape.gradient(loss_value, model.trainable_weights)

      # Réalisation d'une itération de la descente de gradient (mise à jour des paramètres du réseau)
      optimizer.apply_gradients(zip(grads, model.trainable_weights))

      # Mise à jour de la métrique
      train_acc_metric.update_state(y_batch, y_pred)

      cum_loss_value = cum_loss_value + loss_value

  # Calcul de la précision à la fin de l'epoch
  train_acc = train_acc_metric.result()

  if epoch%5 == 0:
    # Calcul de la précision sur l'ensemble de test à la fin de l'epoch, toutes les 5 epochs
    test_logits = model(x_test/255, training=False)
    test_acc_metric.update_state(np.expand_dims(y_test, 1), test_logits)
    test_acc = test_acc_metric.result()

    print("Epoch %4d : Loss : %.4f, Acc : %.4f, Test Acc : %.4f" % (epoch, float(cum_loss_value/steps_per_epoch), float(train_acc), float(test_acc)))

  else:
    print("Epoch %4d : Loss : %.4f, Acc : %.4f" % (epoch, float(cum_loss_value/steps_per_epoch), float(train_acc)))


  # Remise à zéro des métriques pour la prochaine epoch
  train_acc_metric.reset_states()
  test_acc_metric.reset_states()      