Reference: https://www.tensorflow.org/tutorials/quickstart/advanced

In [None]:
import tensorflow as tf
import numpy as np
ks = tf.keras
print("TensorFlow version:", tf.__version__)

# Load the data

In [None]:
# Download MNIST dataset into numpy tensors.

(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train[..., None] / 255
x_test = x_test[..., None] / 255
y_train = np.int_(y_train)
y_test = np.int_(y_test)

In [None]:
# How big is our dataset? What kind of data do we have? 

print(x_train.shape, ', ', x_train.dtype)
print(y_train.shape, ', ', y_train.dtype)
print()
print(x_test.shape, ', ', x_test.dtype)
print(y_test.shape, ', ', y_test.dtype)

In [None]:
batch_size = 32

train_ds = tf.data.Dataset.from_tensor_slices(
    (x_train, y_train)).shuffle(10000).batch(batch_size)

test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(100)

# Define the model

In [None]:
# TODO: implement Dense and Conv2D layers myself

In [None]:
class FFNN(ks.Model):

  def __init__(self, output_size):
    super(FFNN, self).__init__()
    self.layers = [
        ks.layers.Dense(200, activation='relu'),
        ks.layers.Dense(100, activation='relu'),
        ks.layers.Dense(output_size)]

  def call(self, x):
    x = x.reshape(tf.shape(x)[0], -1)
    for layer in self.layers:
      x = layer(x)
    return x

In [None]:
# Reference: https://www.tensorflow.org/tutorials/images/cnn

class CNN(ks.Model):

  def __init__(self, output_size):
    super(CNN, self).__init__()
    self.cnn_layers = [
        ks.layers.Conv2D(32, kernel_size=5, activation='relu'),
        ks.layers.Conv2D(64, kernel_size=5, activation='relu')]
    self.ff_layers = [
        ks.layers.Dense(200, activation='relu'),
        ks.layers.Dense(output_size)]

  def call(self, x):
    for cnn_layer in self.cnn_layers:
      x = tf.nn.max_pool2d(cnn_layer(x), ksize=2, strides=1, padding='VALID')
    x = tf.reshape(x, (tf.shape(x)[0], -1))
    for ff_layer in self.ff_layers:
      x = ff_layer(x)
    return x

In [None]:
# TODO: also demonstrate Sequential

# Training loop

In [None]:
# TODO implement my own optimizer

In [None]:
learning_rate = 1e-3
batch_size = 32

# model = FFNN(10)
model = CNN(10)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True,  # predictions will be given as logits (log unnormalized probabilities) rather than probabilities
)

optimizer = tf.keras.optimizers.Adam()

# Use GPU if available.
# https://www.tensorflow.org/guide/gpu
GPUs = tf.config.list_physical_devices('GPU')
device = '/GPU:0' if GPUs else '/CPU:0'
print('device =', device)

In [None]:
@tf.function
def train_step(images, labels):
  with tf.GradientTape() as tape:
    # training=True is only needed if there are layers with different
    # behavior during training versus inference (e.g. Dropout).
    logits = model(images, training=True)
    loss = loss_object(labels, logits)
    loss += 1e-3 * sum(tf.norm(p, 1) for p in model.trainable_variables)  # regularization
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  return loss, logits

In [None]:
@tf.function
def accuracy(logits, target, normalize=True):
  argmaxs = tf.math.argmax(logits, axis=1)
  corrects = tf.math.equal(argmaxs, target)
  count = tf.math.count_nonzero(corrects)
  if normalize:
    count = count / logits.shape[0]
  return count

In [None]:
# Start tensorboard (optional)
# This will embed a tensorboard front-end in the output of this cell, which will display training graphs in realtime.
# See https://colab.research.google.com/github/tensorflow/tensorboard/blob/master/docs/tensorboard_in_notebooks.ipynb
%load_ext tensorboard
%tensorboard --logdir logs

In [None]:
tb_writer = tf.summary.create_file_writer('logs')  # Tensorboard writer
global_step = 0

In [None]:
num_epochs = 100

for epoch in range(num_epochs):
  for i, (images, labels) in enumerate(train_ds):  
    # Move tensors to the configured device
    with tf.device(device):
      loss_, logits_ = train_step(images, labels)
    
    global_step += 1

    if i % 100 == 0:
      loss_ = loss_.numpy()
      acc_ = accuracy(logits_, labels).numpy()
      print('  Step: %d | Train Loss: %.4f | Train Accuracy: %.2f' % (i, loss_, acc_))
      with tb_writer.as_default():
        tf.summary.scalar('train_loss', loss_, step=global_step)
        tf.summary.scalar('train_accuracy', acc_, step=global_step)
  
  with tf.device(device):
    loss_ = 0.0
    acc_ = 0.0
    for i, (x, y) in enumerate(test_ds):
      batch_logits_ = model(x)  # Test accuracy
      loss_ += loss_object(y, batch_logits_)
      acc_ += accuracy(batch_logits_, y).numpy()
    loss_ /= (i+1)
    acc_ /= (i+1)

  # Save model checkpoint
  model.save(f'./training_checkpoints/ckpt_{epoch}')

  print('')
  print('Epoch: %d | Test Loss: %.4f | Test Accuracy: %.2f' % (epoch, loss_, acc_))
  print('')
  with tb_writer.as_default():
    tf.summary.scalar('test_loss', loss_, step=global_step)
    tf.summary.scalar('test_accuracy', acc_, step=global_step)

In [None]:
# Manual save model
model.save(f'./training_checkpoints/ckpt_{epoch}')

# Load checkpoint

Reference: https://www.tensorflow.org/guide/keras/save_and_serialize

In [None]:
%ls training_checkpoints

In [None]:
model_copy = ks.models.load_model('./training_checkpoints/ckpt_0')

In [None]:
model_copy.compile()

In [None]:
with tf.device(device):
  loss_ = 0.0
  acc_ = 0.0
  for i, (x, y) in enumerate(test_ds):
    batch_logits_ = model_copy(x)  # Test accuracy
    loss_ += loss_object(y, batch_logits_)
    acc_ += accuracy(batch_logits_, y).numpy()
  loss_ /= (i+1)
  acc_ /= (i+1)

print('Test Loss: %.4f | Test Accuracy: %.2f' % (loss_, acc_))