In [1]:
import numpy as np
from keras.utils import np_utils
from keras.datasets import mnist

# Add noise for train and test images.
def img_noise(x, ratio):
    number = np.shape(x)[0]
    x = (x * ratio + np.random.normal(loc=0.5, scale=0.16, size=(number, 28, 28, 1))) / (ratio + 1)
    x[np.where(x > 1.0)] = 1.0
    x[np.where(x < 0.0)] = 0.0
    return x

# load MNIST dataset.
def load_data():
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    number = 10000
    x_train = x_train[0:number]
    y_train = y_train[0:number]
    x_train = x_train.reshape(number, 28, 28, 1)
    x_train = x_train.astype('float32')
    x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)
    x_test = x_test.astype('float32')
    
    y_train = np_utils.to_categorical(y_train, 10)
    y_test = np_utils.to_categorical(y_test, 10)
    
    x_train = x_train / 255
    x_test = x_test / 255
    ratio = 10.0
    x_train = img_noise(x_train, ratio)
    x_test = img_noise(x_test, ratio)
    
    return (x_train, y_train), (x_test, y_test)

(x_train, y_train), (x_test, y_test) = load_data()
print('Shape of x_train: ', np.shape(x_train))
print('Shape of y_train: ', np.shape(y_train))
print('Shape of x_test: ', np.shape(x_test))
print('Shape of y_test: ', np.shape(y_test))

Using TensorFlow backend.


Shape of x_train:  (10000, 28, 28, 1)
Shape of y_train:  (10000, 10)
Shape of x_test:  (10000, 28, 28, 1)
Shape of y_test:  (10000, 10)


In [9]:
import os
import time
import numpy as np
from keras.models import Model
from keras.layers.core import Dense, Dropout, Activation
from keras.layers import Input, Conv2D, MaxPooling2D, AveragePooling2D, ZeroPadding2D, Flatten
from keras.optimizers import SGD, Adam, Adadelta
import keras.callbacks

patience = 5
model_name = 'MNIST_model_fixed_20180131'
base_dir = 'C:/Users/User/Raw data/CNN_test'
mdl_dir = os.path.join(base_dir, 'model')
if not os.path.exists(mdl_dir):
    os.makedirs(mdl_dir)
model_dir = os.path.join(mdl_dir, model_name)
if not os.path.exists(model_dir):
    os.makedirs(model_dir)

# Build CNN model and optimization.
def build_model():
    input_img = Input(shape=(28, 28, 1))
    
    block1 = Conv2D(8, (5, 5), padding='valid', data_format='channels_last',  activation='relu')(input_img)
    block1 = ZeroPadding2D(padding=(2,2), data_format = 'channels_last')(block1)
    block1 = MaxPooling2D((2,2), data_format = 'channels_last')(block1)
    
    block2 = Conv2D(8, (3, 3), padding='valid', data_format='channels_last',  activation='relu')(block1)
    block2 = ZeroPadding2D(padding=(1,1), data_format = 'channels_last')(block2)
    #block2 = MaxPooling2D((2,2), data_format = 'channels_last')(block2)
    '''
    block3 = Conv2D(32, (3, 3), padding='valid', data_format='channels_last',  activation='relu')(block2)
    block3 = ZeroPadding2D(padding=(1,1), data_format = 'channels_last')(block3)
    block3 = MaxPooling2D((2,2), data_format = 'channels_last')(block3)
    '''
    block4 = Conv2D(4, (3, 3), padding='valid', data_format='channels_last',  activation='relu')(block2)
    #block4 = ZeroPadding2D(padding=(1,1), data_format = 'channels_last')(block4)
    #block4 = MaxPooling2D((2,2), data_format = 'channels_last')(block4)
    
    block5 = Flatten()(block4)
    
    block6 = Dense(128, activation = 'relu')(block5)
    block6 = Dropout(0.5)(block6)
    
    output_labels = Dense(10, activation = 'softmax')(block6)
    
    model = Model(inputs = input_img, outputs = output_labels)
    opt = Adadelta(lr = 0.05, rho = 0.95, epsilon = 1e-06)
    #opt = 'adam'
    model.compile(loss = 'categorical_crossentropy', optimizer=opt, metrics = ['accuracy'])
    model.summary()
    return model

def train():
    epoch = 4
    batch = 128
    pretrain = False
    save_every = 1
    
    (x_train, y_train), (x_test, y_test) = load_data()
    
    train_model(batch, epoch, pretrain, save_every, x_train, y_train, np.asarray(x_test),np.asarray(y_test), model_name)

# Run and train model each epoch. 
def train_model(batch_size, num_epoch, pretrain, save_every, x_train, y_train, x_test, y_test, model_name=None):
    if pretrain == False:
        model = build_model()
    else:
        model = load_model(model_name)
    
    num_instances = len(y_train)
    iter_per_epoch = int(num_instances / batch_size)
    if (num_instances % batch_size) > 0:
        iter_per_epoch = iter_per_epoch + 1
    batch_cutoff = [0]
    for i in range(iter_per_epoch - 1):
        batch_cutoff.append(batch_size * (i+1))
    batch_cutoff.append(num_instances)
    
    total_start_t = time.time()
    best_metrics = 0.0
    early_stop_counter = 0
    for e in range(num_epoch):
        rand_idxs = np.random.permutation(num_instances)
        print('#######')
        print('epoch' + str(e+1))
        print('#######')
        start_t = time.time()
        for i in range(iter_per_epoch):
            if i % 50 ==0:
                print('Iteration' + str(i+1))
            X_batch = []
            Y_batch = []
            
            for n in range(batch_cutoff[i],batch_cutoff[i+1]):
                X_batch.append(x_train[rand_idxs[n]])
                Y_batch.append(y_train[rand_idxs[n]])
            model.train_on_batch(np.asarray(X_batch),np.asarray(Y_batch))
        
        loss_and_metrics = model.evaluate(x_test, y_test, batch_size)
        print('\nLoss & metrics:')
        print(loss_and_metrics)
        
        if loss_and_metrics[1] >= best_metrics:
            best_metrics = loss_and_metrics[1]
            print('save best score!!'+str(loss_and_metrics[1]))
            early_stop_counter = 0
        else:
            early_stop_counter += 1
            
        print('Elapsed time in epoch ' + str(e+1) + ': ' + str(time.time() - start_t))
        
        if (e+1) % save_every == 0:
            model_path = os.path.join(model_dir, 'model-%d.h5' %(e+1))
            model.save(model_path)
            model_weights_path = os.path.join(model_dir, 'model_weights-%d.h5' %(e+1))
            model.save_weights(model_weights_path)
            print('Saved model %s!' %str(e+1))
            
        if early_stop_counter >= patience:
            print('Stop by early stopping')
            print('Best score: ' + str(best_metrics))
            break
            
    print('Elapsed time in total: ' + str(time.time() - total_start_t))
    
train()

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_3 (InputLayer)         (None, 28, 28, 1)         0         
_________________________________________________________________
conv2d_7 (Conv2D)            (None, 24, 24, 8)         208       
_________________________________________________________________
zero_padding2d_5 (ZeroPaddin (None, 28, 28, 8)         0         
_________________________________________________________________
max_pooling2d_3 (MaxPooling2 (None, 14, 14, 8)         0         
_________________________________________________________________
conv2d_8 (Conv2D)            (None, 12, 12, 8)         584       
_________________________________________________________________
zero_padding2d_6 (ZeroPaddin (None, 14, 14, 8)         0         
_________________________________________________________________
conv2d_9 (Conv2D)            (None, 12, 12, 4)         292       
__________

In [2]:
import os
from keras.utils.vis_utils import plot_model
from sklearn.metrics import confusion_matrix
from keras.models import load_model

# Model validation.
base_dir = 'C:/Users/User/Raw data/CNN_test'
model_name = 'MNIST_model_fixed_20180118'
epoch = 32
model_plot_path = os.path.join(base_dir, 'image', model_name)
if not os.path.exists(model_plot_path):
    os.makedirs(model_plot_path)
model_path = os.path.join(base_dir, 'model', model_name, 'model-{}.h5'.format(epoch))

def draw_plot():
    emotion_model = load_model(model_path)
    plot_model(emotion_model, to_file=model_plot_path+'/model.png')
    
    predictions = emotion_model.predict(x_test)
    predictions = predictions.argmax(axis=-1)
    y_labels = y_test.argmax(axis=-1)
    confusions = confusion_matrix(y_labels, predictions)
    print('confusions [prediction\True]')
    print(confusions)

    result = emotion_model.evaluate(x_test, y_test)
    print('\nTest score:',result[0])
    print('Test accuracy:',result[1])
draw_plot()

confusions [prediction\True]
[[ 966    1    1    0    1    0    8    1    2    0]
 [   0 1123    3    2    0    0    6    0    1    0]
 [   7    2  990    4    2    0    3    8   15    1]
 [   2    0   12  958    0    7    0    5   19    7]
 [   0    3    2    0  949    0   10    1    2   15]
 [   4    0    1    6    0  859    8    1    9    4]
 [  10    3    1    0    4    2  935    1    2    0]
 [   1    4   16    3    0    0    0  993    2    9]
 [   7    3   10    3    4    3    4   10  924    6]
 [   6    7    0    8   11    1    1   12    7  956]]
Test score: 0.11351544203
Test accuracy: 0.9653
