In [1]:
#import libraries to build the model

%matplotlib inline
from matplotlib import pyplot as plt

import cv2
import numpy as np

import keras

from keras.applications import VGG16
from keras.layers import GlobalAveragePooling2D
from keras.models import Model
from sklearn.mixture import GaussianMixture
from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist
from keras.models import Sequential, load_model
from keras.layers import Dense, Activation, Flatten, Conv2D, MaxPooling2D
from keras.preprocessing.image import ImageDataGenerator

Using TensorFlow backend.


In [2]:
# the data, shuffled and split between train and test sets
(x_train, y_train), (x_test, y_test) = mnist.load_data()
normal_x = x_train[y_train == 1]
x_test = x_test[(y_test == 0) | (y_test == 1)]
y_test = y_test[(y_test == 0) | (y_test == 1)]

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


Exception: URL fetch failure on https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz: None -- [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:841)

In [None]:
#results can be from 0 to 9
num_classes = len(np.unique(y_train))
#determine batch size
batch_size = 250
#determine epoch
epochs = 10


In [None]:
#build a GMM with number of unşque values of y_train dataset
gmm = GaussianMixture(n_components=num_classes)

In [None]:
def reshape_x(x):
    new_x = np.empty((len(x), 56, 56))
    for i, e in enumerate(x):
        new_x[i] = cv2.resize(e, (56, 56))

    new_x = np.expand_dims(new_x, axis=-1)
    new_x = np.repeat(new_x, 3, axis=-1)
    return new_x

In [None]:
#transform 2D 28x28 matrix to 3D (28x28x1) matrix
x_normal = normal_x.reshape(normal_x.shape[0], 28, 28, 1)
x_train = x_train.reshape(x_train.shape[0], 28, 28, 1)
x_test = x_test.reshape(x_test.shape[0], 28, 28, 1)

x_normal = np.expand_dims(x_normal, axis=-1)
x_normal = np.repeat(x_normal, 3, axis=-1)

x_normal = x_normal.astype('float32')
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')

#inputs have to be between [0, 1]
x_normal /= 255
x_train /= 255
x_test /= 255

print('x_normal shape:', x_normal.shape)
print('x_train shape:', x_train.shape)

print(x_normal.shape[0], 'normal samples')
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

In [None]:
x_normal.shape

In [None]:
# convert labels to binary form
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)

In [None]:
#create neural networks structure
model = Sequential()

#1st convolution layer
model.add(Conv2D(32, (3, 3) #32 is number of filters and (3, 3) is the size of the filter.
	, input_shape=(28,28,1)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Conv2D(64,(3, 3))) # apply 64 filters sized of (3x3) on 2nd convolution layer
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2,2)))

model.add(Flatten())

# Fully connected layer. 1 hidden layer consisting of 512 nodes
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dense(num_classes, activation='softmax'))

In [None]:
x_normal.shape

In [None]:
#Train the model with small size instances. Thus, you can create a model with a single CPU in a short time.
gen = ImageDataGenerator()

train_generator = gen.flow(x_train, y_train, batch_size=batch_size)

In [None]:
x_test.shape , y_test.shape

In [None]:
model.compile(loss='categorical_crossentropy'
	, optimizer=keras.optimizers.Adam()
	, metrics=['accuracy']
)

model.fit_generator(train_generator, steps_per_epoch=batch_size, epochs=epochs, 
	validation_data=(x_test, y_test) #validate on all test set
)
#export model
model.save("model.hdf5")

In [None]:
x_normal.shape

In [None]:
#CONV2 model to extract features from training data
model = load_model("model.hdf5")

features = model.predict(x_test)


In [None]:
gmm.fit(features)

In [None]:
#VGG model to extract features from testing data
OKscore = gmm.score_samples(features)
thred = OKscore.mean() - 3 * OKscore.std()

test_features = model.predict(x_test)
score = model.evaluate(x_test, y_test, verbose=0)


In [None]:
score

In [None]:
print('normal accuracy:', 100 - score[0])
print('abnormal accuracy:', 100*score[1])

In [None]:
model = load_model("model.hdf5")

predictions = model.predict(x_test)

#display wrongly classified instances
index = 0
for i in predictions:
	if index < 10000:
		actual = np.argmax(y_test[index])
		pred = np.argmax(i)
		
		if actual != pred:
			print("predict: ",pred," actual: ",actual)
			picture = x_test[index]
			picture = picture.reshape([28, 28]);
			plt.gray()
			plt.imshow(picture)
			plt.show()
		
	index = index + 1