In [None]:
import pandas as pd
from os import path
from os import environ
import os
import random as random
from tensorflow.keras.preprocessing import image as imgproc
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import layers
from tensorflow.keras import metrics
from tensorflow.keras import optimizers
from tensorflow.keras import models
from tensorflow.keras import callbacks
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import random
from sklearn.preprocessing import MinMaxScaler

%matplotlib inline

### Initialisation

In [None]:
# Attempt to make runs more reproducible
seed_value=20212042
print("Using random seed: %d" % seed_value)
environ['PYTHONHASHSEED'] = str(seed_value)
random.seed(seed_value)
np.random.seed(seed_value)
tf.random.set_seed(seed_value) # tensorflow 2.x

print("GPUs Available:", tf.config.list_physical_devices('GPU'))
print("Tensorflow version:", tf.__version__)

### Constants

In [None]:
DATA_DIR = './imagenette2-320'
TRAIN_DIR = path.join(DATA_DIR, 'train')
VALIDATION_DIR = path.join(DATA_DIR, 'validation') # a split off 'train' used as validation set during NN training
TEST_DIR = path.join(DATA_DIR, 'val') # the original Imagenette test dir
MODELS_DIR = path.join('./models')
LABELS_FILE = path.join(DATA_DIR, 'noisy_imagenette.csv')
CLASS1 = 'n03445777' # e.g. n03445777 -> golf ball
CLASS2 = 'n03888257' # e.g. n03888257 -> parachute
CLASSES = [CLASS1, CLASS2]
IMG_SIZE = (150, 150)
COLOUR_SCALE = 1/255.
BATCH_SIZE = 32

### Helper functions

In [None]:
# Data generation flow from train/validation directory
def create_flow(datagen, path, batch_size):
    return datagen.flow_from_directory(
        path,
        target_size=IMG_SIZE,
        classes=CLASSES,
        class_mode='binary',
        batch_size=batch_size
    )

def evaluate_model(model_file):
    model = models.load_model(model_file)
    test_flow = create_flow(ImageDataGenerator(rescale=COLOUR_SCALE), TEST_DIR, BATCH_SIZE)
    loss_accuracy = model.evaluate(test_flow, steps=test_flow.samples // BATCH_SIZE, verbose=False)
    print('Test loss: %.2f, accuracy: %.2f%%' % (loss_accuracy[0], loss_accuracy[1] * 100.))
    
def plot_model_history(history):
    # Loss
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    
    # Binary Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history.history['binary_accuracy'])
    plt.plot(history.history['val_binary_accuracy'])
    plt.title('model accuracy')
    plt.ylabel('binary_accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'validation'], loc='upper left')
    plt.tight_layout()
    plt.show()
    
def load_random_image(filepath):
    img_file = random.choice(os.listdir(filepath))
    img = imgproc.load_img(path.join(filepath, img_file))
    img = img.resize(IMG_SIZE)
    img_array = imgproc.img_to_array(img)
    return img_array * COLOUR_SCALE

### Make train/validation split and organize directory structure accordingly

In [None]:
if not path.isdir(VALIDATION_DIR):
    ground_truth = pd.read_csv(LABELS_FILE)
    ground_truth = ground_truth[ground_truth['noisy_labels_0'].isin(CLASSES)]
    test_df = ground_truth[ground_truth['is_valid']==True]
    imagenette_train = ground_truth[ground_truth['is_valid']==False]
    train_df, val_df = train_test_split(imagenette_train, test_size=0.2) # the dataset is balanced
    val_df = val_df.rename(columns={'path': 'orig_path'})
    val_df['path'] = val_df['orig_path'].str.replace('train/', 'validation/')
    val_df.apply(lambda v: os.renames(path.join(DATA_DIR, v['orig_path']), path.join(DATA_DIR, v['path'])), axis=1)
    del val_df['orig_path']

### Define and train baseline CNN

In [None]:
ImageDataGenerator(rescale=COLOUR_SCALE)
baseline_model_file = path.join(MODELS_DIR, 'imagenette', 'baseline.h5')
if not path.isfile(baseline_model_file):
    os.makedirs(path.dirname(baseline_model_file), exist_ok=True)
    
    # Define model architecture
    inputs = layers.Input(shape=IMG_SIZE + (3,))
    x = layers.Conv2D(32, (3, 3), activation='relu')(inputs)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(128, (3, 3), activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(128, (3, 3), activation='relu')(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Flatten()(x)
    x = layers.Dense(512, activation='relu')(x)
    output = layers.Dense(1, activation='sigmoid')(x)
    baseline_model = keras.Model(inputs=inputs, outputs=output)
    baseline_model.summary()
    
    # Train model
    epochs_count = 35
    datagen = ImageDataGenerator(rescale=COLOUR_SCALE)
    train_flow = create_flow(datagen, TRAIN_DIR, BATCH_SIZE)
    val_flow = create_flow(datagen, VALIDATION_DIR, BATCH_SIZE)
    save_best_cb = callbacks.ModelCheckpoint(filepath=baseline_model_file,
                                             monitor='val_loss', mode='min', save_best_only=True,
                                             verbose=False) # set to True to see best model's epoch
    baseline_model.compile(
        optimizers.RMSprop(lr=1e-4),
        'binary_crossentropy',
        metrics=[metrics.BinaryAccuracy()]
    )
    history = baseline_model.fit(train_flow, steps_per_epoch=train_flow.samples // BATCH_SIZE,
                       validation_data=val_flow, validation_steps=val_flow.samples // BATCH_SIZE,
                       epochs=epochs_count,
                       callbacks=[save_best_cb],
                       verbose=False)
    plot_model_history(history)

# Evaluate against test dataset
evaluate_model(baseline_model_file)

### Use data augmentation to improve model performance

In [None]:
improved_model_file = path.join(MODELS_DIR, 'imagenette', 'improved.h5')
if not path.isfile(improved_model_file):
    os.makedirs(path.dirname(improved_model_file), exist_ok=True)
    
    # Define model architecture
    inputs = layers.Input(shape=IMG_SIZE + (3,))
    x = layers.Conv2D(32, (3, 3), activation='relu')(inputs)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(64, (3, 3), activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(128, (3, 3), activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Conv2D(128, (3, 3), activation='relu')(x)
    x = layers.BatchNormalization()(x)
    x = layers.MaxPooling2D(pool_size=(2, 2))(x)
    x = layers.Flatten()(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(512, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    output = layers.Dense(1, activation='sigmoid')(x)
    improved_model = keras.Model(inputs=inputs, outputs=output)
    improved_model.summary()

    # Train model
    epochs_count = 100
    train_gen = ImageDataGenerator( # TODO document choices
        rescale=COLOUR_SCALE,
        rotation_range=40,
        width_shift_range=0.2,
        height_shift_range=0.2,
        shear_range=0.2,
        zoom_range=0.2,
        horizontal_flip=True,
        fill_mode='nearest'
    )
    train_flow = create_flow(train_gen, TRAIN_DIR, BATCH_SIZE)
    val_flow = create_flow(ImageDataGenerator(rescale=COLOUR_SCALE), VALIDATION_DIR, BATCH_SIZE)
    save_best_cb = callbacks.ModelCheckpoint(filepath=improved_model_file,
                                             monitor='val_loss', mode='min', save_best_only=True,
                                             verbose=False) # set to True to see best model's epoch
    improved_model.compile(
        optimizers.RMSprop(lr=1e-4),
        'binary_crossentropy',
        metrics=[metrics.BinaryAccuracy()]
    )
    history = improved_model.fit(train_flow, steps_per_epoch=val_flow.samples // BATCH_SIZE,
                       validation_data=val_flow, validation_steps=val_flow.samples // BATCH_SIZE,
                       epochs=epochs_count,
                       callbacks=[save_best_cb],
                       verbose=False)
    plot_model_history(history)

# Evaluate against test dataset
evaluate_model(improved_model_file)

### Visualize intermediate activations

In [None]:
model = models.load_model(path.join(MODELS_DIR, 'imagenette', 'improved.h5'))

layer_outputs = []
for each in model.layers:
    if 'conv2d' in each.name:
        layer_outputs.append(each.output)
activation_model = models.Model(inputs=model.input, outputs=layer_outputs)

scaler = MinMaxScaler(feature_range=(0, 255)) # makes activation values ready for drawing
channels_per_row = 32
for c in CLASSES:
    img = load_random_image(path.join(TRAIN_DIR, c))
    plt.imshow(img)
    plt.show()

    img = np.expand_dims(img, axis=0) # array with a single image
    activations = activation_model.predict(img) 
    for layer_activation in activations:
        layer_channel_count = layer_activation.shape[-1]
        img_size = layer_activation.shape[1]
        row_count = layer_channel_count // channels_per_row
        display_grid = np.zeros((img_size * row_count, channels_per_row * img_size))
        for row in range(row_count):
            for col in range(channels_per_row):
                channel_image = layer_activation[0, # CNN layer has a single output tensor
                                                 :, :, # the activations for each output channel
                                                 row * channels_per_row + col] # row and offset in the display grid
                # scale the activations and populate in display grid
                channel_image = scaler.fit_transform(channel_image)
                display_grid[row * img_size : (row + 1) * img_size, col * img_size : (col + 1) * img_size] = channel_image
        
        # draw
        plt.figure(figsize=(display_grid.shape[1] / img_size, display_grid.shape[0] / img_size))
        plt.imshow(display_grid, aspect='auto', cmap='pink')
        plt.show()