In [1]:
from util import *
import numpy as np

In [2]:
from util import *

class RestrictedBoltzmannMachine():
    '''
    For more details : A Practical Guide to Training Restricted Boltzmann Machines https://www.cs.toronto.edu/~hinton/absps/guideTR.pdf
    '''
    def __init__(self, ndim_visible, ndim_hidden, is_bottom=False, image_size=[28,28], is_top=False, n_labels=10, batch_size=10):

        """
        Args:
          ndim_visible: Number of units in visible layer.
          ndim_hidden: Number of units in hidden layer.
          is_bottom: True only if this rbm is at the bottom of the stack in a deep belief net. Used to interpret visible layer as image data with dimensions "image_size".
          image_size: Image dimension for visible layer.
          is_top: True only if this rbm is at the top of stack in deep beleif net. Used to interpret visible layer as concatenated with "n_label" unit of label data at the end. 
          n_label: Number of label categories.
          batch_size: Size of mini-batch.
        """
       
        self.ndim_visible = ndim_visible

        self.ndim_hidden = ndim_hidden

        self.is_bottom = is_bottom
        if is_bottom : self.image_size = image_size
        
        self.is_top = is_top

        if is_top : self.n_labels = 10

        self.batch_size = batch_size        
                
        self.delta_bias_v = 0

        self.delta_weight_vh = 0

        self.delta_bias_h = 0

        self.bias_v = np.random.normal(loc=0.0, scale=0.01, size=(self.ndim_visible))

        self.weight_vh = np.random.normal(loc=0.0, scale=0.01, size=(self.ndim_visible,self.ndim_hidden))

        self.bias_h = np.random.normal(loc=0.0, scale=0.01, size=(self.ndim_hidden))
        
        self.delta_weight_v_to_h = 0

        self.delta_weight_h_to_v = 0        
        
        self.weight_v_to_h = None
        
        self.weight_h_to_v = None

        self.learning_rate = 0.01
        
        self.momentum = 0.7

        self.print_period = 5000
        
        self.rf = { # receptive-fields. Only applicable when visible layer is input data
            "period" : 5000, # iteration period to visualize
            "grid" : [5,5], # size of the grid
            "ids" : np.random.randint(0,self.ndim_hidden,25) # pick some random hidden units
            }
        
        return

        
    def cd1(self, visible_trainset, n_iterations=10000):
        
        """Contrastive Divergence with k=1 full alternating Gibbs sampling

        Args:
          visible_trainset: training data for this rbm, shape is (size of training set, size of visible layer)
          n_iterations: number of iterations of learning (each iteration learns a mini-batch)
        """

        print ("learning CD1")
        
        n_samples = visible_trainset.shape[0]
        index = 0 

        for it in range(1):
            # Select next mini-batch
            next_index = index + self.batch_size

            if next_index < n_samples:
                v_0 = visible_trainset[index:next_index]
            else:
                v_0 = np.concatenate((visible_trainset[index:],visible_trainset[:next_index-n_samples]))
                print(index)
            index = next_index % n_samples

	        # [Done TASK 4.1] run k=1 alternating Gibbs sampling : v_0 -> h_0 ->  v_1 -> h_1.
            # you may need to use the inference functions 'get_h_given_v' and 'get_v_given_h'.
            # note that inference methods returns both probabilities and activations (samples from probablities) and you may have to decide when to use what.

            print(v_0)

            h_0_prob, h_0_bin = self.get_h_given_v(v_0)
            v_1_prob, v_1_bin = self.get_v_given_h(h_0_prob)
            h_1_prob, h_1_bin = self.get_h_given_v(v_1_bin)
            
            # [Done TASK 4.1] update the parameters using function 'update_params'
            self.update_params(v_0, h_0_bin, v_1_prob, h_1_prob)

            # visualize once in a while when visible layer is input images
            
            if it % self.rf["period"] == 0 and self.is_bottom:
                viz_rf(weights=self.weight_vh[:,self.rf["ids"]].reshape((self.image_size[0],self.image_size[1],-1)), it=it, grid=self.rf["grid"])

            # print progress
            
            if it % self.print_period == 0 :
                print ("iteration=%7d recon_loss=%4.4f"%(it, np.linalg.norm(visible_trainset - visible_trainset)))
        
        return
    

    def update_params(self,v_0,h_0,v_k,h_k):

        """Update the weight and bias parameters.

        You could also add weight decay and momentum for weight updates.

        Args:
           v_0: activities or probabilities of visible layer (data to the rbm)
           h_0: activities or probabilities of hidden layer
           v_k: activities or probabilities of visible layer
           h_k: activities or probabilities of hidden layer
           all args have shape (size of mini-batch, size of respective layer)
        """

        # [DONE TASK 4.1] get the gradients from the arguments (replace the 0s below) and update the weight and bias parameters
        
        self.delta_bias_v = self.learning_rate * np.mean(v_0 - v_k, axis=0)
        self.delta_weight_vh = self.learning_rate * (v_0.T @ h_0 - v_k.T @ h_k) / v_0.shape[0]
        self.delta_bias_h = self.learning_rate * np.mean(h_0 - h_k, axis=0)

        assert self.delta_bias_v.shape[0] == v_0.shape[1]

        self.bias_v += self.delta_bias_v
        self.weight_vh += self.delta_weight_vh
        self.bias_h += self.delta_bias_h
        
        return

    def get_h_given_v(self, visible_minibatch):
        
        """Compute probabilities p(h|v) and activations h ~ p(h|v) 

        Uses undirected weight "weight_vh" and bias "bias_h"
        
        Args: 
           visible_minibatch: shape is (size of mini-batch, size of visible layer)
        Returns:        
           tuple ( p(h|v) , h) 
           both are shaped (size of mini-batch, size of hidden layer)
        """
        
        assert self.weight_vh is not None

        n_samples = visible_minibatch.shape[0]

        # [Done TASK 4.1] compute probabilities and activations (samples from probabilities) of hidden layer (replace the zeros below) 

        probs = sigmoid(visible_minibatch @ self.weight_vh + self.bias_h)
        binary_states = np.random.binomial(1, probs, size=None)
        
        return probs, binary_states


    def get_v_given_h(self,hidden_minibatch):
        
        """Compute probabilities p(v|h) and activations v ~ p(v|h)

        Uses undirected weight "weight_vh" and bias "bias_v"
        
        Args: 
           hidden_minibatch: shape is (size of mini-batch, size of hidden layer)
        Returns:        
           tuple ( p(v|h) , v) 
           both are shaped (size of mini-batch, size of visible layer)
        """
        
        assert self.weight_vh is not None

        n_samples = hidden_minibatch.shape[0]

        if self.is_top:

            """
            Here visible layer has both data and labels. Compute total input for each unit (identical for both cases), \ 
            and split into two parts, something like support[:, :-self.n_labels] and support[:, -self.n_labels:]. \
            Then, for both parts, use the appropriate activation function to get probabilities and a sampling method \
            to get activities. The probabilities as well as activities can then be concatenated back into a normal visible layer.
            """

            # [TODO TASK 4.1] compute probabilities and activations (samples from probabilities) of visible layer (replace the pass below). \
            # Note that this section can also be postponed until TASK 4.2, since in this task, stand-alone RBMs do not contain labels in visible layer.
            
            
            pass
            
        else:
                        
            # [DONE TASK 4.1] compute probabilities and activations (samples from probabilities) of visible layer (replace the pass and zeros below)

            probs = sigmoid(hidden_minibatch @ self.weight_vh.T + self.bias_v)
            binary_states = np.random.binomial(1, probs, size=None)             

            pass
        
        return probs, binary_states


    
    """ rbm as a belief layer : the functions below do not have to be changed until running a deep belief net """

    

    def untwine_weights(self):
        
        self.weight_v_to_h = np.copy( self.weight_vh )
        self.weight_h_to_v = np.copy( np.transpose(self.weight_vh) )
        self.weight_vh = None

    def get_h_given_v_dir(self,visible_minibatch):

        """Compute probabilities p(h|v) and activations h ~ p(h|v)

        Uses directed weight "weight_v_to_h" and bias "bias_h"
        
        Args: 
           visible_minibatch: shape is (size of mini-batch, size of visible layer)
        Returns:        
           tuple ( p(h|v) , h) 
           both are shaped (size of mini-batch, size of hidden layer)
        """
        
        assert self.weight_v_to_h is not None

        n_samples = visible_minibatch.shape[0]

        # [TODO TASK 4.2] perform same computation as the function 'get_h_given_v' but with directed connections (replace the zeros below) 
        
        return np.zeros((n_samples,self.ndim_hidden)), np.zeros((n_samples,self.ndim_hidden))


    def get_v_given_h_dir(self,hidden_minibatch):


        """Compute probabilities p(v|h) and activations v ~ p(v|h)

        Uses directed weight "weight_h_to_v" and bias "bias_v"
        
        Args: 
           hidden_minibatch: shape is (size of mini-batch, size of hidden layer)
        Returns:        
           tuple ( p(v|h) , v) 
           both are shaped (size of mini-batch, size of visible layer)
        """
        
        assert self.weight_h_to_v is not None
        
        n_samples = hidden_minibatch.shape[0]
        
        if self.is_top:

            """
            Here visible layer has both data and labels. Compute total input for each unit (identical for both cases), \ 
            and split into two parts, something like support[:, :-self.n_labels] and support[:, -self.n_labels:]. \
            Then, for both parts, use the appropriate activation function to get probabilities and a sampling method \
            to get activities. The probabilities as well as activities can then be concatenated back into a normal visible layer.
            """
            
            # [TODO TASK 4.2] Note that even though this function performs same computation as 'get_v_given_h' but with directed connections,
            # this case should never be executed : when the RBM is a part of a DBN and is at the top, it will have not have directed connections.
            # Appropriate code here is to raise an error (replace pass below)
            
            pass
            
        else:
                        
            # [TODO TASK 4.2] performs same computaton as the function 'get_v_given_h' but with directed connections (replace the pass and zeros below)             

            pass
            
        return np.zeros((n_samples,self.ndim_visible)), np.zeros((n_samples,self.ndim_visible))        
        
    def update_generate_params(self,inps,trgs,preds):
        
        """Update generative weight "weight_h_to_v" and bias "bias_v"
        
        Args:
           inps: activities or probabilities of input unit
           trgs: activities or probabilities of output unit (target)
           preds: activities or probabilities of output unit (prediction)
           all args have shape (size of mini-batch, size of respective layer)
        """

        # [TODO TASK 4.3] find the gradients from the arguments (replace the 0s below) and update the weight and bias parameters.
        
        self.delta_weight_h_to_v += 0
        self.delta_bias_v += 0
        
        self.weight_h_to_v += self.delta_weight_h_to_v
        self.bias_v += self.delta_bias_v 
        
        return
    
    def update_recognize_params(self,inps,trgs,preds):
        
        """Update recognition weight "weight_v_to_h" and bias "bias_h"
        
        Args:
           inps: activities or probabilities of input unit
           trgs: activities or probabilities of output unit (target)
           preds: activities or probabilities of output unit (prediction)
           all args have shape (size of mini-batch, size of respective layer)
        """

        # [TODO TASK 4.3] find the gradients from the arguments (replace the 0s below) and update the weight and bias parameters.

        self.delta_weight_v_to_h += 0
        self.delta_bias_h += 0

        self.weight_v_to_h += self.delta_weight_v_to_h
        self.bias_h += self.delta_bias_h
        
        return    


In [3]:
image_size = [28,28]
train_imgs,train_lbls,test_imgs,test_lbls = read_mnist(dim=image_size, n_train=60000, n_test=10000)

''' restricted boltzmann machine '''

print ("\nStarting a Restricted Boltzmann Machine..")

rbm = RestrictedBoltzmannMachine(ndim_visible=image_size[0]*image_size[1],
                                    ndim_hidden=200,
                                    is_bottom=True,
                                    image_size=image_size,
                                    is_top=False,
                                    n_labels=10,
                                    batch_size=1
)




Starting a Restricted Boltzmann Machine..


In [4]:
visible_trainset = train_imgs

In [5]:
n_samples = visible_trainset.shape[0]
index = 0 

In [6]:
next_index = index + rbm.batch_size


In [7]:
if next_index < n_samples:
    v_0 = visible_trainset[index:next_index]
else:
    v_0 = np.concatenate((visible_trainset[index:],visible_trainset[:next_index-n_samples]))
    print(index)
index = next_index % n_samples

In [8]:
h_0_prob, h_0_bin = rbm.get_h_given_v(v_0)
h_0_bin, h_0_prob

(array([[1, 1, 1, 0, 1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 1, 1,
         0, 1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0, 1, 0, 1,
         0, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 0, 1,
         0, 0, 1, 0, 1, 1, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1,
         1, 1, 1, 0, 0, 0, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1, 1, 0, 1, 1,
         1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0, 0, 0, 0, 1, 1, 1, 0,
         0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1, 0, 0,
         1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 1, 1, 0,
         1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 0, 1, 0, 0, 1, 0, 0, 0, 0,
         0, 1]]),
 array([[0.46457698, 0.49697262, 0.49671357, 0.51406587, 0.52268889,
         0.47675835, 0.52013394, 0.48928487, 0.45233671, 0.50509469,
         0.52744822, 0.53031384, 0.47647737, 0.48563413, 0.5252661 ,
         0.49514727, 0.5055053 , 0.48871174, 0.49695796, 0.51598274,
         0.4815522 , 0.49712749

In [9]:
v_1_prob, v_1_bin = rbm.get_v_given_h(h_0_prob)
v_1_prob, v_1_bin

(array([[0.51859372, 0.52708026, 0.51145361, 0.50539682, 0.51989391,
         0.47328972, 0.51115767, 0.49147438, 0.52741735, 0.52600718,
         0.49576255, 0.50500476, 0.49554818, 0.4831602 , 0.50514371,
         0.51847034, 0.46825674, 0.51251083, 0.54221939, 0.51748312,
         0.46517445, 0.48731263, 0.53785792, 0.50879864, 0.52196705,
         0.48711381, 0.51875704, 0.53681066, 0.5159137 , 0.52492783,
         0.49502331, 0.50015567, 0.5303455 , 0.51110817, 0.54799885,
         0.49900224, 0.53290973, 0.47790792, 0.47970638, 0.49339148,
         0.50823776, 0.507716  , 0.4767085 , 0.48385257, 0.51055709,
         0.49888516, 0.48491613, 0.47622552, 0.52236081, 0.49820741,
         0.50742376, 0.50183798, 0.51336134, 0.51468213, 0.47596284,
         0.49941286, 0.4903138 , 0.51230829, 0.51430776, 0.49266989,
         0.48967819, 0.4856134 , 0.4980682 , 0.47612477, 0.49015931,
         0.50320566, 0.47516042, 0.50501029, 0.48829282, 0.48574115,
         0.48517929, 0.49674043, 0

In [10]:
h_1_prob, h_1_bin = rbm.get_h_given_v(v_1_bin)
h_1_prob, h_1_bin

(array([[0.51912145, 0.57101545, 0.54124681, 0.57508801, 0.50042054,
         0.51782055, 0.44547597, 0.48703537, 0.50017765, 0.50447032,
         0.45391978, 0.70482323, 0.47039377, 0.52674292, 0.49390315,
         0.50355785, 0.56572676, 0.53671538, 0.52246848, 0.52199877,
         0.38453354, 0.54490468, 0.43824068, 0.60948713, 0.48945792,
         0.47869123, 0.48865211, 0.4035239 , 0.48247302, 0.53410165,
         0.49831314, 0.54721711, 0.44951284, 0.52692613, 0.48367667,
         0.50117526, 0.54659916, 0.4741652 , 0.48071837, 0.50417887,
         0.52729122, 0.50623876, 0.57916525, 0.50026158, 0.54005486,
         0.52098616, 0.52447848, 0.50191599, 0.39588933, 0.50001755,
         0.44123025, 0.49860757, 0.4502808 , 0.51812957, 0.44967458,
         0.57939755, 0.52781212, 0.50832405, 0.46068523, 0.47033749,
         0.55328493, 0.51155104, 0.51267123, 0.47428436, 0.49260477,
         0.42084247, 0.52808219, 0.42136681, 0.60549858, 0.52271525,
         0.53787636, 0.53416481, 0

In [11]:
rbm.update_params(v_0, h_0_bin, v_1_prob, h_1_prob)

In [12]:
rbm.weight_vh

array([[ 0.01140949, -0.00959612, -0.00564764, ...,  0.00719526,
        -0.01769582, -0.00851558],
       [-0.01005587, -0.00566965,  0.00064863, ..., -0.01776827,
        -0.00714199,  0.00479012],
       [ 0.00047399, -0.00457341,  0.01053612, ..., -0.0122025 ,
         0.00602502, -0.00434662],
       ...,
       [ 0.00832434, -0.00977634,  0.01161037, ..., -0.01052388,
        -0.00697904,  0.00113687],
       [ 0.00059595, -0.01485199,  0.00287014, ...,  0.00276023,
        -0.01165741,  0.00236436],
       [-0.00519468,  0.0039604 , -0.00808148, ..., -0.0030103 ,
        -0.00749945,  0.01783543]])

In [13]:
v_1_bin

array([[1, 1, 1, 1, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0,
        1, 1, 0, 1, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 1,
        1, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0, 1, 1, 0, 1, 0, 1,
        0, 0, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0,
        1, 1, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 0, 1, 1, 1, 0, 1, 0, 0, 1, 0,
        0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1,
        0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1,
        1, 1, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1, 0, 0, 1, 0, 1, 1, 1, 1, 1, 0,
        1, 0, 1, 0, 1, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 1, 1, 0,
        0, 1, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1,
        1, 1, 0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 0, 1, 1,
        0, 1, 0, 1, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 1, 1, 0, 0, 1,
        0, 1, 0, 1, 0, 1, 1, 0, 0, 1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 0,
        1, 1, 1, 0, 1, 0, 0, 1, 1, 0, 

In [14]:
if (it % rbm.rf["period"] == 0 or it ==  )and rbm.is_bottom:
    viz_rf(weights=rbm.weight_vh[:,rbm.rf["ids"]].reshape((rbm.image_size[0],rbm.image_size[1],-1)), it=it, grid=rbm.rf["grid"])

# print progress

if it % rbm.print_period == 0:
    print ("iteration=%7d recon_loss=%4.4f"%(it, np.linalg.norm(v_1_bin - visible_trainset)))

SyntaxError: invalid syntax (2411461112.py, line 1)