<a href="https://colab.research.google.com/github/VuHuyBui/convnext-mnist-with-flax/blob/main/convnext_mnist.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install flax -q

[K     |████████████████████████████████| 180 kB 30.0 MB/s 
[K     |████████████████████████████████| 217 kB 59.0 MB/s 
[K     |████████████████████████████████| 145 kB 70.6 MB/s 
[K     |████████████████████████████████| 51 kB 6.0 MB/s 
[K     |████████████████████████████████| 85 kB 4.1 MB/s 
[?25h

In [1]:
import jax 
from jax import numpy as jnp, random    # JAX NumPy

from flax import linen as nn           # The Linen API
from flax.training import train_state  # Useful dataclass to keep train state

import numpy as np                     # Ordinary NumPy
import optax                           # Optimizers
import tensorflow_datasets as tfds
from tqdm import tqdm

import functools
from typing import Sequence, Tuple, Any

## 2. Define our *ConvNext* with *Flax*

In [2]:
class ConvNextBlock(nn.Module):
    filters: int
    kernel_size: Tuple[int, int] = (7, 7)
    strides: Tuple[int, int] = (1, 1)
    act: Any = jnp.add
    dtype: Any = jnp.float32

    @nn.compact
    def __call__(self, x):
        res = x
        x = nn.Conv(self.filters, self.kernel_size, strides=self.strides, feature_group_count=self.filters)(x)
        x = nn.LayerNorm(dtype=self.dtype)(x)
        x = nn.Conv(self.filters * 4, kernel_size=(1, 1), strides=self.strides)(x)
        x = nn.gelu(x)
        x = nn.Conv(self.filters, self.kernel_size, strides=(1, 1))(x)

        return self.act(res, x)

class ConvNextDownsamplingBlock(nn.Module):
    filters: int
    kernel_size: Tuple[int, int] = (2, 2)
    strides: Tuple[int, int] = (2, 2)
    @nn.compact
    def __call__(self, x):
        x = nn.LayerNorm()(x)
        x = nn.Conv(self.filters, strides=self.strides, kernel_size=self.kernel_size)(x)
        return x

class ConvNext(nn.Module):
    stage_size: Sequence[int]
    channels: Sequence[int]
    dtype: Any = jnp.float32

    @nn.compact
    def __call__(self, x):
        x = nn.Conv(self.channels[0], kernel_size=(4, 4), strides=(4, 4))(x)
        x = nn.LayerNorm(dtype=self.dtype)(x)
        for i, channel in enumerate(self.channels):
            for j in range(self.stage_size[i]):
                x = ConvNextBlock(channel)(x)
            if i <= len(self.channels) - 2:
                x = ConvNextDownsamplingBlock(self.channels[i+1])(x)
        x = nn.LayerNorm(dtype=self.dtype)(x)
        return x


class ClassisficationHead(nn.Module):
    num_class: int
    @nn.compact
    def __call__(self, x):
        x = x.reshape((x.shape[0], -1))  # flatten
        x = nn.Dense(features=256)(x)
        x = nn.relu(x)
        x = nn.Dense(features=10)(x)
        return x

class ConvNextForClassification(nn.Module):
    num_class: int
    stage_size: Sequence[int] = (1, 1, 3, 1)
    channels: Sequence[int] = (96, 192, 384, 768)
    dtype: Any = jnp.float32
    @nn.compact
    def __call__(self, x):
        x = ConvNext(stage_size=self.stage_size, channels=self.channels, dtype=self.dtype)(x)
        x = ClassisficationHead(num_class=self.num_class)(x)
        return x


## 3. Define loss

We simply use `optax.softmax_cross_entropy()`. Note that this function expects both `logits` and `labels` to have shape `[batch, num_classes]`. Since the labels will be read from TFDS as integer values, we first need to convert them to a onehot encoding.

Our function returns a simple scalar value ready for optimization, so we first take the mean of the vector shaped `[batch]` returned by Optax's loss function.

In [3]:
def cross_entropy_loss(*, logits, labels):
  labels_onehot = jax.nn.one_hot(labels, num_classes=10)
  return optax.softmax_cross_entropy(logits=logits, labels=labels_onehot).mean()

## 4. Metric computation

For loss and accuracy metrics, create a separate function:

In [4]:
def compute_metrics(*, logits, labels):
  loss = cross_entropy_loss(logits=logits, labels=labels)
  accuracy = jnp.mean(jnp.argmax(logits, -1) == labels)
  metrics = {
      'loss': loss,
      'accuracy': accuracy,
  }
  return metrics

## 5. Loading data

Define a function that loads and prepares the MNIST dataset and converts the
samples to floating-point numbers.

In [5]:
def get_datasets():
  """Load MNIST train and test datasets into memory."""
  ds_builder = tfds.builder('mnist')
  ds_builder.download_and_prepare()
  train_ds = tfds.as_numpy(ds_builder.as_dataset(split='train', batch_size=-1))
  test_ds = tfds.as_numpy(ds_builder.as_dataset(split='test', batch_size=-1))
  train_ds['image'] = jnp.float32(train_ds['image']) / 255.
  test_ds['image'] = jnp.float32(test_ds['image']) / 255.
  return train_ds, test_ds

## 6. Create train state

A common pattern in Flax is to create a single dataclass that represents the
entire training state, including step number, parameters, and optimizer state.

Also adding optimizer & model to this state has the advantage that we only need
to pass around a single argument to functions like `train_step()` (see below).

Because this is such a common pattern, Flax provides the class
[flax.training.train_state.TrainState](https://flax.readthedocs.io/en/latest/flax.training.html#train-state)
that serves most basic usecases. Usually one would subclass it to add more data
to be tracked, but in this example we can use it without any modifications.

In [6]:
def create_train_state(rng, learning_rate, momentum):
  """Creates initial `TrainState`."""
  cnn = ConvNextForClassification(10)
  params = cnn.init(rng, jnp.ones([1, 28, 28, 1]))['params']
  tx = optax.sgd(learning_rate, momentum)
  return train_state.TrainState.create(
      apply_fn=cnn.apply, params=params, tx=tx)

## 7. Training step

A function that:

- Evaluates the neural network given the parameters and a batch of input images
  with the
  [Module.apply](https://flax.readthedocs.io/en/latest/flax.linen.html#flax.linen.Module.apply)
  method.
- Computes the `cross_entropy_loss` loss function.
- Evaluates the loss function and its gradient using
  [jax.value_and_grad](https://jax.readthedocs.io/en/latest/jax.html#jax.value_and_grad).
- Applies a
  [pytree](https://jax.readthedocs.io/en/latest/pytrees.html#pytrees-and-jax-functions)
  of gradients to the optimizer to update the model's parameters.
- Computes the metrics using `compute_metrics` (defined earlier).

Use JAX's [@jit](https://jax.readthedocs.io/en/latest/jax.html#jax.jit)
decorator to trace the entire `train_step` function and just-in-time compile
it with [XLA](https://www.tensorflow.org/xla) into fused device operations
that run faster and more efficiently on hardware accelerators.

In [7]:
@jax.jit
def train_step(state, batch):
  """Train for a single step."""
  def loss_fn(params):
    logits = ConvNextForClassification(10).apply({'params': params}, batch['image'])
    loss = cross_entropy_loss(logits=logits, labels=batch['label'])
    return loss, logits
  grad_fn = jax.value_and_grad(loss_fn, has_aux=True)
  (_, logits), grads = grad_fn(state.params)
  state = state.apply_gradients(grads=grads)
  metrics = compute_metrics(logits=logits, labels=batch['label'])
  return state, metrics

## 8. Evaluation step

Create a function that evaluates your model on the test set with
[Module.apply](https://flax.readthedocs.io/en/latest/flax.linen.html#flax.linen.Module.apply)

In [8]:
@jax.jit
def eval_step(params, batch):
  logits = ConvNextForClassification(10).apply({'params': params}, batch['image'])
  return compute_metrics(logits=logits, labels=batch['label'])

## 9. Train function

Define a training function that:

- Shuffles the training data before each epoch using
  [jax.random.permutation](https://jax.readthedocs.io/en/latest/_autosummary/jax.random.permutation.html)
  that takes a PRNGKey as a parameter (check the
  [JAX - the sharp bits](https://jax.readthedocs.io/en/latest/notebooks/Common_Gotchas_in_JAX.html#JAX-PRNG)).
- Runs an optimization step for each batch.
- Retrieves the training metrics from the device with `jax.device_get` and
  computes their mean across each batch in an epoch.
- Returns the optimizer with updated parameters and the training loss and
  accuracy metrics.

In [9]:
def train_epoch(state, train_ds, batch_size, epoch, rng):
  """Train for a single epoch."""
  train_ds_size = len(train_ds['image'])
  steps_per_epoch = train_ds_size // batch_size

  perms = jax.random.permutation(rng, train_ds_size)
  perms = perms[:steps_per_epoch * batch_size]  # skip incomplete batch
  perms = perms.reshape((steps_per_epoch, batch_size))
  batch_metrics = []
  for perm in perms:
    batch = {k: v[perm, ...] for k, v in train_ds.items()}
    state, metrics = train_step(state, batch)
    batch_metrics.append(metrics)

  # compute mean of metrics across each batch in epoch.
  batch_metrics_np = jax.device_get(batch_metrics)
  epoch_metrics_np = {
      k: np.mean([metrics[k] for metrics in batch_metrics_np])
      for k in batch_metrics_np[0]}

  print('train epoch: %d, loss: %.4f, accuracy: %.2f' % (
      epoch, epoch_metrics_np['loss'], epoch_metrics_np['accuracy'] * 100))

  return state

## 10. Eval function

Create a model evaluation function that:

- Retrieves the evaluation metrics from the device with `jax.device_get`.
- Copies the metrics
  [data stored](https://flax.readthedocs.io/en/latest/design_notes/linen_design_principles.html#how-are-parameters-represented-and-how-do-we-handle-general-differentiable-algorithms-that-update-stateful-variables)
  in a JAX
  [pytree](https://jax.readthedocs.io/en/latest/pytrees.html#pytrees-and-jax-functions).

In [10]:
def eval_model(params, test_ds):
  metrics = eval_step(params, test_ds)
  metrics = jax.device_get(metrics)
  summary = jax.tree_util.tree_map(lambda x: x.item(), metrics)
  return summary['loss'], summary['accuracy']

## 11. Download data

In [11]:
train_ds, test_ds = get_datasets()

## 12. Seed randomness

- Get one
  [PRNGKey](https://jax.readthedocs.io/en/latest/_autosummary/jax.random.PRNGKey.html#jax.random.PRNGKey)
  and
  [split](https://jax.readthedocs.io/en/latest/_autosummary/jax.random.split.html#jax.random.split)
  it to get a second key that you'll use for parameter initialization. (Learn
  more about
  [PRNG chains](https://flax.readthedocs.io/en/latest/design_notes/linen_design_principles.html#how-are-parameters-represented-and-how-do-we-handle-general-differentiable-algorithms-that-update-stateful-variables)
  and
  [JAX PRNG design](https://github.com/google/jax/blob/main/design_notes/prng.md).)

In [12]:
rng = jax.random.PRNGKey(0)
rng, init_rng = jax.random.split(rng)

## 13. Initialize train state

Remember that function initializes both the model parameters and the optimizer
and puts both into the training state dataclass that is returned.

In [13]:
learning_rate = 0.1
momentum = 0.9

In [14]:
state = create_train_state(init_rng, learning_rate, momentum)
del init_rng  # Must not be used anymore.

## 14. Train and evaluate

Once the training and testing is done after 10 epochs, the output should show that your model was able to achieve approximately 99% accuracy.

In [15]:
num_epochs = 10
batch_size = 1

In [None]:
for epoch in range(1, num_epochs + 1):
  # Use a separate PRNG key to permute image data during shuffling
  rng, input_rng = jax.random.split(rng)
  # Run an optimization step over a training batch
  state = train_epoch(state, train_ds, batch_size, epoch, input_rng)
  # Evaluate on the test set after each training epoch 
  test_loss, test_accuracy = eval_model(state.params, test_ds)
  print(' test epoch: %d, loss: %.2f, accuracy: %.2f' % (
      epoch, test_loss, test_accuracy * 100))