<h1 align=center><font size = 5>Deep Belief Network </font></h1>

One problem with traditional multilayer perceptrons/artificial neural networks is that backpropagation can often lead to “local minima”. This is when your “error surface” contains multiple grooves and you fall into a groove that is not lowest possible groove as you perform gradient descent.<br>

__Deep belief networks__ solve this problem by using an extra step called __pre-training__. Pre-training is done before backpropagation and can lead to an error rate not far from optimal. This puts us in the “neighborhood” of the final solution. Then we use backpropagation to slowly reduce the error rate from there.

DBNs can be divided in two major parts. The first one are multiple layers of Restricted Boltzmann Machines (RBMs) to pre-train our network. The second one is a feed-forward backpropagation network, that will further refine the results from the RBM stack.
<img src="https://ibm.box.com/shared/static/15y15xs7w72eer0on3gbi8zu6835imru.png" alt="DBN Model"/>

In [17]:
import urllib.request
with urllib.request.urlopen("http://deeplearning.net/tutorial/code/utils.py") as url:
    response = url.read()
target = open('utils.py','w')
target.write(response.decode('utf-8'))
target.close()
    

URLError: <urlopen error [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond>

In [18]:
import math
import tensorflow as tf
import numpy as np
from PIL import Image
from utils import tile_raster_images

<img src="https://ibm.box.com/shared/static/7th91vjz32jhslacdym7ll3udq2zixjb.png" alt="RBM Model" style="width: 400px;"/>

In [23]:
class RBM(object):
    def __init__(self,input_sixe,output_size):
        self.input_size = input_size
        self.output_size = output_size
        self.epochs = 5
        self.learning_rate = 1.0
        self.batchsize = 100
        
        self.w = np.zeros([input_size,output_size],np.float32)
        self.hb = np.zeros([output_size],np.float32)
        self.vb = np.zeros([input_size],np.float32)
    
    def prob_h_given_v(self, visible, w, hb):
        #Sigmoid 
        return tf.nn.sigmoid(tf.matmul(visible, w) + hb)
    
    def prob_v_given_h(self, hidden, w, vb):
        return tf.nn.sigmoid(tf.matmul(hidden, tf.transpose(w)) + vb)
    
    def sample_prob(self, probs):
        return tf.nn.relu(tf.sign(probs - tf.random_uniform(tf.shape(probs))))
    
    def train(self,X):
        _w = tf.placeholder("float", [self.input_size, self.output_size])
        _hb = tf.placeholder("float", [self.output_size])
        _vb = tf.placeholder("float", [self.input_size])
        
        prv_w = np.zeros([self.input_size, self.output_size], np.float32) 
        prv_hb = np.zeros([self.output_size], np.float32)
        prv_vb = np.zeros([self.input_size], np.float32)

        cur_w = np.zeros([self.input_size, self.output_size], np.float32)
        cur_hb = np.zeros([self.output_size], np.float32)
        cur_vb = np.zeros([self.input_size], np.float32)
        
        v0 = tf.placeholder("float", [None, self.input_size])
       
        h0 = self.sample_prob(self.prob_h_given_v(v0, _w, _hb))
        v1 = self.sample_prob(self.prob_v_given_h(h0, _w, _vb))
        h1 = self.prob_h_given_v(v1, _w, _hb)
        
        positive_grad = tf.matmul(tf.transpose(v0), h0)
        negative_grad = tf.matmul(tf.transpose(v1), h1)
        
        update_w = _w + self.learning_rate *(positive_grad - negative_grad) / tf.to_float(tf.shape(v0)[0])
        update_vb = _vb +  self.learning_rate * tf.reduce_mean(v0 - v1, 0)
        update_hb = _hb +  self.learning_rate * tf.reduce_mean(h0 - h1, 0)
        
        err = tf.reduce_mean(tf.square(v0 - v1))
        
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            for epoch in range(self.epochs):
                #For each step/batch
                for start, end in zip(range(0, len(X), self.batchsize),range(self.batchsize,len(X), self.batchsize)):
                    batch = X[start:end]
                    #Update the rates
                    cur_w = sess.run(update_w, feed_dict={v0: batch, _w: prv_w, _hb: prv_hb, _vb: prv_vb})
                    cur_hb = sess.run(update_hb, feed_dict={v0: batch, _w: prv_w, _hb: prv_hb, _vb: prv_vb})
                    cur_vb = sess.run(update_vb, feed_dict={v0: batch, _w: prv_w, _hb: prv_hb, _vb: prv_vb})
                    prv_w = cur_w
                    prv_hb = cur_hb
                    prv_vb = cur_vb
                error = sess.run(err, feed_dict={v0: X, _w: cur_w, _vb: cur_vb, _hb: cur_hb})
                print ('Epoch: %d' % epoch,'reconstruction error: %f' % error)
            self.w = prv_w
            self.hb = prv_hb
            self.vb = prv_vb
    
    def rbm_outpt(self, X):
        input_X = tf.constant(X)
        _w = tf.constant(self.w)
        _hb = tf.constant(self.hb)
        out = tf.nn.sigmoid(tf.matmul(input_X, _w) + _hb)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            return sess.run(out)

<h3>MNIST Dataset of Handwritten digits

In [24]:
from tensorflow.examples.tutorials.mnist import input_data

mnist = input_data.read_data_sets("MNIST_data", one_hot=True)
trX, trY, teX, teY = mnist.train.images, mnist.train.labels, mnist.test.images, mnist.test.labels

Extracting MNIST_data\train-images-idx3-ubyte.gz
Extracting MNIST_data\train-labels-idx1-ubyte.gz
Extracting MNIST_data\t10k-images-idx3-ubyte.gz
Extracting MNIST_data\t10k-labels-idx1-ubyte.gz


<h3>Creating DBN
    

In [25]:
RBM_hidden_sizes = [500, 200 , 50 ] 
inpX = trX
rbm_list = []
input_size = inpX.shape[1]
for i, size in enumerate(RBM_hidden_sizes):
    print ('RBM: ',i,' ',input_size,'->', size)
    rbm_list.append(RBM(input_size, size))
    input_size = size

RBM:  0   784 -> 500
RBM:  1   500 -> 200
RBM:  2   200 -> 50


<h3>RBM Train</h3>

In [26]:
for rbm in rbm_list:
    print ('New RBM:')
    #Train a new one
    rbm.train(inpX) 
    #Return the output layer
    inpX = rbm.rbm_outpt(inpX)

New RBM:
Epoch: 0 reconstruction error: 0.061585
Epoch: 1 reconstruction error: 0.052355
Epoch: 2 reconstruction error: 0.048981
Epoch: 3 reconstruction error: 0.046851
Epoch: 4 reconstruction error: 0.045858
New RBM:
Epoch: 0 reconstruction error: 0.032947
Epoch: 1 reconstruction error: 0.028816
Epoch: 2 reconstruction error: 0.027424
Epoch: 3 reconstruction error: 0.026192
Epoch: 4 reconstruction error: 0.025752
New RBM:
Epoch: 0 reconstruction error: 0.057790
Epoch: 1 reconstruction error: 0.054532
Epoch: 2 reconstruction error: 0.053733
Epoch: 3 reconstruction error: 0.053067
Epoch: 4 reconstruction error: 0.052332


In [27]:
class NN(object):
    def __init__(self,sizes,X,Y):
        self._sizes = sizes
        self._X = X
        self._Y = Y
        self.w_list = []
        self.b_list = []
        self._learning_rate =  1.0
        self._momentum = 0.0
        self._epoches = 10
        self._batchsize = 100
        input_size = X.shape[1]
        
        for size in self._sizes + [Y.shape[1]]:
            max_range = 4 * math.sqrt(6. / (input_size + size))
            self.w_list.append(
                np.random.uniform( -max_range, max_range, [input_size, size]).astype(np.float32))
            self.b_list.append(np.zeros([size], np.float32))
            input_size = size
            
    def load_from_rbms(self, dbn_sizes,rbm_list): 
        assert len(dbn_sizes) == len(self._sizes)
        
        for i in range(len(self._sizes)):
            #Check if for each RBN the expected sizes are correct
            assert dbn_sizes[i] == self._sizes[i]
            
        #If everything is correct, bring over the weights and biases
        for i in range(len(self._sizes)):
            self.w_list[i] = rbm_list[i].w
            self.b_list[i] = rbm_list[i].hb
            
    
    def train(self):
        _a = [None] * (len(self._sizes) + 2)
        _w = [None] * (len(self._sizes) + 1)
        _b = [None] * (len(self._sizes) + 1)
        _a[0] = tf.placeholder("float", [None, self._X.shape[1]])
        y = tf.placeholder("float", [None, self._Y.shape[1]])
        
        for i in range(len(self._sizes) + 1):
            _w[i] = tf.Variable(self.w_list[i])
            _b[i] = tf.Variable(self.b_list[i])
        for i in range(1, len(self._sizes) + 2):
            _a[i] = tf.nn.sigmoid(tf.matmul(_a[i - 1], _w[i - 1]) + _b[i - 1])
        
        cost = tf.reduce_mean(tf.square(_a[-1] - y))
        train_op = tf.train.MomentumOptimizer(self._learning_rate, self._momentum).minimize(cost)
         
        predict_op = tf.argmax(_a[-1], 1)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            for i in range(self._epoches):
                for start, end in zip(range(0, len(self._X), self._batchsize), range(self._batchsize, len(self._X), self._batchsize)):
                        sess.run(train_op, feed_dict={ _a[0]: self._X[start:end], y: self._Y[start:end]})
                
                for j in range(len(self._sizes) + 1):
                    #Retrieve weights and biases
                    self.w_list[j] = sess.run(_w[j])
                    self.b_list[j] = sess.run(_b[j])
                    
                print ("Accuracy rating for epoch " + str(i) + ": " + str(np.mean(np.argmax(self._Y, axis=1) == sess.run(predict_op, feed_dict={_a[0]: self._X, y: self._Y}))))
                
                

In [28]:
nNet = NN(RBM_hidden_sizes, trX, trY)
nNet.load_from_rbms(RBM_hidden_sizes,rbm_list)
nNet.train()

Accuracy rating for epoch 0: 0.4789272727272727
Accuracy rating for epoch 1: 0.6366363636363637
Accuracy rating for epoch 2: 0.7413272727272727
Accuracy rating for epoch 3: 0.8189272727272727
Accuracy rating for epoch 4: 0.8644545454545455
Accuracy rating for epoch 5: 0.8844545454545455
Accuracy rating for epoch 6: 0.8958909090909091
Accuracy rating for epoch 7: 0.9039818181818182
Accuracy rating for epoch 8: 0.9097090909090909
Accuracy rating for epoch 9: 0.9143454545454546
