## Imports

In [None]:
!pip install matplotlib
!pip install sklearn
!pip install scikit-learn

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
import random

In [None]:
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras.models import Model
from keras.optimizers import Adam
from keras.layers import Dense, Conv1D, Conv2D, Input, Reshape, Permute, Add, Flatten, BatchNormalization, Activation, MaxPooling1D, MaxPooling2D, ZeroPadding2D, Concatenate, Dropout, AveragePooling1D, GlobalAveragePooling1D, GlobalMaxPooling1D, UpSampling2D
from keras.regularizers import l2, l1, l1_l2
from keras.losses import CategoricalCrossentropy, SparseCategoricalCrossentropy

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay


##Datasets

In [None]:
from keras.datasets import mnist, cifar100, cifar10, fashion_mnist

In [None]:
def generate_datasets(dataset_name, flat = False, normalize = True):
  # Download dataset
  if dataset_name == "mnist":
    (x_train_val, y_train_val), (x_test, y_test) = mnist.load_data()
    output_shape = 10
  if dataset_name == "cifar100":
    (x_train_val, y_train_val), (x_test, y_test) = cifar100.load_data()
    output_shape = 100
  if dataset_name == "cifar10":
    output_shape = 10
    (x_train_val, y_train_val), (x_test, y_test) = cifar10.load_data()
  if dataset_name == "fashion_mnist":
    output_shape = 10
    (x_train_val, y_train_val), (x_test, y_test) = fashion_mnist.load_data()


  # Create validation set
  x_train, x_valid, y_train, y_valid = train_test_split(x_train_val, y_train_val, test_size=0.2, shuffle= True, random_state= 1) #123 #1234

  # Size
  print("Dataset size:" + str(y_train_val.shape[0] + y_test.shape[0]))
  print("Training + Validation set size:" + str(y_train_val.shape[0]) + str(" -- ") + "Training set size:" + str(y_train.shape[0]) + str("  ") + "Validation set size:" + str(y_valid.shape[0]))
  print("Test set size:" + str(y_test.shape[0]))

  if not isinstance(y_train[0], np.uint8):
    y_train = np.array([x.tolist()[0] for x in y_train])
    y_valid = np.array([x.tolist()[0] for x in y_valid])
    y_test = np.array([x.tolist()[0] for x in y_test])


  print("Samples per label - total - training - validation - test:")
  all_labels = list(y_train) + list(y_valid) + list(y_test)
  for label in set(all_labels):
    print("Label:" + str(label) + " -- " + "Samples total:" + str(all_labels.count(label)) + " -- " + "Samples training:" + str(list(y_train).count(label)) + " -- " + "Samples validation:" + str(list(y_valid).count(label)) + " -- " + "Samples test:" + str(list(y_test).count(label)))

  # Reshape
  if flat == True:
    shape_list = list(x_train.shape)
    input_shape = np.prod(shape_list[1:])
    train_data = x_train.reshape((-1, input_shape))
    validation_data = x_valid.reshape((-1, input_shape))
    test_data = x_test.reshape((-1, input_shape))
  else:
    if len(x_train.shape[1:]) == 3:
      pass
    else:
      x_train = np.expand_dims(x_train, axis=-1)
      x_valid = np.expand_dims(x_valid, axis=-1)
      x_test  = np.expand_dims(x_test, axis=-1)

      x_train = np.repeat(x_train, 3, axis=-1)
      x_valid = np.repeat(x_valid, 3, axis=-1)
      x_test = np.repeat(x_test, 3, axis=-1)

    train_data = x_train
    validation_data = x_valid
    test_data = x_test

  input_shape = tuple(train_data.shape[1:])

  # Normalize
  if normalize == True:
    train_data = train_data/255
    validation_data = validation_data/255
    test_data = test_data/255

  # Return train, validation, test sets
  return train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape

## Models

In [None]:
def instantiate_model(model_name, dataset_name, input_shape, output_shape):
  metrics = ['acc']
  if model_name == "Logistic regression":
    input = Input(shape=(np.prod(list(input_shape)), ))
    dense = Dense(output_shape)(input)
    dense = BatchNormalization()(dense)
    output = Activation('sigmoid')(dense)
    model = Model(input, output)
    model.compile(optimizer= 'sgd', loss= 'sparse_categorical_crossentropy', metrics=metrics)
    return model 
  
  if "Resnet" in model_name:
    if dataset_name == "cifar10" or dataset_name == "cifar100":
      size = (7,7)
      optimizer = 'sgd'
    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      size = (8,8)
      optimizer = 'adam'

    input = Input(shape=input_shape)
    x = UpSampling2D(size=size)(input)

    if "50v1" in model_name:
      x = tf.keras.applications.ResNet50(include_top=False, weights='imagenet', input_shape=(224,224,3), pooling=None)(x)
    elif "50v2" in model_name:
      x = tf.keras.applications.ResNet50V2(include_top=False, weights='imagenet', input_shape=(224,224,3), pooling=None)(x)
    
    if "101v1" in model_name:
      x = tf.keras.applications.ResNet101(include_top=False, weights='imagenet', input_shape=(224,224,3), pooling=None)(x)
    elif "101v2" in model_name:
      x = tf.keras.applications.ResNet101V2(include_top=False, weights='imagenet', input_shape=(224,224,3), pooling=None)(x)

    if "152v1" in model_name:
      x = tf.keras.applications.ResNet152(include_top=False, weights='imagenet', input_shape=(224,224,3), pooling=None)(x)
    elif "152v2" in model_name:
      x = tf.keras.applications.ResNet152V2(include_top=False, weights='imagenet', input_shape=(224,224,3), pooling=None)(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(1024, activation="relu")(x)
    x = tf.keras.layers.Dense(512, activation="relu")(x)
    output = tf.keras.layers.Dense(output_shape, activation="softmax")(x)
    model = Model(input, output)

    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      for layer in model.layers[:-3]:
        layer.trainable=False

    model.compile(optimizer= optimizer, loss= 'sparse_categorical_crossentropy', metrics=metrics)
    return model

  if model_name == "Alexnet": # img_shape=(224, 224, 3), n_classes=10
    if dataset_name == "cifar10" or dataset_name == "cifar100":
      size = (7,7)
      optimizer = 'sgd'
    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      size = (8,8)
      optimizer = 'adam'

    input = Input(shape=input_shape)
    x = UpSampling2D(size=size)(input)
    x = Conv2D(96, (11, 11), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    
    x= Conv2D(256, (5, 5), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = ZeroPadding2D((1, 1))(x)
    x = Conv2D(512, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = ZeroPadding2D((1, 1))(x)
    x = Conv2D(1024, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    x = ZeroPadding2D((1, 1))(x)
    x = Conv2D(1024, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = Flatten()(x)
    x = Dense(3072)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(0.5)(x)

    # Layer 7
    x = Dense(4096)(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(0.5)(x)

    # Layer 8
    x = Dense(output_shape)(x)
    x = BatchNormalization()(x)
    output = Activation('softmax')(x)

    model = Model(input, output)
    model.compile(optimizer= optimizer, loss= 'sparse_categorical_crossentropy', metrics=metrics)
    return model

  if "VGG" in model_name:
    if dataset_name == "cifar10" or dataset_name == "cifar100":
      size = (7,7)
      optimizer = 'sgd'
    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      size = (8,8)
      optimizer = 'adam'

    input = Input(shape=input_shape)
    x = UpSampling2D(size=size)(input)

    if "16" in model_name: 
      x = tf.keras.applications.VGG16(include_top=False, weights="imagenet", input_shape=(224,224,3), pooling=None)(x)
    else: 
      x = tf.keras.applications.VGG19(include_top=False, weights="imagenet", input_shape=(224,224,3), pooling=None)(x)
    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(1024, activation="relu")(x)
    x = tf.keras.layers.Dense(512, activation="relu")(x)
    output = tf.keras.layers.Dense(output_shape, activation="softmax")(x)
    model = Model(input, output)

    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      for layer in model.layers[:-3]:
        layer.trainable=False

    model.compile(optimizer= optimizer, loss= 'sparse_categorical_crossentropy', metrics=metrics)
    return model

  if "Densenet" in model_name:
    if dataset_name == "cifar10" or dataset_name == "cifar100":
      size = (7,7)
      optimizer = 'sgd'
    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      size = (8,8)
      optimizer = 'adam'

    input = Input(shape=input_shape)
    x = UpSampling2D(size=size)(input)

    if "121" in model_name:
      x = tf.keras.applications.DenseNet121(include_top=False, weights="imagenet", input_shape=(224,224,3), pooling=None)(x)
    elif "169" in model_name:
      x = tf.keras.applications.DenseNet169(include_top=False, weights="imagenet", input_shape=(224,224,3), pooling=None)(x)
    else:
      x = tf.keras.applications.DenseNet201(include_top=False, weights="imagenet", input_shape=(224,224,3), pooling=None)(x)

    x = tf.keras.layers.GlobalAveragePooling2D()(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(1024, activation="relu")(x)
    x = tf.keras.layers.Dense(512, activation="relu")(x)
    output = tf.keras.layers.Dense(output_shape, activation="softmax")(x)
    model = Model(input, output)

    if dataset_name == "mnist" or dataset_name == "fashion_mnist":
      for layer in model.layers[:-3]:
        layer.trainable=False

    model.compile(optimizer= optimizer, loss= 'sparse_categorical_crossentropy', metrics=metrics)
    return model

    


## Training

In [None]:
def train_model(model_name, dataset_name, batch_size, epochs, flat, normalize):
  # Generate datasets
  train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape= generate_datasets(dataset_name, flat, normalize)

  # Instantiate model
  model = instantiate_model(model_name, dataset_name, input_shape, output_shape)
  
  # Instantiate early stopping
  stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_acc', patience=3, restore_best_weights= True);
  
  # Train model 
  history = model.fit(train_data, y_train, batch_size = batch_size, epochs= epochs, validation_data = (validation_data, y_valid), callbacks=[stop_early])

  # Evaluate model
  score = model.evaluate(test_data, y_test, verbose = 0) 
  print('Test accuracy:', score[1])
  print('Test loss:', score[0])

  # Confusion matrix results 
  y_prediction = model.predict(test_data)
  y_prediction = np.argmax(y_prediction, axis = 1)
  result = confusion_matrix(y_test, y_prediction)
  
  print("The confusion matrix:")
  print(*result, sep="\n")
  model_classes = list(set(sorted([x for x in y_test])))
  disp = ConfusionMatrixDisplay(confusion_matrix=result, display_labels= model_classes)

  disp.plot()
  plt.savefig(model_name + "_" + dataset_name, dpi = 4096)
  plt.show()

  return model

In [None]:
model = train_model("Logistic regression", "mnist", 128, 30, flat = True, normalize = True)

In [None]:
model = train_model("Resnet50v1", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Resnet50v2", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Resnet101v1", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Resnet101v2", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Resnet152v1", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Resnet152v2", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("VGG16", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("VGG19", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Densenet121", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Densenet169", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Densenet201", "mnist", 128, 30, flat = False, normalize = True)

In [None]:
model = train_model("Alexnet", "mnist", 128, 30, flat = False, normalize = True)

## Corruption mechanisms

In [None]:
def corrupt_labels(corruption_type, train_labels, validation_labels, test_labels, total_labels, percentage_random_labels = None):

  if corruption_type == "shift_label":
    labels = list(range(total_labels))
    np.random.seed(73)
    init_label = np.random.choice(labels) 
    labels.remove(init_label)
    new_label = np.random.choice(labels)
    train_labels = np.array([new_label if el == init_label else el for el in train_labels])
    #validation_labels = np.array([new_label if el == init_label else el for el in validation_labels])
    print("Corruption type: Shift-labels where:" + str(init_label) + " was replaced by:" + str(new_label))

  if corruption_type == "random_label":
    no_train_labels = len(train_labels)
    no_valid_labels = len(validation_labels)
    
    random.seed(42)
    train_corrupted_indices = random.sample(list(range(no_train_labels)), int(no_train_labels * percentage_random_labels))
    np.random.seed(81)
    train_corrupted_labels = np.random.choice(list(range(total_labels)), size = no_train_labels, replace = True)
    new_train_labels = np.array([train_corrupted_labels[el] if el in train_corrupted_indices else train_labels[el] for el in list(range(no_train_labels))])

    random.seed(53)
    valid_corrupted_indices = random.sample(list(range(no_valid_labels)), int(no_valid_labels * percentage_random_labels))
    np.random.seed(49)
    valid_corrupted_labels = np.random.choice(list(range(total_labels)), size = no_valid_labels, replace = True)
    new_valid_labels = np.array([valid_corrupted_labels[el] if el in valid_corrupted_indices else validation_labels[el] for el in list(range(no_valid_labels))])

    train_labels = new_train_labels
    validation_labels = new_valid_labels
    
    print("Corruption type: Random-label where:" + str(percentage_random_labels) + " percent of labels were replaced by random values")

  if corruption_type == "random_map":
    initial_labels = list(range(total_labels))
    
    random.seed(72)
    shuffled_labels = list(range(total_labels))
    random.shuffle(shuffled_labels)

    no_train_labels = len(train_labels)
    no_valid_labels = len(validation_labels)
    
    random.seed(42)
    train_corrupted_indices = random.sample(list(range(no_train_labels)), int(no_train_labels * percentage_random_labels))
    new_train_labels = np.array([shuffled_labels[train_labels[el]] if el in train_corrupted_indices else train_labels[el] for el in list(range(no_train_labels))])

    random.seed(53)
    valid_corrupted_indices = random.sample(list(range(no_valid_labels)), int(no_valid_labels * percentage_random_labels))
    new_valid_labels = np.array([shuffled_labels[validation_labels[el]] if el in valid_corrupted_indices else validation_labels[el] for el in list(range(no_valid_labels))])

    train_labels = new_train_labels
    validation_labels = new_valid_labels
    
    print("Corruption type: Random-map where:" + str(percentage_random_labels) + " percent of labels were replaced according to the random map")

  return train_labels, validation_labels, test_labels

# Updated training procedure

In [None]:
def train_model_with_corruption(model_name, dataset_name, batch_size, epochs, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape):

  # Instantiate model
  model = instantiate_model(model_name, dataset_name, input_shape, output_shape)
  
  # Instantiate early stopping
  stop_early = tf.keras.callbacks.EarlyStopping(monitor='val_acc', patience=3, restore_best_weights= True);
  
  # Train model 
  history = model.fit(train_data, y_train, batch_size = batch_size, epochs= epochs, validation_data = (validation_data, y_valid), callbacks=[stop_early])

  # Evaluate model
  score = model.evaluate(test_data, y_test, verbose = 0) 
  print('Test accuracy:', score[1])
  print('Test loss:', score[0])

  # Confusion matrix results 
  y_prediction = model.predict(test_data)
  y_prediction = np.argmax(y_prediction, axis = 1)
  result = confusion_matrix(y_test, y_prediction)
  print(y_test)
  print(y_prediction)
  
  print("The confusion matrix:")
  print(*result, sep="\n")
  model_classes = list(set(sorted([x for x in y_test])))
  disp = ConfusionMatrixDisplay(confusion_matrix=result, display_labels= model_classes)

  disp.plot()
  plt.savefig(model_name + "_" + dataset_name, dpi = 1024)
  plt.show()

  return model

In [None]:
corruption_type = "random_label"
dataset_name = "mnist"
percentage_random_labels = 0.1

# Generate datasets flat
train_data_f, train_labels, validation_data_f, validation_labels, test_data_f, test_labels, input_shape_f, output_shape_f = generate_datasets(dataset_name, flat = True, normalize = True)
print(train_labels[:200])

# Generate datasets normal
train_data, train_labels, validation_data, validation_labels, test_data, test_labels, input_shape, output_shape= generate_datasets(dataset_name, flat = False, normalize = True)
print(train_labels[:200])

# Adding the corruption strategy
y_train, y_valid, y_test = corrupt_labels(corruption_type, train_labels, validation_labels, test_labels, output_shape, percentage_random_labels)
print(y_train[:200])

In [None]:
model = train_model_with_corruption("Logistic regression", "mnist", 128, 30, train_data_f, y_train, validation_data_f, y_valid, test_data_f, y_test, input_shape_f, output_shape_f)

In [None]:
model = train_model_with_corruption("Resnet50v1", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Resnet50v2", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Resnet101v1", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Resnet101v2", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Resnet152v1", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Resnet152v2", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("VGG16", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("VGG19", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Densenet121", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Densenet169", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Densenet201", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)

In [None]:
model = train_model_with_corruption("Alexnet", "mnist", 128, 30, train_data, y_train, validation_data, y_valid, test_data, y_test, input_shape, output_shape)