# Batch Norm Layer- custom


In [None]:
import tensorflow as tf
from tensorflow.keras import layers
import numpy as np
import time
import tensorflow as tf
from tensorflow.keras.layers import Layer
from tensorflow.keras import initializers
from tensorflow.keras import backend as K

class BatchNormLayer(Layer):
    def __init__(self, epsilon=0.01, alpha=0.5, nonlinearity=None, **kwargs):
        super(BatchNormLayer, self).__init__(**kwargs)
        self.epsilon = epsilon
        self.alpha = alpha
        self.nonlinearity = nonlinearity
        self.beta = None
        self.gamma = None

    def build(self, input_shape):
        super(BatchNormLayer, self).build(input_shape)
        axes = list(range(len(input_shape)))
        shape = [1] * len(input_shape)
        shape[-1] = input_shape[-1]
        broadcast = [False] * len(input_shape)
         # Initialize learnable parameters
        self.beta = self.add_weight(name='beta', shape=shape,
                                    initializer='zeros', trainable=True)
        self.gamma = self.add_weight(name='gamma', shape=shape,
                                     initializer='ones', trainable=True)
        self.mean = self.add_weight(name='mean', shape=shape,
                                    initializer=initializers.Constant(0),
                                    trainable=False)
        self.std = self.add_weight(name='std', shape=shape,
                                   initializer=initializers.Constant(1),
                                   trainable=False)

        self.axes = axes
        self.broadcast = broadcast

    def call(self, inputs, **kwargs):
        mean = K.mean(inputs, axis=self.axes, keepdims=True)
        std = K.std(inputs, axis=self.axes, keepdims=True)
        # Update moving averages for mean and standard deviation
        self.add_update([(self.mean, (1 - self.alpha) * self.mean + self.alpha * mean),
                         (self.std, (1 - self.alpha) * self.std + self.alpha * std)])
        # Normalize inputs using batch normalization formula
        normalized = (inputs - mean) * (self.gamma / (std + self.epsilon)) + self.beta
        return normalized if self.nonlinearity is None else self.nonlinearity(normalized)
# Function to apply Batch Normalization to a layer
def batch_norm(layer):
    nonlinearity = getattr(layer, 'activation', None)
    if nonlinearity is not None:
        layer.activation = None
    if hasattr(layer, 'bias'):
        layer.bias = None
    return BatchNormLayer()(layer)






# Binary Connect layer- with custom DenseLayer and utility functions


In [None]:

import tensorflow as tf
import numpy as np
#   Hard Sigmoid Function
def hard_sigmoid(x):
    return tf.clip_by_value((x + 1.0) / 2.0, 0, 1)
# BinaryConnect weight binarization function
def binarization(W, H, binary=True, deterministic=False, stochastic=False, seed=None):
    if not binary or (deterministic and stochastic):
        return W
    else:
        Wb = hard_sigmoid(W / H)

        if stochastic:
          # Apply random binomial sampling for stochastic BinaryConnect
            Wb = tf.dtypes.cast(
                tf.random.stateless_binomial(shape=tf.shape(W), seed=seed, counts=1, probs=Wb),
                tf.float32
            )
        else:
            # Deterministic rounding for deterministic BinaryConnect
            Wb = tf.round(Wb)
        # Mapping binary values to original weight range
        Wb = tf.dtypes.cast(tf.where(Wb > 0, H, -H), tf.float32)
        return Wb




# Custom DenseLayer with BinaryConnect
class DenseLayer(tf.keras.layers.Layer):
    def __init__(self, units, binary=True, stochastic=True, H=1.0, W_LR_scale="Glorot", **kwargs):
        super(DenseLayer, self).__init__(**kwargs)
        self.units = units
        self.binary = binary
        self.stochastic = stochastic
        self.H = H
        self.W_LR_scale = W_LR_scale
        self.seed = tf.constant([42, 42], dtype=tf.int32)  # Set your desired seed values
        self.W = None  # Initialize W = None

        if W_LR_scale == "Glorot":
            self.W_LR_scale = None  # Set to None initially
        elif isinstance(W_LR_scale, str) and W_LR_scale.lower() == "none":
            self.W_LR_scale = 1.0  # or any default value you prefer

    def build(self, input_shape):
        if self.W is None:
            input_dim = tf.TensorShape(input_shape[-1]).as_list()[0]
            self.units = self.units if self.units is not None else 1
            # Initialize binary weights using RandomUniform distribution
            self.W = self.add_weight(
                name='kernel',
                shape=(input_dim, self.units),
                initializer=tf.initializers.RandomUniform(-self.H, self.H),
                trainable=True
            )

            if self.W_LR_scale is None:
                self.W_LR_scale = 1.0 / np.sqrt(1.5 / (self.units + input_dim))

        super(DenseLayer, self).build(input_shape)

    def call(self, inputs, training=None, **kwargs):
        # Apply BinaryConnect binarization to weights
        self.Wb = binarization(self.W, self.H, self.binary, not training, self.stochastic, self.seed)
        Wr = self.W
        self.W = self.Wb
        output = tf.matmul(inputs, self.W)
        self.W = Wr
        return output

# compute gradients for BinaryConnect layers
def compute_grads(loss, network):
    layers = network.layers
    grads = []
    for layer in layers:
        params = [var for var in model.trainable_variables if 'binary' in var.name.lower()]
        for param in params:
            grads.append(tf.gradients(loss, params)[0])
    return grads
# apply clipping and scaling to BinaryConnect layer updates
def clipping_scaling(updates, model):
    clipped_updates = {}

    for layer in model.layers:
        if isinstance(layer, DenseLayer):
            lr_scaled_update = layer.W_LR_scale * (layer.Wb - layer.W)
            clipped_update = tf.clip_by_value(layer.W + lr_scaled_update, -layer.H, layer.H)
            # Use layer.W.ref() as the key
            clipped_updates[layer.W.ref()] = clipped_update

    return clipped_updates




In [None]:
from __future__ import print_function

import sys
import os
import time

import numpy as np
np.random.seed(1234)  # for reproducibility

import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.datasets import cifar10
from tensorflow.keras.utils import to_categorical

from sklearn.decomposition import PCA
from sklearn.utils import shuffle
import tensorflow_datasets as tfds
import pickle
import gzip

from collections import OrderedDict

if __name__ == "__main__":
    # Initialise all the parameters
    # BN parameters
    batch_size = 50
    print("batch_size = " + str(batch_size))
    # alpha is the exponential moving average factor
    alpha = 0.1
    print("alpha = " + str(alpha))
    epsilon = 1e-4
    print("epsilon = " + str(epsilon))

    # Training parameters
    num_epochs = 10
    print("num_epochs = " + str(num_epochs))

    # Dropout parameters
    dropout_in = 0.  # 0. means no dropout
    print("dropout_in = " + str(dropout_in))
    dropout_hidden = 0.
    print("dropout_hidden = " + str(dropout_hidden))

    # BinaryConnect
    binary = True
    print("binary = " + str(binary))
    stochastic = True # change to false for deterministic setting
    print("stochastic = " + str(stochastic))
    # (-H,+H) are the two binary values
    # H = "Glorot"
    H = 1.
    print("H = " + str(H))
    # W_LR_scale = 1.
    W_LR_scale = "Glorot"  # "Glorot" means we are using the coefficients from Glorot's paper
    print("W_LR_scale = " + str(W_LR_scale))

    # Decaying LR
    LR_start = 0.003
    print("LR_start = " + str(LR_start))
    LR_fin = 0.000002
    print("LR_fin = " + str(LR_fin))
    LR_decay = (LR_fin / LR_start) ** (1. / num_epochs)
    print("LR_decay = " + str(LR_decay))



    # Function to perform ZCA whitening


    def zca_whiten(data):
        data = tf.cast(data, dtype=tf.float32)  # Convert to float32

        data_flat = tf.reshape(data, (tf.shape(data)[0], -1))

        # PCA is performed first
        pca = PCA(whiten=True)
        pca.fit(data_flat.numpy())

        # Apply ZCA whitening
        zca_matrix = np.dot(pca.components_.T, np.diag(1.0 / np.sqrt(pca.explained_variance_ + 1e-5)))
        whitened_data_flat = tf.linalg.matmul(data_flat, zca_matrix)

        # Correctly reshape the whitened data
        whitened_data = tf.reshape(whitened_data_flat, tf.shape(data))

        return whitened_data.numpy().astype(np.float32)

    # Load CIFAR-10 dataset
    (train_set, train_labels), (test_set, test_labels) = cifar10.load_data()

    # Apply ZCA whitening
    train_set = zca_whiten(train_set)
    test_set = zca_whiten(test_set)
    #convert to float32
    train_labels_hinge = np.float32(train_labels)
    test_labels_hinge = np.float32(test_labels)
    # perform one hot encoding
    train_labels = np.float32(np.eye(10)[train_labels])
    test_labels = np.float32(np.eye(10)[test_labels])
    #perform operations for hinge loss
    train_labels = 2 * train_labels - 1.
    test_labels= 2 * test_labels - 1.


In [None]:
    import numpy as np
    #alter the dimensions of labels to make it two-dimensional
    train_labels = np.squeeze(train_labels)
    test_labels = np.squeeze(test_labels)
    print(train_set.shape)
    print(train_labels.shape)

In [None]:
input_shape = (32, 32,3)
# Define a custom layer for binary convolution
class BinaryConv2DLayer(Layer):
    def __init__(self, filters, kernel_size, binary=True, stochastic=True, H=1.0, W_LR_scale="Glorot", **kwargs):
        super(BinaryConv2DLayer, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.binary = binary
        self.stochastic = stochastic
        self.H = H
        self.W_LR_scale = W_LR_scale
        seed_value = seed  # You can use any desired seed value
        self.rng = tf.random.Generator.from_seed(seed_value)
    def build(self, input_shape):
        # Create a binary weight matrix and initialize it using Glorot uniform initialization
        self.kernel = self.add_weight(
            name='kernel',
            shape=(self.kernel_size[0], self.kernel_size[1], input_shape[-1], self.filters),
            initializer='glorot_uniform',
            trainable=True
        )
        super(BinaryConv2DLayer, self).build(input_shape)

    def call(self, inputs, training=None, **kwargs):

        # Use the generator for other random operations as needed
        rand_int = self.rng.uniform(shape=(), minval=1, maxval=2147462579, dtype=tf.int32)

        # Apply binarization to the weight matrix
        Wb = binarization(self.kernel, self.H, self.binary, not training, seed = rand_int)
        Wr = self.kernel
        self.kernel = Wb
        # Perform binary convolution
        output = tf.nn.conv2d(inputs, Wb, strides=[1, 1, 1, 1], padding='SAME')
        # Restore the original weight matrix for subsequent layers
        self.kernel = Wr
        return output
#  Define a custom model using the BinaryConv2DLayer
class MyModel(Model):
    def __init__(self, input_shape, binary, stochastic, H, W_LR_scale, **kwargs):
        super(MyModel, self).__init__(**kwargs)
       # Define layers for the model using BinaryConv2DLayer
        self.binary_conv1 = BinaryConv2DLayer(filters=128, kernel_size=(3, 3), binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale, name='binary_conv1')
        self.batch_norm = BatchNormLayer(epsilon=epsilon, alpha=alpha)
        self.max_pool = layers.MaxPooling2D(pool_size=(2, 2), padding='same')
        self.batch_norm1= BatchNormLayer(epsilon=epsilon, alpha=alpha)
        self.batch_norm2 = BatchNormLayer(epsilon=epsilon, alpha=alpha)
        self.batch_norm3 = BatchNormLayer(epsilon=epsilon, alpha=alpha)

        self.binary_conv2 = BinaryConv2DLayer(filters=256, kernel_size=(3, 3), binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale, name='binary_conv2')

        self.binary_conv3 = BinaryConv2DLayer(filters=512, kernel_size=(3, 3), binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale, name='binary_conv3')

    # Model forward pass
    def call(self, inputs, training=None, **kwargs):
        x = self.binary_conv1(inputs)
        x = self.batch_norm1(x)
        x = self.max_pool(x)
        x = self.batch_norm1(x)

        x = self.binary_conv2(inputs)
        x = self.batch_norm2(x)
        x = self.binary_conv2(inputs)
        x = self.max_pool(x)
        x = self.batch_norm2(x)

        x = self.binary_conv3(inputs)
        x = self.batch_norm3(x)
        x = self.binary_conv3(inputs)
        x = self.max_pool(x)
        x = self.batch_norm3(x)

        return x

# Usage
input_layer = Input(shape=input_shape, name='input')
my_model = MyModel(input_shape=input_shape, binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale)
output = my_model(input_layer)
output = Flatten()(output)
output = DenseLayer(1024, binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale, name='binary_dense1')(output)
output = BatchNormLayer(epsilon=epsilon, alpha=alpha, nonlinearity=None)(output)
output = DenseLayer(1024, binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale, name='binary_dense2')(output)
output = BatchNormLayer(epsilon=epsilon, alpha=alpha, nonlinearity=None)(output)
output = DenseLayer(10, binary=binary, stochastic=stochastic, H=H, W_LR_scale=W_LR_scale, name='binary_dense3')(output)
output = BatchNormLayer(epsilon=epsilon, alpha=alpha, nonlinearity=None)(output)

# Create the final model
model = Model(inputs=input_layer, outputs=output)


# Training and Metrics

In [None]:
import tensorflow as tf


tf.get_logger().setLevel('ERROR')
from tensorflow.keras.layers import Input, Dropout, Flatten
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError, Hinge
from tensorflow.keras.metrics import Mean, SparseCategoricalAccuracy
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.layers import Input, Dropout, Flatten
from tensorflow.keras.models import Model


#loss function
loss_object = Hinge()

#metrics
train_loss = Mean(name="train_loss")
val_loss = Mean(name="val_loss")


optimizer = tf.keras.optimizers.legacy.Adam(learning_rate=LR_start )

@tf.function
def train_step(inputs, targets, LR):
    with tf.GradientTape(persistent=True) as tape:
        predictions = model(inputs, training=True)
        loss = tf.reduce_mean(tf.square(tf.maximum(0., 1. - targets * predictions)))


    if binary:
      #compute binary parameters
        binary_params = [var for var in model.trainable_weights if 'binary' in var.name.lower()]

        # Watch binary_params
        for var in binary_params:
            tape.watch(var)

        grads = tape.gradient(loss, binary_params)

        # Filter out None gradients
        grads_and_vars = zip(grads, binary_params)
        grads_and_vars = [(grad, var) for grad, var in grads_and_vars if grad is not None]
        #use optimizer
        updates_binary = optimizer.apply_gradients(grads_and_vars)
        updates_binary = clipping_scaling(updates_binary, model)
        #update other parameters
        non_binary_params = [var for var in model.trainable_variables if 'binary' not in var.name.lower()]

        gradients_other = tape.gradient(loss, non_binary_params)

        updates_other = optimizer.apply_gradients(zip(gradients_other, non_binary_params))


    else:
        gradients = tape.gradient(loss, model.trainable_variables)
        trainable_vars = [var for var in model.trainable_variables if 'binary' in var.name.lower()]

        # Print shapes for debugging
        for var, grad in zip(trainable_vars, gradients):
            print(f"Trainable Variable: {var.name}, Shape: {var.shape}, Gradient: {grad.shape}")

        updates_final = optimizer.apply_gradients(zip(gradients, trainable_vars))

    return loss





@tf.function
def val_step(inputs, targets):
    predictions = model(inputs, training=False)
    # Cast targets to float32 to match the data type of predictions
    targets = tf.cast(targets, dtype=tf.float32)
    loss = loss_object(targets, predictions)
    return loss

# Training loop
train_losses = []
val_losses = []
for epoch in range(num_epochs):
    for i in range(0, len(train_set), batch_size):
        x_batch = train_set[i:i + batch_size]
        y_batch = train_labels[i:i + batch_size]


        loss = train_step(x_batch, y_batch, LR_start)
        train_loss(loss)

    for i in range(0, len(test_set), batch_size):
        x_val_batch = test_set[i:i + batch_size]
        y_val_batch = test_labels[i:i + batch_size]


        loss= val_step(x_val_batch, y_val_batch)
        val_loss(loss)
    train_losses.append(train_loss.result().numpy())
    val_losses.append(val_loss.result().numpy())

    print(f'Epoch {epoch + 1}/{num_epochs}, Training Loss: {train_loss.result()}, Validation Loss: {val_loss.result()}')


    train_loss.reset_states()
    val_loss.reset_states()


# Plotting the training and validation losses
df_adam = pd.DataFrame({'Epoch': range(1, num_epochs + 1), 'Training Loss': train_losses, 'Validation Loss': val_losses})

# Plotting the training and validation losses
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss over Epochs')
plt.legend()
plt.show()



In [None]:
df_adam.to_csv('d_adam.csv')