# Cats Dogs CNN Classifier
###### Kaggle link: https://www.kaggle.com/c/dogs-vs-cats

In [None]:
# import tensorflow for obvious reasons
import tensorflow as tf

In [None]:
# import numpy for data manipulation
import numpy as np

In [None]:
# import glob for getting file names
import glob

In [None]:
# variables
n_epochs = 200
n_batch = 128
buffer_size = 1024
one_hot_cat = np.array([0.0, 1.0])
one_hot_dog = np.array([1.0, 0.0])
resize_shape = [128,128]

In [None]:
# get file names
filenames = np.array(glob.glob("./dataset/train/*.jpg"), dtype=str)

In [None]:
# get total size of dataset
n_samples = len(filenames)

In [None]:
# create placeholder for labels
labels = np.zeros(shape=(n_samples, 2))

In [None]:
# create label vector
for counter, image_name in enumerate(filenames):
    if 'cat' in image_name:
        labels[counter] = one_hot_cat
    elif 'dog' in image_name:
        labels[counter] = one_hot_dog

In [None]:
# split the data
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(filenames, labels, test_size=0.1, random_state=buffer_size)

In [None]:
# create placeholder for input
X = tf.placeholder(tf.float32, [None, 128, 128, 3])

In [None]:
# create placeholder for label
y_ = tf.placeholder(tf.float32, [None, 2])

In [None]:
# create the image parser function
def _parser_function(filename, label):
    image_string = tf.read_file(filename)
    image = tf.image.decode_jpeg(image_string)
    image_resized = tf.image.resize_images(image, size=resize_shape)
    image_resized = image_resized / 255.0
    return image_resized, label

In [None]:
# create input pipeline using tf.data
dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train))
dataset = dataset.shuffle(buffer_size=buffer_size)
dataset = dataset.apply(tf.contrib.data.map_and_batch(map_func=_parser_function, batch_size=n_batch))
iterator = dataset.make_initializable_iterator()

In [None]:
# create batch iterators
input_mini_batch, label_mini_batch = iterator.get_next()

In [None]:
# create test pipeline using tf.data
test_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test))
test_dataset = test_dataset.shuffle(buffer_size=buffer_size)
test_dataset = test_dataset.apply(tf.contrib.data.map_and_batch(map_func=_parser_function, batch_size=n_batch))
test_iterator = test_dataset.make_initializable_iterator()

In [None]:
# create batch iterators
X_test_mini_batch, y_test_mini_batch = test_iterator.get_next()

In [None]:
# create weights and biases with a small amount of noise
def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.1)
    return tf.Variable(initial)

def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)

In [None]:
# convolution and pooling layers
def conv2d(x, W):
    return tf.nn.conv2d(x, W, strides=[1, 1, 1, 1], padding='SAME')

def max_pool_2x2(x):
    return tf.nn.max_pool(x, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

In [None]:
# initialize weights and biases
# convolutional layers
W_conv1 = weight_variable([5,5,3,32])
b_conv1 = bias_variable([32])

W_conv2 = weight_variable([5,5,32,32])
b_conv2 = bias_variable([32])

W_conv3 = weight_variable([5,5,32,64])
b_conv3 = bias_variable([64])

W_conv4 = weight_variable([5,5,64,64])
b_conv4 = bias_variable([64])

W_conv5 = weight_variable([5,5,64,96])
b_conv5 = bias_variable([96])

W_conv6 = weight_variable([5,5,96,96])
b_conv6 = bias_variable([96])

W_conv7 = weight_variable([5,5,96,128])
b_conv7 = bias_variable([128])

W_conv8 = weight_variable([5,5,128,128])
b_conv8 = bias_variable([128])

# fully connected layers
W_fc1 = weight_variable([8192,4096])
b_fc1 = bias_variable([4096])

W_fc2 = weight_variable([4096,4096])
b_fc2 = bias_variable([4096])

W_fc3 = weight_variable([4096,2])
b_fc3 = bias_variable([2])

In [None]:
# add dropout to conv layers
keep_prob_conv = tf.placeholder(tf.float32)

In [None]:
# add dropout to fc layers
keep_prob_fc = tf.placeholder(tf.float32)

In [None]:
# create tensorflow model
# conv layer 1
h_conv1 = conv2d(X, W_conv1)
h_conv1_activated = tf.nn.relu(h_conv1 + b_conv1)

# conv layer 2
h_conv2 = conv2d(h_conv1_activated, W_conv2)
h_conv2_activated = tf.nn.relu(h_conv2 + b_conv2)
h_pool2 = max_pool_2x2(h_conv2_activated)
h_pool2_dropout = tf.nn.dropout(h_pool2, keep_prob_conv)

# conv layer 3
h_conv3 = conv2d(h_pool2_dropout, W_conv3)
h_conv3_activated = tf.nn.relu(h_conv3 + b_conv3)

# conv layer 4
h_conv4 = conv2d(h_conv3_activated, W_conv4)
h_conv4_activated = tf.nn.relu(h_conv4 + b_conv4)
h_pool4 = max_pool_2x2(h_conv4_activated)
h_pool4_dropout = tf.nn.dropout(h_pool4, keep_prob_conv)

# conv layer 5
h_conv5 = conv2d(h_pool4_dropout, W_conv5)
h_conv5_activated = tf.nn.relu(h_conv5 + b_conv5)

#conv layer 6
h_conv6 = conv2d(h_conv5_activated, W_conv6)
h_conv6_activated = tf.nn.relu(h_conv6 + b_conv6)
h_pool6 = max_pool_2x2(h_conv6_activated)
h_pool6_dropout = tf.nn.dropout(h_pool6, keep_prob_conv)

# conv layer 7
h_conv7 = conv2d(h_pool6_dropout, W_conv7)
h_conv7_activated = tf.nn.relu(h_conv7 + b_conv7)

#conv layer 8
h_conv8 = conv2d(h_conv7_activated, W_conv8)
h_conv8_activated = tf.nn.relu(h_conv8 + b_conv8)
h_pool8 = max_pool_2x2(h_conv8_activated)
h_pool8_dropout = tf.nn.dropout(h_pool8, keep_prob_conv)

In [None]:
# flatten output from conv layers
x_fc = tf.reshape(h_pool8_dropout, shape=[-1, 8192])

In [None]:
# fully conected layer 1
h_fc1 = tf.matmul(x_fc, W_fc1) + b_fc1
h_fc1_activated = tf.nn.relu(h_fc1)
h_fc1_dropout = tf.nn.dropout(h_fc1_activated, keep_prob_fc)

# fully conected layer 2
h_fc2 = tf.matmul(h_fc1_dropout, W_fc2)
h_fc2_activated = tf.nn.relu(h_fc2)

# fully conected layer 3
y = tf.matmul(h_fc2_activated, W_fc3)

In [None]:
# define the cost function 
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=y, labels=y_))

In [None]:
# tell the tensorflow computational graph to minimize the cost using adam optimizer
train = tf.train.AdamOptimizer().minimize(cost)

In [None]:
# predict output using our model
# returns a boolean array
predictions = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))

In [None]:
# cast the boolean array to float and calculate accuracy
accuracy = tf.reduce_mean(tf.cast(predictions, tf.float32))

In [None]:
# initialize tensorflow interactive session
sess = tf.InteractiveSession()

In [None]:
# initialize variables inside the graph
tf.global_variables_initializer().run()

In [None]:
# load graph
saver = tf.train.Saver()
saver.restore(sess, "./model.ckpt")

In [None]:
# save graph
def save_graph(epoch):
    saver = tf.train.Saver()
    save_path = saver.save(sess, "./model_epoch_final.ckpt", global_step=epoch)

In [None]:
# create function to calculate train and test accuracy
def calculate_accuracy():
    # reset iterators
    sess.run(iterator.initializer)
    sess.run(test_iterator.initializer)
    
    train_acc = 0
    train_iters = 0
    
    # check train accuracy over 5 mini batches, to reduce computation
    for _ in range(5):
        train_iters += 1
        X_test, y_test = sess.run([input_mini_batch, label_mini_batch])
        train_acc += sess.run(accuracy, feed_dict={X: X_test, y_: y_test, keep_prob_fc: 1.0, keep_prob_conv: 1.0})

    test_acc = 0
    test_iters = 0
    while True:
        try:
            test_iters += 1
            X_test, y_test = sess.run([X_test_mini_batch, y_test_mini_batch])
            test_acc += sess.run(accuracy, feed_dict={X: X_test, y_: y_test, keep_prob_fc: 1.0, keep_prob_conv: 1.0})
        except tf.errors.OutOfRangeError:
            break
    
    # print accuracy
    print(f"Train accuracy: {train_acc/train_iters}")
    print(f"Test accuracy: {test_acc/test_iters}")

In [None]:
# train the model! (uses mini-batch)
for epoch in range(0,n_epochs):
    epoch_loss = 0
    sess.run(iterator.initializer)
    
    while True:
        try:
            X_for_train, y_for_train = sess.run([input_mini_batch, label_mini_batch])
            t, c = sess.run([train, cost], feed_dict={X: X_for_train, y_: y_for_train, keep_prob_fc: 0.5, keep_prob_conv: 0.8})
            epoch_loss += c
        except tf.errors.OutOfRangeError:
            break

    # print loss
    print(f"Epoch {epoch} out of {n_epochs}, loss: {epoch_loss}")
    
    # print train and test accuracies
    calculate_accuracy()
    
    # Save Graph
    if epoch % 2 == 0 and epoch >= 2:
        save_graph(epoch)

In [None]:
# calculate final accuracy
sess.run(test_iterator.initializer)
acc = 0
iters = 0
while True:
    try:
        iters += 1
        X_for_train, y_for_train = sess.run([X_test_mini_batch, y_test_mini_batch])
        acc += sess.run(accuracy, feed_dict={X: X_for_train, y_: y_for_train, keep_prob_fc: 1.0, keep_prob_conv: 1.0})
    except tf.errors.OutOfRangeError:
        break
        
print(f"Accuracy: {acc/iters}")