In [24]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.datasets import mnist
import tensorflow_datasets as tfds

## Data

In [3]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

def mould(x):
  return x.reshape(-1, 28, 28, 1).astype("float32")/255.0

x_train, x_test = mould(x_train), mould(x_test)

##  Model

In [22]:
model = keras.Sequential(
    [
     layers.Input(shape = (28,28,1)),
     layers.Conv2D(64, 3, padding='same', activation = 'relu'),
     layers.Conv2D(128, 3, padding='same', activation = 'relu'),
     layers.Flatten(),
     layers.Dense(10, activation = 'softmax')
    ]
)

class CustomFit(keras.Model):
  def __init__(self, model):
    super().__init__()
    self.model = model
  
  def compile(self, optimizer, loss):
    super().compile()
    self.optimizer = optimizer
    self.loss = loss
    # self.metrics = metrics # CANT DO THIS

  def train_step(self, data):
    x, y = data

    # Forward
    with tf.GradientTape() as tape:
      # Record all operations for backwards
      y_pred = self.model(x, training = True)
      loss = self.loss(y, y_pred)
    
    # Get the gradients
    training_vars = self.trainable_variables
    gradients = tape.gradient(loss, training_vars)

    # Step
    self.optimizer.apply_gradients(zip(gradients, training_vars))
    
    # Metrics
    acc_metric.update_state(y, y_pred)
    # self.compiled_metrics.update_state(y, y_pred)

    return {"loss": loss, "accuracy":acc_metric.result()}
  
  def test_step(self, data):
    x, y = data
    
    y_pred = self.model(x, training=False)

    loss = self.loss(y,y_pred)
    acc_metric.update_state(y, y_pred)

    return {"loss":loss, "accuracy":acc_metric.result()}


acc_metric = keras.metrics.SparseCategoricalAccuracy(name = "accuracy")
model = CustomFit(model)
model.compile(
    loss = keras.losses.SparseCategoricalCrossentropy(from_logits = False),
    optimizer = keras.optimizers.Adam(4e-3)
)
model.fit(x_train, y_train, batch_size = 32, epochs = 2)

Epoch 1/2
Epoch 2/2


<keras.callbacks.History at 0x7f930a442f50>

In [23]:
model.evaluate(x_test, y_test, batch_size = 32)



[0.9755692481994629, 0.0005250590620562434]

## Everything on our own

In [25]:
(ds_train, ds_test), ds_info = tfds.load(
  "mnist",
  split = ["train", "test"],
  shuffle_files = True,
  as_supervised = True,
  with_info = True    
)

def normalize_image(img, label):
  return tf.cast(img, tf.float32)/255.0, label

AUTOTUNE = tf.data.experimental.AUTOTUNE
BATCH_SIZE = 128

ds_train = ds_train.map(normalize_image, num_parallel_calls = AUTOTUNE).cache().shuffle(ds_info.splits["train"].num_examples).batch(BATCH_SIZE).prefetch(AUTOTUNE)
ds_test = ds_test.map(normalize_image, num_parallel_calls = AUTOTUNE).batch(BATCH_SIZE).prefetch(AUTOTUNE)

[1mDownloading and preparing dataset mnist/3.0.1 (download: 11.06 MiB, generated: 21.00 MiB, total: 32.06 MiB) to /root/tensorflow_datasets/mnist/3.0.1...[0m


local data directory. If you'd instead prefer to read directly from our public
GCS bucket (recommended if you're running on GCP), you can instead pass
`try_gcs=True` to `tfds.load` or set `data_dir=gs://tfds-data/datasets`.



Dl Completed...:   0%|          | 0/4 [00:00<?, ? file/s]


[1mDataset mnist downloaded and prepared to /root/tensorflow_datasets/mnist/3.0.1. Subsequent calls will reuse this data.[0m


In [27]:
model = keras.Sequential([
                          keras.Input(shape=(28,28,1)),
                          layers.Conv2D(32,3,activation='relu'),
                          layers.Flatten(),
                          layers.Dense(10, activation = 'softmax')
])

In [30]:
epochs = 5
optimizer = keras.optimizers.Adam()
loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=False)

acc_metric = keras.metrics.SparseCategoricalAccuracy()
val_acc = keras.metrics.SparseCategoricalAccurcay()

ds_val = None

In [32]:
# Training Loop:
max_val_acc = 0
for epoch in range(epochs):
  print(f"\nStarting epoch {epoch+1} / {epochs}")
  for batch_idx, (x_b, y_b) in enumerate(ds_train):

    # Forward with backward calc
    with tf.GradientTape() as tape:
      y_pred = model(x_b, training = True)
      loss = loss_fn(y_b, y_pred)
    
    # Gradient calc
    gradients = tape.gradient(loss, model.trainable_weights)
    
    # optimizer.step()
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))
    
    # Calc metric
    acc_metric.update_state(y_b, y_pred)
  
  train_acc = acc_metric.result()
  print(f"Acc: {train_acc}")
  acc_metric.reset_states()

  for (x,y) in enumerate(ds_val):

    y_pred = model(x)
    val_acc.update_state(y_pred, y)
  
  val_accuracy = val_acc.result()
  print(f"Validation Acc: {val_accuracy}")
  if val_accuracy > max_val_acc:
    max_val_acc = val_accuracy
    model.save_weights("path")
  val_acc.reset_states()
  

# Testing

for batch_idx, (x_b, y_b) in enumerate(ds_test):

  y_pred = model(x_b, training = False)
  acc_metric.update_state(y_b, y_pred)

test_accuracy = acc_metric.result()
print(f"Acc: {test_accuracy}")
acc_metric.reset_states()


Starting epoch 1 / 5
Acc: 0.9249833226203918

Starting epoch 2 / 5
Acc: 0.9740833044052124

Starting epoch 3 / 5
Acc: 0.9818000197410583

Starting epoch 4 / 5
Acc: 0.9857500195503235

Starting epoch 5 / 5
Acc: 0.9881500005722046
Acc: 0.9815000295639038
