In [1]:
from load import mnist
import numpy as np

import pylab

import theano
import theano.tensor as T
from theano.sandbox.rng_mrg import MRG_RandomStreams as RandomStreams

In [2]:
# 1 encoder, decoder and a softmax layer

def init_weights(n_visible, n_hidden):
    initial_W = np.asarray(
        np.random.uniform(
            low=-4 * np.sqrt(6. / (n_hidden + n_visible)),
            high=4 * np.sqrt(6. / (n_hidden + n_visible)),
            size=(n_visible, n_hidden)),
        dtype=theano.config.floatX)
    return theano.shared(value=initial_W, name='W', borrow=True)

def init_bias(n):
    return theano.shared(value=np.zeros(n,dtype=theano.config.floatX),borrow=True)

In [19]:
def plot_mnist_data(X, file_name):
    pylab.figure()
    pylab.gray()
    size = int(np.sqrt(X[0].shape[0]))
    for i in range(100):
        pylab.subplot(10, 10, i+1); pylab.axis('off'); pylab.imshow(X[i,:].reshape(size,size))
    pylab.savefig('./Graph/' + file_name)
    pylab.close()
    

In [22]:
def plot_weight(weight, tag):
    # Plot 100 samples of weights (as images) learned at each layer
    w = weight.get_value()
    pylab.figure()
    pylab.gray()
    size = int(np.sqrt(w.shape[0]))
    for i in range(100):
        pylab.subplot(10, 10, i+1); pylab.axis('off'); pylab.imshow(w[:,i].reshape(size,size))
    pylab.savefig('./Graph/' + tag + '_weight.png')
    pylab.close()
    
    print('plot_weight finished!')

In [5]:
def plot_traning_error(d, tag):
    global training_epochs
    pylab.figure()
    pylab.plot(range(training_epochs), d)
    pylab.xlabel('iterations')
    pylab.ylabel('cross-entropy training error')
    pylab.savefig('./Graph/' + tag + '_training_error.png')
    pylab.close()
    
def plot_test_accuracy(acc, tag):
    global training_epochs
    pylab.figure()
    pylab.plot(range(training_epochs), acc)
    pylab.xlabel('iterations')
    pylab.ylabel('test accuracy')
    pylab.savefig('./Graph/' + tag + '_test_acc.png')
    pylab.close()

In [6]:
# load data
trX, teX, trY, teY = mnist()

trX, trY = trX[:12000], trY[:12000]
teX, teY = teX[:2000], teY[:2000]

print(trX.shape)

(12000, 784)


In [9]:
# question B (1) & B(2)
# construct the network
def construct_nn_part1_2():
    x = T.fmatrix('x')  
    d = T.fmatrix('d')

    rng = np.random.RandomState(123)
    theano_rng = RandomStreams(rng.randint(2 ** 30))

    corruption_level=0.1
    learning_rate = 0.1

    no_hidden1 = 900
    no_hidden2 = 625
    no_hidden3 = 400

    # 3 layers for encoder
    W1, b1 = init_weights(28*28, no_hidden1) , init_bias(no_hidden1)
    W2, b2 = init_weights(no_hidden1, no_hidden2), init_bias(no_hidden2)
    W3, b3 = init_weights(no_hidden2, no_hidden3), init_bias(no_hidden3)
    W4, b4 = init_weights(no_hidden3, 10), init_bias(10) # output layer for question B(2)

    # 3 layers for decoder
    b1_prime = init_bias(28*28)
    W1_prime = W1.transpose() 
    b2_prime = init_bias(no_hidden1)
    W2_prime = W2.transpose()
    b3_prime = init_bias(no_hidden2)
    W3_prime = W3.transpose()

    tilde_x = theano_rng.binomial(size=x.shape, n=1, p=1 - corruption_level,
                                  dtype=theano.config.floatX)*x
    # 3 layers for encoder
    y1 = T.nnet.sigmoid(T.dot(tilde_x, W1) + b1)
    y2 = T.nnet.sigmoid(T.dot(y1, W2) + b2)
    y3 = T.nnet.sigmoid(T.dot(y2, W3) + b3)

    # 3 layers for decoder
    z1 = T.nnet.sigmoid(T.dot(y3, W3_prime) + b3_prime)
    z2 = T.nnet.sigmoid(T.dot(z1, W2_prime) + b2_prime)
    z3 = T.nnet.sigmoid(T.dot(z2, W1_prime) + b1_prime)

#     crossentropy(py, Y))
    cost_da = - T.mean(T.sum(x * T.log(z3) + (1 - x) * T.log(1 - z3), axis=1))

    params_da = [W1, b1, W2, b2, W3, b3, b1_prime, b2_prime, b3_prime]
    grads_da = T.grad(cost_da, params_da)
    updates_da = [(param_da, param_da - learning_rate * grad_da)
               for param_da, grad_da in zip(params_da, grads_da)]
    train_da = theano.function(inputs=[x], outputs = cost_da, updates = updates_da, allow_input_downcast = True)
    test_da = theano.function(inputs=[x], outputs = z3, allow_input_downcast = True)
    encoder = theano.function(inputs=[x], outputs = y3, allow_input_downcast = True)
    # five-layer feedforward neuron network
    output_ff = T.nnet.softmax(T.dot(y3, W4)+b4)
    predicted_result_ff = T.argmax(output_ff, axis=1)
    cost_ff = T.mean(T.nnet.categorical_crossentropy(output_ff, d))

    params_ff = [W1, b1, W2, b2, W3, b3, W4, b4]
    grads_ff = T.grad(cost_ff, params_ff)
    updates_ff = [(param_ff, param_ff - learning_rate * grad_ff)
               for param_ff, grad_ff in zip(params_ff, grads_ff)]
    noisy_data = theano.function(inputs=[x], outputs = tilde_x, allow_input_downcast = True)
    train_ffn = theano.function(inputs=[x, d], outputs = cost_ff, updates = updates_ff, allow_input_downcast = True)
    test_ffn = theano.function(inputs=[x], outputs = predicted_result_ff, allow_input_downcast=True)
    
    return train_da, test_da, train_ffn, test_ffn, noisy_data, encoder, W1, W2, W3

In [10]:
train_da, test_da, train_ffn, test_ffn, noisy_data, encoder, W1, W2, W3 = construct_nn_part1_2()
print('training dae1 ...')
training_epochs = 25
batch_size = 128
reconstruction_error = []

for epoch in range(training_epochs):
    # go through trainng set
    c = []
    for start, end in zip(range(0, len(trX), batch_size), range(batch_size, len(trX), batch_size)):
        c.append(train_da(trX[start:end])) # costs
    reconstruction_error.append(np.mean(c, dtype='float64')) # reconstruction errors
    print(reconstruction_error[epoch])


training dae1 ...
253.312447671
203.755491445
166.736343807
133.610677384
118.168223202
107.835275016
100.136433503
94.5610479353
91.1413092506
87.3553519551
84.7786094544
82.4206637132
81.1944287626
79.5419997444
77.7325956122
76.5600525367
75.7449724294
75.0447745767
74.0869395147
73.4045476896
72.5246029346
71.8316064684
71.2151892642
71.0634954106
70.218303709


In [20]:
encoded_x = encoder(teX)
reconstructed_x = test_da(teX)
tilde_x = noisy_data(trX)
plot_mnist_data(trX, "1_original")
plot_mnist_data(tilde_x, "1_tilde")
plot_mnist_data(teX, "1_test")
plot_mnist_data(reconstructed_x, "1_reconstructed")
plot_mnist_data(encoded_x, "1_encoded")

In [14]:
np.sqrt(tilde_x[0].shape[0])

28.0

In [23]:
plot_weight(W1, '1_W1')
# do not know how to reshape
plot_weight(W2, '1_W2')  
plot_weight(W3, '1_W3')

plot_weight finished!
plot_weight finished!
plot_weight finished!


In [27]:
W1.get_value().shape

(784, 900)

In [None]:
# plot learning curves (i.e., reconstruction errors on training data) for training each epoch
plot_traning_error(reconstruction_error, '1')


In [None]:
print('\ntraining ffn ...')
ff_training_cost, ff_acc = [], []
for epoch in range(training_epochs):
    # go through trainng set
    c = []
    for start, end in zip(range(0, len(trX), batch_size), range(batch_size, len(trX), batch_size)):
        c.append(train_ffn(trX[start:end], trY[start:end]))
    ff_training_cost.append(np.mean(c, dtype='float64')) # training cost
    ff_acc.append(np.mean(np.argmax(teY, axis=1) == test_ffn(teX))) # accuracy
    print(ff_acc[epoch])

In [None]:
plot_traning_error(ff_training_cost, '2')
plot_test_accuracy(ff_acc, '2')