In [1]:
class Classification_Generation:
    # n_k is the number of classes, n_d is the # of dots representing one class
    def __init__(self, n_x, layers, n_k=10, n_d=1, k=1):
        if len(layers) == 1:
            top_rbm_x = n_x
            lower_dbn = None
        else:
            top_rbm_x = layers[-2][0]
            lower_dbn = DBN(n_x, layers[:-1], k=1)
        
        top_rbm_v = top_rbm_x + n_k*n_d
        top_rbm_done = False
        if layers[-1][1] is not None:
            top_rbm = RBM.load_model(layers[-1][1])
            if top_rbm.n_v != top_rbm_v: 
                raise ValueError("Incorrect input model for the top classification layer") 
            top_rbm_done = True
        else:
            top_rbm = RBM(top_rbm_v, layers[-1][0])

        self.n_k = n_k
        self.n_d = n_d
        self.n_x = n_x
        self.n_layer = len(layers)
        self.top_rbm = top_rbm
        self.top_rbm_v = top_rbm_v
        self.top_rbm_x = top_rbm_x
        self.top_rbm_done = top_rbm_done

        self.lower_dbn = lower_dbn
        return
        
    def convert_input(self, X, Label):
        n_sample = X.shape[0]
        n_k = self.n_k; n_d = self.n_d; n_x = self.top_rbm_x
        assert X.shape == (n_sample, n_x)

        if Label is not None:
            Label = Label.flatten()
            #attach label to data
            Y = np.zeros(shape=(n_sample, n_k))
            Index = np.arange(n_sample)
            Y[Index, Label] = 1
            Y = Y.repeat(n_d, axis=1)
        else:
            Y = np.zeros(shape=(n_sample, n_k*n_d))

        V = np.append(X, Y, axis=1)
        assert V.shape == (n_sample, self.top_rbm_v)        
        return V
    
    # this is the API for app to use
    def train(self, X, Label, epochs=1, batch_size=10, learning=0.01, mean_field=True, save_file=None):
        save_file += ".dbn_classify.layer" + str(self.n_layer)
        if self.lower_dbn is not None:
            self.lower_dbn.train_model(X, epochs, batch_size, learning, mean_field, save_file)
            X, Hs = self.lower_dbn.forward(X)
        
        # train the top RBM
        if self.top_rbm_done: return
        
        X = self.convert_input(X, Label)
        save_file += "-"+str(self.n_layer)
        self.top_rbm.train_model(X, epochs, batch_size, learning, save_file)
        self.top_rbm_done = True
        return
        

    def classify(self, x):
        x = x.reshape(1,-1)
        if self.lower_dbn is not None:
            x, hs = self.lower_dbn.forward(x)

        v = self.convert_input(x, None)
        vp, vs = self.top_rbm.reconstruct(v)

        n_k = self.n_k; n_d = self.n_d; n_x = self.top_rbm_x
        result = vp[0, n_x:]
        result = result.reshape(n_k, -1).sum(axis=1) # collapse the k*d dots into k sum dots.
        pred = np.argmax(result)
        
        vp = vp[:, :n_x]        
        if self.lower_dbn is not None:
            vp, vs = self.lower_dbn.backward(vp)

        return vp[0], pred
    
    def generate(self, label, k_gibbs, init_w, w):
        n_k = self.n_k; n_d = self.n_d; n_x = self.top_rbm_x

        classes = np.zeros(n_k)
        classes[label] = init_w
        classes = classes.repeat(n_d)
        
        np.random.seed(1234)
        #v = np.append( np.random.binomial(1, 0.5, n_x), classes)
        #v = np.append( np.zeros(n_x), classes)
        v = np.append( np.random.uniform(0, 0.9, n_x), classes)
        vp = v.reshape(1,-1)

        classes = np.zeros(n_k)
        classes[label] = w
        classes = classes.repeat(n_d)

        for i in range(k_gibbs):
            vp, vs = self.top_rbm.reconstruct(vp)
            vp[0, n_x:] = classes

        vp = vp[:, :n_x]        
        if self.lower_dbn is not None:
            vp, vs = self.lower_dbn.backward(vp)

        return vp[0]
    
    def show_features(self, shapes, count=-1):

#        self.top_rbm.show_features(shapes[-1], "MNIST learned features in DBN layer %d" % (self.n_layer), count)


        #for i in range(self.n_layer):
        #    rbm = self.rbms[i]
        #    rbm.show_features(shapes[i], "MNIST learned features in DBN layer %d" % (i+1), count)
        '''
        lower_rbm = None
        for i in range(self.n_layer):
            rbm = self.rbms[i]
            if i!=0:
                W = np.dot(rbm.W, lower_rbm.W)
                rbm = RBM(W.shape[1], W.shape[0], W=W)
               
            rbm.show_features(shapes[0], "MNIST learned features in DBN layer %d" % (i+1), count)
            lower_rbm = rbm
        '''
        return
            

In [None]:
# testing of the RBM Classification_Generation code above

%run "mnist.ipynb"
import matplotlib as mpl
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import Grid
import math

class MNIST_DBN_CLASSIFY:
    def __init__(self, n_x, layers, n_k=10, n_d=1, folder="../convolution-network"):
        self.train_input = MnistInput("train", folder)
        self.test_input = MnistInput("test", folder)
        self.dbn = Classification_Generation(n_x, layers, n_k, n_d, k=1)
        return

    def train(self, train_size=-1, n_epoch=100, batch_size=10, learning=0.1):
        X = []
        Y = []
        n_sample = 0
        for x, y in self.train_input.read(train_size):
            X.append(x)
            Y.append(y)
            n_sample += 1

        X = (np.array(X).reshape(n_sample, -1) > 30) * 1
        Label = np.array(Y).reshape(n_sample, -1)
        
        self.dbn.train(X, Label, n_epoch, batch_size, learning, mean_field=True, save_file="mnist")
        return

    def classify(self, test_size=-1, output_size=20):

        n_total=0; n_correct = 0; n_output = 0
        X=[]; Y=[]; Recon=[]; Preds=[]
        for x, y in self.test_input.read(test_size):
            n_total += 1
            reco, pred = self.dbn.classify(x/255)
            if y == pred: n_correct += 1 
            if n_output < output_size:
                Preds.append(pred)
                Recon.append(reco)
                X.append(x)
                Y.append(y)
                n_output += 1

        accuracy = n_correct/n_total
        print("Classification accuracy: {}".format(accuracy))

        # output
        d = 28
        ncols = 10
        nrows = int(output_size/5)
        fig = plt.figure(figsize=(ncols, int(nrows*2)), dpi=100)
        grid = Grid(fig, rect=111, nrows_ncols=(nrows,ncols))

        for i, ax in enumerate(grid):
            j = i//2
            if i%2 == 0:
                ax.imshow(X[j].reshape(d,d), cmap=mpl.cm.Greys)
                ax.set_title("Orig: {}".format(Y[j]), y=0)
            else:
                ax.imshow(Recon[j].reshape(d,d), cmap=mpl.cm.Greys)
                ax.set_title("Reco: {}".format(Preds[j]), y=0)

            ax.set_axis_off()

        fig.text(0.5,1, "Classified images and reconstructed digits", fontsize=20, horizontalalignment='center')
        fig.tight_layout()
        plt.show()        
        return

    def generate(self, k_gibbs=50, init_w=30, w=30):
        digits = []
        for i in range( self.dbn.n_k ):
            digit = self.dbn.generate(i, k_gibbs, init_w, w)
            digits.append(digit)
        
        # output
        d = 28
        ncols = self.dbn.n_k
        nrows = 1
        fig = plt.figure(figsize=(ncols, int(nrows*2)), dpi=100)
        grid = Grid(fig, rect=111, nrows_ncols=(nrows,ncols))

        for i, ax in enumerate(grid):
            ax.imshow(digits[i].reshape(d,d), cmap=mpl.cm.Greys)
            ax.set_title(i)
            ax.set_axis_off()

        fig.text(0.5,1, "Generated digit images from labels", fontsize=20, horizontalalignment='center')
        fig.tight_layout()
        plt.show()        
        return



In [None]:
%run "DBN.ipynb"

mnist_dbn = None
if __name__ == "__main__" and '__file__' not in globals():
    np.seterr(all='raise')
    plt.close('all')

    v = (28,28); h1 = (20,25); h2 = (20,25); h3 = (1000,1)
    
    layers = [
#        (mult(h1), "2mnist_dbn.layer1.epochs100.784x500"), # (dimension, "model_file") of hidden layer 1
#        (mult(h2), "2mnist_dbn.layer2.epochs100.500x500"), # (dimension, "model_file") of hidden layer 2
#        (mult(h3), mnist_rbm_classify.(784+10)x281.epochs100")  # (dimension, "model_file") of hidden layer 3
        (mult(h3), None)  # (dimension, "model_file") of top layer
    ]

    mnist_dbn = MNIST_DBN_CLASSIFY(mult(v), layers, folder="../convolution-network")
    mnist_dbn.train(train_size=-1, n_epoch=100, batch_size=10)
    mnist_dbn.dbn.show_features([v])
    mnist_dbn.classify()

In [None]:
mnist_dbn = None
if __name__ == "__main__" and '__file__' not in globals():
    np.seterr(all='raise')
    plt.close('all')

    v = (28,28); h1 = (20,25); h2 = (20,25); h3 = (281,1)
    
    layers = [
#        (mult(h1), "2mnist_dbn.layer1.epochs100.784x500"), # (dimension, "model_file") of hidden layer 1
#        (mult(h2), "2mnist_dbn.layer2.epochs100.500x500"), # (dimension, "model_file") of hidden layer 2
        (mult(h3), "mnist_rbm_classify.(784+10)x281.epochs100")  # (dimension, "model_file") of top layer
#        (mult(h3), None)  # (dimension, "model_file") of top layer
    ]

    mnist_dbn = MNIST_DBN_CLASSIFY(mult(v), layers, folder="../convolution-network")
    mnist_dbn.train(train_size=1000, n_epoch=100, batch_size=10)
    mnist_dbn.dbn.show_features([v], 56)
    mnist_dbn.classify()

In [None]:
if __name__ == "__main__" and '__file__' not in globals():
    mnist_dbn.generate(k_gibbs=80, init_w=9, w=10) #"mnist_rbm_classify.(784+10)x281.epochs100"