<a href="https://colab.research.google.com/github/TakafumiMiwa/the-theory-of-statistic/blob/master/intro_chainer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### [1] Chanier のインストールとインストールの確認

In [0]:
!curl https://colab.chainer.org/install | sh -
!python -c 'import chainer; chainer.print_runtime_info()'

### [2] Fashion MNIST データの読み込み

In [0]:
from chainer.datasets.fashion_mnist import get_fashion_mnist
train, test = get_fashion_mnist()

### [3] データの確認

In [0]:
# sub functions
import matplotlib.pyplot as plt
LABEL_NAMES = [
    'T-shirt/top',
    'Trouser',
    'Pullover',
    'Dress',
    'Coat',
    'Sandal',
    'Shirt',
    'Sneaker',
    'Bag',
    'Ankle boot'
]

def get_label_name(label):
    return LABEL_NAMES[label]

データの長さ，表示

In [0]:
print('train data size =', len(train))
print('test data size =',  len(test))

In [0]:
x, t = test[11]
print('Shape of x:', x.shape)
print('label:', t)
print('label name:', get_label_name(t))


In [0]:
plt.plot(x)

In [0]:
plt.imshow(x.reshape(28, 28), 'gray')

train データのvalidation 用に分割

In [0]:
import chainer
train, validation = chainer.datasets.split_dataset_random(train, 50000, seed=0)

In [0]:
# sub functions
import numpy as np
def sub_category(x):
    tp = np.zeros(10).astype(np.float32)
    tp[x] = 1.0
    return tp

NN part 1

In [0]:
import numpy as np
import chainer.functions as F
import chainer.links as L
from chainer import Chain
from chainer import Variable
from chainer import datasets

LEARNING_RATE = 0.001

# weight baias
W1 = np.random.randn(200, 784).astype(np.float32)
W2 = np.random.randn(10, 200).astype(np.float32)

b1 = np.random.randn(200, 1).astype(np.float32)
b2 = np.random.randn(10, 1).astype(np.float32)

for t in range(len(train)):
    x, y = train[t]
    x = x.reshape(784, 1)
    y = sub_category(y).reshape(10, 1)

    h1 = W1.dot(x) + b1
    h2 = np.maximum(h1, 0)
    h3 = W2.dot(h2) + b2

    loss = np.square(h3 - y).sum() / 10   # L2 squre error 

    h1 = h1.reshape(200, 1)
    h2 = h2.reshape(200, 1)
    h3 = h3.reshape(10, 1)


    grad_L_h2 = 0.2*(h3 - y)    # L / h3

    W2_grad = grad_L_h2.dot(h2.T)        # W2 weght updater
    b2_grad = grad_L_h2                           # b2 weight  updater
    W1_grad = grad_L_h2.T.dot(W2).T 
    W1_grad[h1 < 0] = 0                            # RELU derivativer
    b1_grad = W1_grad                              # b1 weight updater
    W1_grad = W1_grad.dot(x.T)             # W1 weight updater

    W2 -= LEARNING_RATE * W2_grad
    b2 -= LEARNING_RATE * b2_grad
    W1 -= LEARNING_RATE  * W1_grad
    b1 -= LEARNING_RATE  * b1_grad

    

テストケースで確認

In [0]:
# test 

for t in range(10):
    index = np.random.randint(len(test))
    x, y = test[index]
    
    x = x.reshape(784, 1)
    y = sub_category(y).reshape(10, 1)

    h1 = W1.dot(x) + b1
    h2 = np.maximum(h1, 0)
    h3 = W2.dot(h2) + b2
    
    print("predict", h3.argmax(), "true", y.argmax())

NN part 2     もう少し chainer っぽく

In [0]:
import chainer.functions as F
import chainer.links as L
from chainer import Variable

LEARNING_RATE = 0.001

class MLP(chainer.Chain):
    def __init__(self):
        super(MLP, self).__init__()
        with self.init_scope():
            self.l1 = L.Linear(784, 200)
            self.l2 = L.Linear(200, 10)
            
    def __call__(self, x):
        h1 = self.l1(x)
        h2 = F.relu(h1)
        h3 = self.l2(h2)
        return h3
            
model = MLP()

for t in range(len(train)):
    x, y = train[t]
    x = x.reshape(1, 784)
    
    y_p = model(x)
    y_t = sub_category(y).reshape(1, 10)
    
    loss = F.mean_squared_error(y_p, y_t)
    print(loss.data)
    
    model.cleargrads()
    
    loss.backward()
    
    model.l1.W.data -= LEARNING_RATE * model.l1.W.grad
    model.l2.W.data -= LEARNING_RATE * model.l2.W.grad



NN part 3   loss function を任意に設定できるようにclass化

In [0]:
import chainer.functions as F
import chainer.links as L
from chainer import Chain
from chainer import optimizers, training
from chainer.training import extensions
from chainer import Variable
from chainer import reporter

batchsize = 128
n_epoch = 5

def sub_cate(x, line):
    tp = np.zeros((line, 10)).astype(np.float32)
    for t in range(line):
        index = np.int(x[t])
        tp[t][index]  = 1
    return tp

# model
class MLP(Chain):
    def __init__(self):
        super(MLP, self).__init__()
        with self.init_scope():
            self.l1=L.Linear(784, 200)  
            self.l2=L.Linear(200, 10)  

    def __call__(self, x):
        #h0 = x.reshape(1, 784)
        h1 = F.relu(self.l1(x))
        h2 = self.l2(h1) 
        return h2
        
class LossCalculator(chainer.Chain):
    def __init__(self, model):
        super(LossCalculator, self).__init__()
        with self.init_scope():
            self.model = model

    def __call__(self, x, y): 
        y_p = self.model(x)
        line = len(y)
        y_t  = sub_cate(np.float32(y).reshape(line, 1), line)
        loss = F.mean_squared_error(y_p, y_t)
        reporter.report({'loss': loss}, self)
        return loss

    
model1 = MLP()
model = LossCalculator(model1)

optimizer = chainer.optimizers.SGD()
optimizer.setup(model)

train_iter = chainer.iterators.SerialIterator(train, batchsize)
validation_iter = chainer.iterators.SerialIterator(validation, batchsize, repeat=False, shuffle=False)

updater = training.StandardUpdater(train_iter, optimizer)
trainer = chainer.training.Trainer(updater, (n_epoch, 'epoch'), out='out')
trainer.extend(extensions.Evaluator(validation_iter, model))
trainer.extend(extensions.LogReport())
trainer.extend(extensions.PrintReport(['epoch', 'main/loss',  'validation/main/loss',  'elapsed_time']))
    
trainer.run()

NN part 4  

In [0]:
import chainer 
import chainer.functions as F
import chainer.links as L
from chainer import Chain
from chainer import optimizers, training
from chainer.training import extensions

n_epoch = 5
batchsize = 256
device = 0

class MLP(Chain):
    def __init__(self):
        super(MLP, self).__init__()
        with self.init_scope():
            self.l1=L.Linear(784, 200)  
            self.l2=L.Linear(200, 10)  
            
    def __call__(self, x):
        h1 = F.tanh(self.l1(x))
        y = self.l2(h1)  
        return y
    
model = L.Classifier(MLP()) 

if device >= 0:
    model.to_gpu(device)

optimizer = chainer.optimizers.Adam()
optimizer.setup(model)

train_iter = chainer.iterators.SerialIterator(train, batchsize)
validation_iter = chainer.iterators.SerialIterator(validation, batchsize, repeat=False, shuffle=False)

updater = training.StandardUpdater(train_iter, optimizer, device=device)
trainer = chainer.training.Trainer(updater, (n_epoch, 'epoch'), out='out')

trainer.extend(extensions.LogReport())
trainer.extend(extensions.Evaluator(validation_iter, model, device=device), name='val')
trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'main/accuracy', 'val/main/loss', 'val/main/accuracy', 'elapsed_time']))
trainer.extend(extensions.PlotReport(['main/loss', 'val/main/loss'],x_key='epoch', file_name='loss.png'))
trainer.extend(extensions.PlotReport( ['main/accuracy', 'val/main/accuracy'], x_key='epoch', file_name='accuracy.png'))
trainer.extend(extensions.dump_graph('main/loss'))

    
trainer.run()
    
    

CNN final 

In [0]:
import chainer 
import chainer.functions as F
import chainer.links as L
from chainer import Chain
from chainer import optimizers, training
from chainer.training import extensions

train, test = get_fashion_mnist(ndim=3)
train, validation = chainer.datasets.split_dataset_random(train, 50000, seed=0)

n_epoch = 30
batchsize = 512
device = 0

class MLP(Chain):
    def __init__(self):
        super(MLP, self).__init__()
        with self.init_scope():
            self.cn1 = L.Convolution2D(1, 20, 5)
            self.cn2 = L.Convolution2D(20, 50, 5)
            self.fc1 = L.Linear(800, 500)
            self.fc2 = L.Linear(500, 10)
            
    def __call__(self, x):
        h1 = F.max_pooling_2d(F.relu(self.cn1(x)), 2)
        h2 = F.max_pooling_2d(F.relu(self.cn2(h1)), 2)
        h3 = F.dropout(F.relu(self.fc1(h2)))
        return self.fc2(h3)
    
    
model = L.Classifier(MLP()) 

if device >= 0:
    model.to_gpu(device)

optimizer = chainer.optimizers.Adam()
optimizer.setup(model)

train_iter = chainer.iterators.SerialIterator(train, batchsize)
validation_iter = chainer.iterators.SerialIterator(validation, batchsize, repeat=False, shuffle=False)

updater = training.StandardUpdater(train_iter, optimizer, device=device)
trainer = chainer.training.Trainer(updater, (n_epoch, 'epoch'), out='out')

trainer.extend(extensions.LogReport())
trainer.extend(extensions.Evaluator(validation_iter, model, device=device), name='val')
trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'main/accuracy', 'val/main/loss', 'val/main/accuracy', 'elapsed_time']))
trainer.extend(extensions.PlotReport(['main/loss', 'val/main/loss'],x_key='epoch', file_name='loss.png'))
trainer.extend(extensions.PlotReport( ['main/accuracy', 'val/main/accuracy'], x_key='epoch', file_name='accuracy.png'))
trainer.extend(extensions.dump_graph('main/loss'))

    
trainer.run()
    