In [None]:
import tensorflow as tf #Deep learning Library
import numpy as np #Matrix Algebra Library

In [None]:
#Getting the MNIST data provided by Tensorflow
from tensorflow.examples.tutorials.mnist import input_data

#Loading in the mnist data
mnist = input_data.read_data_sets("MNIST_data/", one_hot=False)
trX, trY, teX, teY = mnist.train.images, mnist.train.labels, mnist.test.images,\
    mnist.test.labels

In [None]:
class RBM_DAN(object):
    def __init__(self, input_size, output_size, learning_rate, batch_size):
        self.input_size = input_size #Size of the input layer
        self.output_size = output_size #Size of the hidden layer
        self.epochs = 5 #How many times we will update the weights 
        self.learning_rate = learning_rate #How big of a weight update we will perform 
        self.batch_size = batch_size #How many images will we "feature engineer" at at time 
        self.new_input_layer = None #Initalize new input layer variable for k-step contrastive divergence 
        self.new_hidden_layer = None
        self.new_test_hidden_layer = None
        
        #Here we initialize the weights and biases of our RBM
        #If you are wondering, the 0 is the mean of the distribution we are getting our random weights from. 
        #The .01 is the standard deviation.
        self.w = np.random.normal(0,0.01,[input_size,output_size]) #weights
        self.hb = np.random.normal(0,0.01,[output_size]) #hidden layer bias
        self.vb = np.random.normal(0,0.01,[input_size]) #input layer bias (sometimes called visible layer)
        
        
        #Calculates the sigmoid probabilities of input * weights + bias
        #Here we multiply the input layer by the weights and add the bias
        #This is the phase that creates the hidden layer
    def prob_h_given_v(self, visible, w, hb):
        return tf.nn.sigmoid(tf.matmul(visible, w) + hb)
        
        #Calculates the sigmoid probabilities of input * weights + bias
        #Here we multiply the hidden layer by the weights and add the input layer bias
        #This is the reconstruction phase that recreates the original image from the hidden layer
    def prob_v_given_h(self, hidden, w, vb):
        return tf.nn.sigmoid(tf.matmul(hidden, tf.transpose(w)) + vb)
    
    #Returns new layer binary values
    #This function returns a 0 or 1 based on the sign of the probabilities passed to it
    #Our RBM will be utilizing binary features to represent the images
    #This function just converts the features we have learned into a binary representation 
    def sample_prob(self, probs):
        return tf.nn.relu(tf.sign(probs - tf.random_uniform(tf.shape(probs))))
    def gradient_subtract(self,w,g,lmda,alpha):
        
        for i in range(w.shape[0]):
            for j in range(w.shape[1]):
                den1=np.sqrt(np.sum(w[i,:]**2))
                print(i,j)
                den2=np.sqrt(np.sum(w[:,j]**2))
                a=lmda*((g*(w[i,j]/(alpha+den1)))+((1-g)*(w[i,j]/(alpha+den2))))
                w[i,j]=w[i,j]-a
        return w
    def gradient_subtract_improved(self,w,g,lmda,alpha):
        b1=np.sqrt(np.sum(w**2,1))
        b2=np.sqrt(np.sum(w**2,0))
        A1=np.array([b1,]*w.shape[1]).transpose()
        A2=np.array([b2,]*w.shape[0])
        matSub=lmda*((g*(w/(alpha+A1)))+((1-g)*(w/(alpha+A2))))
        return w-matSub
        
    def train(self, X, teX):
        #Initalize placeholder values for graph
        #If this looks strange to you, then you have not used Tensorflow before
        _w = tf.placeholder(tf.float32, shape = [self.input_size, self.output_size])
        _vb = tf.placeholder(tf.float32, shape = [self.input_size])
        _hb = tf.placeholder(tf.float32, shape = [self.output_size])
        
        
        #initalize previous variables
        #we will be saving the weights of the previous and current iterations
        pre_w = np.random.normal(0,0.01, size = [self.input_size,self.output_size])
        pre_vb = np.random.normal(0,0.01, size = [self.input_size])
        pre_hb = np.random.normal(0,0.01, size = [self.output_size])
        
        #initalize current variables
        #we will be saving the weights of the previous and current iterations
        cur_w = np.random.normal(0,0.01, size = [self.input_size,self.output_size])
        cur_vb = np.random.normal(0,0.01, size = [self.input_size])
        cur_hb = np.random.normal(0,0.01, size = [self.output_size])
               
        #Plaecholder variable for input layer
        v0 = tf.placeholder(tf.float32, shape = [None, self.input_size])
         
        #pass probabilities of input * w + b into sample prob to get binary values of hidden layer
        h0 = self.sample_prob(self.prob_h_given_v(v0, _w, _hb ))
        
        #pass probabilities of new hidden unit * w + b into sample prob to get new reconstruction
        v1 = self.sample_prob(self.prob_v_given_h(h0, _w, _vb))
        
        #Just get the probailities of the next hidden layer. We wont need the binary values. 
        #The probabilities here help calculate the gradients during back prop 
        h1 = self.prob_h_given_v(v1, _w, _hb)
        
        
        #Contrastive Divergence
        positive_grad = tf.matmul(tf.transpose(v0), h0) #input' * hidden0
        negative_grad = tf.matmul(tf.transpose(v1), h1) #reconstruction' * hidden1
        #(pos_grad - neg_grad) / total number of input samples 
        CD = (positive_grad - negative_grad) / tf.to_float(tf.shape(v0)[0]) 
        
        #This is just the definition of contrastive divergence 
        update_w1 = _w + self.learning_rate * CD
        update_vb = _vb + tf.reduce_mean(v0 - v1, 0)
        update_hb = _hb + tf.reduce_mean(h0 - h1, 0)
        
        
        #MSE - This is our error function
        err = tf.reduce_mean(tf.square(v0 - v1))
        
        #Will hold new visible layer.
        errors = []
        hidden_units = []
        reconstruction = []
        
        test_hidden_units = []
        test_reconstruction=[]
        
        
        #The next four lines of code intitalize our Tensorflow graph and create mini batches
        #The mini batch code is from cognitive class. I love the way they did this. Just giving credit! 
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            for epoch in range(self.epochs):
                for start, end in zip(range(0, len(X), self.batch_size), range(self.batch_size, len(X), self.batch_size)):
                    batch = X[start:end] #Mini batch of images taken from training data
                    
                    #Feed in batch, previous weights/bias, update weights and store them in current weights
                    cur_w1 = sess.run(update_w1, feed_dict = {v0:batch, _w:pre_w , _vb:pre_vb, _hb:pre_hb})
                    cur_hb = sess.run(update_hb, feed_dict = {v0:batch, _w:pre_w , _vb:pre_vb, _hb:pre_hb})
                    cur_vb = sess.run(update_vb, feed_dict = {v0:batch, _w:pre_w , _vb:pre_vb, _hb:pre_hb})
                    cur_w =self.gradient_subtract_improved(cur_w1,0.5,0.0001,0.000000001)

                    
                    #Save weights 
                    pre_w = cur_w
                    pre_hb = cur_hb
                    pre_vb = cur_vb
                
                #At the end of each iteration, the reconstructed images are stored and the error is outputted 
                reconstruction.append(sess.run(v1, feed_dict={v0: X, _w: cur_w, _vb: cur_vb, _hb: cur_hb}))        
                print('Learning Rate: {}:  Batch Size: {}:  Hidden Layers: {}: Epoch: {}: Error: {}:'.format(self.learning_rate, self.batch_size, 
                                                                                                             self.output_size, (epoch+1),
                                                                                                            sess.run(err, feed_dict={v0: X, _w: cur_w, _vb: cur_vb, _hb: cur_hb})))
            
            #Store final reconstruction in RBM object
            self.new_input_layer = reconstruction[-1]
            
            # Use the following two blocks of code as per requirement of DAN
            # Threshhold the weights DAN_s
            low_values_flags1 = np.absolute(pre_w) < 0.05
            pre_w[low_values_flags1] = 0
            
            # Convert in +1 and -1 DAB_b
            negative_values = pre_w < 0
            pre_w[negative_values] = -1
            positive_values = pre_w > 0
            pre_w[positive_values] = 1
            
            
            #Store weights in RBM object
            self.w = pre_w
            self.hb = pre_hb
            self.vb = pre_vb
    
    #This is used for Contrastive Divergence.
    #This function makes the reconstruction your new input layer. 
    def rbm_output(self, X):
        input_x = tf.constant(X)
        _w = tf.constant(self.w)
        _hb = tf.constant(self.hb)
        _vb = tf.constant(self.vb)
        
        out = tf.nn.sigmoid(tf.matmul(input_x, _w) + _hb)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            return sess.run(out)

In [None]:
class RBM_DBN(object):
    def __init__(self, input_size, output_size, learning_rate, batch_size):
        self.input_size = input_size #Size of the input layer
        self.output_size = output_size #Size of the hidden layer
        self.epochs = 5 #How many times we will update the weights 
        self.learning_rate = learning_rate #How big of a weight update we will perform 
        self.batch_size = batch_size #How many images will we "feature engineer" at at time 
        self.new_input_layer = None #Initalize new input layer variable for k-step contrastive divergence 
        self.new_hidden_layer = None
        self.new_test_hidden_layer = None
        
        #Here we initialize the weights and biases of our RBM
        #If you are wondering, the 0 is the mean of the distribution we are getting our random weights from. 
        #The .01 is the standard deviation.
        self.w = np.random.normal(0,0.01,[input_size,output_size]) #weights
        self.hb = np.random.normal(0,0.01,[output_size]) #hidden layer bias
        self.vb = np.random.normal(0,0.01,[input_size]) #input layer bias (sometimes called visible layer)
        self.p = np.random.normal(0,0.01,[output_size]) # hidden layer probabilities
        
        #Calculates the sigmoid probabilities of input * weights + bias
        #Here we multiply the input layer by the weights and add the bias
        #This is the phase that creates the hidden layer
    def prob_h_given_v(self, visible, w, hb):
        return tf.nn.sigmoid(tf.matmul(visible, w) + hb)
        
        #Calculates the sigmoid probabilities of input * weights + bias
        #Here we multiply the hidden layer by the weights and add the input layer bias
        #This is the reconstruction phase that recreates the original image from the hidden layer
    def prob_v_given_h(self, hidden, w, vb):
        return tf.nn.sigmoid(tf.matmul(hidden, tf.transpose(w)) + vb)
    
    #Returns new layer binary values
    #This function returns a 0 or 1 based on the sign of the probabilities passed to it
    #Our RBM will be utilizing binary features to represent the images
    #This function just converts the features we have learned into a binary representation 
    def sample_prob(self, probs):
        return tf.nn.relu(tf.sign(probs - tf.random_uniform(tf.shape(probs))))
      
    def train(self, X, teX):
        #Initalize placeholder values for graph
        #If this looks strange to you, then you have not used Tensorflow before
        _w = tf.placeholder(tf.float32, shape = [self.input_size, self.output_size])
        _vb = tf.placeholder(tf.float32, shape = [self.input_size])
        _hb = tf.placeholder(tf.float32, shape = [self.output_size])
        
        
        #initalize previous variables
        #we will be saving the weights of the previous and current iterations
        pre_w = np.random.normal(0,.01, size = [self.input_size,self.output_size])
        pre_vb = np.random.normal(0, .01, size = [self.input_size])
        pre_hb = np.random.normal(0, .01, size = [self.output_size])
        
        #initalize current variables
        #we will be saving the weights of the previous and current iterations
        cur_w = np.random.normal(0, .01, size = [self.input_size,self.output_size])
        cur_vb = np.random.normal(0, .01, size = [self.input_size])
        cur_hb = np.random.normal(0, .01, size = [self.output_size])
               
        #Plaecholder variable for input layer
        v0 = tf.placeholder(tf.float32, shape = [None, self.input_size])
         
        #pass probabilities of input * w + b into sample prob to get binary values of hidden layer
        h0 = self.sample_prob(self.prob_h_given_v(v0, _w, _hb ))
        
        #pass probabilities of new hidden unit * w + b into sample prob to get new reconstruction
        v1 = self.sample_prob(self.prob_v_given_h(h0, _w, _vb))
        
        #Just get the probailities of the next hidden layer. We wont need the binary values. 
        #The probabilities here help calculate the gradients during back prop 
        h1 = self.prob_h_given_v(v1, _w, _hb)
        
        
        #Contrastive Divergence
        positive_grad = tf.matmul(tf.transpose(v0), h0) #input' * hidden0
        negative_grad = tf.matmul(tf.transpose(v1), h1) #reconstruction' * hidden1
        #(pos_grad - neg_grad) / total number of input samples 
        CD = (positive_grad - negative_grad) / tf.to_float(tf.shape(v0)[0]) 
        
        #This is just the definition of contrastive divergence 
        update_w = _w + self.learning_rate * CD
        update_vb = _vb + tf.reduce_mean(v0 - v1, 0)
        update_hb = _hb + tf.reduce_mean(h0 - h1, 0)
        
        
        #MSE - This is our error function
        err = tf.reduce_mean(tf.square(v0 - v1))
        
        #Will hold new visible layer.
        errors = []
        hidden_units = []
        reconstruction = []
        
        test_hidden_units = []
        test_reconstruction=[]
        
        
        #The next four lines of code intitalize our Tensorflow graph and create mini batches
        #The mini batch code is from cognitive class. I love the way they did this. Just giving credit! 
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            for epoch in range(self.epochs):
                for start, end in zip(range(0, len(X), self.batch_size), range(self.batch_size, len(X), self.batch_size)):
                    batch = X[start:end] #Mini batch of images taken from training data
                    
                    #Feed in batch, previous weights/bias, update weights and store them in current weights
                    cur_w = sess.run(update_w, feed_dict = {v0:batch, _w:pre_w , _vb:pre_vb, _hb:pre_hb})
                    cur_hb = sess.run(update_hb, feed_dict = {v0:batch, _w:pre_w , _vb:pre_vb, _hb:pre_hb})
                    cur_vb = sess.run(update_vb, feed_dict = {v0:batch, _w:pre_w , _vb:pre_vb, _hb:pre_hb})
                   

                    
                    #Save weights 
                    pre_w = cur_w
                    pre_hb = cur_hb
                    pre_vb = cur_vb
                
                #At the end of each iteration, the reconstructed images are stored and the error is outputted 
                reconstruction.append(sess.run(v1, feed_dict={v0: X, _w: cur_w, _vb: cur_vb, _hb: cur_hb}))        
                print('Learning Rate: {}:  Batch Size: {}:  Hidden Layers: {}: Epoch: {}: Error: {}:'.format(self.learning_rate, self.batch_size, 
                                                                                                             self.output_size, (epoch+1),
                                                                                                            sess.run(err, feed_dict={v0: X, _w: cur_w, _vb: cur_vb, _hb: cur_hb})))
            
            #Store final reconstruction in RBM object
            self.new_input_layer = reconstruction[-1]
            
            
            
            #Store weights in RBM object
            self.w = pre_w
            self.hb = pre_hb
            self.vb = pre_vb
    
    #This is used for Contrastive Divergence.
    #This function makes the reconstruction your new input layer. 
    def rbm_output(self, X):
        input_x = tf.constant(X)
        _w = tf.constant(self.w)
        _hb = tf.constant(self.hb)
        _vb = tf.constant(self.vb)
        
        out = tf.nn.sigmoid(tf.matmul(input_x, _w) + _hb)
        with tf.Session() as sess:
            sess.run(tf.global_variables_initializer())
            return sess.run(out)

In [None]:
RBM_hidden_size = [800,800] #Three hidden layer sizes for our three layer DBN
learning_rate = .01 
input_size = trX.shape[1] #input layer size of original image data

rbm_dbn_list = [] #This will hold all of the RBMs used in our DBN
rbm_dan_list = [] #This will hold all of the RBMS used in our DAN

#Creates 3 RBMs
for layer in RBM_hidden_size:
    rbm_dbn_list.append(RBM_DBN(input_size, layer, learning_rate, 32))
    rbm_dan_list.append(RBM_DAN(input_size, layer, learning_rate, 32))
    input_size = layer

In [None]:
#Initalize input layer variables 
inpX1 = trX                
test_inpx1 = teX
inpX2 = trX
test_inpx2 = teX

#This loop is the DBN. Each rbm is trained here.
#At the end of training, the hidden layer of the RBM is used as input
#For the next layer of the DBN.
rbm_dbn_troutputs = []
rbm_dan_troutputs = []
rbm_dbn_teoutputs = []
rbm_dan_teoutputs = []
for i,rbm in enumerate(rbm_dbn_list):
    
    print('Input Shape: ', inpX1.shape)
    print('Layer: ',(i+1))

    rbm.train(inpX1,test_inpx1)
    inpX1 = rbm.rbm_output(inpX1)
    test_inpx1 = rbm.rbm_output(test_inpx1)
    rbm_dbn_troutputs.append(inpX1)
    rbm_dbn_teoutputs.append(test_inpx1)

    print('Output Shape: ', inpX1.shape)
    print()
for i,rbm in enumerate(rbm_dan_list):
    
    print('Input Shape: ', inpX2.shape)
    print('Layer: ',(i+1))

    rbm.train(inpX2,test_inpx2)
    inpX2 = rbm.rbm_output(inpX2)
    test_inpx2 = rbm.rbm_output(test_inpx2)
    rbm_dan_troutputs.append(inpX2)
    rbm_dan_teoutputs.append(test_inpx2)

    print('Output Shape: ', inpX2.shape)
    print()


In [None]:
# saving the trained models of DAN and DBN and the test and train sample outputs from DAN and DBN

# training set ouput from DAN
np.save('rbm_dan_troutput.npy',rbm_dan_troutputs[1]) 
# testing set output from DAN
np.save('rbm_dan_teoutput.npy',rbm_dan_teoutputs[1])
# training set output from DBN
np.save('rbm_dbn_troutput.npy',rbm_dbn_troutputs[1])
# testing set output from DBN
np.save('rbm_dbn_teoutput.npy',rbm_dbn_teoutputs[1])
# RBMs of DBN
np.save('rbm_dbn_list.npy',rbm_dbn_list)
# RBMs of DAN
np.save('rbm_dan_list.npy',rbm_dan_list)


In [None]:
# Plot layer 1 weights of DAN and DBN models
import matplotlib.pyplot as plt
%matplotlib inline
a=np.histogram(rbm_dbn_list[0].w,bins=100)
b=np.histogram(rbm_dan_list[0].w,bins=100)
indexa=a[1]
indexb=b[1]
plt.plot(indexa[1:],a[0],'--',label='DBN')
plt.plot(indexb[1:],b[0],'r',label='DAN')
plt.legend(loc='upper right', shadow=True)
plt.title('layer 1 weights')
plt.show()


In [None]:
# plot layer 2 weights of DAN and DBN models
a=np.histogram(rbm_dbn_list[1].w,bins=100)
b=np.histogram(rbm_dan_list[1].w,bins=100)
indexa=a[1]
indexb=b[1]
plt.plot(indexa[1:],a[0],'--',label='DBN')
plt.plot(indexb[1:],b[0],'r',label='DAN')
plt.legend(loc='upper right', shadow=True)
plt.title('layer 2 weights')
plt.show()