In [1]:
"""
Mounts your drive for reading and writing datasets, results, weights.
"""
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [3]:
import struct
import numpy as np
import math

def one_hot(a, num_classes):
  """
  Converts the input numpy array to one hot.

  Parameters
  ==========

  a: numpy.array
    The input array.
  num_classes: int
    The number of classes to be used for one hot notation.
  
  Returns
  =======

  np.array
  """
  return np.squeeze(np.eye(num_classes, dtype=np.float32)[a.reshape(-1)])

def read_idx(filename, flatten=True, normalize=True, show_logs=True,
             num_data=1000000, to_one_hot=False,
             cache_result=False, use_cache=False):
    """
    Reads datasets from binary files.

    filename: str
      The path to the file where the dataset is stored.
      Make sure to not add any extensions at the end of
      the filepath.
    flatten: bool
      If True, flattens the data points. By default, True.
    normalize: bool
      If True, the data points are normalised
      by dividing each element by 126. By default, True.
    show_logs: bool
      If True, shows the logs while the data is
      being processed. By default, True.
    num_data: int
      Number of data points to be fetched from the 
      dataset. By default, 1000000.
    to_one_hot: bool
      If True, converts data points to one hot 
      notation using 0 for low and 1 for high.
      By default, False.
    cache_result: bool
      If True, caches the pre-processed data to 
      cache files of `npy` format. By default, False.
    use_cache: bool
      If True, uses the previously cached data, if possible.
      By default, False.
    
    Returns
    =======

    np.array

    Note
    ====

    This function was specifically written for MNIST database
    of handwritten digits.
    """
    if use_cache:
        filename = filename + "_cache.npy"
        ret_val = np.load(filename)
        print("Loaded cached data from %s"%(filename))
        return ret_val
    with open(filename, 'rb') as f:
        zero, data_type, dims = struct.unpack('>HBB', f.read(4))
        shape = tuple(struct.unpack('>I', f.read(4))[0] for d in range(dims))
        ret_val = np.fromstring(f.read(), dtype=np.uint8).reshape(shape)
        num_data = min(ret_val.shape[0], num_data)
        normalize_val = []
        if normalize:
            for i in range(num_data):
                mat = [[0 for _i in range(ret_val.shape[1])]
                        for _j in range(ret_val.shape[2])]
                if show_logs:
                    print("Normalized %s-th data"%(i+1))
                for j in range(ret_val.shape[1]):
                    for k in range(ret_val.shape[2]):
                        mat[j][k] = ret_val[i][j][k]/126.
                normalize_val.append(mat)
                del mat
            ret_val = np.asarray(normalize_val, dtype=np.float32)
        del normalize_val
        flatten_val = []
        if flatten:
            for i in range(num_data):
                if show_logs:
                    print("Flattened %s-th data"%(i+1))
                flatten_val.append(ret_val[i].flatten('C'))
            ret_val = np.asarray(flatten_val, dtype=np.float32)
        del flatten_val
        if to_one_hot:
            ret_val = one_hot(ret_val[0:num_data], 10)
        if cache_result:
            np.save(filename+"_cache", ret_val)
            print("Saved cached data to %s_cache"%(filename))
        return ret_val

In [4]:
import tensorflow as tf

class ANNLayer(tf.keras.layers.Layer):
    """
    Base class for ANN layers.

    Parameters
    ==========

    num_inputs: int
        Number of inputs to the layer.
    num_outputs: int
        Number of outputs from the layer.
    activation:
        Activation from tensorflow.keras.activations.
    """

    def __init__(self, num_inputs, num_outputs, activation):
        super(ANNLayer, self).__init__(dtype=tf.float32)
        self.num_outputs = num_outputs
        self.activation = activation
        self.kernel_mu = self.add_variable("kernel_mu",
                                            shape=[num_inputs,
                                                   self.num_outputs],
                                            initializer=tf.keras.initializers.TruncatedNormal(),
                                            dtype=tf.float32)

    def call(self, input):
        """
        Implements the feed forward operation when layer 
        is called on the given input.

        input: tf.Tensor
          The input.
        weights: tf.Tensor
          The weights
        
        Returns
        =======

        tf.Tensor
        """
        prod = self.activation(tf.matmul(input, self.kernel_mu))
        return prod

class BNNLayer(tf.keras.layers.Layer):
    """
    Base class for BNN layers.

    Parameters
    ==========

    num_inputs: int
        Number of inputs to the layer.
    num_outputs: int
        Number of outputs from the layer.
    activation:
        Activation from tensorflow.keras.activations.
    """

    def __init__(self, num_inputs, num_outputs, activation):
        super(BNNLayer, self).__init__(dtype=tf.float32)
        self.num_outputs = num_outputs
        self.activation = activation
        self.kernel_mu = self.add_variable("kernel_mu",
                                            shape=[num_inputs,
                                                   self.num_outputs],
                                            initializer=tf.keras.initializers.TruncatedNormal(),
                                            dtype=tf.float32)
        self.kernel_rho = self.add_variable("kernel_sigma",
                                            shape=[num_inputs,
                                                   self.num_outputs],
                                            initializer=tf.keras.initializers.TruncatedNormal(),
                                            dtype=tf.float32)

    def _reparametrize(self):
        """
        Abstract method which implements the
        reparametrisation technique.
        """
        return None

    def call(self, input, weights):
        """
        Implements the inference operation when layer 
        is called on the given input and weights.

        input: tf.Tensor
          The input.
        weights: tf.Tensor
          The weights
        
        Returns
        =======

        tf.Tensor
        """
        prod = self.activation(tf.matmul(input, weights))
        return prod

class BNNLayer_Normal_Normal(BNNLayer):
    """
    BNN layer which implements reparametrisation
    trick from N(0, 1) to any N(mu, sigma).
    """

    def _reparametrize(self):
        eps_w_shape = self.kernel_mu.shape
        eps_w = tf.random.normal(eps_w_shape, 0, 0.01, dtype=tf.float32)
        term_w = tf.math.multiply(eps_w,
                                  tf.math.log(tf.math.add(
                                  tf.math.exp(tf.clip_by_value(self.kernel_rho, -87.315, 88.722)),
                                  tf.constant(1., shape=eps_w_shape, dtype=tf.float32))))
        return tf.math.add(self.kernel_mu, term_w)

In [5]:
from tensorflow.keras.activations import relu as Relu, elu as Elu, softmax as Softmax
import math

class ANN(tf.keras.Model):
    """
    Artificial Neural Network using point estimates
    of underlying distribution of training data.

    Parameters
    ==========

    input_shape: tuple
      By default, None.
    """

    def __init__(self, input_shape=None):
        super(ANN, self).__init__()
        self.InputLayer = tf.keras.layers.InputLayer(input_shape=(input_shape[1],),
                            batch_size=input_shape[0], dtype=tf.float32)
        self.Dense_1 = ANNLayer(int(input_shape[-1]), 400, activation=Relu)
        self.Dense_2 = ANNLayer(400, 400, activation=Relu)
        self.Output = ANNLayer(400, 10, activation=Softmax)
        self.Layers = [self.Dense_1, self.Dense_2, self.Output]

    def run(self, inputs):
        """
        Performs feed forward operation on the given inputs.

        Parameters
        ==========

        inputs: tf.Tensor

        Returns
        =======

        tf.Tensor
        """
        layer_output = self.InputLayer(inputs)
        i = 0
        for layer in self.Layers:
            layer_output = layer(layer_output)
            i += 1
        return layer_output

    def get_loss(self, inputs, targets, inference=False):
        """
        Computes the total training loss.

        Parameters
        ==========

        inputs: tf.Tensor
            Input to the layers.
        targets: tf.Tensor
            True targets that the model wants to learn from.
        inference: bool
            Used to determine the order of the outputs in the tuple
            being returned.
        
        Returns
        =======

        tuple
          Containing loss and output of neural network for each sample.
        """
        outputs = self.run(inputs)
        loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(targets, outputs))

        if inference:
            return outputs, loss
        return loss, outputs

class BNN_Normal_Normal(tf.keras.Model):
    """
    Neural Network which uses, `BNNLayer_Normal_Normal` layers.

    Parameters
    ==========

    input_shape: tuple
      By default, None.
    """

    def __init__(self, input_shape=None):
        super(BNN_Normal_Normal, self).__init__()
        self.InputLayer = tf.keras.layers.InputLayer(input_shape=(input_shape[1],),
                            batch_size=input_shape[0], dtype=tf.float32)
        self.Dense_1 = BNNLayer_Normal_Normal(int(input_shape[-1]), 400, activation=Relu)
        self.Dense_2 = BNNLayer_Normal_Normal(400, 400, activation=Relu)
        self.Output = BNNLayer_Normal_Normal(400, 10, activation=Softmax)
        self.Layers = [self.Dense_1, self.Dense_2, self.Output]

    def run(self, inputs, *weights):
        """
        Produces neural network's outputs for
        given inputs and weights.

        Parameters
        ==========

        inputs: tf.Tensor/np.array
        weights: tf.Tensor

        Returns
        =======

        tf.Tensor
        """
        layer_output = self.InputLayer(inputs)
        i = 0
        for layer in self.Layers:
            layer_output = layer(layer_output, weights[i])
            i += 1
        return layer_output

    def log_prior(self, weights):
        """
        Computes the natural logarithm of scale
        mixture prior of weights.

        Parameters
        ==========

        weights: tf.Tensor

        Returns
        =======

        tf.Tensor

        Note
        ====

        The two standard deviations of the scale mixture are,
        exp(0) and exp(-6). The weight of both normal distributions
        is 0.5.
        """
        shape = weights.shape
        sigma_1 = tf.constant(math.exp(0), shape=shape, dtype=tf.float32)
        sigma_2 = tf.constant(math.exp(-6), shape=shape, dtype=tf.float32)
        def pdf(w, sigma):
            res1 = tf.math.divide(tf.math.square(w), tf.math.square(sigma)*2)
            return tf.math.divide(tf.math.exp(tf.clip_by_value(-res1, -87.315, 88.722)), sigma*(2*math.pi)**0.5)
        part_1 = tf.clip_by_value(0.25*pdf(weights, sigma_1), tf.float32.min//2, tf.float32.max//2)
        part_2 = tf.clip_by_value(0.75*pdf(weights, sigma_2), tf.float32.min//2, tf.float32.max//2)
        return tf.math.reduce_sum(tf.math.log(part_1 + part_2))

    def log_posterior(self, weights, mu, rho):
        """
        Computes the natural logarithm of Gaussian
        posterior on weights.

        Parameters
        ==========

        weights: tf.Tensor
        mu: tf.Tensor
          The mean of the posterior Gaussian distribution.
        rho: tf.Tensor
          Used to compute the variance of the posterior Gaussian distribution.
        
        Returns
        =======

        tf.Tensor
        """
        def pdf(w, mu, sigma):
            res1 = tf.math.divide(tf.math.square(w - mu), tf.math.square(sigma)*2)
            return tf.math.divide(tf.math.exp(tf.clip_by_value(-res1, -87.315, 88.722)), sigma*(2*math.pi)**0.5)
        sigma = tf.math.log(tf.math.add(
                                  tf.math.exp(tf.clip_by_value(rho, -87.315, 88.722)),
                                  tf.constant(1., shape=rho.shape, dtype=tf.float32)))
        log_q = tf.math.log(tf.clip_by_value(pdf(weights, mu, sigma), tf.float32.min//2, tf.float32.max//2))
        return tf.math.reduce_sum(log_q)

    def get_loss(self, inputs, targets, samples, weight=1., inference=False):
        """
        Computes the total training loss.

        Parameters
        ==========

        inputs: tf.Tensor/np.array
            Input to the layers.
        targets: tf.Tensor/np.array
            True targets that the model wants to learn from.
        samples: int
            The number of samples to be drawn for weights.
        weight: tf.float32 or equivalent.
            Weight given to loss of each batch. By default, 1.
        inference: bool
            Used to determine the order of the outputs in the tuple
            being returned.
        
        Returns
        =======

        tuple
          Containing loss and output of neural network for each sample.
        """
        loss = tf.constant(0., dtype=tf.float32)
        outputs_list = []
        for _ in range(samples):
            weights = []
            pw, qw = tf.constant(0, dtype=tf.float32), tf.constant(0, dtype=tf.float32)
            for layer in self.Layers:
                weights.append(layer._reparametrize())
                kernel_mu, kernel_rho = layer.kernel_mu, layer.kernel_rho
                pw += self.log_prior(weights[-1])
                qw += self.log_posterior(weights[-1], kernel_mu, kernel_rho)

            outputs = self.run(inputs, *weights)
            outputs_list.append(outputs)
            cse = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(targets, outputs))
            if inference:
                loss += cse
            else:
                loss += (qw - pw)*weight + tf.cast(cse, tf.float32)

        if inference:
            return outputs_list, loss/samples
        return loss/samples, outputs_list

In [6]:
from datetime import datetime
from tensorflow.keras.datasets import fashion_mnist
samples = 10
prefix = "./drive/My Drive/bnn/"

def shannon_entropy(preds, samples):
    """
    Computes Shannon entropy.

    Parameters
    ==========

    preds: tf.Tensor
      Predictions made by the neural network.
    samples: int
      Number of times inferencing was done.
    
    Returns
    =======

    tf.float32
    """
    total_entropy = 0
    for i in range(10000):
      mean_prediction = 0
      for j in range(samples):
        mean_prediction += preds[j][i]/samples
      total_entropy += -tf.reduce_mean(tf.math.multiply(mean_prediction, tf.math.log(mean_prediction)))
    return total_entropy/10000

def test_mnist(file_path, input_shape, input_data, output_data, cache_result, use_cache):
    """
    Used for evaluating the trained `ANN` on test split of MNIST images.

    Parameters
    ==========

    file_path: str
      The path to the file from where the saved `BNN_Normal_Normal` is to be loaded.
    input_shape: tuple
    input_data: str
      The path to the file containing input data. Must be binary.
    output_data: str
      The path to the file containing output_data. Must be binary.
    cache_result: bool
      If True, caches the pre-processed data to 
      cache files of `npy` format. By default, True.
    use_cache: bool
      If True, uses the previously cached data, if possible.
      By default, False.
    
    Returns
    =======

    Tuple of the following,

    preds: list containing tf.Tensor
      Predictions made by neural network.
    targets: tf.Tensor
      The target labels.
    total_loss: tf.float32
    uncertainty: tf.float32
    """
    model = BNN_Normal_Normal(input_shape=input_shape)
    model.load_weights(file_path)
    inputs = read_idx(input_data, True, True, True, 10000, False, cache_result, use_cache)
    targets = read_idx(output_data, False, False, True, 10000, True, cache_result, use_cache)
    preds, total_loss = model.get_loss(inputs, targets, samples, 1., True)
    uncertainty = shannon_entropy(preds, samples)
    return preds, targets, total_loss, uncertainty

def test_fmnist(file_path, input_shape):
    """
    Used for evaluating the trained `ANN` on adversarial versions of MNIST images.

    Parameters
    ==========

    file_path: str
      The path to the file from where the saved `BNN_Normal_Normal` is to be loaded.
    input_shape: tuple
    
    Returns
    =======

    Tuple of the following,

    preds: list containing tf.Tensor
      Predictions made by neural network.
    y_test: tf.Tensor
      The target labels.
    total_loss: tf.float32
    uncertainty: tf.float32
    """
    _, (x_test, y_test) = fashion_mnist.load_data()
    inputs = []
    for i in range(x_test.shape[0]):
      inputs.append(x_test[i].flatten())
    x_test = np.asarray(inputs)
    x_test.reshape((x_test.shape[0], 784))
    x_test = np.array(x_test, np.float32)
    x_test = x_test / 126.
    y_test = tf.one_hot(y_test, 10)
    model = BNN_Normal_Normal(input_shape=input_shape)
    model.load_weights(file_path)
    preds, total_loss = model.get_loss(x_test, y_test, samples, 1., True)
    uncertainty = shannon_entropy(preds, samples)
    return preds, y_test, total_loss, uncertainty


In [7]:
preds, targets, _, uncertainty = test_mnist(prefix + "weights/mnist/2020_08_09_08_27_21_674420", (None, 784), 
                                            prefix + "datasets/mnist_test_images", prefix + "datasets/mnist_test_labels",
                                            False, True)
acc = 0
oods = 0
test_points = 10000
for i in range(test_points):
  is_ood = False
  curr_output = tf.math.argmax(preds[0][i])
  for j in range(1, samples):
    if curr_output != tf.math.argmax(preds[j][i]):
      is_ood = True
      break
  if is_ood:
    oods += 1
  else:
      if curr_output == tf.math.argmax(targets[i]):
        acc += 1
print("Accuracy: ", acc/(test_points/100))
print("OOD Inputs: ", oods)
print("Uncertainty: ", uncertainty)

Instructions for updating:
Please use `layer.add_weight` method instead.
Loaded cached data from ./drive/My Drive/bnn/datasets/mnist_test_images_cache.npy
Loaded cached data from ./drive/My Drive/bnn/datasets/mnist_test_labels_cache.npy
Accuracy:  94.01
OOD Inputs:  527
Uncertainty:  tf.Tensor(0.011013616, shape=(), dtype=float32)


In [8]:
preds, targets, _, uncertainty = test_fmnist(prefix + "weights/mnist/2020_08_09_08_27_21_674420", (None, 784))
acc = 0
oods = 0
test_points = 10000
for i in range(test_points):
  is_ood = False
  curr_output = tf.math.argmax(preds[0][i])
  for j in range(1, samples):
    if curr_output != tf.math.argmax(preds[j][i]):
      is_ood = True
      break
  if is_ood:
    oods += 1
  else:
      if curr_output == tf.math.argmax(targets[i]):
        acc += 1
print("Accuracy: ", acc/(test_points/100))
print("OOD Inputs: ", oods)
print("Uncertainty: ", uncertainty)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/train-images-idx3-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-labels-idx1-ubyte.gz
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/t10k-images-idx3-ubyte.gz
Accuracy:  3.43
OOD Inputs:  3108
Uncertainty:  tf.Tensor(0.044517577, shape=(), dtype=float32)
