In [6]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

import gzip
import numpy as np
import tensorflow as tf
from typing import Tuple
import time

Up until now, we've been running our code in "[eager execution](https://www.tensorflow.org/guide/eager)" mode, which is enabled by default. In this mode, the flow of code execution happens in the order we're accustomed to, and we can add breakpoints and inspect the values of our tensors and variables as usual.

In contrast, when in "[graph execution](https://www.tensorflow.org/guide/intro_to_graphs)" mode, the code execution flows a bit differently: during the first pass through the code, a graph is created containing information about the operations and tensors in that code. Then in subsequent passes, the graph is used instead of the Python code. One consequence of this flow is that our code is not debuggable in the usual manner. We gain two major advantages though:
- The graph can be deployed to environments that don't have Python, such as embedded devices. 
- The graph can take advantage of several performance optimizations, such as running parts of the code in parallel.

In order to get the best of both worlds, we use eager execution mode during the development phase, and then switch to graph execution mode once we're done debugging the model. To switch from eager to graph execution, we can add the `@tf.function` decorator to the function containing our model operations.

Let's look at the training code again, but this time with the `@tf.function` decorator applied to the `fit_one_batch` function, which is where we have all the model operations.

In [7]:
labels_map = {
    0: 'T-Shirt',
    1: 'Trouser',
    2: 'Pullover',
    3: 'Dress',
    4: 'Coat',
    5: 'Sandal',
    6: 'Shirt',
    7: 'Sneaker',
    8: 'Bag',
    9: 'Ankle Boot',
  }


def read_images(path: str, image_size: int, num_items: int) -> np.ndarray:
  f = gzip.open(path,'r')
  buf = f.read(image_size * image_size * num_items)
  data = np.frombuffer(buf, dtype=np.uint8).astype(np.float32)
  data = data.reshape(num_items, image_size, image_size)
  return data


def read_labels(path: str, num_items: int) -> np.ndarray:
  f = gzip.open(path,'r')
  f.read(8)
  buf = f.read(num_items)
  data = np.frombuffer(buf, dtype=np.uint8).astype(np.int64)
  data = data.reshape(num_items)
  return data


def get_data(batch_size: int) -> Tuple[tf.data.Dataset, tf.data.Dataset]:
  image_size = 28
  num_train = 60000
  num_test = 10000

  training_images = read_images('data/FashionMNIST/raw/train-images-idx3-ubyte.gz', image_size, num_train)
  test_images = read_images('data/FashionMNIST/raw/t10k-images-idx3-ubyte.gz', image_size, num_test)
  training_labels = read_labels('data/FashionMNIST/raw/train-labels-idx1-ubyte.gz', num_train)
  test_labels = read_labels('data/FashionMNIST/raw/t10k-labels-idx1-ubyte.gz', num_test)

  # (training_images, training_labels), (test_images, test_labels) = tf.keras.datasets.fashion_mnist.load_data()

  train_dataset = tf.data.Dataset.from_tensor_slices((training_images, training_labels))
  test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels))

  train_dataset = train_dataset.map(lambda image, label: (float(image) / 255.0, label))
  test_dataset = test_dataset.map(lambda image, label: (float(image) / 255.0, label))

  train_dataset = train_dataset.batch(batch_size).shuffle(500)
  test_dataset = test_dataset.batch(batch_size).shuffle(500)

  return (train_dataset, test_dataset)


def get_model() -> tf.keras.Model:
  model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(512, activation='relu'),
    tf.keras.layers.Dense(10)
  ])
  return model


@tf.function
def fit_one_batch(X: tf.Tensor, y: tf.Tensor, model: tf.keras.Model, loss_fn: tf.keras.losses.Loss, 
optimizer: tf.keras.optimizers.Optimizer) -> Tuple[tf.Tensor, tf.Tensor]:
  with tf.GradientTape() as tape:
    y_prime = model(X, training=True)
    loss = loss_fn(y, y_prime)

  grads = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  return (y_prime, loss)


def fit(dataset: tf.data.Dataset, model: tf.keras.Model, loss_fn: tf.keras.losses.Loss, 
optimizer: tf.optimizers.Optimizer) -> None:
  batch_count = len(dataset)
  loss_sum = 0
  correct_item_count = 0
  current_item_count = 0
  print_every = 100

  for batch_index, (X, y) in enumerate(dataset):
    (y_prime, loss) = fit_one_batch(X, y, model, loss_fn, optimizer)

    y = tf.cast(y, tf.int64)
    correct_item_count += (tf.math.argmax(y_prime, axis=1) == y).numpy().sum()

    batch_loss = loss.numpy()
    loss_sum += batch_loss
    current_item_count += len(X)

    if ((batch_index + 1) % print_every == 0) or ((batch_index + 1) == batch_count):
      batch_accuracy = correct_item_count / current_item_count * 100
      print(f'[Batch {batch_index + 1:>3d} - {current_item_count:>5d} items] accuracy: {batch_accuracy:>0.1f}%, loss: {batch_loss:>7f}')


learning_rate = 0.1
batch_size = 64
epochs = 2

(train_dataset, test_dataset) = get_data(batch_size)

model = get_model()

loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.optimizers.SGD(learning_rate)

print('\nFitting:')
t_begin = time.time()
for epoch in range(epochs):
  print(f'\nEpoch {epoch + 1}\n-------------------------------')
  fit(train_dataset, model, loss_fn, optimizer)
t_elapsed = time.time() - t_begin
print(f'\nTime per epoch: {t_elapsed / epochs :>.3f} sec' )


Fitting:

Epoch 1
-------------------------------
[Batch 100 -  6400 items] accuracy: 67.2%, loss: 0.806799
[Batch 200 - 12800 items] accuracy: 72.3%, loss: 0.624786
[Batch 300 - 19200 items] accuracy: 74.7%, loss: 0.638929
[Batch 400 - 25600 items] accuracy: 76.1%, loss: 0.397606
[Batch 500 - 32000 items] accuracy: 77.2%, loss: 0.702730
[Batch 600 - 38368 items] accuracy: 78.0%, loss: 0.773377
[Batch 700 - 44768 items] accuracy: 78.9%, loss: 0.342358
[Batch 800 - 51168 items] accuracy: 79.4%, loss: 0.341566
[Batch 900 - 57568 items] accuracy: 79.9%, loss: 0.350611
[Batch 938 - 60000 items] accuracy: 80.1%, loss: 0.431731

Epoch 2
-------------------------------
[Batch 100 -  6400 items] accuracy: 85.6%, loss: 0.529302
[Batch 200 - 12800 items] accuracy: 85.1%, loss: 0.458260
[Batch 300 - 19200 items] accuracy: 84.9%, loss: 0.528943
[Batch 400 - 25600 items] accuracy: 85.1%, loss: 0.699493
[Batch 500 - 32000 items] accuracy: 85.2%, loss: 0.638139
[Batch 600 - 38400 items] accuracy: 85

Notice that we also add a timer, and print the time it takes to train. You can comment and uncomment the `@tf.function` decorator, and notice the difference between the elapsed times. On my machine, eager execution takes more than twice the amount of time to train, compared to graph execution.

Now that we've trained our model, we're ready to test it, which we can do by running a single pass forward through the network. The function `evaluate_one_batch` contains the code that does this: we simply need to call the `model` to get a prediction, followed by the loss function `loss_fn` to get a score for how the predicted labels `y_prime` compare to the actual labels `y`. Notice that we don't add a `tf.GradientTape()` this time &mdash; that's because, since we don't do a backward pass during testing, we don't need to calculate derivatives for gradient descent. Notice also that we added a `@tf.function` decorator once we were done with development and debugging, to get a performance boost.  

In [8]:
@tf.function
def evaluate_one_batch(X: tf.Tensor, y: tf.Tensor, model: tf.keras.Model, 
loss_fn: tf.keras.losses.Loss) -> Tuple[tf.Tensor, tf.Tensor]:
  y_prime = model(X, training=False)
  loss = loss_fn(y, y_prime)

  return (y_prime, loss)

The `evaluate` function calls the `evaluate_one_batch` function for the entire dataset, once per mini-batch. The important code in the function below is just the `for` loop and the call to `evaluate_one_batch` within it &mdash; the rest is just boilerplate code to print progress during execution.

In [9]:
def evaluate(dataset: tf.data.Dataset, model: tf.keras.Model, 
loss_fn: tf.keras.losses.Loss) -> Tuple[float, float]:
  batch_count = len(dataset)
  loss_sum = 0
  correct_item_count = 0
  current_item_count = 0

  for (X, y) in dataset:
    (y_prime, loss) = evaluate_one_batch(X, y, model, loss_fn)

    correct_item_count += (tf.math.argmax(y_prime, axis=1).numpy() == y.numpy()).sum()
    loss_sum += loss.numpy()
    current_item_count += len(X)

  average_loss = loss_sum / batch_count
  accuracy = correct_item_count / current_item_count
  return (average_loss, accuracy)

And finally, we print the test loss and accuracy, and save the learned model parameters.

In [10]:
print('\nEvaluating:')
(test_loss, test_accuracy) = evaluate(test_dataset, model, loss_fn)
print(f'Test accuracy: {test_accuracy * 100:>0.1f}%, test loss: {test_loss:>8f}')

model.save_weights('outputs/weights')


Evaluating:
Test accuracy: 84.9%, test loss: 0.419172


Hopefully the test loss and accuracy you obtained are similar to the training loss and accuracy you obtained earlier. In this case they should be, but if that's not the case in your future projects, you may need to adjust your data or model. 