In [1]:
%matplotlib inline

from __future__ import absolute_import, print_function, unicode_literals, division
from sklearn.datasets import fetch_mldata
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import itertools
import random as rand
import copy

In [2]:
alice = {"Interstellar":1, "Whiplash":1, "Selma":0, "Lego Movie":0, "Birdman":0}
eric = {"Interstellar":0, "Whiplash":0, "Selma":0, "Lego Movie":1, "Birdman":0}
nancy = {"Interstellar":1, "Whiplash":1, "Selma":1, "Lego Movie":0, "Birdman":1}
#sarah = {"Interstellar":0, "Whiplash":1, "Selma":0, "Lego Movie":0, "Birdman":1}
#mike = {"Interstellar":1, "Whiplash":1, "Selma":1, "Lego Movie":1, "Birdman":1}
#bob = {"Interstellar":0, "Whiplash":1, "Selma":1, "Lego Movie":0, "Birdman":1}

data = {"alice":alice, "eric":eric, "nancy":nancy}#, "sarah":sarah} "mike":mike, "bob":bob}

data = pd.DataFrame.from_dict(data)
data.head()

Unnamed: 0,alice,eric,nancy
Birdman,0,0,1
Interstellar,1,0,1
Lego Movie,0,1,0
Selma,0,0,1
Whiplash,1,0,1


In [3]:
data_array = np.array(data.T)
print(data_array)
print(data_array.shape)

[[0 1 0 0 1]
 [0 0 1 0 0]
 [1 1 0 1 1]]
(3, 5)


In [5]:
class RBM(object):
    """
    Implementation of Restricted Boltzmann Machine for binary prediction
    
    Inputs:
    Data
    Number of hidden and visible states
    Learning rate, default set to 0.1
    
    Outputs:
    Weight matrix
    Epoch number and error
    """

    def __init__(self, data, num_visible, num_hidden, learning_rate=0.1):
        
        self.data = data
        self.num_data = data.shape[0]
        self.num_hidden = num_hidden
        self.num_visible = num_visible
        self.learning_rate = learning_rate

        # weight matrix first row and first column represent the bias
        self.weights = np.random.rand(self.num_visible, self.num_hidden) 
        self.weights = np.insert(self.weights, 0, 0, axis = 0)
        self.weights = np.insert(self.weights, 0, 0, axis = 1)
    
    def _sigmoid(self,x):
        """
        Logistic Sigmoid Activation Function
        """
        return 1.0/(1.0+np.exp(-x))
    
    def _sample_h_given_v(self, v):
        """
        Predicts hidden layer given visible layer
        """
        hid = self._sigmoid(np.dot(v, self.weights))
        hidden_states = hid > np.random.rand(self.num_hidden + 1)
        return hidden_states
    
    def _sample_v_given_h(self, h):
        """
        Predicts visible layer given hidden layer
        """
        vis = self._sigmoid(np.dot(h, self.weights.T))
        visible_states = vis > np.random.rand(self.num_visible + 1)

        return visible_states
    
    def _propagate_up(self, vis):
        """
        Propagates visible layer activation to hidden layer
        """
        vis_activation = self._sigmoid(np.dot(vis, self.weights))
        sample = vis_activation > np.random.rand(self.num_data, self.num_hidden + 1)
        return sample

    def _propagate_down(self, hid):
        """
        Propagates hidden layer activation to visible layer
        """
        hidden_activation = self._sigmoid(np.dot(hid, self.weights.T))
        return hidden_activation

    
    def CDk(self, max_epochs=1000):
        """
        Trains the RBM
        """
        self.data = np.insert(self.data, 0, 1, axis=1)
        
        for epoch in xrange(0, max_epochs):
            
            # CDk positive phase
            up_data = self._propagate_up(self.data)
            up_associations = np.dot(self.data.T, up_data)
            
            # CDk negative phase
            down_vis_probs = self._propagate_down(up_data)
            #remove the bias layer
            down_vis_probs[:,0] = 1 
            down_associations = self._sigmoid(np.dot(down_vis_probs, self.weights))            
            down_associations = np.dot(down_vis_probs.T, down_associations)
            
            self.weights += self.learning_rate * \
                ((up_associations - down_associations)/self.num_data)
                
            error = np.sum((self.data - down_vis_probs) ** 2)
            print("Epoch: ", epoch, ", Error: ", error)
            if epoch == 100:
                return down_vis_probs
    
    def Gibbs_alternating(self, num_gen_samples):
        """
        """
        
        samples = np.ones((num_gen_samples, self.num_vis + 1))
        samples[0,1:] = np.random.rand(self.num_vis)
        
        for i in xrange(0, self.num_data-1):
            
            v = samples[i-1,:]
            h = _sample_h_given_v(v)
            h[0] = 1

            v = _sample_v_given_h(h)
            samples[i,:] = v      
        
        return samples[:,1:]
            

In [7]:
print("ORIGINAL ARRAY")
print(data_array)
r = RBM(data_array, 3, 3, .1)
sample = r.CDk(max_epochs=100)
print(r.weights)

ORIGINAL ARRAY
[[0 1 0 0 1]
 [0 0 1 0 0]
 [1 1 0 1 1]]


ValueError: shapes (3,6) and (4,4) not aligned: 6 (dim 1) != 4 (dim 0)

In [6]:
user = np.array([[1, 0, 0, 0, 1, 0]])
print(r._sample_h_given_v(user))

ORIGINAL ARRAY
[[0 1 0 0 1]
 [0 0 1 0 0]
 [1 1 0 1 1]]


ValueError: shapes (3,6) and (4,4) not aligned: 6 (dim 1) != 4 (dim 0)