<a href="https://colab.research.google.com/github/dimahdera/PremiUm-CNN-CIFAR10-Tensorflow-2.x/blob/main/1D_Conv_Bayesian.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import tensorflow as tf
from tensorflow.keras import layers, initializers, regularizers, constraints

class Deterministic_Conv1D(layers.Layer):
    def __init__(self, filters, kernel_size, strides=1, padding='valid',
                 activation=None,
                 kernel_initializer='glorot_uniform',
                 kernel_regularizer=None,
                 **kwargs):
        super(Deterministic_Conv1D, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding.lower()
        self.activation = tf.keras.activations.get(activation)
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.kernel_regularizer = regularizers.get(kernel_regularizer)

    def build(self, input_shape):
        # Input shape: (batch_size, sequence_length, input_channels)
        input_channels = input_shape[-1]
        # Define kernel weights
        self.kernel = self.add_weight(
            shape=(self.kernel_size, input_channels, self.filters),
            initializer=self.kernel_initializer,
            regularizer=self.kernel_regularizer,
            trainable=True,
            name='kernel'
        )
        # Padding logic for 'same'
        if self.padding == 'same':
            self.pad_size = (self.kernel_size - 1) // 2
        else:
            self.pad_size = 0

    def call(self, inputs):
        # Apply padding if needed
        if self.padding == 'same':
            inputs = tf.pad(inputs, [[0, 0], [self.pad_size, self.pad_size], [0, 0]])
        # Perform convolution using tf.nn.conv1d
        outputs = tf.nn.conv1d(inputs, self.kernel, stride=self.strides, padding='VALID')
        # Apply activation function if specified
        if self.activation:
            outputs = self.activation(outputs)
        return outputs

In [2]:
class Bayesian_Conv1D_first(layers.Layer):
    def __init__(self, filters, kernel_size, strides=1, padding='VALID',
                 kernel_initializer='glorot_uniform',
                 kernel_regularizer=None,
                 **kwargs):
        super(Bayesian_Conv1D_first, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.kernel_regularizer = regularizers.get(kernel_regularizer)

    def build(self, input_shape):
        # Input shape: (batch_size, sequence_length, input_channels)
        input_channels = input_shape[-1]
        ini_sigma = -2.25
        min_sigma = -4.6
        # Define kernel weights
        self.kernel = self.add_weight(
            shape=(self.kernel_size, input_channels, self.filters),
            initializer=self.kernel_initializer,
            regularizer=self.kernel_regularizer,
            trainable=True,
            name='kernel'
        )
        self.kernel_sigma = self.add_weight(shape=(self.filters,), initializer=tf.random_uniform_initializer(minval=min_sigma, maxval=ini_sigma,  seed=None))

    def call(self, inputs):
        # Perform convolution using tf.nn.conv1d
        outputs = tf.nn.conv1d(inputs, self.kernel, stride=self.strides, padding=self.padding)
        xxT = tf.nn.conv1d(inputs*inputs, tf.constant(1.,shape= self.kernel.shape), stride=self.strides, padding=self.padding ) # shape=[batch_size, t, #kernels]=[100,24,32]
        sigma_out = tf.multiply(tf.math.log(1. + tf.math.exp(self.kernel_sigma) ), xxT)
        kl_conv = kl_regularizer_conv(self.kernel, self.kernel_sigma)
        sigma_out = tf.math.softplus(sigma_out)
        return outputs, sigma_out, kl_conv

In [3]:
def kl_regularizer_conv(mu, logvar):
#    k = mu.shape[-1]
#    mu = tf.reshape(mu, [-1, k])
#    n= mu.shape[0]
    prior_var = 0.01
    kl_loss = tf.math.log(prior_var)  - 1 - logvar + (tf.math.log(1+tf.math.exp(logvar))/prior_var) + ( tf.square(mu)/prior_var)
    kl = 0.5*tf.math.reduce_mean( kl_loss)#/( tf.math.reduce_max( kl_loss) + tf.keras.backend.epsilon() )
   # kl = tf.where(tf.math.is_nan(kl), tf.constant(1.0e-5, shape=kl.shape), kl)
   # kl = tf.where(tf.math.is_inf(kl), tf.constant(1.0e-5, shape=kl.shape), kl)
    return kl

class Bayesian_Conv1D_intermidiate(layers.Layer):
    def __init__(self, filters, kernel_size, strides=1, padding='VALID',
                 kernel_initializer='glorot_uniform',
                 kernel_regularizer=None,
                 **kwargs):
        super(Bayesian_Conv1D_intermidiate, self).__init__(**kwargs)
        self.filters = filters
        self.kernel_size = kernel_size
        self.strides = strides
        self.padding = padding
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.kernel_regularizer = regularizers.get(kernel_regularizer)

    def build(self, input_shape):
        mu_shape, sigma_shape = input_shape
        # Input shape: (batch_size, sequence_length, input_channels)
        input_channels = mu_shape[-1]
        ini_sigma = -2.25
        min_sigma = -4.6
        # Define kernel weights
        self.kernel = self.add_weight(
            shape=(self.kernel_size, input_channels, self.filters),
            initializer=self.kernel_initializer,
            regularizer=self.kernel_regularizer,
            trainable=True,
            name='kernel'
        )
        self.kernel_sigma = self.add_weight(shape=(self.filters,), initializer=tf.random_uniform_initializer(minval=min_sigma, maxval=ini_sigma,  seed=None))

    def call(self, inputs):
        mu_input, sigma_input = inputs
        batch_size = mu_input.shape[0]
        in_channel = mu_input.shape[-1]
        kernel_sigma2 =   tf.math.log(1+tf.math.exp(self.kernel_sigma) )
        # Perform convolution using tf.nn.conv1d

        outputs = tf.nn.conv1d(mu_input, self.kernel, stride=self.strides, padding=self.padding)

        mu_input = tf.expand_dims(mu_input, axis=1) #[batch, 1, sequence_length, input_channels]
        sigma_input = tf.expand_dims(sigma_input, axis=1) #[batch, 1, sequence_length, input_channels]

        diag_sigma_patches = tf.image.extract_patches(sigma_input, sizes=[1, 1, self.kernel_size, 1],
                                                      strides=[1, 1, self.strides, 1],
                                                      rates=[1, 1, 1, 1], padding=self.padding) # shape= [batch, 1, new_sequence_length, kernel_size * input_channels]
        diag_sigma_g = tf.squeeze(diag_sigma_patches) # shape= [batch, new_sequence_length, kernel_size * input_channels]
        mu_cov_square = tf.reshape(tf.math.multiply(self.kernel, self.kernel), [self.kernel_size * in_channel, self.filters])  # shape[ kernel_size*input_channels,   kernel_num]
        mu_dim = tf.cast(tf.shape(mu_cov_square)[0], tf.float32)
        mu_wT_sigmag_mu_w = tf.matmul(diag_sigma_g, mu_cov_square)/mu_dim  # shape=[batch_size, new_sequence_length , kernel_num   ]
        trace = tf.math.reduce_sum(diag_sigma_g, -1, keepdims=True)  # shape=[batch_size,  new_sequence_length, 1]
        trace = tf.ones([1, 1, self.filters]) * trace  # shape=[batch_size,  new_sequence_length, kernel_num]


        trace =tf.multiply(kernel_sigma2, trace) /mu_dim # shape=[batch_size, , new_im_size*new_im_size, kernel_num]
        mu_in_patches = tf.squeeze(tf.image.extract_patches(mu_input, sizes=[1, 1, self.kernel_size, 1],
                                                            strides=[1, 1, self.strides, 1],
                                                            rates=[1, 1, 1, 1], padding=self.padding)) # shape=[batch_size, new_sequence_length, self.kernel_size*input_channels]

        mu_gT_mu_g = tf.math.reduce_sum(tf.math.multiply(mu_in_patches, mu_in_patches), axis=-1)  # shape=[batch_size, new_sequence_length]
        mu_gT_mu_g1 = tf.ones([1, 1,  self.filters]) * tf.expand_dims(mu_gT_mu_g, axis=-1)     # shape=[batch_size, new_sequence_length, kernel_num]
        sigmaw_mu_gT_mu_g = tf.multiply(kernel_sigma2, mu_gT_mu_g1) /mu_dim  # shape=[batch_size, new_sequence_length, kernel_num]
        Sigma_out = trace + mu_wT_sigmag_mu_w + sigmaw_mu_gT_mu_g  # # shape=[batch_size, new_sequence_length, kernel_num]
        Sigma_out = tf.math.softplus(Sigma_out)
        kl_conv = kl_regularizer_conv(self.kernel, self.kernel_sigma)
        return outputs, Sigma_out, kl_conv

In [4]:
batch = 2
sequence_length = 15
input_channels = 3
filter_width = 3
output_channels = 4
stride = 2


layer = Bayesian_Conv1D_first(output_channels, filter_width)
layer2 = Bayesian_Conv1D_intermidiate(output_channels, filter_width)
# Input tensor: [batch, sequence_length, input_channels]
input_tensor = tf.random.normal([batch, sequence_length, input_channels])

# Filters: [filter_width, input_channels, output_channels]
filters = tf.random.normal([filter_width, input_channels, output_channels])

# Perform custom 1D convolution
output1 = layer(input_tensor)
outputs, Sigma_out, kl_conv = output1
# Print output shape
print("Output shape:", outputs.shape)
output2 = layer2((outputs, Sigma_out))
outputs2, Sigma_out2, kl_conv2 = output2

print("Output shape:", outputs2.shape)


Output shape: (2, 13, 4)
Output shape: (2, 11, 4)


In [6]:
class Bayesian_Batch_Normalization(layers.Layer):
    def __init__(self, var_epsilon):
        super(Bayesian_Batch_Normalization, self).__init__()
        self.var_epsilon = var_epsilon

    def call(self, mu_in, Sigma_in):
        mean, variance = tf.nn.moments(mu_in, [0, 1, 2])
        mu_out = tf.nn.batch_normalization(mu_in, mean, variance, offset=None, scale=None,
                                           variance_epsilon=self.var_epsilon)
        Sigma_out = tf.multiply(Sigma_in, 1 / (variance + self.var_epsilon))
        return mu_out, Sigma_out

In [10]:
# with sfiting and scaling learnable parameters
class Bayes_BatchNorm(layers.Layer):
    def __init__(self, momentum=0.99, eps=1e-6, **kwargs):
        """
        Custom Batch Normalization layer.
        Args:
            momentum (float): Momentum for the moving average.
            eps (float): Small constant for numerical stability.
        """
        self.momentum = momentum
        self.eps = eps
        super(Bayes_BatchNorm, self).__init__(**kwargs)

    def build(self, input_shape):
        input_shape_mu, input_shape_sigma = input_shape
        # Gamma (scale) and beta (shift) parameters
        self.gamma = self.add_weight(
            shape=input_shape_mu[-1:],
            initializer=tf.keras.initializers.Ones(),
            name='gamma',
            trainable=True
        )
        self.beta = self.add_weight(
            shape=input_shape_mu[-1:],
            initializer=tf.keras.initializers.Zeros(),
            name='beta',
            trainable=True
        )
        # Moving mean and variance for inference
        self.moving_mean = self.add_weight(
            shape=input_shape_mu[-1:],
            initializer=tf.keras.initializers.Zeros(),
            name='moving_mean',
            trainable=False
        )
        self.moving_variance = self.add_weight(
            shape=input_shape_mu[-1:],
            initializer=tf.keras.initializers.Ones(),
            name='moving_variance',
            trainable=False
        )
        super(Bayes_BatchNorm, self).build(input_shape)

    def call(self, input, training=None):
        """
        Forward pass of the batch normalization layer.

        Args:
            x: Input tensor.
            training (bool): If True, the layer is in training mode.

        Returns:
            Batch-normalized tensor.
        """
        x, sigma_x = input
        if training:
            # Calculate batch mean and variance
            batch_mean = tf.reduce_mean(x, axis=0, keepdims=False)
            batch_variance = tf.reduce_variance(x, axis=0, keepdims=False)

            # Update moving mean and variance
            self.moving_mean.assign(
                self.momentum * self.moving_mean + (1.0 - self.momentum) * batch_mean
            )
            self.moving_variance.assign(
                self.momentum * self.moving_variance + (1.0 - self.momentum) * batch_variance
            )

            mean, variance = batch_mean, batch_variance

        else:
            # Use moving mean and variance for inference
            mean, variance = self.moving_mean, self.moving_variance

        # Normalize input
        x_normalized = (x - mean) / tf.sqrt(variance + self.eps)
        x_normalized = self.gamma * x_normalized + self.beta
        a = (self.gamma / (variance + self.eps)) ** 2  # [50,17,64]
        Sigma_out = tf.math.multiply(a, sigma_x)  # [50,17,64]
        Sigma_out = tf.math.softplus(Sigma_out)
        # Scale and shift
        return x_normalized, Sigma_out

    def compute_output_shape(self, input_shape):
        return input_shape

In [11]:
batch = 2
sequence_length = 15
input_channels = 3
filter_width = 3
output_channels = 4
stride = 2


layer = Bayesian_Conv1D_first(output_channels, filter_width)
layer2 = Bayesian_Conv1D_intermidiate(output_channels, filter_width)
layer3 = Bayes_BatchNorm()
# Input tensor: [batch, sequence_length, input_channels]
input_tensor = tf.random.normal([batch, sequence_length, input_channels])

# Filters: [filter_width, input_channels, output_channels]
filters = tf.random.normal([filter_width, input_channels, output_channels])

# Perform custom 1D convolution
output1 = layer(input_tensor)
outputs, Sigma_out, kl_conv = output1

# Print output shape
print("Output shape:", outputs.shape)
output2 = layer2((outputs, Sigma_out))
outputs2, Sigma_out2, kl_conv2 = output2
print("Output shape:", outputs2.shape)

output3 = layer3((outputs2, Sigma_out2))
outputs3, Sigma_out3 = output3
print("Output shape:", outputs3.shape)

Output shape: (2, 13, 4)
Output shape: (2, 11, 4)
Output shape: (2, 11, 4)
