In [1]:
import tensorflow as tf
import numpy as np

In [2]:
# load MNIST DIGITS
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

# normalize the train and test sets
x_train, x_test = x_train / 255.0, x_test / 255.0

# one-hot encode the labels
y_train = tf.one_hot(y_train, 10)
y_test = tf.one_hot(y_test, 10)

In [3]:
width = 28  # width of the image in pixels
height = 28  # height of the image in pixels
flat = width * height  # number of pixels in one image
class_output = 10  # number of possible classifications for the problem

In [4]:
# reshape train and test data
# the input image is 28 pixels by 28 pixels, 1 channel (grayscale). in this case, the first dimension is the batch number of the image
# it can be of any size (so we set it to -1). the second and third dimensions are width and height, and the last one is the image channels.

x_image_train = tf.reshape(x_train, [-1, 28, 28, 1])
x_image_train = tf.cast(x_image_train, 'float32')

x_image_test = tf.reshape(x_test, [-1, 28, 28, 1])
x_image_test = tf.cast(x_image_test, 'float32')

# creating new dataset with reshaped inputs
train_ds = tf.data.Dataset.from_tensor_slices((x_image_train, y_train)).batch(50)
test_ds = tf.data.Dataset.from_tensor_slices((x_image_test, y_test)).batch(50)

### Weights Biases and Convolutional Layer Definitions

In [5]:
# we define a kernel here. the size of the filter/kernel is 5x5; input channels is 1 (grayscale); and we need 32 different
# feature maps (here, 32 feature maps means 32 different filters are applied on each image, so the output of the convolution
# layer would be 28x28x32). in this step, we create a filter/kernel tensor of shape [filter_height, filter_width, in_channels, out_channels]

W_conv1 = tf.Variable(tf.random.truncated_normal([5, 5, 1, 32], stddev=0.1, seed=0))
b_conv1 = tf.Variable(tf.constant(0.1, shape=[32]))  # need 32 biases for 32 outputs


# 1st convolutional layer
def conv_layer_1(x):
    # 1st convolution
    x = tf.nn.conv2d(x, W_conv1, strides=[1, 1, 1, 1], padding='SAME') + b_conv1
    # relu
    x = tf.nn.relu(x)
    # max_pool
    x = tf.nn.max_pool(
        x,
        ksize=[1, 2, 2, 1],
        strides=[1, 2, 2, 1],
        padding="SAME"
    )
    return x


W_conv2 = tf.Variable(tf.random.truncated_normal([5, 5, 32, 64], stddev=0.1, seed=1))
b_conv2 = tf.Variable(tf.constant(0.1, shape=[64]))  # need 64 biases for 64 outputs


# 2nd convolutional layer
def conv_layer_2(x):
    # 1st convolution
    x = tf.nn.conv2d(x, W_conv2, strides=[1, 1, 1, 1], padding='SAME') + b_conv2
    # relu
    x = tf.nn.relu(x)
    # max_pool
    x = tf.nn.max_pool(
        x,
        ksize=[1, 2, 2, 1],
        strides=[1, 2, 2, 1],
        padding="SAME"
    )
    return x


# flatten for outputs
def flatten_conv_volume(x):
    x = tf.reshape(x, [-1, 7 * 7 * 64])
    return x

### Fully Connected Layer Definitions

In [6]:
# fc weights and biases
W_fc1 = tf.Variable(tf.random.truncated_normal([7 * 7 * 64, 1024], stddev=0.1, seed=2))
b_fc1 = tf.Variable(tf.constant(0.1, shape=[1024]))  # need 1024 biases for 1024 outputs


def fc_1(x):
    x = tf.matmul(x, W_fc1) + b_fc1
    x = tf.nn.relu(x)
    # add some dropout to avoid overfitting
    # set a rate of 0.3 (this was chosen somewhat arbitrarily)
    x = tf.nn.dropout(x, 0.3)
    return x


W_fc2 = tf.Variable(tf.random.truncated_normal([1024, 10], stddev=0.1, seed=2))  # 1024 neurons
b_fc2 = tf.Variable(tf.constant(0.1, shape=[10]))  # 10 possibilities for digits [0,1,2,3,4,5,6,7,8,9]


def fc_2(x):
    x = tf.matmul(x, W_fc2) + b_fc2
    x = tf.nn.relu(x)
    return x

### Full Network with Softmax Output

In [7]:
def y_hat_cnn(x):
    x = conv_layer_1(x)
    x = conv_layer_2(x)
    x = flatten_conv_volume(x)
    x = fc_1(x)
    x = fc_2(x)
    y_hat = tf.nn.softmax(x)
    return y_hat

### Loss Function and Optimizer

In [8]:
def cross_entropy(y_hat, y_pred):
    return -tf.reduce_sum(y_hat * tf.math.log(y_pred + 1.e-10))


optimizer = tf.keras.optimizers.Adam(1e-4)

In [9]:
# variables that gradient tape has to watch
variables = [W_conv1, b_conv1, W_conv2, b_conv2, W_fc1, b_fc1, W_fc2, b_fc2]


# single training step (possibly over a batch)
def train_step(x, y):
    with tf.GradientTape() as tape:
        y_hat = y_hat_cnn(x)
        current_loss = cross_entropy(y, y_hat)
        grads = tape.gradient(current_loss, variables)
        optimizer.apply_gradients(zip(grads, variables))
        return current_loss.numpy()

In [10]:
# train the model
loss_values = []
accuracies = []
epochs = 1

for i in range(epochs):
    j = 0
    # each batch has 50 examples
    print(f"############\nStarting epoch: {i + 1}\n############")
    for x_train_batch, y_train_batch in train_ds:
        j += 1
        current_loss = train_step(x_train_batch, y_train_batch)
        if j % 50 == 0:  #reporting intermittent batch statistics
            correct_prediction = tf.equal(
                tf.argmax(y_hat_cnn(x_train_batch), axis=1),
                tf.argmax(y_train_batch, axis=1)
            )
            #  accuracy
            accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)).numpy()
            print(f"Batch: {j}, loss: {current_loss}, accuracy: {accuracy}")


    current_loss = cross_entropy(y_train[:10000], y_hat_cnn(x_image_train[:10000, :, :, :])).numpy()
    loss_values.append(current_loss)
    print()
    correct_prediction = tf.equal(
        tf.argmax(y_hat_cnn(x_image_train[:10000, :, :, :]), axis=1),
        tf.argmax(y_train[:10000], axis=1)
    )
    #  accuracy
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)).numpy()
    accuracies.append(accuracy)
    print(f"############\nEnd of epoch: {i + 1}\n\tloss: {current_loss}\n\taccuracy: {accuracy}\n############")

############
Starting epoch: 1
############
Batch: 50, loss: 107.37092590332031, accuracy: 0.07999999821186066
Batch: 100, loss: 92.30964660644531, accuracy: 0.2800000011920929
Batch: 150, loss: 97.21812438964844, accuracy: 0.3400000035762787
Batch: 200, loss: 83.60059356689453, accuracy: 0.4000000059604645
Batch: 250, loss: 74.81949615478516, accuracy: 0.47999998927116394
Batch: 300, loss: 63.296043395996094, accuracy: 0.5400000214576721
Batch: 350, loss: 48.380313873291016, accuracy: 0.6399999856948853
Batch: 400, loss: 54.9945182800293, accuracy: 0.6600000262260437
Batch: 450, loss: 49.34773254394531, accuracy: 0.699999988079071
Batch: 500, loss: 32.57860565185547, accuracy: 0.8600000143051147
Batch: 550, loss: 12.160505294799805, accuracy: 0.9599999785423279
Batch: 600, loss: 17.0562744140625, accuracy: 0.8999999761581421
Batch: 650, loss: 17.437807083129883, accuracy: 0.8999999761581421
Batch: 700, loss: 6.042573928833008, accuracy: 0.9599999785423279
Batch: 750, loss: 24.06845092

In [11]:
# evaluate the model on validation set
j = 0
acc = []
# evaluate accuracy by batch and average...reporting every 100th batch
for x_train_batch, y_train_batch in test_ds:
    j += 1
    correct_prediction = tf.equal(
        tf.argmax(y_hat_cnn(x_train_batch), axis=1),
        tf.argmax(y_train_batch, axis=1)
    )
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)).numpy()
    acc.append(accuracy)
    if j % 100 == 0:
        print(f"\tBatch {j}, accuracy: {accuracy}")

print(f"\nAccuracy of entire set: {np.mean(acc)}")

	Batch 100, accuracy: 0.8999999761581421
	Batch 200, accuracy: 0.9599999785423279

Accuracy of entire set: 0.9519000053405762
