In [58]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from tensorflow.keras.datasets import mnist

import numpy as np
import matplotlib.pyplot as plt

tf.config.list_physical_devices()

[PhysicalDevice(name='/physical_device:CPU:0', device_type='CPU'),
 PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]

In [59]:
def get_minst_model():

    inputs = keras.Input(shape=(28*28,))
    features = layers.Dense(512, activation="relu")(inputs)
    features = layers.Dropout(0.5)(features)
    outputs = layers.Dense(10, activation="softmax")(features)

    model = keras.Model(inputs, outputs)

    return model

In [60]:
(images, labels), (test_images, test_labels) = mnist.load_data()
images = images.reshape((60000, 28*28)).astype("float32") / 255
test_images = test_images.reshape((10000, 28*28)).astype("float32") / 255
train_images, val_images = images[10000:], images[:10000]
train_labels, val_labels = labels[10000:], labels[:10000]

In [61]:
# optimizer = keras.optimizers.RMSprop
# loss_fn = keras.losses.sparse_categorical_crossentropy

# def train_step(inputs, targets):
#     with tf.GradientTape() as tape:
#         predictions = model(inputs, targets)
#         loss = loss_fn(targets, predictions)
#     gradients = tape.gradient(loss, model.trainable_weights)
#     optimizer.apply_gradients(zip(model.trainable_weights, gradients))

**Low level usage of metrics**

In [62]:
metric = keras.metrics.SparseCategoricalAccuracy()

targets = [0, 1, 2]
predictions = [[1,0,0], [0,1,0], [0,0,1]]
metric.update_state(targets, predictions)
current_result = metric.result()
print(f"result: {current_result:.2f}")

result: 1.00


In [63]:
values = [0, 1, 2, 3, 4]
mean_tracker = keras.metrics.Mean()
for value in values:
    mean_tracker.update_state(value)

print(f"Mean of values = {mean_tracker.result():.2f}")

Mean of values = 2.00


**Complete training & evaluation loop**

In [64]:
model = get_minst_model()

loss_fn = keras.losses.SparseCategoricalCrossentropy()
optimizer = keras.optimizers.RMSprop()
metrics = [keras.metrics.SparseCategoricalAccuracy()]
loss_tracking_metric = keras.metrics.Mean()

In [65]:
def train_step(inputs, targets):

    with tf.GradientTape() as tape:
        predictions = model(inputs, training=True)
        loss = loss_fn(targets, predictions)
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))

    logs = {}

    for metric in metrics:
        metric.update_state(targets, predictions)
        logs[metric.name] = metric.result()

    loss_tracking_metric.update_state(loss)
    logs["loss"] = loss_tracking_metric.result()

    return logs

def reset_metrics():
    for metric in metrics:
        metric.reset_state()
    loss_tracking_metric.reset_state()

In [66]:
training_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels))
training_dataset = training_dataset.batch(32)
epochs =3

for epoch in range(epochs):
    reset_metrics()

    for inputs_batch, target_batch in training_dataset:
        logs = train_step(inputs_batch, target_batch)
    print(f"Results at the end of epoch {epoch}:")
    for key, value in logs.items():
        print(f"{key}: {value:.4f}")


Results at the end of epoch 0:
sparse_categorical_accuracy: 0.9137
loss: 0.2906
Results at the end of epoch 1:
sparse_categorical_accuracy: 0.9543
loss: 0.1659
Results at the end of epoch 2:
sparse_categorical_accuracy: 0.9624
loss: 0.1416


In [67]:
def test_step(inputs, targets):
    
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)

    logs = {}

    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()
    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()

    return logs

In [68]:
val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)

reset_metrics()

for inputs_batch, target_batch in val_dataset:
    logs = test_step(inputs_batch, target_batch)
print(f"Evaluation results:")
for key, value in logs.items():
    print(f"{key}: {value:.4f}")

Evaluation results:
val_sparse_categorical_accuracy: 0.9686
val_loss: 0.1349


**Making custom train & eval loops fast with tf.function**

In [69]:
@tf.function
def test_step(inputs, targets):
    
    predictions = model(inputs, training=False)
    loss = loss_fn(targets, predictions)

    logs = {}

    for metric in metrics:
        metric.update_state(targets, predictions)
        logs["val_" + metric.name] = metric.result()
    loss_tracking_metric.update_state(loss)
    logs["val_loss"] = loss_tracking_metric.result()

    return logs

val_dataset = tf.data.Dataset.from_tensor_slices((val_images, val_labels))
val_dataset = val_dataset.batch(32)

reset_metrics()

for inputs_batch, target_batch in val_dataset:
    logs = test_step(inputs_batch, target_batch)
print(f"Evaluation results:")
for key, value in logs.items():
    print(f"{key}: {value:.4f}")

Evaluation results:
val_sparse_categorical_accuracy: 0.9686
val_loss: 0.1349


**Middle ground: Somewhere inbetween fit() & full custom loop**

In [70]:
loss_fn = keras.losses.SparseCategoricalCrossentropy()
loss_tracker = keras.metrics.Mean(name="loss")

# @tf.function
# no need for tf.function decarator while over-riding train_step(), its done internally
class CustomModel(keras.Model):

    # over-riding
    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = loss_fn(targets, predictions)
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))

        loss_tracker.update_state(loss)
        return {"loss": loss_tracker.result()}

    @property
    def metrics(self):
        return [loss_tracker]

In [71]:
inputs = keras.Input(shape=(28*28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)

model = CustomModel(inputs, outputs)

# accuracy metric will not bee seen while training - need to specify in train_step() while overiding it 
model.compile(optimizer=keras.optimizers.RMSprop(),
              loss=keras.losses.SparseCategoricalCrossentropy(),
              metrics=[keras.metrics.SparseCategoricalAccuracy()])


# model.compile(optimizer=keras.optimizers.RMSprop())
model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7fbe39823910>

In [72]:
class CustomModel(keras.Model):

    # over-riding
    def train_step(self, data):
        inputs, targets = data
        with tf.GradientTape() as tape:
            predictions = self(inputs, training=True)
            loss = loss_fn(targets, predictions)
        gradients = tape.gradient(loss, self.trainable_weights)
        self.optimizer.apply_gradients(zip(gradients, self.trainable_weights))
        self.compiled_metrics.update_state(targets, predictions)

        return {m.name: m.result() for m in self.metrics}

In [73]:
inputs = keras.Input(shape=(28*28,))
features = layers.Dense(512, activation="relu")(inputs)
features = layers.Dropout(0.5)(features)
outputs = layers.Dense(10, activation="softmax")(features)

model = CustomModel(inputs, outputs)

model.compile(optimizer=keras.optimizers.RMSprop(),
              loss=keras.losses.SparseCategoricalCrossentropy(),
              metrics=[keras.metrics.SparseCategoricalAccuracy()])


model.fit(train_images, train_labels, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x7fbe3904b0a0>

In [74]:
from numba import cuda
cuda.select_device(0)
cuda.close()