# 第6回講義 宿題

## 課題. Stacked Denoising Autoencoderで事前学習をした上で, MNISTを多層パーセプトロンで学習せよ

### 注意

- homework関数を完成させて提出してください
    - 訓練データはtrain_X, train_y, テストデータはtest_Xで与えられます
    - train_Xとtrain_yをtrain_X, train_yとvalid_X, valid_yに分けるなどしてモデルを学習させてください
    - test_Xに対して予想ラベルpred_yを作り, homework関数の戻り値としてください\
- pred_yのtest_yに対する精度(F値)で評価します
- 全体の実行時間がiLect上で60分を超えないようにしてください
- homework関数の外には何も書かないでください

- CNNは使わないでください

次のような内容のコードが**事前**に実行されます

```python
from collections import OrderedDict
from sklearn.utils import shuffle
from sklearn.metrics import f1_score
from sklearn.datasets import fetch_mldata
from sklearn.cross_validation import train_test_split

import numpy as np
import theano
import theano.tensor as T


mnist = fetch_mldata('MNIST original')
mnist_X, mnist_y = shuffle(mnist.data.astype('float32'),
                           mnist.target.astype('int32'),
                           random_state=42)

mnist_X = mnist_X / 255.0

train_X, test_X, train_y, test_y = train_test_split(mnist_X, mnist_y,
                                                    test_size=0.2,
                                                    random_state=??)
```

次のセルを完成させて提出してください
- **上記のコード以外で必要なもの**は全て書いてください

In [None]:
def homework(train_X, train_y, test_X):

    train_y = np.eye(10)[train_y].astype('int32')
    train_X, valid_X, train_y, valid_y = train_test_split(train_X,
                                                          train_y, test_size=0.2, random_state=None)


    class Autoencoder:
    # Constructor
        def __init__(self, visible_dim, hidden_dim, W, function):
            self.visible_dim = visible_dim
            self.hidden_dim = hidden_dim
            self.function = function
            self.W = W
            self.a = theano.shared(np.zeros(visible_dim).astype('float32'),
                                   name='a')
            self.b = theano.shared(np.zeros(hidden_dim).astype('float32'),
                                   name='b')
            self.params = [self.W, self.a, self.b]

    # Encoder
        def encode(self, x):
            u = T.dot(x, self.W) + self.b
            y = self.function(u)
            return y

    # Decoder
        def decode(self, x):
            u = T.dot(x, self.W.T) + self.a
            y = self.function(u)
            return y

    # Forward Propagation
        def f_prop(self, x):
            y = self.encode(x)
            reconst_x = self.decode(y)
            return reconst_x

    # Reconstruction Error
        def reconst_error(self, x, noise):
            tilde_x = x*noise
            reconst_x = self.f_prop(tilde_x)
            error = T.mean(T.sum(T.nnet.binary_crossentropy(reconst_x, x), axis=1))
            return error, reconst_x
    
    def sgd(params, g_params, eps=np.float32(0.1)):
        updates = OrderedDict()
        momentum = g_param
        for param, g_param in zip(params, g_params):
            updates[param] = param - (1-myu)*eps*g_param
        return updates

# Define the Layer class 

    class Layer:
    # Constructor
        def __init__(self, in_dim, out_dim, function, possibility):
            self.in_dim = in_dim
            self.out_dim = out_dim
            self.function = function
            self.W = theano.shared(np.random.uniform(low=-0.08, high=0.08,
                                               size=(in_dim, out_dim)
                                               ).astype('float32'),
                                   name='W')
            self.b = theano.shared(np.zeros(out_dim).astype('float32'), name='b')
            self.params = [self.W, self.b]
            self.possibility = possibility
            self.set_pretraining()

    # Forward Propagation
        def f_prop(self, x):
            self.u = T.dot(x, self.W) + self.b
            self.z = self.function(self.u)
            return self.z
    
        def get_mask(self):
            a = np.random.rand(self.out_dim) < self.possibility
            return a*np.float32(1.0)

    # Set Pretraining
        def set_pretraining(self):
            ae = Autoencoder(self.in_dim, self.out_dim, self.W, self.function)

            x = T.fmatrix(name='x')
            noise = T.fmatrix(name='noise')

            cost, reconst_x = ae.reconst_error(x, noise)
            params = ae.params
            g_params = T.grad(cost=cost, wrt=params)
            updates = sgd(params, g_params)

            self.pretraining = theano.function(inputs=[x, noise],
                                                outputs=[cost, reconst_x],
                                                updates=updates,
                                                allow_input_downcast=True,
                                                name='pretraining')

            hidden = ae.encode(x)
            self.encode_function = theano.function(inputs=[x], outputs=hidden,
                                                   allow_input_downcast=True,
                                                   name='encode_function')
            
    layers = [
        Layer(784, 600, T.nnet.sigmoid, 1.00),
        Layer(600, 500, T.nnet.sigmoid, 1.00),
        Layer(500, 450, T.nnet.sigmoid, 1.00),
        Layer(450, 300, T.nnet.sigmoid, 1.00),
        Layer(300, 10, T.nnet.softmax, 1.00)
        ]
    X = np.copy(train_X)
    for l, layer in enumerate(layers[:-1]):
        corruption_level = np.float32(0.3)
        batch_size = 100
        n_batches = X.shape[0] // batch_size

        for epoch in range(121):
            X = shuffle(X)
            err_all = []
            cost = []
            for i in range(0, n_batches):
                start = i*batch_size
                end = start + batch_size

                noise = np.random.binomial(size=X[start:end].shape, n=1,
                                     p=1-corruption_level)
                err, reconst_X = layer.pretraining(X[start:end], noise)
                err_all.append(err)
        #list(err)
            cost.append(np.mean(err))
        # print(np.mean(err))

        X = layer.encode_function(X)

    x = T.fmatrix(name='x')
    t = T.imatrix(name='t')

    params = []
    for i, layer in enumerate(layers):
        params += layer.params
        if i == 0:
            layer.mask = layer.get_mask()
            layer_out = layer.f_prop(x)*layer.mask
        else:
            layer.mask = layer.get_mask()
            layer_out = layer.f_prop(layer_out)*layer.mask

    y = layers[-1].z

    cost = T.mean(T.nnet.categorical_crossentropy(y, t))

    g_params = T.grad(cost=cost, wrt=params)
    updates = sgd(params, g_params)

    train = theano.function(inputs=[x, t], outputs=cost, updates=updates,
                        allow_input_downcast=True, name='train')
    valid = theano.function(inputs=[x, t], outputs=[cost, T.argmax(y, axis=1)],
                        allow_input_downcast=True, name='valid')
    test = theano.function([x], T.argmax(y, axis=1), name='test')
    
    batch_size = 100
    n_batches = train_X.shape[0]//batch_size

    for epoch in range(400):
        train_X, train_y = shuffle(train_X, train_y)
        for i in range(n_batches):
            start = i*batch_size
            end = start + batch_size
            train(train_X[start:end], train_y[start:end])
        valid_cost, predict_y = valid(valid_X, valid_y)
    return test(test_X)


In [None]:
from collections import OrderedDict
from sklearn.utils import shuffle
from sklearn.metrics import f1_score
from sklearn.datasets import fetch_mldata
from sklearn.cross_validation import train_test_split

import numpy as np
import theano
import theano.tensor as T


def load_mnist():
    mnist = fetch_mldata('MNIST original')
    mnist_X, mnist_y = shuffle(mnist.data.astype('float32'),
                               mnist.target.astype('int32'),
                               random_state=42)

    mnist_X = mnist_X / 255.0

    train_X, test_X, train_y, test_y = train_test_split(mnist_X, mnist_y,
                                                        test_size=0.2,
                                                        random_state=42)

    return (train_X, test_X, train_y, test_y)


def check_homework():
    train_X, test_X, train_y, test_y = load_mnist()

    # validate for small dataset
    train_X_mini = train_X[:1000]
    train_y_mini = train_y[:1000]
    test_X_mini = test_X[:1000]
    test_y_mini = test_y[:1000]

    pred_y = homework(train_X_mini, train_y_mini, test_X_mini)
    return f1_score(test_y_mini, pred_y, average='macro')

if 'homework' in globals():
    result = check_homework()

    print("No Error Occured!")

In [1]:
from collections import OrderedDict
from sklearn.utils import shuffle
from sklearn.metrics import f1_score
from sklearn.datasets import fetch_mldata
from sklearn.cross_validation import train_test_split

import numpy as np
import theano
import theano.tensor as T


mnist = fetch_mldata('MNIST original')
mnist_X, mnist_y = shuffle(mnist.data.astype('float32'),
                           mnist.target.astype('int32'),
                           random_state=42)

mnist_X = mnist_X / 255.0

train_X, test_X, train_y, test_y = train_test_split(mnist_X, mnist_y,
                                                    test_size=0.2,
                                                    random_state=122)

Using gpu device 0: GRID K520 (CNMeM is enabled with initial size: 95.0% of memory, cuDNN 4007)


In [None]:
print(sum(homework(train_X,train_y,test_X)==test_y)/len(test_y))