In [None]:
!pip install optuna

import tensorflow as tf
import optuna

import os
import numpy as np
import pandas as pd
import pickle as pkl
import itertools
from datetime import datetime
import math


import warnings
warnings.filterwarnings("ignore")

In [24]:
# GOLBAL VARIABLES NEEDED TO RUN THE EXPERIMENT

# 1] ENTER DATA DIRECTORIES
base_dir  = "/content/drive/MyDrive/CIFAR_Dataset/CIFAR_10/Batch3_1+10K"
train_dir = base_dir+'/cifar10_train.npz'
val_dir   = base_dir+'/cifar10_val.npz'
test_dir  = base_dir+'/cifar10_test.npz'
# 2] ENTER DIRECTORY TO SAVE OPTUNA's TRAINED MODEL
OPTUNA_MODEL_DIRECTORY = "/content/drive/MyDrive/CIFAR_Dataset/CIFAR_10/cnn_optuna.pickle"

NUMBER_OF_DENSE_LAYERS = 3 # or 5; Includes the output layer.
TYPE_OF_SAMPLER = 'grid' #  'grid', 'random', 'qmc', 'tpe' 
NUM_OPTUNA_TRIALS = 10

#**OPTUNA STUDY**

##**Loading Data**

In [25]:
# Enter training, validation and testing dataset directories for CIFAR-10 datasets.
def preprocess_image_input(input_images): # Only used when resent50 is selected as model_type below
  input_images = input_images.astype('float32')
  output_ims = tf.keras.applications.resnet50.preprocess_input(input_images)
  return output_ims

def data_loader( train_dir, val_dir, test_dir, model_type ): # model_type : 'cnn' OR 'resnet50' ( Please specify only one of these in sttring format )

    train_dataset = np.load(train_dir)
    val_dataset   = np.load(val_dir)
    test_dataset  = np.load(test_dir)

    y_train = train_dataset['y_train'].astype("float32")
    y_val   = val_dataset['y_val'].astype("float32") 
    y_test  = test_dataset['y_test'].astype("float32") 

    x_train = train_dataset['x_train'].astype("float32")
    x_val   = val_dataset['x_val'].astype("float32")
    x_test  = test_dataset['x_test'].astype("float32") 

    if model_type == 'cnn':
        x_train, x_val, x_test = x_train/255, x_val/255, x_test/255
    elif model_type == 'resnet50':
        x_train = preprocess_image_input(x_train)
        x_val = preprocess_image_input(x_val)
        x_test = preprocess_image_input(x_test)
    elif model_type not in ['cnn', 'resnet50']:
        raise ValueError('Error: Please enter correct \'model_type\' variable value. Correct values are \'cnn\' or \'resnet50\' (strings).')
    return x_train, x_val, x_test, y_train, y_val, y_test

## ============================================  LOADING DATA  =====================================================
x_train, x_val, x_test, y_train, y_val, y_test = data_loader( train_dir, val_dir, test_dir, model_type='cnn' )

##**Custom Model Defining**

In [26]:
class CNN:
    def __init__(self, input_size, num_classes):
        self.input_size = input_size
        self.num_classes = num_classes
    
    def generate_model(self, layer_info = None ):
        model = tf.keras.Sequential()

        model.add(tf.keras.layers.Conv2D(32, (3, 3), padding='same', activation='relu', input_shape=(self.input_size, self.input_size, 3)))
        model.add(tf.keras.layers.BatchNormalization())

        model.add(tf.keras.layers.Conv2D(32, (3, 3), padding='same', activation='relu'))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
        model.add(tf.keras.layers.Dropout(0.25))

        model.add(tf.keras.layers.Conv2D(64, (3, 3), padding='same', activation='relu'))
        model.add(tf.keras.layers.BatchNormalization())
        model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
        model.add(tf.keras.layers.Dropout(0.25))

        model.add(tf.keras.layers.GlobalAveragePooling2D()) # add GlobalAveragePooling2D layer
        # model.add(tf.keras.layers.Flatten())

        if layer_info is not None:
            for layer in layer_info[:-1]:
                layer_params = layer['params']
                if layer['type'] == 'dense':
                    model.add(tf.keras.layers.Dense(**layer_params))
                    model.add(tf.keras.layers.BatchNormalization())
            for layer in layer_info[-1:]:
                layer_params = layer['params']
                if layer['type'] == 'dense':
                    model.add(tf.keras.layers.Dense(**layer_params))

        # Calculate the number of trainable parameters in the model
        trainable_count = sum(tf.keras.backend.count_params(weights) for weights in model.trainable_weights)
        print(f"Trainable parameters: {trainable_count:,}")

        return model

In [27]:
@tf.function
def loss_function_optuna( y_dataset, logits, loss ): # logits = model(x_dataset)
    total_loss = loss(y_dataset, logits)
    total_loss = tf.cast( total_loss, dtype=tf.float32 )
    return total_loss

In [28]:
def fmin_loss( model, loss_function, optimizer, batch_size , epochs, record = True ):      # lamda, not exp(lamda), Works with both tf.Variable and tf.constant type lambda input, (or just scalar)
    # total_loss0 = 1e20
    train_df = tf.data.Dataset.from_tensor_slices((x_train,y_train))
    train_df = train_df.shuffle(buffer_size = 1024).batch(batch_size)

    All_Epoch_Gradients, All_Epoch_Weights = [], []

    for epoch in range(epochs):

        weights0 = [var.numpy() for var in model.trainable_weights] # Getting only trainable weights at which the gradient is being calculated.
        # Note : model.get_weights() retrieves all the weights (including non-trainable)

        Step_Gradient, Num_batch = [], 0
        
        for step,(x_train_,y_train_) in enumerate(train_df):
            # print("Step == ", step)
            with tf.GradientTape(persistent = True) as tape:

                logits = model(x_train_, training=True)
                total_loss1 = loss_function_optuna( y_train_, logits, loss = loss_function ) 

            vars_list = model.trainable_weights
            grads = tape.gradient(total_loss1, vars_list)      # for ref  - https://www.tensorflow.org/tutorials/customization/custom_training_walkthrough 
            optimizer.apply_gradients(zip(grads,vars_list))

            if record : 
                if step == 0 : 
                    Step_Gradient = grads
                    # print( Step_Gradient )
                else:
                    for idx in range(len(Step_Gradient)):
                        Step_Gradient[idx] =  tf.add(Step_Gradient[idx], grads[idx])
            Num_batch = step
        
    # total_loss0   = total_loss1
        if record : 
            Step_Gradient = [ i/Num_batch for i in Step_Gradient ] 
            All_Epoch_Gradients.append( Step_Gradient )
            All_Epoch_Weights.append(weights0)
            
    if record :
        return All_Epoch_Gradients, All_Epoch_Weights
    else: 
        return 0, 0

####Note: Training and Local Tuning is done based on Loss and not the Accuracy.

In [29]:
def optuna_optimizer(trial):

    tf.keras.backend.clear_session()

    alphas = [ trial.suggest_float(f'regularization{i}', 1e-6, 1e-1, log=True) for i in range(NUMBER_OF_DENSE_LAYERS) ]

    # Define the new model
    input_shape = 32
    output_shape = 10

    layer_info_ = [ {'type': 'dense', 'params': {'units': 64, 'activation': 'relu', 'kernel_regularizer':tf.keras.regularizers.l2(i)}} for i in alphas[:-1]]
    layer_info_ += [ {'type': 'dense', 'params': {'units': output_shape, 'activation': 'softmax', 'kernel_regularizer':tf.keras.regularizers.l2(alphas[-1])}} ]

    model = CNN( input_shape, output_shape).generate_model( layer_info_ )
    
    # Optimizing
    optimizer         = tf.keras.optimizers.Adam()
    loss_function     = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    All_Epoch_Gradients, All_Epoch_Weights = fmin_loss( model, loss_function, optimizer,  128,  100, True)

    # Getting Scores
    cce = tf.keras.losses.SparseCategoricalCrossentropy()
    sca = tf.keras.metrics.SparseCategoricalAccuracy()

    y_pred_train = model.predict(x_train)
    train_loss_unregularized = cce(y_train, y_pred_train).numpy()
    train_acc = sca(y_train, y_pred_train).numpy()

    y_pred_val = model.predict(x_val)
    val_loss_unregularized = cce(y_val, y_pred_val).numpy()
    val_acc = sca(y_val, y_pred_val).numpy()

    y_pred_test = model.predict(x_test)
    test_loss_unregularized = cce(y_test, y_pred_test).numpy()
    test_acc = sca(y_test, y_pred_test).numpy()

    print("\nTraining:  Loss ", train_loss_unregularized, "  Accuracy", train_acc*100 )
    print("Validation: Loss ", val_loss_unregularized, "  Accuracy", val_acc*100 )
    print("Test:       Loss", test_loss_unregularized, "  Accuracy", test_acc*100, "\n\n")

    with open("{}.pickle".format(trial.number), "wb") as fout:
        pkl.dump(model, fout)

    with open("training_info_{}.pickle".format(trial.number), "wb") as fout:
        Dict_ = { "Gradients" : All_Epoch_Gradients, "Weights" : All_Epoch_Weights }
        pkl.dump(Dict_, fout)
    
    score = val_loss_unregularized
    return(score)


def optuna_training( num_trials ):

    time1 = datetime.now()

    if TYPE_OF_SAMPLER == 'grid': 
        search_space = { f"regularization{i}" : list(np.linspace(1e-6,1e-1,10)) for i in range(NUMBER_OF_DENSE_LAYERS) }
        study = optuna.create_study(sampler=optuna.samplers.GridSampler(search_space), direction = "minimize")
    elif TYPE_OF_SAMPLER == 'random': 
        study = optuna.create_study(sampler=optuna.samplers.RandomSampler(), direction = "minimize")
    elif TYPE_OF_SAMPLER == 'qmc': 
        study = optuna.create_study(sampler=optuna.samplers.QMCSampler(), direction = "minimize")
    elif TYPE_OF_SAMPLER == 'tpe': 
        study = optuna.create_study(sampler=optuna.samplers.TPESampler(), direction = "minimize")
    else:
        print("...Mention the correct sampler name...")

    study.optimize(optuna_optimizer, n_trials = num_trials)

    print('\n\n')
    trial = study.best_trial
    print("Best Score: ", trial.value)
    print("Best Params: ")
    for key, value in trial.params.items():
        print("  {}: {}".format(key, value))

    print( "\n\n ", "Trial Number: ", trial.number, "\n" )

    time2 = datetime.now()
    delta = time2 - time1
    print(f"Time difference is {delta.total_seconds()} seconds")

    # Loading Best OPTUNA model to get initial feasible weights for trainable layers
    with open("{}.pickle".format(trial.number), "rb") as fin:
        best_clf = pkl.load(fin)

    with open("training_info_{}.pickle".format(trial.number), "rb") as fin_:
        wt_grad = pkl.load(fin_)

    # Getting Optimal Model's Weights and Gradients values for Hessian Approximations
    weight_sets = wt_grad['Weights']
    grad_sets   = wt_grad['Gradients']

    # Getting Optimal Model's Weights and Hyperparameters
    full_weights_list          = best_clf.get_weights()
    trainable_weights_list     = best_clf.trainable_weights
    init_hyperparameters = [ tf.Variable(value) for key, value in trial.params.items() ]

    return trainable_weights_list, full_weights_list, init_hyperparameters, weight_sets, grad_sets, delta

In [None]:
trainable_weights_list, full_weights_list, init_hyperparameters, weight_sets, grad_sets, optuna_time = optuna_training(NUM_OPTUNA_TRIALS)

model__ = [ trainable_weights_list, full_weights_list, init_hyperparameters, weight_sets, grad_sets, optuna_time ]
# LOADING OPTUNA TRAINED MODELS
with open(OPTUNA_MODEL_DIRECTORY, "wb") as fout:
        pkl.dump(model__, fout)