In [None]:
from __future__ import print_function
import keras
import tensorflow as tf
from keras.layers import Dense, Conv2D, BatchNormalization, Activation
from keras.layers import AveragePooling2D, Input, Flatten
from tensorflow.keras.optimizers import Adam
from keras.callbacks import ModelCheckpoint, LearningRateScheduler
from keras.callbacks import ReduceLROnPlateau
from keras.preprocessing.image import ImageDataGenerator
from keras.regularizers import l2
from keras import backend as K
from keras.models import Model
from keras.datasets import cifar100
from keras.utils.np_utils import to_categorical
import numpy as np
import os
from keras.layers import Conv2D,Conv1D,Reshape,MaxPooling2D,Multiply,GlobalAveragePooling2D,Multiply,Lambda, Activation,GlobalMaxPooling2D,Dense,Input

In [None]:

# Training parameters
batch_size = 32  # orig paper trained all networks with batch_size=128
epochs = 200
data_augmentation = True
num_classes = 100

# Subtracting pixel mean improves accuracy
subtract_pixel_mean = True


n = 9 # For resnet56

# Model version
# Orig paper: version = 1 (ResNet v1), Improved ResNet: version = 2 (ResNet v2)
version = 1

# Computed depth from supplied model parameter n
if version == 1:
    depth = n * 6 + 2
elif version == 2:
    depth = n * 9 + 2

# Model name, depth and version
model_type = 'ResNet%dv%d' % (depth, version)

# Load the CIFAR10 data.
(x_train, y_train), (x_test, y_test) = cifar100.load_data()

# Input image dimensions.
input_shape = x_train.shape[1:]

# Normalize data.
x_train = x_train.astype('float32') / 255
x_test = x_test.astype('float32') / 255

# If subtract pixel mean is enabled
if subtract_pixel_mean:
    x_train_mean = np.mean(x_train, axis=0)
    x_train -= x_train_mean
    x_test -= x_train_mean

print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')
print('y_train shape:', y_train.shape)

# Convert class vectors to binary class matrices.
y_train = to_categorical(y_train, num_classes)
y_test = to_categorical(y_test, num_classes)


def lr_schedule(epoch):
    """Learning Rate Schedule

    Learning rate is scheduled to be reduced after 80, 120, 160, 180 epochs.
    Called automatically every epoch as part of callbacks during training.

    # Arguments
        epoch (int): The number of epochs

    # Returns
        lr (float32): learning rate
    """
    lr = 1e-3
    if epoch > 180:
        lr *= 0.5e-3
    elif epoch > 160:
        lr *= 1e-3
    elif epoch > 120:
        lr *= 1e-2
    elif epoch > 80:
        lr *= 1e-1
    print('Learning rate: ', lr)
    return lr


def resnet_layer(inputs,
                 num_filters=16,
                 kernel_size=3,
                 strides=1,
                 activation='relu',
                 batch_normalization=True,
                 conv_first=True):
    """2D Convolution-Batch Normalization-Activation stack builder

    # Arguments
        inputs (tensor): input tensor from input image or previous layer
        num_filters (int): Conv2D number of filters
        kernel_size (int): Conv2D square kernel dimensions
        strides (int): Conv2D square stride dimensions
        activation (string): activation name
        batch_normalization (bool): whether to include batch normalization
        conv_first (bool): conv-bn-activation (True) or
            bn-activation-conv (False)

    # Returns
        x (tensor): tensor as input to the next layer
    """
    conv = Conv2D(num_filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding='same',
                  kernel_initializer='he_normal',
                  kernel_regularizer=l2(1e-4))

    x = inputs
    if conv_first:
        x = conv(x)
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
    else:
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
        x = conv(x)
    return x


def resnet_v1(input_shape, depth, num_classes=100):
    """ResNet Version 1 Model builder [a]

    Stacks of 2 x (3 x 3) Conv2D-BN-ReLU
    Last ReLU is after the shortcut connection.
    At the beginning of each stage, the feature map size is halved (downsampled)
    by a convolutional layer with strides=2, while the number of filters is
    doubled. Within each stage, the layers have the same number filters and the
    same number of filters.
    Features maps sizes:
    stage 0: 32x32, 16
    stage 1: 16x16, 32
    stage 2:  8x8,  64
    The Number of parameters is approx the same as Table 6 of [a]:
    ResNet20 0.27M
    ResNet32 0.46M
    ResNet44 0.66M
    ResNet56 0.85M
    ResNet110 1.7M

    # Arguments
        input_shape (tensor): shape of input image tensor
        depth (int): number of core convolutional layers
        num_classes (int): number of classes (CIFAR10 has 10)

    # Returns
        model (Model): Keras model instance
    """
    if (depth - 2) % 6 != 0:
        raise ValueError('depth should be 6n+2 (eg 20, 32, 44 in [a])')
    # Start model definition.
    num_filters = 16
    num_res_blocks = int((depth - 2) / 6)

    inputs = Input(shape=input_shape)
    x = resnet_layer(inputs=inputs)
    # Instantiate the stack of residual units
    for stack in range(3):
        for res_block in range(num_res_blocks):
            strides = 1
            if stack > 0 and res_block == 0:  # first layer but not first stack
                strides = 2  # downsample
            y = resnet_layer(inputs=x,
                             num_filters=num_filters,
                             strides=strides)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters,
                             activation=None)
            if stack > 0 and res_block == 0:  # first layer but not first stack
                # linear projection residual shortcut connection to match
                # changed dims
                x = resnet_layer(inputs=x,
                                 num_filters=num_filters,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
            x = keras.layers.add([x, y])
            x = Activation('relu')(x)
        num_filters *= 2

    # Add classifier on top.
    # v1 does not use BN after last shortcut connection-ReLU
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes,
                    activation='softmax',
                    kernel_initializer='he_normal')(y)

    # Instantiate model.
    model = Model(inputs=inputs, outputs=outputs)
    return model


def resnet_v2(input_shape, depth, num_classes=10):
    """ResNet Version 2 Model builder [b]

    Stacks of (1 x 1)-(3 x 3)-(1 x 1) BN-ReLU-Conv2D or also known as
    bottleneck layer
    First shortcut connection per layer is 1 x 1 Conv2D.
    Second and onwards shortcut connection is identity.
    At the beginning of each stage, the feature map size is halved (downsampled)
    by a convolutional layer with strides=2, while the number of filter maps is
    doubled. Within each stage, the layers have the same number filters and the
    same filter map sizes.
    Features maps sizes:
    conv1  : 32x32,  16
    stage 0: 32x32,  64
    stage 1: 16x16, 128
    stage 2:  8x8,  256

    # Arguments
        input_shape (tensor): shape of input image tensor
        depth (int): number of core convolutional layers
        num_classes (int): number of classes (CIFAR10 has 10)

    # Returns
        model (Model): Keras model instance
    """
    if (depth - 2) % 9 != 0:
        raise ValueError('depth should be 9n+2 (eg 56 or 110 in [b])')
    # Start model definition.
    num_filters_in = 16
    num_res_blocks = int((depth - 2) / 9)

    inputs = Input(shape=input_shape)
    # v2 performs Conv2D with BN-ReLU on input before splitting into 2 paths
    x = resnet_layer(inputs=inputs,
                     num_filters=num_filters_in,
                     conv_first=True)

    # Instantiate the stack of residual units
    for stage in range(3):
        for res_block in range(num_res_blocks):
            activation = 'relu'
            batch_normalization = True
            strides = 1
            if stage == 0:
                num_filters_out = num_filters_in * 4
                if res_block == 0:  # first layer and first stage
                    activation = None
                    batch_normalization = False
            else:
                num_filters_out = num_filters_in * 2
                if res_block == 0:  # first layer but not first stage
                    strides = 2    # downsample

            # bottleneck residual unit
            y = resnet_layer(inputs=x,
                             num_filters=num_filters_in,
                             kernel_size=1,
                             strides=strides,
                             activation=activation,
                             batch_normalization=batch_normalization,
                             conv_first=False)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters_in,
                             conv_first=False)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters_out,
                             kernel_size=1,
                             conv_first=False)
            if res_block == 0:
                # linear projection residual shortcut connection to match
                # changed dims
                x = resnet_layer(inputs=x,
                                 num_filters=num_filters_out,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
            x = keras.layers.add([x, y])

        num_filters_in = num_filters_out

    # Add classifier on top.
    # v2 has BN-ReLU before Pooling
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes,
                    activation='softmax',
                    kernel_initializer='he_normal')(y)

    # Instantiate model.
    model = Model(inputs=inputs, outputs=outputs)
    return model


if version == 2:
    model = resnet_v2(input_shape=input_shape, depth=depth)
else:
    model = resnet_v1(input_shape=input_shape, depth=depth)

model.compile(loss='categorical_crossentropy',
              optimizer=Adam(lr=lr_schedule(0)),
              metrics=['accuracy'])
model.summary()
print(model_type)

# Prepare model model saving directory.
save_dir = os.path.join(os.getcwd(), 'saved_models')
model_name = 'cifar10_%s_model.{epoch:03d}.h5' % model_type
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)
filepath = os.path.join(save_dir, model_name)

# Prepare callbacks for model saving and for learning rate adjustment.
checkpoint = ModelCheckpoint(filepath=filepath,
                             monitor='val_acc',
                             verbose=1,
                             save_best_only=True)

lr_scheduler = LearningRateScheduler(lr_schedule)

lr_reducer = ReduceLROnPlateau(factor=np.sqrt(0.1),
                               cooldown=0,
                               patience=5,
                               min_lr=0.5e-6)

callbacks = [checkpoint, lr_reducer, lr_scheduler]

# Run training, with or without data augmentation.
if not data_augmentation:
    print('Not using data augmentation.')
    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=epochs,
              validation_data=(x_test, y_test),
              shuffle=True,
              callbacks=callbacks)
else:
    print('Using real-time data augmentation.')
    # This will do preprocessing and realtime data augmentation:
    datagen = ImageDataGenerator(
        # set input mean to 0 over the dataset
        featurewise_center=False,
        # set each sample mean to 0
        samplewise_center=False,
        # divide inputs by std of dataset
        featurewise_std_normalization=False,
        # divide each input by its std
        samplewise_std_normalization=False,
        # apply ZCA whitening
        zca_whitening=False,
        # epsilon for ZCA whitening
        zca_epsilon=1e-06,
        # randomly rotate images in the range (deg 0 to 180)
        rotation_range=0,
        # randomly shift images horizontally
        width_shift_range=0.1,
        # randomly shift images vertically
        height_shift_range=0.1,
        # set range for random shear
        shear_range=0.,
        # set range for random zoom
        zoom_range=0.,
        # set range for random channel shifts
        channel_shift_range=0.,
        # set mode for filling points outside the input boundaries
        fill_mode='nearest',
        # value used for fill_mode = "constant"
        cval=0.,
        # randomly flip images
        horizontal_flip=True,
        # randomly flip images
        vertical_flip=False,
        # set rescaling factor (applied before any other transformation)
        rescale=None,
        # set function that will be applied on each input
        preprocessing_function=None,
        # image data format, either "channels_first" or "channels_last"
        data_format=None,
        # fraction of images reserved for validation (strictly between 0 and 1)
        validation_split=0.0)

    # Compute quantities required for featurewise normalization
    # (std, mean, and principal components if ZCA whitening is applied).
    datagen.fit(x_train)

    # Fit the model on the batches generated by datagen.flow().
    #model.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size),
                       # validation_data=(x_test, y_test),
                       # epochs=epochs, verbose=1, workers=4,
                        #callbacks=callbacks)

# Score trained model.
#scores = model.evaluate(x_test, y_test, verbose=1)
#print('Test loss:', scores[0])
#print('Test accuracy:', scores[1])

In [None]:
scores = model.evaluate(x_test, y_test, verbose=1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

In [None]:
model.save_weights('ResNet56_100.ckpt')

In [None]:
model.load_weights('ResNet56_100.ckpt')

# Latency Inference

In [None]:
import time
# Prepare a sample input image for inference (adjust shape as needed)
input_image = x_test[0:1]  # Taking the first image for inference

# Warm-up the model (optional)
model.predict(input_image)

# Measure inference latency
n = 100  # Number of repetitions
total_time = 0

for _ in range(n):
    start_time = time.time()
    model.predict(input_image)
    end_time = time.time()
    total_time += (end_time - start_time)

average_latency = (total_time / n) *1000
print("Average Latency:", average_latency, "Milli seconds")


# LP

In [None]:
import tensorflow_addons as tfa
from tensorflow_addons.layers import GroupNormalization
import tensorflow as tf


In [None]:
class GlobalStdPooling(tf.keras.layers.Layer):
    def call(self, inputs):
        # Calculate global average pooling
        mean = tf.reduce_mean(inputs, axis=[1, 2], keepdims=True)
        #max = tf.reduce_max(inputs, axis=[1, 2], keepdims=True) 
        #print('mean:')
        #tf.print(max)
        # Subtract the mean from each channel
        centered_inputs = inputs - mean

        # Square the centered differences
        squared_diff = tf.square(centered_inputs)
        #print("squared_diff:")
        #print(  squared_diff.shape)
        # Calculate the standard deviation
        std_dev = (tf.sqrt(tf.reduce_mean(squared_diff, axis=[1, 2], keepdims=True)+1e-5)) #epsilon=1e-5
        #print('std:')
        #tf.print(std_dev)
        # Flatten the std_dev tensor
        flattened_mean = tf.keras.layers.Flatten()( mean )
        channel_std = tf.keras.layers.Flatten()(std_dev)
        #tf.print(flattened_mean)
        #tf.print(flattened_mean-2 )

        return  channel_std

    
def cbam_attention(inputs, ratio=16):
    # Create ECA block
    x=inputs
    p = GlobalAveragePooling2D()(x) 
    q = GlobalStdPooling()(x) 
    x1=p+q
    #x1=Reshape((x.shape[-1],1))(x1)
    #x1 = Dense(units=x.shape[-1] // 16, activation='relu')(x1)
    #x1=Conv1D(filters=1,kernel_size=3,padding='same',activation='sigmoid')(x1)
    #x1 = Dense(units=x.shape[-1], activation='relu')(x1)
    #x1=Reshape((x.shape[-1],))(x1)
    x1=GroupNormalization(groups=4)(x1)

    x2 = GlobalMaxPooling2D()(x)
    #x2 = GlobalStdPooling()(x)
    #x2=m+n
    #x2=Reshape((x.shape[-1],1))(x2) 
    #x2=Conv1D(filters=1,kernel_size=3,padding='same',activation='sigmoid')(x2)
    #x2 = Dense(units=x.shape[-1] // 16, activation='relu')(x2)
    #x2 = Dense(units=x.shape[-1], activation='relu')(x2)
    #x2=Reshape((x.shape[-1],))(x2)
    #x2=GroupNormalization(groups=4)(x2)
    #x2 = GlobalStdPooling()(x)
    x2=GroupNormalization(groups=4)(x2)


    features=x1+x2
    features=Activation("sigmoid")(features)

    excitation = Reshape((1, 1, x.shape[-1]))(features)
    scale = Multiply()([x, excitation]) 
    return scale

In [None]:
def cbam_resnet_v1(input_shape, depth, num_classes=100):
    """ResNet Version 1 Model builder [a]

    Stacks of 2 x (3 x 3) Conv2D-BN-ReLU
    Last ReLU is after the shortcut connection.
    At the beginning of each stage, the feature map size is halved (downsampled)
    by a convolutional layer with strides=2, while the number of filters is
    doubled. Within each stage, the layers have the same number filters and the
    same number of filters.
    Features maps sizes:
    stage 0: 32x32, 16
    stage 1: 16x16, 32
    stage 2:  8x8,  64
    The Number of parameters is approx the same as Table 6 of [a]:
    ResNet20 0.27M
    ResNet32 0.46M
    ResNet44 0.66M
    ResNet56 0.85M
    ResNet110 1.7M

    # Arguments
        input_shape (tensor): shape of input image tensor
        depth (int): number of core convolutional layers
        num_classes (int): number of classes (CIFAR10 has 10)

    # Returns
        model (Model): Keras model instance
    """
    if (depth - 2) % 6 != 0:
        raise ValueError('depth should be 6n+2 (eg 20, 32, 44 in [a])')
    # Start model definition.
    num_filters = 16
    num_res_blocks = int((depth - 2) / 6)

    inputs = Input(shape=input_shape)
    x = resnet_layer(inputs=inputs)
    # Instantiate the stack of residual units
    
    for stack in range(3):
        for res_block in range(num_res_blocks):
            strides = 1
            if stack > 0 and res_block == 0:  # first layer but not first stack
                strides = 2  # downsample
            y = resnet_layer(inputs=x,
                             num_filters=num_filters,
                             strides=strides)
            y = resnet_layer(inputs=y,
                             num_filters=num_filters,
                             activation=None)
            y=cbam_attention(y)
            
            if stack > 0 and res_block == 0:  # first layer but not first stack
                # linear projection residual shortcut connection to match
                # changed dims
                x = resnet_layer(inputs=x,
                                 num_filters=num_filters,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
            x = keras.layers.add([x, y])
            x = Activation('relu')(x)
        num_filters *= 2

    # Add classifier on top.
    # v1 does not use BN after last shortcut connection-ReLU
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes,
                    activation='softmax',
                    kernel_initializer='he_normal')(y)

    # Instantiate model.
    cbam_model = Model(inputs=inputs, outputs=outputs)
    return cbam_model

cbam_model = cbam_resnet_v1(input_shape=input_shape, depth=depth)

cbam_model.summary()
print(model_type)

# Prepare model model saving directory.
save_dir = os.path.join(os.getcwd(), 'saved_models')
model_name = 'cifar10_%s_model.{epoch:03d}.h5' % model_type
if not os.path.isdir(save_dir):
    os.makedirs(save_dir)
filepath = os.path.join(save_dir, model_name)

# Prepare callbacks for model saving and for learning rate adjustment.
checkpoint = ModelCheckpoint(filepath=filepath,
                             monitor='val_acc',
                             verbose=1,
                             save_best_only=True)

lr_scheduler = LearningRateScheduler(lr_schedule)

lr_reducer = ReduceLROnPlateau(factor=np.sqrt(0.1),
                               cooldown=0,
                               patience=5,
                               min_lr=0.5e-6)

callbacks = [checkpoint, lr_reducer, lr_scheduler]

def lr_schedule(epoch):
    """Learning Rate Schedule

    Learning rate is scheduled to be reduced after 80, 120, 160, 180 epochs.
    Called automatically every epoch as part of callbacks during training.

    # Arguments
        epoch (int): The number of epochs

    # Returns
        lr (float32): learning rate
    """
    lr = 1e-3
    if epoch > 90:
        lr *= 0.5e-3
    elif epoch > 80:
        lr *= 1e-3
    elif epoch > 60:
        lr *= 1e-2
    elif epoch > 40:
        lr *= 1e-1
    print('Learning rate: ', lr)
    return lr

cbam_model.compile(loss='categorical_crossentropy',
              optimizer=Adam(lr=lr_schedule(0)),
              metrics=['accuracy'])

In [None]:
model_conv_count=0  # copy conv layer weight from old odel to attention model 
for layer in model.layers:
    if isinstance(layer, Conv2D):
        model_conv_count+=1
        cbam_model_conv_count=0
        for cbam_layer in cbam_model.layers:
            if isinstance(cbam_layer, Conv2D): 
                cbam_model_conv_count+=1
                if(model_conv_count==cbam_model_conv_count):
                    cbam_layer.set_weights(layer.get_weights())

#copy last dense layer weight
cbam_model.layers[-1].set_weights(model.layers[-1].get_weights())                   

In [None]:
# Fit the model on the batches generated by datagen.flow().
cbam_model.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size),
                    validation_data=(x_test, y_test),
                    epochs=100, verbose=1, workers=4,
                    callbacks=callbacks)

# Score trained model.
scores = cbam_model.evaluate(x_test, y_test, verbose=1)
print('Test loss:', scores[0])
print('Test accuracy:', scores[1])

In [None]:
def activation_average_cbam(cbam_model):

    activation_average=[]
    for layer in cbam_model.layers:
        if isinstance(layer, tf.keras.layers.Reshape):
        #if isinstance(layer, tf.keras.layers.Reshape) and (isinstance(cbam_model.layers[cbam_model.layers.index(layer)+1],Multiply)):
            print(f'{layer.name} Done')
            test_data=x_train[:1000] #only pick those sample which are correctly classified
            true_labels =y_train[:1000]
            predictions = cbam_model.predict(test_data)
            predicted_classes = np.argmax(predictions, axis=1)
            true_labels = np.argmax(true_labels, axis=1)
            correct_indices = np.where(predicted_classes == true_labels)[0]
            correct_test_data = test_data[correct_indices]

            # calculating activation for filter pruning

            inputs=cbam_model.input
            outputs=layer.output

            activation_model=tf.keras.Model(inputs=inputs,outputs=outputs) # create new block for activation calculation
            activation_SE=activation_model.predict(correct_test_data)

            # calculate the average activation from the SE block, reshape layer is used so we squeeze the shape

            acti_average=np.average(np.squeeze(activation_SE),axis=0)
            activation_average.append(acti_average)
    return activation_average 

activation_average=activation_average_cbam(cbam_model)
#print(activation_average)

In [None]:
import pickle
# Load the data from the file
with open('res56activation_average_cifar100_layerpruned.pkl', "rb") as file:
    activation_average = pickle.load(file)

In [None]:
# Calculate the percentage of values greater than 0.5 for each array
percentage_list = []

data_list=activation_average
for data in data_list:
    mean_val = np.mean(data)
    num_values_greater_than_0_5 = np.sum(data > (1.2*mean_val))
    total_values = len(data)
    percentage = (num_values_greater_than_0_5 / total_values) * 100
    percentage_list.append(percentage)
#percentage_list[0]=100
percentage_list[9]=100  #downsampling priority
percentage_list[18]=100 #downsampling priority

    # Resulting list of percentages
for percentage in percentage_list:
    print(f"Percentage of values over mean: {percentage:.2f}%")

#local block ranking

# Slice the list to create three groups
group1 =np.argsort(percentage_list[0:9])[5:]
group2 = np.argsort(percentage_list[9:18])[5:]+9
group3 =np.argsort(percentage_list[18:27])[5:]+18

print(f'group1:{group1}') 
print(f'group2:{group2}')
print(f'group3:{group3}')
conv_layer_kept_index=[]
conv_layer_kept_index.extend(group1)
conv_layer_kept_index.extend(group2)
conv_layer_kept_index.extend(group3)

conv_layer_kept_index=np.sort(conv_layer_kept_index)

print(conv_layer_kept_index)

In [None]:
#Global Block Ranking:
#0 5 10 15 20 24
layer_pruned_index=np.argsort(percentage_list)[12:]
print(layer_pruned_index)
conv_layer_kept_index=np.sort(np.argsort(percentage_list)[12:])
print(conv_layer_kept_index)

# FP

In [None]:
def resnet_layer(inputs,
                 num_filters,
                 kernel_size=3,
                 strides=1,
                 activation='relu',
                 batch_normalization=True,
                 conv_first=True):
    """2D Convolution-Batch Normalization-Activation stack builder

    # Arguments
        inputs (tensor): input tensor from input image or previous layer
        num_filters (int): Conv2D number of filters
        kernel_size (int): Conv2D square kernel dimensions
        strides (int): Conv2D square stride dimensions
        activation (string): activation name
        batch_normalization (bool): whether to include batch normalization
        conv_first (bool): conv-bn-activation (True) or
            bn-activation-conv (False)

    # Returns
        x (tensor): tensor as input to the next layer
    """
    conv = Conv2D(num_filters,
                  kernel_size=kernel_size,
                  strides=strides,
                  padding='same',
                  kernel_initializer='he_normal',
                  kernel_regularizer=l2(1e-4))

    x = inputs
    if conv_first:
        x = conv(x)
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
    else:
        if batch_normalization:
            x = BatchNormalization()(x)
        if activation is not None:
            x = Activation(activation)(x)
        x = conv(x)
    return x


def filter_pruned_model(input_shape,remain_filter_in_stage): 
    """ResNet Version 1 Model builder [a]

    Stacks of 2 x (3 x 3) Conv2D-BN-ReLU
    Last ReLU is after the shortcut connection.
    At the beginning of each stage, the feature map size is halved (downsampled)
    by a convolutional layer with strides=2, while the number of filters is
    doubled. Within each stage, the layers have the same number filters and the
    same number of filters.
    Features maps sizes:
    stage 0: 32x32, 16
    stage 1: 16x16, 32
    stage 2:  8x8,  64
    The Number of parameters is approx the same as Table 6 of [a]:
    ResNet20 0.27M
    ResNet32 0.46M
    ResNet44 0.66M
    ResNet56 0.85M
    ResNet110 1.7M

    # Arguments
        input_shape (tensor): shape of input image tensor
        depth (int): number of core convolutional layers
        num_classes (int): number of classes (CIFAR10 has 10)

    # Returns
        model (Model): Keras model instance
    """
    depth=56 
    num_classes=100
    
    if (depth - 2) % 6 != 0:
        raise ValueError('depth should be 6n+2 (eg 20, 32, 44 in [a])')
        
    num_res_blocks = int((depth - 2) / 6)
    
    # Start model definition.
    num_filters = remain_filter_in_stage[0]
    
    inputs = Input(shape=input_shape)
    
    x = resnet_layer(inputs=inputs,num_filters=num_filters)
    
    # Instantiate the stack of residual units
    
    for stack in range(3):
        
        if stack==1: # stage 2
            num_filters = remain_filter_in_stage[1]
        if stack==2: # stage 2
            num_filters = remain_filter_in_stage[2]
            
        for res_block in range(num_res_blocks):
            strides = 1
            if stack > 0 and res_block == 0:  # first layer but not first stack
                strides = 2  # downsample
            y = resnet_layer(inputs=x,
                             num_filters=num_filters,
                             strides=strides)
            
            
            y = resnet_layer(inputs=y,
                             num_filters=num_filters,
                             activation=None)
            
            
            if stack > 0 and res_block == 0:  # first layer but not first stack
                # linear projection residual shortcut connection to match
                # changed dims
                x = resnet_layer(inputs=x,
                                 num_filters=num_filters,
                                 kernel_size=1,
                                 strides=strides,
                                 activation=None,
                                 batch_normalization=False)
                 
            x = keras.layers.add([x, y])
            x = Activation('relu')(x)
        
        

    # Add classifier on top.
    # v1 does not use BN after last shortcut connection-ReLU
    x = AveragePooling2D(pool_size=8)(x)
    y = Flatten()(x)
    outputs = Dense(num_classes,
                    activation='softmax',
                    kernel_initializer='he_normal')(y)

    # Instantiate model.
    pruned_model = Model(inputs=inputs, outputs=outputs) 
    return pruned_model 




In [None]:
def filter_pruning_using_activation_cbam(model, activation_average,remain_filter_in_stage):
    activation_average_index=0
    Conv2D_count=0
    for layer in model.layers: # except 1st conv
        if isinstance(layer, tf.keras.layers.Conv2D):
            #print(layer.name)
            weights = layer.get_weights()  # Get the weights of the layer
            #filter pruning by observing activation
            filter_activation_pruning=activation_average[activation_average_index]
            #print(activation_average_index)
            activation_average_index=activation_average_index+1   # Compute the L1-norm for each filter
            #resnet-56 block identification
            Conv2D_count=Conv2D_count+1
            if Conv2D_count<=19: #stage 1 & 1st conv
                num_prune=16-remain_filter_in_stage[0]
            elif Conv2D_count>19 and Conv2D_count<=38:#stage 2
                num_prune=32-remain_filter_in_stage[1]
            elif Conv2D_count>38:#stage 3
                num_prune=64-remain_filter_in_stage[2]
                
            #num_prune = int(prune_percent * 0.01 * len(filter_activation_pruning))  # Determine the number of filters to prune
            
            
            prune_indices = np.argsort(filter_activation_pruning)[:num_prune]  # Get the indices of the filters to prune
            pruned_weights = weights[0].copy()  # Create a copy of the weights
            pruned_weights[:, :, :, prune_indices] = 0  # Prune the filters by setting their weights to zero
            layer.set_weights([pruned_weights, weights[1]])  # Set the new weights for the Conv2D layer
    return model

In [None]:
remain_filter_in_stage=[12,24,36]


model.load_weights('ResNet56_100.ckpt')
   
model=filter_pruning_using_activation_cbam(model,activation_average, remain_filter_in_stage)


pruned_model = filter_pruned_model(input_shape,remain_filter_in_stage)

pruned_model.summary()

# weight copy

In [None]:
#before resnet block
#for layer, new_layer in zip(model.layers[:4], pruned_model.layers[:4]):
       # weights_model = layer.get_weights()
       # weights_res56 = new_layer.get_weights()

       # if weights_model and weights_res56 and weights_model[0].shape == weights_res56[0].shape:
         #   new_layer.set_weights(weights_model)
            
#resblock

count=1
previous_layer_input_channel=np.array([],dtype=bool)
#copy the rest weights
for layer, new_layer in zip(model.layers, pruned_model.layers):
    if isinstance(layer,Conv2D) and layer.kernel_size!=(1,1):
        print('...................')
        print(layer.kernel_size)
        weights=layer.get_weights()
        non_zero_filters = np.any(weights[0], axis=(0, 1, 2)) # check for any non zero value
        #print(layer.name)
        print(non_zero_filters)
        # Get the non-zero weights and biases
        if count==1: # for first conv2D
           non_zero_weights = weights[0][:, :, :, non_zero_filters]
           new_model_weights= weights[0][:, :, :, non_zero_filters]
          
        else:
            non_zero_weights = weights[0][:, :, :, non_zero_filters]
            new_model_weights=non_zero_weights[:,:,previous_layer_input_channel,:]
            
        non_zero_biases = weights[1][non_zero_filters]
        #print(layer.name)
        new_layer.set_weights([new_model_weights,non_zero_biases])


        #BN copy
        next_layer_index = model.layers.index(layer) + 1
        next_next_layer_index = model.layers.index(layer) + 2
        if isinstance(model.layers[next_layer_index], BatchNormalization) or isinstance(model.layers[next_next_layer_index], BatchNormalization):
           bn_layer = model.layers[next_layer_index] if isinstance(model.layers[next_layer_index], BatchNormalization) else model.layers[next_next_layer_index]
           pruned_bn_layer = pruned_model.layers[next_layer_index] if isinstance(pruned_model.layers[next_layer_index], BatchNormalization) else pruned_model.layers[next_next_layer_index]
           weights=bn_layer.get_weights()
           bn_weights=[]
           bn_weights=[weights[0][non_zero_filters],weights[1][non_zero_filters],weights[2][non_zero_filters],weights[3][non_zero_filters]]
           pruned_bn_layer.set_weights(bn_weights)

        previous_layer_input_channel=non_zero_filters
        if count==19:
            previous_layer_input_channel_for_11_conv_1=np.array([],dtype=bool)
            previous_layer_input_channel_for_11_conv_1=non_zero_filters
        if count==37:
            previous_layer_input_channel_for_11_conv_2=np.array([],dtype=bool)
            previous_layer_input_channel_for_11_conv_2=non_zero_filters
        
        count+=1
        

    if isinstance(layer,Dense):
        dense_weights=layer.get_weights()
        new_dense_weight=dense_weights[0][previous_layer_input_channel]
                #new_dense_bias=dense_weights[1][previous_layer_input_channel]
        new_layer.set_weights([new_dense_weight,dense_weights[1]])

    #conv block weight copy ########## projection conv
    
count=1 # for conv2D with kernelsize =1 counting 

for layer, new_layer in zip(model.layers[4:], pruned_model.layers[4:]):
    if isinstance(layer,Conv2D)and layer.kernel_size==(1,1):
        if count==1:
            weights=layer.get_weights()
            non_zero_filters = np.any(weights[0], axis=(0, 1, 2))
            non_zero_weights = weights[0][:, :, :, non_zero_filters] #output channel
            new_model_weights=non_zero_weights[:,:,previous_layer_input_channel_for_11_conv_1,:]  #input channel
            non_zero_biases = weights[1][non_zero_filters]
            new_layer.set_weights([new_model_weights,non_zero_biases])
            #bn_layer_index=model.layers.index(layer) + 2
            #weights=model.layers[bn_layer_index].get_weights()
            #pruned_bn_layer_index=pruned_model.layers.index(new_layer) + 2
            #bn_weights=[]
            #print(weights[0].shape) 
            #print(non_zero_filters)
            #bn_weights=[weights[0][non_zero_filters],weights[1][non_zero_filters],weights[2][non_zero_filters],weights[3][non_zero_filters]]
            #pruned_model.layers[pruned_bn_layer_index].set_weights(bn_weights)
            
            count+=1
        else:
            weights=layer.get_weights()
            non_zero_filters = np.any(weights[0], axis=(0, 1, 2))
            non_zero_weights = weights[0][:, :, :, non_zero_filters] #output channel
            new_model_weights=non_zero_weights[:,:,previous_layer_input_channel_for_11_conv_2,:]  #input channel
            non_zero_biases = weights[1][non_zero_filters]
            new_layer.set_weights([new_model_weights,non_zero_biases])
            #bn_layer_index=model.layers.index(layer) + 2
            #weights=model.layers[bn_layer_index].get_weights()
            #pruned_bn_layer_index=pruned_model.layers.index(new_layer) + 2
            #bn_weights=[]
            #bn_weights=[weights[0][non_zero_filters],weights[1][non_zero_filters],weights[2][non_zero_filters],weights[3][non_zero_filters]]
            #pruned_model.layers[pruned_bn_layer_index].set_weights(bn_weights)



In [None]:

# Prepare callbacks for model saving and for learning rate adjustment.
checkpoint = ModelCheckpoint(filepath=filepath,
                             monitor='val_acc',
                             verbose=1,
                             save_best_only=True)

lr_scheduler = LearningRateScheduler(lr_schedule)

lr_reducer = ReduceLROnPlateau(factor=np.sqrt(0.1),
                               cooldown=0,
                               patience=5,
                               min_lr=0.5e-6)

callbacks = [checkpoint, lr_reducer, lr_scheduler]

def lr_schedule(epoch):
    """Learning Rate Schedule

    Learning rate is scheduled to be reduced after 80, 120, 160, 180 epochs.
    Called automatically every epoch as part of callbacks during training.

    # Arguments
        epoch (int): The number of epochs

    # Returns
        lr (float32): learning rate
    """
    lr = 1e-3
    if epoch > 90:
        lr *= 0.5e-3
    elif epoch > 80:
        lr *= 1e-3
    elif epoch > 60:
        lr *= 1e-2
    elif epoch > 40:
        lr *= 1e-1
    print('Learning rate: ', lr)
    return lr
pruned_model.compile(loss='categorical_crossentropy',
              optimizer=Adam(lr=lr_schedule(0)),
              metrics=['accuracy'])

# Fit the model on the batches generated by datagen.flow().
pruned_model.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size),
                    validation_data=(x_test, y_test),
                    epochs=100, verbose=1, workers=4,
                    callbacks=callbacks)


loss,accuracy=pruned_model.evaluate(x_test, y_test, verbose=1)

print(f'Test Accuracy:{accuracy}') 
#print(f'block per stage:{len(stage_1)}-{len(stage_2)}-{len(stage_3)}')


def flops_count(new_model):
    total_flops = 0
    for layer in new_model.layers:
        if isinstance(layer, tf.keras.layers.Conv2D):
            #print(f'{layer.name}') 
            kernel_size = layer.kernel_size
            input_channels = layer.input_shape[-1]
            output_channels = layer.output_shape[-1]
            height = layer.output_shape[1]
            width = layer.output_shape[2]
            flops = ((kernel_size[0] * kernel_size[1] * input_channels) * output_channels + output_channels) * height * width
            total_flops += flops
        elif isinstance(layer, tf.keras.layers.Dense):
            input_neurons = layer.input_shape[-1]
            output_neurons = layer.units
            flops = (input_neurons * output_neurons) + output_neurons
            total_flops += flops
    return total_flops


print(f"Total number of parameters before pruning: {model.count_params()}")
print(f"Total number of parameters after pruning: {pruned_model.count_params()}") 

print(f'Model Flops: {flops_count(model)}')

print(f'Layer pruned Model Flops: {flops_count(pruned_model)}')

print(f'Flops reduction:{(1-(flops_count(pruned_model)/flops_count(model)))*100}%')
print(f'Parameters reduction:{(1-(pruned_model.count_params()/model.count_params()))*100}%')