In [2]:
%%time
# The below code will download the zipped folder in which the original training data has already been split into training and validation with almost equal representation among all classes, and unzip it in the current directory
!gdown --id 11SGStqp8Vug2GDzSpJDwQYHThLIjZFQn
!unzip -q inaturalist_12K.zip
!ls

gdrive	inaturalist_12K  inaturalist_12K.zip  sample_data
CPU times: user 2.61 s, sys: 430 ms, total: 3.04 s
Wall time: 8min 50s


In [3]:
# !pip install split-folders

In [4]:
# import splitfolders

# The code below was used for splitting the training data into training and validation set with equal representations from each class
# ---------------------------------- Uncomment below code to split folders once again / check ----------------------------------

# splitfolders.ratio('./inaturalist_12K/train', output='./inaturalist_12K', seed=1337, ratio=(.9, .1), group_prefix=None)

In [1]:
import os
def print_count_classes(data_path = './inaturalist_12K/val'):
  # Function to check if the images in validation and training set contain nearly equal images belonging to each class
  class_count_valid = {}
  for subdir, dirs, files in os.walk(data_path):
      for file in files:
        class_count_valid[subdir] = class_count_valid.get(subdir,0)+1

  print(f'In path {data_path} : {class_count_valid}')

# --------------------------- Uncomment below code to check count of images in each class in training and validation set ----------------------------

# print_count_classes('./inaturalist_12K/val')
# print_count_classes('./inaturalist_12K/train')

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Activation, Dense, Flatten, BatchNormalization, Conv2D, MaxPooling2D, AveragePooling2D, Dropout
from tensorflow.keras.optimizers import Adam, SGD, RMSprop, Nadam
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing import image_dataset_from_directory
from tensorflow.keras.callbacks import EarlyStopping

physical_devices = tf.config.experimental.list_physical_devices('GPU')
print("Num GPUs Available: ", len(physical_devices))

if len(physical_devices) > 0:
    tf.config.experimental.set_memory_growth(physical_devices[0], True)

Num GPUs Available:  0


In [2]:
from tensorflow.keras import regularizers

def build_model_partA(inp_img_shape, K_list, F_list, no_neurons_dense, no_classes = 10, pooling_list = ['max']*5, activation_fn_list = ['relu']*6, 
                      P_list = ['valid']*10, S_list = [1]*10, reg_list = ['none']*7, lambda_ = 0.001, BN_yes = False, dropout_p = 0):
    '''
    Function to build the model comprising (5 conv+relu+maxpooling layers + 1 dense FC layer) for part A in keras
    Arguments :
        inp_img_shape -- shape of input image
        K_list -- List of number of filters in each non FC layer
        F_list -- List of size of filters (assumed same dimension in width and height) in each non FC layer  
        no_neurons_dense -- Number of neurons in the dense FC layer
        no_classes -- Number of output classes in the classification problem
        pooling_list -- List of pooling layer option for each conv+pooling block ('max' : MaxPooling2D, 'avg': AveragePooling2D)
        activation_fn_list -- List of activation function in each convolution layer and the onne hidden FC layer
        P_list -- List of padding options in each non FC layer 
                  ('valid' : no padding, 'same' : padding to make input and output same dimensions)
        S_list -- List of strides (assumed equal in width and height) in each non FC layer
        reg_list -- List of regularization options for the convolution, one hidden FC and output layers ('none' : no regularization, 'L2' , 'L1')
        lambda_ -- weight decay hyperparameter for regularisation
        BN_yes -- True : Batch normalisation (BN) should be used, False : BN should not be used
        dropout_p -- Probability of dropping out a neuron
                     (The dropout is added for the single dense hidden layer alone after referring to many CNN architecture papers)

    Returns :
        model -- The keras sequential model of the CNN created
    '''
    get_regularization = {
        'none': None,
        'L1': regularizers.l1(lambda_),
        'L2': regularizers.l2(lambda_)
    }

    get_pooling_layer = {
        'max': MaxPooling2D,
        'avg': AveragePooling2D
    }

    model = Sequential()
    # First layer
    model.add(Conv2D(filters = K_list[0], kernel_size = (F_list[0], F_list[0]), strides = (S_list[0], S_list[0]), 
                     padding = P_list[0], input_shape = inp_img_shape, kernel_regularizer = get_regularization[reg_list[0]]))
    if BN_yes:
        model.add(BatchNormalization())
    model.add(Activation(activation_fn_list[0]))
    model.add(get_pooling_layer[pooling_list[0]](pool_size=(F_list[1], F_list[1]), strides = (S_list[1], S_list[1]), padding = P_list[1]))

    # 4 Conv-relu-MaxPooling layers
    for l in range(1, 5):
        model.add(Conv2D(filters = K_list[2*l], kernel_size = (F_list[2*l], F_list[2*l]), strides = (S_list[2*l], S_list[2*l]), 
                         padding = P_list[2*l], kernel_regularizer = get_regularization[reg_list[l]]))
        if BN_yes:
            model.add(BatchNormalization())
        model.add(Activation(activation_fn_list[l]))
        model.add(get_pooling_layer[pooling_list[l]](pool_size = (F_list[2*l+1], F_list[2*l+1]), strides = (S_list[2*l+1], S_list[2*l+1]), padding = P_list[2*l+1]))
    
    # 1 dense FC layer
    model.add(Flatten())
    model.add(Dropout(dropout_p))
    model.add(Dense(units = no_neurons_dense, kernel_regularizer = get_regularization[reg_list[5]]))
    if BN_yes:
        model.add(BatchNormalization())
    model.add(Activation(activation = activation_fn_list[5]))

    # Output layer
    model.add(Dense(units = no_classes, kernel_regularizer = get_regularization[reg_list[6]]))
    if BN_yes:
        model.add(BatchNormalization())
    model.add(Activation(activation = 'softmax'))

    return model
    

In [3]:
!pip install --upgrade wandb
!wandb login 6746f968d95eb71e281d6c7772a0469574430408

Collecting wandb
  Using cached wandb-0.10.25-py2.py3-none-any.whl (2.1 MB)
Collecting sentry-sdk>=0.4.0
  Using cached sentry_sdk-1.0.0-py2.py3-none-any.whl (131 kB)
Collecting Click>=7.0
  Using cached click-7.1.2-py2.py3-none-any.whl (82 kB)
Processing /Users/abishek_programming/Library/Caches/pip/wheels/3e/31/09/fa59cef12cdcfecc627b3d24273699f390e71828921b2cbba2/pathtools-0.1.2-py3-none-any.whl
Collecting docker-pycreds>=0.4.0
  Using cached docker_pycreds-0.4.0-py2.py3-none-any.whl (9.0 kB)
Processing /Users/abishek_programming/Library/Caches/pip/wheels/29/93/c6/762e359f8cb6a5b69c72235d798804cae523bbe41c2aa8333d/promise-2.3-py3-none-any.whl
Processing /Users/abishek_programming/Library/Caches/pip/wheels/50/ca/fa/8fca8d246e64f19488d07567547ddec8eb084e8c0d7a59226a/subprocess32-3.5.4-py3-none-any.whl
Collecting psutil>=5.0.0
  Using cached psutil-5.8.0-cp37-cp37m-macosx_10_9_x86_64.whl (236 kB)
Collecting shortuuid>=0.5.0
  Using cached shortuuid-1.0.1-py3-none-any.whl (7.5 kB)
Colle

In [4]:
from tensorflow.keras import layers

# Model for resizing and rescaling images
image_rescale = Sequential([
    layers.experimental.preprocessing.Rescaling(1./255)
])

# Model for performing random transformations for data augmentation
data_augmentation = Sequential([
    layers.experimental.preprocessing.RandomFlip("horizontal"),
    layers.experimental.preprocessing.RandomRotation(0.1),
    layers.experimental.preprocessing.RandomTranslation(0.2, 0.2),
    layers.experimental.preprocessing.RandomZoom(0.2, 0.2),
    layers.experimental.preprocessing.RandomContrast(0.2)
])

def prepare_data(data_path, inp_img_shape, batch_size, img_preprocess, data_augmentation, data_augment_yes = False, shuffle = True):
    # Function to generate image data after shuffling and forming batches, and applying data augmentation techniques to it randomly
    AUTOTUNE = tf.data.AUTOTUNE
    dataset = image_dataset_from_directory(
        data_path, labels='inferred', color_mode='rgb', batch_size=batch_size, image_size=inp_img_shape[:-1], shuffle=shuffle,
        seed=123, label_mode='categorical'
    )
    
    dataset = dataset.map(lambda x, y: (img_preprocess(x), y), num_parallel_calls=AUTOTUNE)

    # Use data augmentation only if data_augment_yes == True (Training set only requires data augmentation)
    if data_augment_yes:
        dataset = dataset.map(lambda x, y: (data_augmentation(x, training=True), y), num_parallel_calls=AUTOTUNE)

    # Use buffered prefecting on datasets
    return dataset.prefetch(buffer_size=AUTOTUNE)


def data_generator(inp_img_shape, batch_size, data_augment_yes = False, train_data_path = None, val_data_path = None, test_data_path = None):
    # Function to generate training, validation and test data
    train_data = None
    if train_data_path is not None:
        train_data = prepare_data(train_data_path, inp_img_shape, batch_size, image_rescale, data_augmentation, data_augment_yes)
    val_data = None
    if val_data_path is not None:
        val_data = prepare_data(val_data_path, inp_img_shape, batch_size, image_rescale, data_augmentation, False)
    test_data = None
    if test_data_path is not None:
        test_data = prepare_data(test_data_path, inp_img_shape, batch_size, image_rescale, data_augmentation, False)
    
    return train_data, val_data, test_data


In [5]:
import wandb
from wandb.keras import WandbCallback

In [6]:
def train_model(model, train_data, loss_function, optimizer = 'adam', learning_rate = 1e-3, epochs = 10, val_data = None):
    # Function to train the model using the mentioned optimizer, learning rate and epochs
    if optimizer == 'adam':
        model.compile(optimizer = Adam(learning_rate=learning_rate), loss = loss_function, metrics = ['accuracy'])
    elif optimizer == 'momentum':
        model.compile(optimizer = SGD(learning_rate=learning_rate, momentum = 0.9), loss = loss_function, metrics = ['accuracy'])
    elif optimizer == 'rmsprop':
        model.compile(optimizer = RMSprop(learning_rate=learning_rate), loss = loss_function, metrics = ['accuracy'])
    elif optimizer == 'nesterov':
        model.compile(optimizer = SGD(learning_rate=learning_rate, momentum = 0.9, nesterov = True), loss = loss_function, metrics = ['accuracy'])
    elif optimizer == 'nadam':
        model.compile(optimizer = Nadam(learning_rate=learning_rate), loss = loss_function, metrics = ['accuracy'])
    else:
        model.compile(optimizer = SGD(learning_rate=learning_rate), loss = loss_function, metrics = ['accuracy'])

    assert(val_data is not None)
    model.fit(train_data,
              epochs = epochs, 
              validation_data = val_data,
              verbose = 2,
              callbacks = [WandbCallback(monitor='val_accuracy'), EarlyStopping(monitor='val_accuracy', patience=5)])
    # Using validation accuracy as the metric to monitor as that is what is intended to be maximized
    return model


In [7]:
def get_klist(start1, factor):
    # Function to get list of number of filters in each convolution layer
    start = start1
    vals = []
    for i in range(5):
        vals.append(start)
        vals.append(start)
        start = max(int(start*factor), 1)
    return vals

In [8]:
def test_model(model, test_data):
    # Function to get test accuracy and loss for a model on a test data
    assert(test_data is not None)
    test_loss, test_accuracy = model.evaluate(test_data, use_multiprocessing = True, workers = 4)
    test_accuracy = round(test_accuracy*100, 2)
    test_loss = round(test_loss, 4)
    print(f'Test Accuracy : {test_accuracy} | Test Loss : {test_loss}')

    return test_loss, test_accuracy


In [15]:
def CNN_train(inp_img_shape, train_data_path, K_list, F_list, config, no_classes = 10, pooling_list = ['max']*5, activation_fn_list = ['relu']*6, 
              P_list = ['valid']*10, S_list = [1]*10, reg_list = ['none']*7, val_data_path = None, test_data_path = None, 
              wandb_init = True, load_run = None):
    '''
    Function to train, validate and test a CNN with specific hyperparameters (and architecture as mentioned in the question), 
    or test a CNN which was already trained.
    (NOTE : the function uses WANDB to log the best model and training metrics)
    
    Arguments :
        inp_img_shape -- (tuple) shape of input image
        train_data_path -- (string) the path to training data
        K_list -- (list) List of number of filters in each non FC layer
        F_list -- (list) List of size of filters (assumed same dimension in width and height) in each non FC layer 
        config -- (dictionary) contains all the hyperparameter and architectural configurations used for the model 
                  [refer to the config_1 in next cell to see what all it contains]
        no_classes -- (int) Number of output classes in the classification problem
        pooling_list -- (list) List of pooling layer option for each conv+pooling block ('max' : MaxPooling2D, 'avg': AveragePooling2D)
        activation_fn_list -- (list) List of activation function in each convolution layer and the onne hidden FC layer
        P_list -- (list) List of padding options in each non FC layer 
                  ('valid' : no padding, 'same' : padding to make input and output same dimensions)
        S_list -- (list) List of strides (assumed equal in width and height) in each non FC layer
        reg_list -- (list) List of regularization options for the convolution, one hidden FC and output layers ('none' : no regularization, 'L2' , 'L1')
        val_data_path -- (string) the path to validation data
        test_data_path -- (string) the path to test data
        wandb_init -- (bool) True : WANDB run has been initiated outside the function | False : WANDB run not initiated
        load_run -- (string) WANDB run ID to restore and use a previously trained model (None to create a new model)

    Returns :
        model -- (Keras Model object) the CNN model which was used for training and/or testing
        id -- (string) the unique run ID from WANDB
    '''
    id = ''
    if wandb_init:
        id = wandb.util.generate_id()
        run = wandb.init(id = id, project="assignment2", entity="abisheks", reinit=True, config=config)
        
    tf.keras.backend.clear_session()

    if load_run is None:
        model = build_model_partA(inp_img_shape, K_list, F_list, config['no_neurons_dense'], no_classes, pooling_list, activation_fn_list, 
                                  P_list, S_list, reg_list, config['weight_decay'], config['batch_normalization'], config['dropout'])
    else:
        api = wandb.Api()
        run_prev = api.run('abisheks/assignment2/'+load_run)
        prev_model_file = run_prev.file('model-best.h5').download(replace=True)
        model = tf.keras.models.load_model(prev_model_file.name)

    assert(train_data_path is not None)
    train_data, val_data, test_data = data_generator(inp_img_shape, config['batch_size'], config['data_augmented'], 
                                                     train_data_path, val_data_path, test_data_path)
    model = train_model(model, train_data, config['loss_function'], config['optimizer'], config['learning_rate'], config['epochs'], val_data)
    
    if test_data is not None:
        test_loss, test_accuracy = test_model(model, test_data)
        wandb.log({'test_accuracy': test_accuracy, 'test_loss': test_loss})

    if wandb_init:
        run.finish()

    return model, id


In [10]:
# Hyperparameters for building the model for Part-A
K_list_1 = [32, 32, 32, 32, 64, 64, 64, 64, 128, 128]           # List of number of filters in each non FC layer
F_list_1 = [3, 3, 3, 3, 3, 3, 3, 3, 3, 3]                       # List of size of filters in each non FC layer  
activation_fn_list_1 = ['relu']*6                               # List of activation function in each convolution and FC layer
P_list_1 = ['valid']*10                                         # List of padding options in each non FC layer ('valid' : no padding, 'same' : padding to make input and output same dimensions)
S_list_1 = [1, 2, 1, 2, 1, 2, 1, 2, 1, 1]                       # List of number of strides in each non FC layer
reg_list_1 = ['L2', 'L2', 'L2', 'L2', 'L2', 'L2', 'L2']         # List of regularization options for the convolution, one hidden FC and output layers ('none' : no regularization, 'L2' , 'L1')
inp_img_shape_1 = (227, 227, 3)                                 # Shape of input image from data
no_classes_1 = 10                                               # Number of output classes in the classification problem
pooling_list_1 = ['max']*4 + ['avg']                            # List of pooling layer option for each conv+pooling block ('max' : MaxPooling2D, 'avg': AveragePooling2D)

# The dictionary that stores all hyperparameter and architectural information about the model. Will be sent to wandb to describe the model in the run
config_1 = {
    "learning_rate": 1e-3,                                      # Hyperparameter for updating the parameters in gradient descent
    "epochs": 10,                                               # Number of epochs to train the model   
    "optimizer": 'nesterov',                                    # Gradient descent algorithm used for the parameter updation
    "batch_size": 64,                                           # Batch size used for the optimizer
    "loss_function": 'categorical_crossentropy',                # Loss function used in the optimizer
    "architecture": 'CNN',                                      # Type of neural network used
    "dataset": "iNaturalist_12K",                               # Name of dataset
    'no_filters': 32,                                           # Number of filters for the first convolution layer
    'filter_organization': 1,                                   # The factor by which the number of filters change in the subseqeuent convolution layers
    'no_neurons_dense': 64,                                     # Number of neurons in the dense FC layer
    'data_augmented': False,                                    # True : Data augmentation is done during training, False : No data augmentation done
    'dropout' : 0.2,                                            # Probability of dropping out a neuron in dropout technique
    'batch_normalization': True,                                # True : Batch normalisation (BN) should be used, False : BN should not be used
    'weight_decay': 0.01,                                       # weight decay hyperparameter for regularization
    'F_list': F_list_1,
    'activation_fn_list': activation_fn_list_1,
    'P_list': P_list_1,
    'S_list': S_list_1,
    'regularization_list': reg_list_1,
    'input_image_shape': inp_img_shape_1,
    'pooling_layer_list': pooling_list_1
}


# PART-A, Question 1 -- Building a model with (5 conv+relu+maxpooling layers + 1 dense FC layer) for image classification objective
# ---------------------------------- To test the CNN_train function uncomment the code below ----------------------------------

# modelA, _ = CNN_train(inp_img_shape_1, './inaturalist_12K/train', get_klist(32, 1.5), F_list_1, config_1, no_classes_1, 
#                       pooling_list_1, activation_fn_list_1, P_list_1, S_list_1, reg_list_1, './inaturalist_12K/val', './inaturalist_12K/test')

In [11]:
# Hyperparameter choices to sweep 
sweep_config = {
    'name': 'CNN',
    'method': 'bayes',                   # Possible search : grid, random, bayes
    'metric': {
      'name': 'val_accuracy',
      'goal': 'maximize'   
    },
    'parameters': {
        'no_filters': {
            'values': [32, 64]
        },
        'filter_organization': {
            'values': [1, 1.5, 2]
        },
        'data_augmented': {
            'values': [True, False]
        },
        'dropout' :{
            'values': [0, 0.25, 0.4]
        },
        'batch_normalization': {
            'values': [True, False]
        },
        'no_neurons_dense': {
            'values': [32, 64, 256]
        },
        'optimizer': {
            'values': ['adam', 'nesterov']
        },
        'weight_decay': {
            'values': [0.01, 0.001]
        }
    }
}

In [12]:
def sweep_wrapper(data_path = './inaturalist_12K'):
    # Wrapper function to call the CNN function for sweeping with different hyperparameters

    # Initialize a new wandb run
    run = wandb.init(config=config_1, reinit=True)

    # Config is a variable that holds and saves hyperparameters and inputs
    config = wandb.config

    wandb.run.name = f'nf_{config.no_filters}_fo_{config.filter_organization}_dr_{config.dropout}'
    wandb.run.name += '_da' if config.data_augmented else '' 
    wandb.run.name += '_bn' if config.batch_normalization else ''
    wandb.run.save()
    print(wandb.run.name)

    modelA, _ = CNN_train(inp_img_shape_1, f'{data_path}/train', get_klist(config.no_filters, config.filter_organization), F_list_1,
                          config, no_classes_1, pooling_list_1, activation_fn_list_1, P_list_1, S_list_1, reg_list_1, 
                          f'{data_path}/val', wandb_init = False)
    run.finish()

In [19]:
# PART-A, Question 2 -- Sweeping across different sets of hyperparameters
# ---------------------------------- To run the sweep uncomment the code below ----------------------------------

# sweep_id = wandb.sweep(sweep_config, entity="abisheks", project="assignment2")
# wandb.agent(sweep_id, lambda : sweep_wrapper())

In [13]:
import numpy as np

def deprocess_image(x):
    # Function to process the tensor to visualize it as an image
    # normalize tensor: center on 0., ensure std is 0.1
    x -= x.mean()
    x /= (x.std() + 1e-5)
    x *= 0.1

    # clip to [0, 1]
    x += 0.5
    
    x = np.clip(x, 0, 1)

    # convert to RGB array
    x *= 255
    if tf.keras.backend.image_data_format() == 'channels_first':
        x = x.transpose((1, 2, 0))
    x = np.clip(x, 0, 255).astype('uint8')
    return x

In [16]:
import yaml
import matplotlib.pyplot as plt
import glob
import random
from PIL import Image

def analyze_best_model(wandb_log = False):
    '''
    Function to evaluate the best model for Part A on test set, plot sample test results and visualize the filters and their outputs in first layer.
    Arguments :
        wandb_log -- (bool) True : log the plots and results in WANDB | False : do not log them to WANDB, visualize here alone
    Returns :
        -- None --
    '''
    best_run_path = 'abisheks/assignment2/huyhhsb4'
    api = wandb.Api()
    run = api.run(best_run_path)
    model_file = run.file('model-best.h5').download(replace=True)
    model = tf.keras.models.load_model(model_file.name)
    config_file = run.file('config.yaml').download(replace=True)
    with open(config_file.name, 'r') as file:
        config = yaml.safe_load(file)

    # Generate test_data and find the test accuracy of the model
    _, _, test_data = data_generator(config['input_image_shape']['value'], config['batch_size']['value'], test_data_path = './inaturalist_12K/test')
    test_loss, test_accuracy = test_model(model, test_data)
    if wandb_log:
        run = wandb.init(project="assignment2", entity="abisheks", reinit=True)
        wandb.log({'test_loss': test_loss, 'test_accuracy': test_accuracy})

    # ----------------------------------------------------------------

    # String labels for the different classes
    class_labels = ['Amphibia', 'Animalia', 'Arachnida', 'Aves', 'Fungi', 'Insecta', 'Mammalia', 'Mollusca', 'Plantae', 'Reptilia']
    class_labels = sorted(class_labels)

    # Choose 30 random test images to show the true and predicted class in a 10 x 3 grid
    random_sample_paths = random.sample(glob.glob('./inaturalist_12K/test/*/*'), 30)
    inp_samples = [np.array(Image.open(sample_path).resize(config['input_image_shape']['value'][:-1])) for sample_path in random_sample_paths]
    true_labels = [os.path.normpath(sample_path).split(os.path.sep)[-2] for sample_path in random_sample_paths]
    predicted_class_nos = np.argmax(model.predict(np.array(inp_samples)), axis=-1)
    predicted_labels = [class_labels[class_no] for class_no in predicted_class_nos]
    
    R, C = 10, 3
    fig, ax = plt.subplots(R, C, figsize=(15, 18))
    fig.suptitle("Model predictions for 30 sample images", fontsize='x-large')
    for i in range(R):
        for j in range(C):
            idx = i*C + j
            ax[i][j].imshow(Image.open(random_sample_paths[idx]).resize((500, 500)))
            ax[i][j].axis('off')
            ax[i][j].text(1.1, 0.7, "True : ", size='large', ha="left", va='center', color='black', transform=ax[i][j].transAxes)
            ax[i][j].text(1.45, 0.7, true_labels[idx], size='large', ha="left", va='center', color='blue', transform=ax[i][j].transAxes)
            pred_color = 'green' if true_labels[idx] == predicted_labels[idx] else 'red'
            ax[i][j].text(1.1, 0.3, "Predicted : ", size='large', ha="left", va='center', color='black', transform=ax[i][j].transAxes)
            ax[i][j].text(1.75, 0.3, predicted_labels[idx], size='large', ha="left", va='center', color=pred_color, transform=ax[i][j].transAxes)

    fig.tight_layout(rect=[0, 0.03, 1, 0.95])

    if wandb_log:
        wandb.log({'sample_predictions': fig})
    
    fig.show()

    # ----------------------------------------------------------------
 
    # Choose a random image from the test data and visualize the filters and output in first layer for that image
    random_image_path = random.sample(glob.glob('./inaturalist_12K/test/*/*'), 1)[0]
    rand_inp_image = np.array(Image.open(random_image_path).resize(config['input_image_shape']['value'][:-1]))
    
    # The first convolution layer of the model is named 'conv2d'
    layer = model.get_layer(name = 'conv2d')
    # Forming an intermediate keras model to get the output of the first layer in suitable format to visulaize it
    intermediate_model = tf.keras.Model(inputs=model.input, outputs=layer.output)
    layer_outputs = intermediate_model.predict(rand_inp_image.reshape(1, *rand_inp_image.shape))
    # Getting layer weights
    layer_weights = layer.weights

    fig_img = plt.figure()
    plt.title('Random Image used for visualizing filters', fontsize='x-large')
    plt.axis('off')
    plt.imshow(np.array(Image.open(random_image_path)))

    R, C = int(config['no_filters']['value'] / 8), 8
    fig1, ax1 = plt.subplots(R, C, figsize=(15, 15))
    fig2, ax2 = plt.subplots(R, C, figsize=(15, 15))
    fig1.suptitle(f"Visualizing output of {config['no_filters']['value']} filters in first layer", fontsize='x-large')
    fig2.suptitle(f"Visualizing {config['no_filters']['value']} filters in first layer", fontsize='x-large')
    for i in range(R):
        for j in range(C):
            idx = i*C + j
            ax1[i][j].set_title(f'Filter : {idx+1}')
            ax1[i][j].axis('off')
            ax2[i][j].set_title(f'Filter : {idx+1}')
            ax2[i][j].axis('off')
            ax1[i][j].imshow(deprocess_image(layer_outputs[0][:,:,idx]), cmap='viridis')
            ax2[i][j].imshow(deprocess_image(layer_weights[0].numpy()[:,:,:,idx]))
    
    fig1.tight_layout(rect=[0, 0.03, 1, 0.95])
    fig2.tight_layout(rect=[0, 0.03, 1, 0.95])
    if wandb_log:
        wandb.log({'random_image': wandb.Image(Image.open(random_image_path))})
        wandb.log({'filters': fig2})
        wandb.log({'filter_outputs': fig1})
        run.finish()

    fig1.show()
    fig2.show()

# PART-A, Question 4 -- Analyzing the best model for part A and visualize the filters in 1st layer
# ---------------------------------- To run the analyze function uncomment the code below ----------------------------------

# analyze_best_model()

In [17]:
def guided_backprop(wandb_log = False):
    '''
    Function to do guided backpropogation and visualize results for the CONV-5 layer.
    Arguments :
        wandb_log -- (bool) True : log the plots and results in WANDB | False : do not log them to WANDB, visualize here alone
    Returns :
        -- None --
    '''
    @tf.custom_gradient
    def guidedRelu(x):
        def grad(dy):
            return tf.cast(dy>0,"float32") * tf.cast(x>0, "float32") * dy
        return tf.nn.relu(x), grad

    best_run_path = 'abisheks/assignment2/huyhhsb4'
    api = wandb.Api()
    run = api.run(best_run_path)
    model_file = run.file('model-best.h5').download(replace=True)
    model = tf.keras.models.load_model(model_file.name)
    config_file = run.file('config.yaml').download(replace=True)
    with open(config_file.name, 'r') as file:
        config = yaml.safe_load(file)

    random_image_path = random.sample(glob.glob('./inaturalist_12K/test/*/*'), 1)[0]
    rand_inp_image = np.array(Image.open(random_image_path).resize(config['input_image_shape']['value'][:-1]))

    fig_img = plt.figure()
    plt.title('Random Image used for guided backpropogation', fontsize='x-large')
    plt.axis('off')
    plt.imshow(np.array(Image.open(random_image_path).resize(config['input_image_shape']['value'][:-1])))

    if wandb_log:
        run = wandb.init(project="assignment2", entity="abisheks", reinit=True)
        wandb.log({'random_image': wandb.Image(Image.open(random_image_path))})

    gb_model = tf.keras.Model(
        inputs = [model.inputs],
        outputs = [model.get_layer("conv2d_4").output]
    )
    output_shape = model.get_layer("conv2d_4").output.shape[1:]
    layer_dict = [layer for layer in gb_model.layers if hasattr(layer,'activation')]
    for layer in layer_dict:
        if layer.activation == tf.keras.activations.relu:
            layer.activation = guidedRelu
    
    R, C = 10, 1
    fig, ax = plt.subplots(R, C, figsize=(15, 28))
    fig.suptitle("Visualizing gradient for 10 neurons in CONV-5 layer", fontsize='x-large')
    for i in range(10):
        rand_neuron_index = [0] + [random.randint(0, dim_max-1) for dim_max in output_shape]

        mask_mat = np.zeros((1, *output_shape))
        mask_mat[rand_neuron_index[0], rand_neuron_index[1], rand_neuron_index[2], rand_neuron_index[3]] = 1

        with tf.GradientTape() as tape:
            inputs = tf.cast(rand_inp_image.reshape(1, *rand_inp_image.shape), tf.float32)
            tape.watch(inputs)
            outputs = gb_model(inputs) * mask_mat

        grad = tape.gradient(outputs,inputs)[0]

        ax[i].set_title(f'Neuron-{i+1} index : {tuple(rand_neuron_index[1:])}')
        ax[i].axis('off')
        ax[i].imshow(deprocess_image(np.array(grad)))
    
    fig.tight_layout(rect=[0, 0.03, 1, 0.95])
    if wandb_log:
        wandb.log({'guided_backprop_10_neurons_visualization': fig})

    fig.show()

    with tf.GradientTape() as tape:
        inputs = tf.cast(rand_inp_image.reshape(1, *rand_inp_image.shape), tf.float32)
        tape.watch(inputs)
        outputs = gb_model(inputs)

    grad = tape.gradient(outputs,inputs)[0]
    fig_whole = plt.figure()
    plt.title('Visualizing gradient for whole CONV-5 layer', fontsize='x-large')
    plt.axis('off')
    plt.imshow(deprocess_image(np.array(grad)))
    plt.show()

    if wandb_log:
        wandb.log({'guided_backprop_whole_visualization': fig_whole})
        run.finish()

# PART-A, Question 5 -- Guided backpropogation in CONV-5 layer
# ---------------------------------- To run the guided backprop function uncomment the code below ----------------------------------

# guided_backprop()