In [1]:

import time

import numpy as np
import scipy.sparse as sp
from scipy.special import expit  # logistic function

from sklearn.base import BaseEstimator
from sklearn.base import TransformerMixin
from sklearn.utils import gen_even_slices

# Compute the log of the logistic function, ``log(1 / (1 + e ** -x))``
from sklearn.utils.extmath import log_logistic 



In [2]:
# this is just straight from scipy...
# this is a simplified solution without
# some of the "nice" optimization

class BernoulliRBM(BaseEstimator, TransformerMixin):
    def __init__(self, n_components=256, learning_rate=0.1, batch_size=10,
                 n_iter=10):
        self.n_components = n_components
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.n_iter = n_iter
    
    def transform(self, X):     
        """P(h=1|v=X)
        """
        return self._mean_hiddens(X)
    
    def _mean_hiddens(self, v):
        """Compute P(h=1|v)
        """
        p = np.dot(v, self.components_.T)
        p += self.intercept_hidden_
        return expit(p, out=p)
    
    def _sample_hiddens(self, v):
        """Sample from the distribution P(h|v).
        """
        p = np.dot(h, self.components_)
        p += self.intercept_visible_
        expit(p, out=p)
        # see hinton chpt 3
        return (np.random.random_sample(size=p.shape) < p)
    
    def _sample_visibles(self, h):
        """Sample from the distribution P(v|h).
        """
        p = np.dot(h, self.components_)
        p += self.intercept_visible_
        expit(p, out=p)
        return (np.random.random_sample(size=p.shape) < p)
    
   
    def gibbs(self, v):
        """Perform one Gibbs sampling step.
        """
        h_ = self._sample_hiddens(v)
        v_ = self._sample_visibles(h_)

        return v_
    
    def partial_fit(self, X, y=None):
        """Fit the model to the data X which should contain a partial
        segment of the data.
        """
        if not hasattr(self, 'components_'):
            self.components_ = np.asarray(
                self.random_state_.normal(
                    0,
                    0.01,
                    (self.n_components, X.shape[1])
                ),
                order='F')
        if not hasattr(self, 'intercept_hidden_'):
            self.intercept_hidden_ = np.zeros(self.n_components, )
        if not hasattr(self, 'intercept_visible_'):
            self.intercept_visible_ = np.zeros(X.shape[1], )
        if not hasattr(self, 'h_samples_'):
            self.h_samples_ = np.zeros((self.batch_size, self.n_components))

        self._fit(X, self.random_state_)
        

    def _fit(self, v_pos):
        """Inner fit for one mini-batch.
        Adjust the parameters to maximize the likelihood of v using
        Stochastic Maximum Likelihood (SML).
        """
        h_pos = self._mean_hiddens(v_pos)
        v_neg = self._sample_visibles(self.h_samples_)
        h_neg = self._mean_hiddens(v_neg)

        lr = float(self.learning_rate) / v_pos.shape[0]
        update = np.dot(v_pos.T, h_pos).T
        update -= np.dot(h_neg.T, v_neg)
        self.components_ += lr * update
        self.intercept_hidden_ += lr * (h_pos.sum(axis=0) - h_neg.sum(axis=0))
        self.intercept_visible_ += lr * (np.asarray(
                                         v_pos.sum(axis=0)).squeeze() -
                                         v_neg.sum(axis=0))

        h_neg[np.random.uniform(size=h_neg.shape) < h_neg] = 1.0  # sample binomial
        self.h_samples_ = np.floor(h_neg, h_neg)
    
    
    def fit(self, X, y=None):
        """Fit the model to the data X.
        """
        n_samples = X.shape[0]
        
        self.components_ = np.random.normal(0, 0.01, (self.n_components, X.shape[1]))
        self.intercept_hidden_ = np.zeros(self.n_components, )
        self.intercept_visible_ = np.zeros(X.shape[1], )
        self.h_samples_ = np.zeros((self.batch_size, self.n_components))

        n_batches = int(np.ceil(float(n_samples) / self.batch_size))
        batch_slices = list(gen_even_slices(n_batches * self.batch_size,
                                            n_batches, n_samples))
        begin = time.time()
        for iteration in range(1, self.n_iter + 1):
            for batch_slice in batch_slices:
                self._fit(X[batch_slice])
        return self

In [3]:
rbm = BernoulliRBM(100)

In [4]:
X = np.random.randint(low=0, high=1, size=(100, 100))
rbm.fit(X)

BernoulliRBM(batch_size=10, learning_rate=0.1, n_components=100, n_iter=10)

In [5]:
rbm.transform(X)

array([[ 0.56985902,  0.57098287,  0.57070786, ...,  0.56883462,
         0.56916813,  0.56876622],
       [ 0.56985902,  0.57098287,  0.57070786, ...,  0.56883462,
         0.56916813,  0.56876622],
       [ 0.56985902,  0.57098287,  0.57070786, ...,  0.56883462,
         0.56916813,  0.56876622],
       ..., 
       [ 0.56985902,  0.57098287,  0.57070786, ...,  0.56883462,
         0.56916813,  0.56876622],
       [ 0.56985902,  0.57098287,  0.57070786, ...,  0.56883462,
         0.56916813,  0.56876622],
       [ 0.56985902,  0.57098287,  0.57070786, ...,  0.56883462,
         0.56916813,  0.56876622]])