# Step size policies

In this notebook I, Philip Blagoveschensky, tried out different step size policies of batch gradient descent. A lot of code is copied from https://github.com/stevenliuyi/information-bottleneck.

## Data generation

First, we will generate a very simple dataset for the demonstration. The inputs are vectors of 10 binaries, and the outputs are just single binaries. The inputs could be represented by integers from 0 to 1023 ($=2^{10}-1$). The 1024 possible inputs are divided into 16 groups (each group has 64 numbers), and each integer input $n\in[0,1023]$ belongs to group $i$ if $x\equiv i \pmod{16}$, where $i \in [0,15]$. Each group $i$ is then associated with a random binary number (output).

In [1]:
import numpy as np
from random import randint, seed

In [2]:
n_train_samples = 50000 # number of train samples
n_test_samples = 10000 # number of test samples
batch_size = 100

groups = np.append(np.zeros(8),np.ones(8)) # 16 groups
np.random.seed(1234)
np.random.shuffle(groups)
p_noise_y = 0.1
p_noise_x = 0.0

def bin_noise(x, p):
    return x if np.random.rand() > 2 * p else np.random.randint(0, 1)

# generate samples
seed(1234)
def generate_samples(n_samples):
    x_data = np.zeros((n_samples, 10)) # inputs
    x_int = np.zeros(n_samples) # integers representing the inputs
    y_data = np.zeros((n_samples, 2)) # outputs
    
    for i in range(n_samples):
        random_int = randint(0, 1023)
        x_data[i,:] = [bin_noise(int(b), p_noise_x) for b in list("{0:b}".format(random_int).zfill(10))]
        x_int[i] = np.sum([x_data[i, k] * (2 ** (9 - k)) for k in range(10)])
        y_data[i,0] = bin_noise(groups[random_int % 16], p_noise_y)
        y_data[i,1] = 1 - y_data[i,0]
        
    return x_data, y_data, x_int

x_train, y_train, x_train_int = generate_samples(n_train_samples) # training dataset
x_test, y_test, _ = generate_samples(n_test_samples) # testing dataset

For our dataset, the theoritical mutual information between $X$ and $Y$ would be
\begin{align}
I(X;Y) & = \sum_{x\in X, y\in Y}P(x,y)\log\Big(\frac{P(x,y)}{P(x)P(y)}\Big) \\
& = \sum_{x\in X}\Big[P(x,y=0)\log\Big(\frac{P(x,y=0)}{P(x)P(y=0)}\Big) + P(x,y=1)\log\Big(\frac{P(x,y=1)}{P(x)P(y=1)}\Big)\Big] \\
& = 1024 \Big[ \frac{1}{1024}\log\Big(\frac{1/1024}{0.5/1024}\Big) + 0\Big] \\
& = 0.693.
\end{align}
Note that terms with $P(x,y)=0$ are set to $0$ for entropy calculation.

## Neural network

In [3]:
import tensorflow as tf
from tensorflow.keras.optimizers import SGD

def fully_conn(tensor, n_outputs, name):
    size = tensor.get_shape().as_list()[1]
    
    # weight and bias tensor
    weight = tf.Variable(tf.truncated_normal([size, n_outputs]))
    bias = tf.Variable(tf.zeros([n_outputs]))
    
    # fully-connected layer
    tensor = tf.matmul(tensor, weight)
    bias = tf.nn.bias_add(tensor, bias)
    
    # hyperbolic tangent activation
    tensor = tf.tanh(tensor, name=name)
    
    return tensor

In [4]:
def output(tensor, n_outputs):
    size = tensor.get_shape().as_list()[1]
    
    # weight and bias tensor
    weight = tf.Variable(tf.truncated_normal([size, n_outputs]))
    bias = tf.Variable(tf.zeros([n_outputs]))
    
    # fully-connected layer
    tensor = tf.matmul(tensor, weight)
    bias = tf.nn.bias_add(tensor, bias)
    
    return tensor

In [5]:
def mlp(x, n_neurons): # x is the input layer  
    # mlp stands for multilayer perceptron
    # hidden layers
    hidden = x
    for n in range(len(n_neurons)):
        hidden = fully_conn(hidden, n_neurons[n], 'hidden%s' % (n+1))
    
    # output layer
    y = output(hidden, 2)
    
    return y

In [6]:
def build_network(hidden_layer_neurons):
    global x, y, learning_rate, logits, cost, optimizer, accuracy
    
    tf.reset_default_graph()
    x = tf.placeholder(tf.float32, (None, 10), name='x')
    y = tf.placeholder(tf.float32, (None, 2), name='y')
    learning_rate = tf.placeholder(tf.float32, shape=[], name="lr")

    tf.set_random_seed(12345)
    logits = mlp(x, hidden_layer_neurons)
    logits = tf.identity(logits, name='logits')

    # cross-entropy coss function
    cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=logits, labels=y), name='cost')
    
    # optimizer
    optimizer = tf.train.GradientDescentOptimizer(learning_rate).minimize(cost)
    
    # accuracy
    correct_predictions = tf.equal(tf.argmax(logits, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32), name='accuracy')
    
def print_out_summary(sess, epoch, lr=None):
    acc, loss = sess.run([accuracy, cost], feed_dict={x: x_test, y: y_test})
    print(
        'Epoch {:>4}:  Testing accuracy {:>.4f} - Testing loss {:>.4f} - lr {:>.4f}'
        .format(epoch, acc, loss, lr)
    )

## Mutual information

Now we are ready to explore the information bottleneck theory for our network. To estimate the mutual information between all the hidden layers and intput/output layers, we could binned the output activations as stated in the paper  (here we choose 30 bins, the same as in the paper), so that the hidden layer random variables $T_i$ (each $i$ corresponds to one hidden layer) would be discrete. Then, we will be able to estimate the joint distribution $P(X,T_i)$ and $P(T_i,Y)$, and use them to calculate the encoder mutual information (between input $X$ and hidden layer $T_i$)
\begin{equation}
    I(X;T_i) = \sum_{x\in X, t\in T_i}P(x,t)\log\Big(\frac{P(x,t)}{P(x)P(t)}\Big)
\end{equation}

and decoder mutual information (between hidden layer $T_i$ and desired output $Y$, note that it is not the model output $\widehat{Y}$)
\begin{equation}
    I(T_i;Y) = \sum_{t\in T_i, y\in Y}P(t,y)\log\Big(\frac{P(t,y)}{P(t)P(y)}\Big).
\end{equation}

In [7]:
from collections import Counter

def calc_mutual_information(hidden):
    #print("hidden.shape: {}".format(hidden.shape))
    n_neurons = hidden.shape[1]
  
    # discretization 
    n_bins = 30
    bins = np.linspace(-1, 1, n_bins+1)
    indices = np.digitize(hidden, bins)
    
    # initialize pdfs
    pdf_x = Counter(); pdf_y = Counter(); pdf_t = Counter(); pdf_xt = Counter(); pdf_yt = Counter()

    for i in range(n_train_samples):
        pdf_x[x_train_int[i]] += 1/float(n_train_samples)
        pdf_y[y_train[i,0]] += 1/float(n_train_samples)      
        pdf_xt[(x_train_int[i],)+tuple(indices[i,:])] += 1/float(n_train_samples)
        pdf_yt[(y_train[i,0],)+tuple(indices[i,:])] += 1/float(n_train_samples)
        pdf_t[tuple(indices[i,:])] += 1/float(n_train_samples)
    
    # calcuate encoder mutual information I(X;T)
    mi_xt = 0
    for i in pdf_xt:
        # P(x,t), P(x) and P(t)
        p_xt = pdf_xt[i]; p_x = pdf_x[i[0]]; p_t = pdf_t[i[1:]]
        # I(X;T)
        mi_xt += p_xt * np.log(p_xt / p_x / p_t)
 
    # calculate decoder mutual information I(T;Y)
    mi_ty = 0
    for i in pdf_yt:
        # P(t,y), P(t) and P(y)
        p_yt = pdf_yt[i]; p_t = pdf_t[i[1:]]; p_y = pdf_y[i[0]]
        # I(X;T)
        mi_ty += p_yt * np.log(p_yt / p_t / p_y)
            
    return mi_xt, mi_ty

# get mutual information for all hidden layers
def get_mutual_information(hiddens):
    mi_xt_list = []; mi_ty_list = []
    for hidden in hiddens:
        mi_xt, mi_ty = calc_mutual_information(hidden)
        mi_xt_list.append(mi_xt)
        mi_ty_list.append(mi_ty)
    return mi_xt_list, mi_ty_list

We are now able to estimate the mutual information while training the network. We'll save the mutual information for later use.

In [8]:
def get_hidden_layers(names):
    hidden_layers = []
    for name in names:
        hidden_layers.append(tf.get_default_graph().get_tensor_by_name("%s:0" % name))
    return hidden_layers

# train the neural network and obtain mutual information
def train_with_mi(n_epochs, n_hidden_layers, lr_callable):
    mi_xt_all = []; mi_ty_all = []; epochs = []
    
    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer()) # initialization
    
        hidden_layer_names = ['hidden%s' % i for i in range(1,n_hidden_layers+1)]
        for epoch in range(n_epochs):
            # run one step of optimizer
            batch_indices = np.random.randint(low=0, high=n_train_samples, size=batch_size)
            sess.run(optimizer, feed_dict={
                x: x_train[batch_indices],
                y: y_train[batch_indices],
                learning_rate: lr_callable(epoch)
            })
#                x: [x_train[epoch]],
#                y: [y_train[epoch]]})

            if epoch % print_summary_every_n == 0:
                print_out_summary(sess, epoch, lr_callable(epoch))
                
            if epoch % calc_mi_every_n_epochs == 0:
                #print("calculating mi for epoch {}".format(epoch))
                _, hidden_layers = sess.run(
                    [cost, get_hidden_layers(hidden_layer_names)],
                    feed_dict={x: x_train, y: y_train}
                )
                mi_xt, mi_ty = get_mutual_information(hidden_layers)
                mi_xt_all.append(mi_xt)
                mi_ty_all.append(mi_ty)
                epochs.append(epoch)
    
    return np.array(mi_xt_all), np.array(mi_ty_all), np.array(epochs)

## Visualization

In [9]:
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML

In [10]:
cmap = plt.cm.get_cmap('cool')

def animate(i):
    i = i
    title.set_text('Epoch %s' % str(epochs[i]).zfill(4))
    ax.plot(mi_xt_all[i,:], mi_ty_all[i,:], 'k-',alpha=0.2)
    if i > 0:
        for j in range(n_hidden_layers):
            ax.plot(mi_xt_all[(i-1):(i+1),j],mi_ty_all[(i-1):(i+1),j],'.-',c=cmap(j*.2))
    return

In [11]:
np.random.seed(600)
seed(600)
n_epochs = 30001
calc_mi_every_n_epochs = n_epochs // 40
print_summary_every_n = n_epochs // 10
n_hidden_layers = 5
batch_size = 100
build_network([8,7,6,5,3]) # 5 hidden layers

# you can use one of learning rate (step size policy) definitions below
#lr_callable = lambda epoch: 3 * 0.5**(epoch // 4000)
#lr_callable = lambda epoch: 0.03
#lr_callable = lambda epoch: 0.1 * 0.5**(epoch/4000)
lr_callable = lambda epoch: 0.1 / (1 + epoch/1000)

mi_xt_all, mi_ty_all, epochs = train_with_mi(
    n_epochs, n_hidden_layers,
    lr_callable=lr_callable
)

Epoch    0:  Testing accuracy 0.5659 - Testing loss 0.7423 - lr 0.1000
Epoch 3000:  Testing accuracy 0.6940 - Testing loss 0.6043 - lr 0.0250
Epoch 6000:  Testing accuracy 0.7191 - Testing loss 0.5815 - lr 0.0143
Epoch 9000:  Testing accuracy 0.7240 - Testing loss 0.5637 - lr 0.0100
Epoch 12000:  Testing accuracy 0.7317 - Testing loss 0.5406 - lr 0.0077
Epoch 15000:  Testing accuracy 0.7453 - Testing loss 0.5215 - lr 0.0063
Epoch 18000:  Testing accuracy 0.7550 - Testing loss 0.5014 - lr 0.0053
Epoch 21000:  Testing accuracy 0.7867 - Testing loss 0.4772 - lr 0.0045
Epoch 24000:  Testing accuracy 0.7994 - Testing loss 0.4507 - lr 0.0040
Epoch 27000:  Testing accuracy 0.8036 - Testing loss 0.4358 - lr 0.0036
Epoch 30000:  Testing accuracy 0.8085 - Testing loss 0.4253 - lr 0.0032


In [12]:
fig, ax = plt.subplots(figsize=(8,8))
ax.set_xlim((0.5,7))
ax.set_ylim((0.1,0.43))
ax.set_xlabel('I(X;T)')
ax.set_ylabel('I(T;Y)')
title = ax.set_title('')
plt.close(fig)

#_ = list(map(animate, range(mi_ty_all.shape[0])))
anim = animation.FuncAnimation(fig,
                               animate,
                               init_func=None,
                               frames=len(epochs),
                               interval=100)
HTML(anim.to_html5_video())