In [None]:
'''
    classifying MNIST using an AUTOENCODER CNN on Tensorflow
    Dataset: MNIST
'''
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
# creating minibatches
def mini_batch(X, size):
    idx = np.random.randint(len(X), size = (size,1))
    x_bat = X[idx]
    x_bat = x_bat.reshape(size, 28, 28, 1)
    
    return x_bat

In [None]:
# this function adds gaussian noise in the images for the purpose of training
def create_noise(image):
    noisy = np.array(tf.random_normal(shape = tf.shape(image), mean = 0, stddev = 0, dtype = tf.float32))
    return image + noisy

In [None]:
# importing mnist dataset from keras
mnist = tf.keras.datasets.mnist
(x_train,y_train), (x_test, y_test) = mnist.load_data()

In [None]:
x_train = x_train.reshape(len(x_train),  28,  28,  1).astype(np.float32)
x_test = x_test.reshape(len(x_test),  28,  28,  1).astype(np.float32)
print("training data: ", x_train.shape)
print("testing data: ", x_test.shape)

In [None]:
tf.reset_default_graph()
# creating placeholder for input image data
inputs = tf.placeholder(tf.float32, (None, 28,28,1))
target = tf.placeholder(tf.float32, (None, 28,28,1))

# kernel weights
wc1 = tf.get_variable('wc1', [3,3,1,32], initializer=tf.contrib.layers.xavier_initializer())
wc2 = tf.get_variable('wc2', [3,3,32,32], initializer=tf.contrib.layers.xavier_initializer())
wc3 = tf.get_variable('wc3', [3,3,32,32], initializer=tf.contrib.layers.xavier_initializer())
wc4 = tf.get_variable('wc4', [3,3,32,16], initializer=tf.contrib.layers.xavier_initializer())
wc5 = tf.get_variable('wc5', [3,3,16,32], initializer=tf.contrib.layers.xavier_initializer())
wc6 = tf.get_variable('wc6', [3,3,32,32], initializer=tf.contrib.layers.xavier_initializer())
wc7 = tf.get_variable('wc7', [3,3,32,1], initializer=tf.contrib.layers.xavier_initializer())

# learning rate
learning_rate = 0.01

In [None]:
with tf.device('/cpu:0'):
    # encoder unit
    c1 = tf.nn.relu(tf.nn.conv2d(x_train, wc1, strides = [1,1,1,1], padding = 'SAME'))
    m1 = tf.nn.max_pool(c1, ksize = [1,2,2,1], strides = [1,2,2,1], padding = 'SAME')
    # m1 shape = 14 x 14 x 32
    print('m1     :',m1.shape)
    c2 = tf.nn.relu(tf.nn.conv2d(m1, wc2, strides = [1,1,1,1], padding = 'SAME'))
    m2 = tf.nn.max_pool(c2, ksize = [1,2,2,1], strides = [1,2,2,1], padding = 'SAME')
    # m2 shape = 7 x 7 x 32
    print('m2     :',m2.shape)
    c3 = tf.nn.relu(tf.nn.conv2d(m2, wc3, strides = [1,1,1,1], padding = 'SAME'))
    encoded = tf.nn.max_pool(c3, ksize = [1,2,2,1], strides = [1,2,2,1], padding = 'SAME')
    # encoded shape = 4 x 4 x 32
    print('encoder:',encoded.shape)

    # decoder unit
    d1 = tf.image.resize_images(encoded, size = (7,7), method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    c4 = tf.nn.relu(tf.nn.conv2d(d1, wc4, strides = [1,1,1,1], padding = 'SAME'))
    print('c4     :', c4.shape)
    d2 = tf.image.resize_images(c4, size = (14,14), method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    c5 = tf.nn.relu(tf.nn.conv2d(d2, wc5, strides = [1,1,1,1], padding = 'SAME'))
    print('c5     :', c5.shape)
    d3 = tf.image.resize_images(c5, size = (28,28), method = tf.image.ResizeMethod.NEAREST_NEIGHBOR)
    c6 = tf.nn.relu(tf.nn.conv2d(d3, wc6, strides = [1,1,1,1], padding = 'SAME'))
    print('c6     :', c6.shape)
    logits = tf.nn.conv2d(c6, wc7, strides = [1,1,1,1], padding = 'SAME')
    print('logits :',logits.shape)
    # sigmoid cross entropy
    cost_func = tf.nn.sigmoid_cross_entropy_with_logits(logits = logits, labels = inputs)
    optimize = tf.train.AdamOptimizer(learning_rate).minimize(cost_func)

In [None]:
# creating a tensorflow session
s = tf.Session(config=tf.ConfigProto(log_device_placement=True))
s.run(tf.global_variables_initializer())

# hyperparameters
epochs = 1

# training loop
for i in range(epochs):
    train_batch = mini_batch(x_train, 32)
    noise_batch = np.array(create_noise(train_batch))
    print("epoch: ",i)
    s.run(optimize, {inputs : noise_batch, target : train_batch})    

In [None]:
# cost, prediction and accuracy
cost = s.run(cost_func)
print("cost: ",cost)

noise_test = np.array(create_noise(x_test))
prediction = s.run(logits, {inputs: x_test, target: noise_test})

# plotting reconstructed images
w=10
h=10
fig=plt.figure(figsize=(8, 8))
columns = 4
rows = 5
img = np.array(prediction)[0:10]
j = 0
for i in range(1, columns*rows +1):
    fig.add_subplot(rows, columns, i)
    plt.imshow(img[j])
    j+=1
    
plt.show()