In [None]:
# 2017-11-22 deep learning club
# digits regognition (mnist) + data augmentation 
# https://keras.io
# concepts covered: 
# - [x] NN with dense layers
# - [x] not so deep cNN
# - [x] weights initialization
# - [x] data augmentation
# - [x] saving augmented data
# - [x] saving and loading the model

In [None]:
from __future__ import print_function
import keras
from keras.datasets import mnist
from keras.preprocessing.image import ImageDataGenerator
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers.convolutional import ZeroPadding2D
from keras.models import load_model

# convenient imports
import tensorflow as tf
from keras import backend as K

In [None]:
import sys,os,time,random
import numpy as np

import matplotlib
matplotlib.use('Agg');
import matplotlib.pyplot as plt
plt.set_cmap('Greys');

import pickle

%matplotlib inline

In [None]:
print("keras", keras.__version__)
print("tensorflow", tf.__version__)

In [None]:
# check the backend the ordering of the channels
print(keras.backend.backend())
print(keras.backend.image_dim_ordering())
print(K.image_data_format())

In [None]:
#reproducibility
seed = 1331
random.seed(seed)
np.random.seed(seed)
tf.set_random_seed(seed)

In [None]:
# The data, shuffled and split between train and test sets:
(x_train, y_train), (x_test, y_test) = mnist.load_data()
print('train samples:', x_train.shape[0])
print('test samples:', x_test.shape[0])
# IMP: trick to make the CNN work: add extra dimension [:,:,:, None] that corresponds to the number of channels 
x_train = x_train.astype('float32')[:,:,:, None]
x_test = x_test.astype('float32')[:,:,:, None]
print('x_train shape:', x_train.shape)

In [None]:
# normalization
x_train /= 255
x_test /= 255

In [None]:
batch_size = 128
epochs = 10
num_classes = 10
num_channels = 1

input_shape = (x_train.shape[1], x_train.shape[2], num_channels)

print ("input shape:", input_shape)

In [None]:
# put the show images here 
# examples of the images from the training set 
n_images_show = 7
plt.rcParams['figure.figsize'] = (15, 5)
plt.imshow(np.concatenate(x_train[:n_images_show, :, :, 0],axis=1), interpolation='none')
plt.axis('off');

In [None]:
# Convert class vectors to binary class matrices.
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

In [None]:
# fully connected model
def create_dense_model(initializer):
    model = Sequential()
    model.add(Flatten(input_shape=x_train.shape[1:]))
    model.add(Dense(100, kernel_initializer=initializer))
    model.add(keras.layers.normalization.BatchNormalization())
    model.add(Activation('relu'))
    model.add(Dense(num_classes, kernel_initializer=initializer))

    # initiate RMSprop optimizer
    opt = keras.optimizers.SGD(lr=0.0001, decay=0.0, momentum=0.0, nesterov=False)

    # Let's train the model using RMSprop
    model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])
    return model

In [None]:
initializers = ['zeros', 'ones', 'random_uniform', 'glorot_uniform']
model = create_dense_model(initializers[3])
model.summary()

In [None]:
history = model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=epochs,
              validation_data=(x_test, y_test),
              shuffle=True)

In [None]:
# plots the training process
def plot_history(history):
    print("Available data:", history.history.keys())
    # summarize history for accuracy
    plt.figure
    plt.plot(history.history['acc'])
    plt.plot(history.history['val_acc'])
    plt.title('model accuracy')
    plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper right')
    plt.show()
    # summarize history for loss
    plt.plot(history.history['loss'])
    plt.plot(history.history['val_loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'test'], loc='upper right')
    plt.show()

In [None]:
plot_history(history)

In [None]:
# put the show images here 
# examples of the images from the training set 
n_images_show = 7
sample = x_test[np.random.choice(x_test.shape[0], n_images_show, replace=False)]
predicted = model.predict(sample).argmax(-1)
plt.figure(figsize=(16,8))
for i in range(n_images_show):
    plt.subplot(1, n_images_show, i+1)
    plt.imshow(sample[i, :, :, 0], interpolation='none')
    plt.text(0, 0, predicted[i], color='black', 
             bbox=dict(facecolor='white', alpha=1))
    plt.axis('off')

In [None]:
# conv net
def model_simple_conv_model():
    model = Sequential()
    model.add(Conv2D(16, (3, 3), input_shape=input_shape))
    model.add(Activation('relu'))
    model.add(Conv2D(32, (5, 5)))
    model.add(Activation('relu'))
    model.add(Flatten())
    model.add(Dense(10))
    model.add(Activation('softmax'))

    # initiate RMSprop optimizer
    opt = keras.optimizers.rmsprop(lr=0.0001, decay=1e-6)

    # Let's train the model using RMSprop
    model.compile(loss='categorical_crossentropy',
                  optimizer=opt,
                  metrics=['accuracy'])
    return model

In [None]:
model = model_simple_conv_model()
model.summary()

In [None]:
hist = model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=epochs,
              validation_data=(x_test, y_test),
              shuffle=True)

In [None]:
scores = model.evaluate(x_test, y_test, verbose=0)
print("Baseline Error: %.2f%%" % (100 - scores[1]*100))

In [None]:
# saving model using keras 
directory = "data/models/"
if not os.path.exists(directory):
    os.makedirs(directory)

model_path = directory + "nn-model.h5py"
model.save(model_path)

In [None]:
# loading model from keras 
if 'model' in globals(): # check that the model is defined
    del model 
model = load_model(model_path)

In [None]:
# check that we loaded the same model 
scores = model.evaluate(x_test, y_test, verbose=0)
print("Baseline Error: %.2f%%" % (100 - scores[1]*100))

In [None]:
# pretty way to show the digits
def plot_digits(x_data, y_pred):
    for i in range(0, n_images_show):
        plt.subplot(1, n_images_show, i + 1)
        plt.imshow(x_data[i].reshape(input_shape[0], input_shape[1]), interpolation='none')    
        plt.text(0, 0, y_pred[i], color='black', bbox=dict(facecolor='white', alpha=1))
        plt.axis('off')
    plt.show()

In [None]:
# check that the model is actually working 
proba = model.predict(x_test)
digits_predicted = np.argmax(proba, axis=1)

plot_digits(x_test, digits_predicted);

In [None]:
# data augmentation
# define the data generator
shift = 0.06
angle = 30 # 45

# IMP: check the fill_mode that you set! 
image_data_gen_args = dict(featurewise_center=False, 
                             featurewise_std_normalization=False, 
                             # zca_whitening=True, 
                             rotation_range=angle,
                             width_shift_range=shift, 
                             height_shift_range=shift,
                             fill_mode = 'constant',
                             cval = 0,
                             horizontal_flip=False, 
                             vertical_flip=False,
                             )

datagen = ImageDataGenerator(**image_data_gen_args)
# is not necessary for simple transformations
# datagen.fit(x_train, augment=True, seed=seed)

In [None]:
# check that we loaded the same model 
scores = model.evaluate(x_test, y_test, verbose=0)
print("Baseline Error: %.2f%%" % (100 - scores[1]*100))

In [None]:
# performance on the modified data
j_batch = 0 # counter to break an infinite loop
for X_batch, y_batch in datagen.flow(x_train, y_train, batch_size=batch_size):
    # probabilities of being a specific digit
    proba = model.predict(X_batch, verbose=0) 
    digits_predicted = np.argmax(proba, axis=1)
    # show all images  
    plot_digits(X_batch, digits_predicted) 
    
    j_batch += 1
    if (j_batch >= 3):
        break

In [None]:
# check that we loaded the same model 
scores = model.evaluate_generator(datagen.flow(x_test, y_test, batch_size=batch_size), steps=len(x_test) / batch_size)
print("Baseline Error: %.2f%%" % (100 - scores[1]*100))

In [None]:
# data augmentation
# current_dir = os.getcwd()
j_batch = 0 # counter to break an infinite loop
# save_to_dir=(current_dir + '/data/images'), save_prefix='aug', save_format='png'): # if you want to save the images
for X_batch, y_batch in datagen.flow(x_train, y_train, batch_size=batch_size):
    print(j_batch, "th batch with the shapes:", X_batch.shape, y_batch.shape)   
    y_digits = np.nonzero(y_batch)[1]
    # show all images
    plot_digits(X_batch, y_digits) 
    
    j_batch += 1
    if (j_batch >= 3):
        break

In [None]:
# fits the model on batches with real-time data augmentation
model.fit_generator(datagen.flow(x_train, y_train, batch_size=batch_size),
                    steps_per_epoch=len(x_train) / batch_size, epochs=epochs)

In [None]:
# verify model on the augmented data
scores = model.evaluate_generator(datagen.flow(x_test, y_test, batch_size=batch_size), steps=len(x_test) / batch_size)
print("Baseline Error: %.2f%%" % (100 - scores[1]*100))

In [None]:
# verify model on the initial data 
scores = model.evaluate(x_test, y_test, verbose=0)
print("Baseline Error: %.2f%%" % (100 - scores[1]*100))