In [None]:
from __future__ import print_function

#import os ; os.environ['KERAS_BACKEND']='cntk'

import keras
import keras.backend as K

import numpy as np
import scipy.optimize
import scipy.misc
from collections import OrderedDict, namedtuple

transpose = K.transpose
if K._BACKEND == 'tensorflow':
    import tensorflow as tf
    def tensor_eye(size):
        return tf.eye(size)
elif K._BACKEND == 'theano':
    import theano.tensor as T
    def tensor_eye(size):
        return T.eye(size)
elif K._BACKEND == 'cntk':
    raise Exception('Unsupported')
    import cntk as C
    def transpose(x):
        return C.swapaxes(x, -1, -2)
else:
    raise Exception('Unknown backend')

    
def get_mnist():
    nb_classes = 10
    (X_train, y_train), (X_test, y_test) = keras.datasets.mnist.load_data()
    X_train = np.reshape(X_train, [X_train.shape[0], -1]).astype('float32') / 255.
    X_test  = np.reshape(X_test , [X_test.shape[0] , -1]).astype('float32') / 255.
    X_train = X_train * 2.0 - 1.0
    X_test  = X_test  * 2.0 - 1.0

    Y_train = keras.utils.np_utils.to_categorical(y_train, nb_classes).astype('float32')
    Y_test  = keras.utils.np_utils.to_categorical(y_test, nb_classes).astype('float32')

    Dataset = namedtuple('Dataset',['X','Y','y','nb_classes'])
    trn = Dataset(X_train, Y_train, y_train, nb_classes)
    tst = Dataset(X_test , Y_test, y_test, nb_classes)

    del X_train, X_test, Y_train, Y_test, y_train, y_test
    
    return trn, tst

trn, tst = get_mnist()
PLOT_LAYERS    = [2,3]
NUM_EPOCHS     = 10000
NUMBER_OF_BINS = 3

SGD_BATCHSIZE = 128

ACTIVATION = 'relu'
ACTIVATION = 'tanh'

In [None]:
# Simplified MI computation code from https://github.com/ravidziv/IDNNs

def get_unique_probs(x):
    uniqueids = np.ascontiguousarray(x).view(np.dtype((np.void, x.dtype.itemsize * x.shape[1])))
    _, unique_inverse, unique_counts = np.unique(uniqueids, return_index=False, return_inverse=True, return_counts=True)
    return np.asarray(unique_counts / float(sum(unique_counts))), unique_inverse

def bin_calc_information(inputdata, layerdata, num_of_bins):
    p_xs, unique_inverse_x = get_unique_probs(inputdata)
    
    bins = np.linspace(-1, 1, num_of_bins, dtype='float32') 
    digitized = bins[np.digitize(np.squeeze(layerdata.reshape(1, -1)), bins) - 1].reshape(len(layerdata), -1)
    p_ts, _ = get_unique_probs( digitized )
    
    H2 = -np.sum(p_ts * np.log(p_ts))
    H2X = 0.
    for xval in range(len(p_xs)):
        p_t_given_x, _ = get_unique_probs(digitized[unique_inverse_x == xval, :])
        H2X += - p_xs[xval] * np.sum(p_t_given_x * np.log(p_t_given_x))
    return H2 - H2X


In [None]:

def Kget_dists(X):
    """Keras code to compute the pairwise distance matrix for a set of
    vectors specifie by the matrix X.
    """
    x2 = K.expand_dims(K.sum(K.square(X), axis=1), 1)
    dists = x2 + transpose(x2) - 2*K.dot(X, transpose(X))
    return dists

def get_shape(x):
    dims = K.cast( K.shape(x)[1], K.floatx() ) 
    N    = K.cast( K.shape(x)[0], K.floatx() )
    return dims, N

def kde_entropy_from_dists_loo(dists, x, var):
    dims, N = get_shape(x)
    dists2 = dists + tensor_eye(K.cast(N, 'int32')) * 10e20
    dists2 = dists2 / (2*var)
    normconst = (dims/2.0)*K.log(2*np.pi*var)
    lprobs  = K.logsumexp(-dists2, axis=1) - K.log(N-1) - normconst
    h = -K.mean(lprobs)
    return h

def entropy_estimator(x, var):
    dims, N = get_shape(x)
    dists = Kget_dists(x)
    dists2 = dists / (2*var)
    normconst = (dims/2.0)*K.log(2*np.pi*var)
    lprobs = K.logsumexp(-dists2, axis=1) - K.log(N) - normconst
    h = -K.mean(lprobs)
    return dims/2 + h

def wrapKfunc(f, data):
    def callf(logvar):
        return f([data, 1, np.exp(logvar)])[0].flat[0]
    return callf

class Reporter(keras.callbacks.Callback):
    def __init__(self, on_every=1, *kargs, **kwargs):
        super(Reporter, self).__init__(*kargs, **kwargs)
        self.on_every = on_every
        
    def on_train_begin(self, logs={}):
        self.layerfuncs = []
        var = K.placeholder(ndim=0)
        inputs = self.model.inputs + [ K.learning_phase(),] + [var,]
        for lndx in PLOT_LAYERS:
            l = self.model.layers[lndx]
            cfuncs = {}
            cfuncs['loo']    = K.function(inputs, [kde_entropy_from_dists_loo(Kget_dists(l.output),l.output,var)])
            cfuncs['upper']  = K.function(inputs, [entropy_estimator(l.output,var)])
            #cfuncs['lower']  = K.function(inputs, [entropy_estimator(l.output,var, 0.25)])
            cfuncs['lower'] = K.function(inputs, [entropy_estimator(l.output,4*var)+np.log(0.25)*l.output.shape[1]/2.])
            #cfuncs['lower']  = K.function(inputs, [entropy_estimator(l.output,var, 0.25)])
            cfuncs['output'] = K.function(self.model.inputs + [ K.learning_phase(),], [l.output])
            self.layerfuncs.append(cfuncs)
        self.saved_logs = {}
            
    def on_epoch_end(self, epoch, logs={}):
        if not (epoch % self.on_every == 0):
            return
            
        l = OrderedDict()
        for lndx, cfuncs in enumerate(self.layerfuncs):
            trndata = trn.X[::20]
            tstdata = tst.X[::10]
            r = scipy.optimize.minimize_scalar(wrapKfunc(cfuncs['loo'], trndata), method='brent')
            l['trn_layer_%d_h_loo'%lndx] = r.fun
            l['trn_layer_%d_logvar'%lndx] = r.x
            l['trn_layer_%d_h_upper'%lndx] = wrapKfunc(cfuncs['upper'], trndata)(r.x)
            l['trn_layer_%d_h_lower'%lndx] = wrapKfunc(cfuncs['lower'], trndata)(r.x)

            r = scipy.optimize.minimize_scalar(wrapKfunc(cfuncs['loo'], tstdata), method='brent')
            l['tst_layer_%d_h_loo' %lndx] = r.fun
            l['tst_layer_%d_logvar'%lndx] = r.x
            l['tst_layer_%d_h_upper'%lndx] = wrapKfunc(cfuncs['upper'], tstdata)(r.x)
            l['tst_layer_%d_h_lower'%lndx] = wrapKfunc(cfuncs['lower'], tstdata)(r.x)
            
            trndata = trn.X[::10]
            tstdata = tst.X
            trnlayeroutput= cfuncs['output']([trndata, 0])[0]
            tstlayeroutput= cfuncs['output']([tstdata, 0])[0]
            l['trn_layer_%d_h_bin'%lndx] = bin_calc_information(trndata, trnlayeroutput,num_of_bins=NUMBER_OF_BINS)
            l['tst_layer_%d_h_bin'%lndx] = bin_calc_information(tstdata, tstlayeroutput,num_of_bins=NUMBER_OF_BINS)
            l['trn_layer_%d_h_binstd'%lndx] = bin_calc_information(trndata,0.5* scipy.stats.zscore(trnlayeroutput),num_of_bins=NUMBER_OF_BINS)
            l['tst_layer_%d_h_binstd'%lndx] = bin_calc_information(tstdata,0.5* scipy.stats.zscore(tstlayeroutput),num_of_bins=NUMBER_OF_BINS)

            
        for k,v in l.items():
            print(k,"=",v)
            logs[k] = v
            
        self.saved_logs[epoch] = l.copy()
        
            
input_layer  = keras.layers.Input((trn.X.shape[1],))
hidden_output = keras.layers.Dense(1024, activation=ACTIVATION)(input_layer)
hidden_output = keras.layers.Dense(20  , activation=ACTIVATION)(hidden_output)
hidden_output = keras.layers.Dense(20  , activation=ACTIVATION)(hidden_output)

outputs  = keras.layers.Dense(trn.nb_classes, activation='softmax')(hidden_output)
model = keras.models.Model(inputs=input_layer, outputs=outputs)
optimizer = keras.optimizers.SGD(lr=0.001)
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

reporter   = Reporter(on_every=20)

In [None]:
r = model.fit(x=trn.X, y=trn.Y, verbose=2, batch_size=SGD_BATCHSIZE, epochs=NUM_EPOCHS, 
              validation_data=(tst.X, tst.Y), callbacks=[reporter,])

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
plt.figure(figsize=(15,5*len(PLOT_LAYERS)))
for rowndx, lndx in enumerate(PLOT_LAYERS):
    for colndx, t in enumerate(['trn','tst']):
        plt.subplot(len(PLOT_LAYERS),2,rowndx*len(PLOT_LAYERS)+colndx+1)
        epochs = sorted(reporter.saved_logs.keys())

        plt.plot(epochs, [reporter.saved_logs[epoch][t+'_layer_%d_h_upper' % rowndx] for epoch in epochs], 'r:', label="$H_{KL}$")
        plt.plot(epochs, [reporter.saved_logs[epoch][t+'_layer_%d_h_lower' % rowndx] for epoch in epochs], 'g:', label="$H_{BD}$")
        
        plt.plot(epochs, [reporter.saved_logs[epoch][t+'_layer_%d_h_loo'   % rowndx] for epoch in epochs], 'r', label="$H_{loo}$")
        plt.plot(epochs, [reporter.saved_logs[epoch][t+'_layer_%d_h_bin'   % rowndx] for epoch in epochs], 'b', label="$H_{bin}$")
        plt.plot(epochs, [reporter.saved_logs[epoch][t+'_layer_%d_h_binstd'% rowndx] for epoch in epochs], 'k', label="$H_{binstd}$")

        plt.title('%s layer %d'%(t,lndx))
        plt.xlabel('Epochs')
        plt.ylabel('Layer Activity Entropy')
        
        if rowndx ==1 and t == 'tst':
            plt.legend(loc='lower right')
    
plt.savefig('run_output_tanh.pdf')

In [None]:
# Original MI computation code from https://github.com/ravidziv/IDNNs

def extract_probs(label, x):
    """calculate the probabilities of the given data and labels p(x), p(y) and (y|x)"""
    pys = np.sum(label, axis=0) / float(label.shape[0])
    b = np.ascontiguousarray(x).view(np.dtype((np.void, x.dtype.itemsize * x.shape[1])))
    unique_array, unique_indices, unique_inverse_x, unique_counts = \
        np.unique(b, return_index=True, return_inverse=True, return_counts=True)
    unique_a = x[unique_indices]
    b1 = np.ascontiguousarray(unique_a).view(np.dtype((np.void, unique_a.dtype.itemsize * unique_a.shape[1])))
    pxs = unique_counts / float(np.sum(unique_counts))
    p_y_given_x = []
    for i in range(0, len(unique_array)):
        indexs = unique_inverse_x == i
        py_x_current = np.mean(label[indexs, :], axis=0)
        p_y_given_x.append(py_x_current)
    p_y_given_x = np.array(p_y_given_x).T
    b_y = np.ascontiguousarray(label).view(np.dtype((np.void, label.dtype.itemsize * label.shape[1])))
    unique_array_y, unique_indices_y, unique_inverse_y, unique_counts_y = \
        np.unique(b_y, return_index=True, return_inverse=True, return_counts=True)
    pys1 = unique_counts_y / float(np.sum(unique_counts_y))
    return pys, pys1, p_y_given_x, b, unique_a, unique_inverse_x, unique_inverse_y, pxs

def calc_information_sampling(data, bins, pys1, pxs, label, b, p_YgX, unique_inverse_x,  unique_inverse_y):
    bins = bins.astype(np.float32)
    num_of_bins = bins.shape[0]
    # bins = stats.mstats.mquantiles(np.squeeze(data.reshape(1, -1)), np.linspace(0,1, num=num_of_bins))
    # hist, bin_edges = np.histogram(np.squeeze(data.reshape(1, -1)), normed=True)
    digitized = bins[np.digitize(np.squeeze(data.reshape(1, -1)), bins) - 1].reshape(len(data), -1)
    b2 = np.ascontiguousarray(digitized).view(
        np.dtype((np.void, digitized.dtype.itemsize * digitized.shape[1])))
    unique_array, unique_inverse_t, unique_counts = \
        np.unique(b2, return_index=False, return_inverse=True, return_counts=True)
    p_ts = unique_counts / float(sum(unique_counts))
    PXs, PYs = np.asarray(pxs).T, np.asarray(pys1).T
    local_IXT, local_ITY = calc_information_from_mat(PXs, PYs, p_ts, digitized, unique_inverse_x, unique_inverse_y,
                                                     unique_array)
    return local_IXT, local_ITY

def calc_condtion_entropy(px, t_data, unique_inverse_x):
    # Condition entropy of t given x
    H2X_array = np.array(list(
        calc_entropy_for_specipic_t(t_data[unique_inverse_x == i, :], px[i])
                                   for i in range(px.shape[0])))
    H2X = np.sum(H2X_array)
    return H2X


def calc_information_from_mat(px, py, ps2, data, unique_inverse_x, unique_inverse_y, unique_array):
    """Calculate the MI based on binning of the data"""
    H2 = -np.sum(ps2 * np.log(ps2))
    H2X = calc_condtion_entropy(px, data, unique_inverse_x)
    H2Y = calc_condtion_entropy(py.T, data, unique_inverse_y)
    IY = H2 - H2Y
    IX = H2 - H2X
    return IX, IY

def calc_entropy_for_specipic_t(current_ts, px_i):
    """Calc entropy for specipic t"""
    b2 = np.ascontiguousarray(current_ts).view(
        np.dtype((np.void, current_ts.dtype.itemsize * current_ts.shape[1])))
    unique_array, unique_inverse_t, unique_counts = \
        np.unique(b2, return_index=False, return_inverse=True, return_counts=True)
    p_current_ts = unique_counts / float(sum(unique_counts))
    p_current_ts = np.asarray(p_current_ts, dtype=np.float64).T
    H2X = px_i * (-np.sum(p_current_ts * np.log(p_current_ts)))
    return H2X

if False:
    inputs = model.inputs + [ K.learning_phase(),] 
    num_of_bins = 10
    bins = np.linspace(-1, 1, num_of_bins)

    for lndx, l in enumerate(model.layers[2:-1]):
        inputdata = trn.X[::20]
        label = trn.Y[::20]
        data = K.function(inputs, [l.output])([inputdata, 0])[0]
        pys, pys1, p_YgX, b, unique_a, unique_inverse_x, unique_inverse_y, pxs = extract_probs(label, inputdata)

        local_IXT, local_ITY = calc_information_sampling(data, bins, pys1, pxs, label, b, p_YgX, unique_inverse_x,
                                  unique_inverse_y)

        print(local_IXT, local_ITY)
    