In [None]:
import tensorflow as tf
import numpy as np
from rbms import DBM
import sys
sys.path.append("../base")
from model import Model
from base_func import act_func,out_act_check,Summaries

class DBN(Model):
    def __init__(self,
                 hidden_act_func='relu',
                 output_act_func='softmax',
                 loss_func='mse',
                 struct=[784, 100, 100,10],
                 lr=1e-4,
                 momentum=0.5,
                 use_for='classification',
                 bp_algorithm='adam',
                 epochs=100,
                 batch_size=32,
                 dropout=0.3,
                 units_type=['gauss','bin'],
                 rbm_lr=1e-3,
                 rbm_epochs=30,
                 cd_k=1,
                 pre_train=True):
        Model.__init__(self,'DBN')
        self.loss_func=loss_func
        self.hidden_act_func=hidden_act_func
        self.output_act_func = out_act_check(output_act_func,loss_func)
        self.use_for=use_for
        self.bp_algorithm=bp_algorithm
        self.lr=lr
        self.momentum=momentum
        self.epochs=epochs
        self.struct = struct
        self.batch_size = batch_size
        self.dropout = dropout
        self.pre_train=pre_train
        
        self.dbm_struct = struct[:-1]
        self.units_type = units_type
        self.cd_k = cd_k
        self.rbm_lr = rbm_lr
        self.rbm_epochs = rbm_epochs
        
        self.build_model()
        
    # DBN_model
    
    def build_model(self):
        print("Start building model...")
        print('DBN:')
        print(self.__dict__)
        """
        Pre-training
        """
        if self.pre_train:
            # 构建dbm
            self.pt_model = DBM(
                    units_type=self.units_type,
                    dbm_struct=self.dbm_struct,
                    rbm_epochs=self.rbm_epochs,
                    batch_size=self.batch_size,
                    cd_k=self.cd_k,
                    rbm_lr=self.rbm_lr)      
        """
        Fine-tuning
        """
        with tf.name_scope('DBN'):
            self.input_data = tf.placeholder(tf.float32, [None, self.struct[0]])
            self.label_data = tf.placeholder(tf.float32, [None, self.struct[-1]])
            self.keep_prob = tf.placeholder(tf.float32) 
            self.out_W = tf.Variable(tf.truncated_normal(shape=[self.struct[-2], self.struct[-1]], 
                                                         stddev=np.sqrt(2 / (self.struct[-2] + self.struct[-1]))), 
                                                         name='W_out')
            self.out_b = tf.Variable(tf.constant(0.0,shape=[self.struct[-1]]),name='b_out')
            self.parameter_list = list()
            if self.pre_train:
                for pt in self.pt_model.pt_list:
                    self.parameter_list.append([pt.W,pt.bh])
            else:
                for i in range(len(self.struct)-2):
                    W = tf.Variable(tf.truncated_normal(shape=[self.struct[i], self.struct[i+1]], 
                                                        stddev=np.sqrt(2 / (self.struct[i] + self.struct[i+1]))), 
                                                        name='W'+str(i+1))
                    b = tf.Variable(tf.constant(0.0,shape=[self.struct[i+1]]),name='b'+str(i+1))
                    self.parameter_list.append([W,b])
                    
            self.parameter_list.append([self.out_W,self.out_b])
            
            self.logits,self.pred=self.transform(self.input_data)
            self.build_train_step()
            
            if self.tbd:
                for i in range(len(self.parameter_list)):
                    Summaries.scalars_histogram('_W'+str(i+1),self.parameter_list[i][0])
                    Summaries.scalars_histogram('_b'+str(i+1),self.parameter_list[i][1])
                tf.summary.scalar('loss',self.loss)
                tf.summary.scalar('accuracy',self.accuracy)
                self.merge = tf.summary.merge(tf.get_collection(tf.GraphKeys.SUMMARIES,self.name))
            
    def transform(self,data_x):
        next_data = data_x
        for i in range(len(self.parameter_list)):
            W=self.parameter_list[i][0]
            b=self.parameter_list[i][1]
            
            if self.dropout>0:
                next_data = tf.nn.dropout(next_data, self.keep_prob)

            z = tf.add(tf.matmul(next_data, W), b)
            if i==len(self.parameter_list)-1:
                logits=z
                output_act=act_func(self.output_act_func)
                pred=output_act(z)
            else:
                hidden_act=act_func(self.hidden_act_func,self.h_act_p)
                self.h_act_p = np.mod(self.h_act_p + 1, len(self.hidden_act_func))
                next_data=hidden_act(z)
            
        return logits,pred

In [None]:
import sys
import numpy
from HiddenLayer import HiddenLayer
from LogisticRegression import LogisticRegression
from RBM import RBM
from utils import *


class DBN(object):
    def __init__(self, input=None, label=None,\
                 n_ins=2, hidden_layer_sizes=[3, 3], n_outs=2,\
                 rng=None):
        
        self.x = input
        self.y = label

        self.sigmoid_layers = []
        self.rbm_layers = []
        self.n_layers = len(hidden_layer_sizes)  # = len(self.rbm_layers)

        if rng is None:
            rng = numpy.random.RandomState(1234)

        
        assert self.n_layers > 0


        # construct multi-layer
        for i in xrange(self.n_layers):
            # layer_size
            if i == 0:
                input_size = n_ins
            else:
                input_size = hidden_layer_sizes[i - 1]

            # layer_input
            if i == 0:
                layer_input = self.x
            else:
                layer_input = self.sigmoid_layers[-1].sample_h_given_v()
                
            # construct sigmoid_layer
            sigmoid_layer = HiddenLayer(input=layer_input,
                                        n_in=input_size,
                                        n_out=hidden_layer_sizes[i],
                                        rng=rng,
                                        activation=sigmoid)
            self.sigmoid_layers.append(sigmoid_layer)


            # construct rbm_layer
            rbm_layer = RBM(input=layer_input,
                            n_visible=input_size,
                            n_hidden=hidden_layer_sizes[i],
                            W=sigmoid_layer.W,     # W, b are shared
                            hbias=sigmoid_layer.b)
            self.rbm_layers.append(rbm_layer)


        # layer for output using Logistic Regression
        self.log_layer = LogisticRegression(input=self.sigmoid_layers[-1].sample_h_given_v(),
                                            label=self.y,
                                            n_in=hidden_layer_sizes[-1],
                                            n_out=n_outs)

        # finetune cost: the negative log likelihood of the logistic regression layer
        self.finetune_cost = self.log_layer.negative_log_likelihood()



    def pretrain(self, lr=0.1, k=1, epochs=100):
        # pre-train layer-wise
        for i in xrange(self.n_layers):
            if i == 0:
                layer_input = self.x
            else:
                layer_input = self.sigmoid_layers[i-1].sample_h_given_v(layer_input)
            rbm = self.rbm_layers[i]
            
            for epoch in xrange(epochs):
                rbm.contrastive_divergence(lr=lr, k=k, input=layer_input)
                # cost = rbm.get_reconstruction_cross_entropy()
                # print >> sys.stderr, \
                #        'Pre-training layer %d, epoch %d, cost ' %(i, epoch), cost


    def finetune(self, lr=0.1, epochs=100):
        layer_input = self.sigmoid_layers[-1].sample_h_given_v()

        # train log_layer
        epoch = 0
        done_looping = False
        while (epoch < epochs) and (not done_looping):
            self.log_layer.train(lr=lr, input=layer_input)
            # self.finetune_cost = self.log_layer.negative_log_likelihood()
            # print >> sys.stderr, 'Training epoch %d, cost is ' % epoch, self.finetune_cost
            
            lr *= 0.95
            epoch += 1


    def predict(self, x):
        layer_input = x
        
        for i in xrange(self.n_layers):
            sigmoid_layer = self.sigmoid_layers[i]
            layer_input = sigmoid_layer.output(input=layer_input)

        out = self.log_layer.predict(layer_input)
        return out