<a href="https://colab.research.google.com/github/Nithyarajoman/Machine-learning-Trial/blob/main/Library_Functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, models

In [None]:
# Read data
(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1).astype('float')/255
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1).astype('float')/255

In [None]:
# Create Model
model_hierarchy = models.Sequential(
    [        
        layers.Conv2D(32, kernel_size=(3, 3),
                      input_shape=(28, 28, 1),
                      strides=(2,2), padding="same",
                      use_bias=False),
        layers.BatchNormalization(),
        layers.Activation(activation="relu"),
        #layers.MaxPooling2D(pool_size=(2, 2)),
        models.Sequential([
            layers.Conv2D(64, kernel_size=(3, 3),
                          strides=(2,2), padding="valid",
                          use_bias=False),
            layers.BatchNormalization(),
            layers.Activation(activation="relu")
            ]),            
        #layers.MaxPooling2D(pool_size=(2, 2)),
        layers.Flatten(),
        layers.Dropout(0.5),
        layers.Dense(10, activation='softmax')
    ]
)

In [None]:
# Train
model_hierarchy.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])
model_hierarchy.fit(x=x_train, y=y_train, epochs=5, batch_size=128, validation_split=0.1 )
score = model_hierarchy.evaluate(x_test, y_test, verbose=0)

In [None]:
# Function to calculate return MAC from each layer

def mac_each_layer(l):
    o_shape, i_shape, strides, ks, filters = ['', '', ''], ['', '', ''], [1, 1], [0, 0], [0, 0]        
    macc = 0
    
    if ('Dense' in str(l)):
        i_shape = l.input.shape[1:4].as_list()[0]
        o_shape = l.output.shape[1:4].as_list()
        macc = (o_shape[0] * i_shape)

    
    if ('Conv2D ' in str(l) and 'DepthwiseConv2D' not in str(l) and 'SeparableConv2D' not in str(l)):
        strides = l.strides
        ks = l.kernel_size
        filters = l.filters
        i_shape = l.input.get_shape()[1:4].as_list()
        o_shape = l.output.get_shape()[1:4].as_list()

        if (filters == None):
            filters = i_shape[2]

        macc = ((filters * ks[0] * ks[1] * i_shape[2]) * (
                (i_shape[0] / strides[0]) * (i_shape[1] / strides[1])))

    if ('Conv2D ' in str(l) and 'DepthwiseConv2D' in str(l) and 'SeparableConv2D' not in str(l)):
        strides = l.strides
        ks = l.kernel_size
        filters = l.filters
        i_shape = l.input.get_shape()[1:4].as_list()
        o_shape = l.output.get_shape()[1:4].as_list()

        if (filters == None):
            filters = i_shape[2]

        macc = ((ks[0] * ks[1] * i_shape[2]) * ((i_shape[0] / strides[0]) * (i_shape[1] / strides[1])))

    return macc

In [None]:
# Function to pass layers to the calculating function and to get the macc of each layer

def MAC_summary(model):
  print('%25s | %6s' % ('Layer Name', 'MACs'))
  print('*' * 100)
  t_macc = 0

  for l in model.layers:
    if (type(l) in [models.Sequential, keras.Model]):
      for ib_layer in l.layers:
        mac_obtained = mac_each_layer(ib_layer)
        print('%25s(%5s) | %5.4f' % (
                l.name,ib_layer.name, mac_obtained ))
        t_macc += mac_obtained  

    else:
      mac_obtained = mac_each_layer(l)
      print('%25s | %5.4f' % (
                l.name, mac_obtained ))
      
    t_macc += mac_obtained
  print('\nTotal MACs operations in the complete model: %10.8f\n' % (t_macc))
  return

In [None]:
# Function to check if a layer is model or not
def is_hier_layer(layer):
    "Finds if layer is actually a model instead of a single layer"
    return type(layer) in [models.Sequential, keras.Model]


# Function to add communicated data to the metrics
def add_metrics(m2):
    "Annotates recursiverly a (non-compiled) model with metrics that calculate probability of non-zero activity"
    prefix_name = m2.name + "_"
    for ll in m2.layers:
        if is_hier_layer(ll):
            add_metrics(ll)
        else:
            my_metric = tf.reduce_sum( tf.math.count_nonzero(ll.output, dtype=tf.int32) / tf.size( ll.output)  )
            name=prefix_name + ll.name
            m2.add_metric(my_metric, name=name)
    return


In [None]:
# Function to print the effective data communicated 

def data_summary(model):

  # Function to add communicated data to the metrics
  def add_metrics(m2):
    "Annotates recursiverly a (non-compiled) model with metrics that calculate probability of non-zero activity"
    prefix_name = m2.name + "_"
    for ll in m2.layers:
        if is_hier_layer(ll):
            add_metrics(ll)
        else:
            my_metric = tf.reduce_sum( tf.math.count_nonzero(ll.output, dtype=tf.int32) / tf.size( ll.output)  )
            name=prefix_name + ll.name
            m2.add_metric(my_metric, name=name)
    return

  # Function to check if a layer is model or not
  def is_hier_layer(layer):
      "Finds if layer is actually a model instead of a single layer"
      return type(layer) in [models.Sequential, keras.Model]

  # Clone the model. Note that it is not compiled!
  m2 = keras.models.clone_model(model)
  add_metrics(m2)
  
  # Compiling the cloned model and taking the inference
  m2.compile(optimizer='adam',
                loss='sparse_categorical_crossentropy',
                metrics=['accuracy'])

  aux = m2.evaluate(x_test[0:1000], y_test[0:1000]) 
  
  print('%40s | %20s' % ('Metrics and Layer Name', 'Effective data communicated'))
  print("*" * 150)
  for l in m2.metrics:
    print('%40s | %5.4f' % (l.name, l.result().numpy()))
  return

In [None]:
MAC_summary(model_hierarchy)

In [None]:
data_summary(model_hierarchy)