In [None]:
## Stacked Autoencoders with MNIST ##
## url : http://deeplearning.net/tutorial/SdA.html ##
import numpy as np
import theano
import theano.tensor as T

from logistic import load_data,LogisticRegression
from mlp import HiddenLayer
from DA_module import DenoisingAutoencoder

from theano.tensor.shared_randomstreams import RandomStreams

In [None]:
class StackedAutoencoder(object):
    def __init__(self,rng,hidden_units,n_in=784,n_out=10,corruption_levels=[0.1,0.2,0.3],pre_lr=0.001,fine_lr=0.1):
        self.num_layers = len(hidden_units)
        self.sigmoid_layers = []
        self.dA_layers = []
        self.params = []
        theano.rng = RandomStreams(rng.randint(2**30))
        # input data : x 
        self.x = T.matrix('x')
        # output data : y
        self.y = T.ivector('y')
        # theano random 
        theano_rng = RandomStreams(rng.randint(2 ** 30))
        # corruption_levels
        self.corruption_levels = corruption_levels
        # learning rates
        self.pre_lr = pre_lr
        self.fine_lr = fine_lr
                                
        
        # setup the layers : input, size_of_input
        for i in xrange(self.num_layers):
            # if layer 0
            if i == 0:# layer 0 : takes n_in inputs                
                layer_input_size = n_in
                layer_input = self.x
            else:
                layer_input_size = hidden_units[i-1]
                layer_input = self.sigmoid_layers[-1].output# previous layer's output
                # as the list of layers is built step by step
                #  the last layer in the list is the previous layer
                
            #construct a sigmoid layer    
            sigmoid_layer = HiddenLayer(rng=rng,
                                        input=layer_input,
                                        n_in=layer_input_size,
                                        n_out=hidden_units[i],
                                        activation=T.nnet.sigmoid)
            # add constructed sigmoid layer to list
            self.sigmoid_layers.append(sigmoid_layer)
            # append params of this layer to params collection of sDA
            self.params.extend(sigmoid_layer.params)
            # setup the corresponding autoencoder layer
            da = DenoisingAutoencoder(input=layer_input, rng=rng, num_v = layer_input_size,
                                      num_h = hidden_units[i],theano_rng=theano_rng,
                                      w=sigmoid_layer.w,bh=sigmoid_layer.b)# param sharing
            self.dA_layers.append(da)
            
        # setupt the last logistic layer
        self.output_layer = LogisticRegression(input=self.sigmoid_layers[-1].output,n_in=hidden_units[-1],
                                         n_out = n_out)
        # add params of logistic layer to global params
        # fine_tune cost
        self.fine_tune_cost = self.output_layer.neg_log_likelihood(self.y)
        # errors
        self.errors = self.output_layer.errors(self.y)
                    
        
    def get_pretrain_functions(self,train_set_x,batch_size):
        # list of train functions for training DA's 
        #  layer by layer
        fns = [] # build iteratively
        for i in xrange(self.num_layers):
            # cost and updates of ith DA # corruption level, learning_rate
            cost,updates = self.dA_layers[i].step(corruption_level = self.corruption_levels[i],
                                             learning_rate = self.pre_lr)
            # inputs and other params are already setup while
                # building the list of dA's
            # train function
            index = T.lscalar('index')
            train = theano.function([index],cost,updates=updates,
                                   givens={ self.x : train_set_x[index*batch_size : (index+1)*batch_size] }
                                   )
            fns.append(train)            
        return fns
    
    def get_finetune_functions(self,datasets,batch_size):
        (train_set_x, train_set_y) = datasets[0]
        (valid_set_x, valid_set_y) = datasets[1]
        (test_set_x, test_set_y) = datasets[2]

        # compute number of minibatches for training, validation and testing
        n_valid_batches = valid_set_x.get_value(borrow=True).shape[0]
        n_valid_batches /= batch_size
        n_test_batches = test_set_x.get_value(borrow=True).shape[0]
        n_test_batches /= batch_size
        
        
        # index
        index = T.lscalar('index')
        
        # gradients
        gparams = T.grad(self.fine_tune_cost,self.params)
        # updates
        updates = [ (param, param - (gparam * 0.1))
                    for param,gparam in zip(self.params,gparams)
                    ]
        # errors
        train = theano.function([index],self.fine_tune_cost,updates=updates,
                                    givens = { self.x : train_set_x[index*batch_size : (index+1)*batch_size],
                                               self.y : train_set_y[index*batch_size : (index+1)*batch_size]
                                              }
                                    )
        test = theano.function([index],self.errors,
                                    givens = { self.x : train_set_x[index*batch_size : (index+1)*batch_size],
                                               self.y : train_set_y[index*batch_size : (index+1)*batch_size]
                                              }
                                    )
        
        return train,test
                

In [None]:
# init SDA
rng = np.random.RandomState(8437)
sda = StackedAutoencoder(rng,[1000,1000,1000],n_in=784,n_out=10,corruption_levels=[0.1,0.2,0.3])

In [None]:
# load data
datasets = load_data('mnist.pkl.gz')

batch_size = 20

train_set_x, train_set_y = datasets[0]
test_set_x, test_set_y = datasets[2]

# compute number of minibatches for training, validation and testing
n_train_batches = train_set_x.get_value(borrow=True).shape[0] / batch_size
n_test_batches = test_set_x.get_value(borrow=True).shape[0] / batch_size

In [None]:
# get pretraining functions
ptrain_fns = sda.get_pretrain_functions(train_set_x=train_set_x,batch_size=1)
# let us train away
#  layer by layer
for fn_id in xrange(sda.num_layers):
    for iter in xrange(15):
        # cost : list of costs
        c = []
        for batch_id in xrange(n_train_batches):
            c.append(ptrain_fns[fn_id](index=batch_id))
        print "Layer : %d, Iteration %d, cost : " %(fn_id,iter)
        print np.mean(c)
                                       

In [None]:
# Supervised Fine Tuning 
fine_train, fine_test = sda.get_finetune_functions(datasets=datasets,batch_size=20)

for iter in xrange(100):
    cost_sum = 0
    for i in xrange(n_train_batches):
        cost = fine_train(index=i)
        cost_sum += cost
    print "Iteration %d, Avg Cost : %f" %(iter,cost_sum/n_train_batches)

In [None]:
er_sum=0
for i in xrange(n_test_batches):
    er = fine_test(index=i)
    er_sum += er
    print 'Error : %f' %(er)
print 'Avg error : %f' %(er_sum/n_test_batches)