In [None]:
import tensorflow as tf
from tensorflow import keras
from keras.layers import Dense
import numpy as np

In [None]:
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super().__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer,
        metrics,
        student_loss_fn,
        distillation_loss_fn,
        alpha=0.1,
        temperature=3,
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        super().compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # Unpack data
        x, y = data

        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:https://keras.io/examples/vision/knowledge_distillation/
            # Forward pass of student
            student_predictions = self.student(x, training=True)

            # Compute losses
            student_loss = self.student_loss_fn(y, student_predictions)

            # Compute scaled distillation loss from https://arxiv.org/abs/1503.02531
            # The magnitudes of the gradients produced by the soft targets scale
            # as 1/T^2, multiply them by T^2 when using both hard and soft targets.
            distillation_loss = (
                self.distillation_loss_fn(
                    tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                    tf.nn.softmax(student_predictions / self.temperature, axis=1),
                )
                * self.temperature**2
            )

            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics configured in `compile()`.
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions
        y_prediction = self.student(x, training=False)

        # Calculate the loss
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics.
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results

In [None]:
# Prepare the train and test dataset.

x_train = 0
y_train = 0

x_test = 0
y_test = 0

In [None]:
# Create the teacher
teacher = keras.Sequential(
    [
        Dense(32, input_shape=x_train.shape[1:]),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(15, activation='softmax'),
    ],
    name="teacher",
)

# Create the student
student = keras.Sequential(
    [
        Dense(32, input_shape=x_train.shape[1:]),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(128, activation = 'gelu'),
        Dense(64, activation = 'gelu'),
        Dense(15, activation='softmax'),
    ],
    name="student",
)

# Clone student for later comparison
student_scratch = keras.models.clone_model(student)

In [None]:
# Train teacher as usual
teacher.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Train and evaluate teacher on data.
teacher.fit(x_train, y_train, epochs=5)
teacher.evaluate(x_test, y_test)

In [None]:
def _assert_multinomial_distribution(input_tensor, axis):
  """Assert input has valid multinomial distribution along `axis`."""
  sum_of_multinomial_distribution = tf.reduce_sum(
      input_tensor=input_tensor, axis=axis)
  return [
      tf.debugging.assert_non_negative(input_tensor),
      tf.debugging.assert_near(
          sum_of_multinomial_distribution,
          tf.constant(1.0),
          message='x and/or y is not a proper probability distribution'),
  ]


def _assert_valid_axis(ndims, axis):
  """Assert the condition `-ndims < axis <= ndims` if `axis` is not `None`."""
  if axis and (axis < -ndims or axis >= ndims):
    raise ValueError('axis = %d not in [%d, %d)' % (axis, -ndims, ndims))


def _kl_divergence_fn(true_dist, predicted_dist):
  epsilon = 1e-7  # A small increment to add to avoid taking a log of zero.
  return true_dist * tf.math.log(true_dist + epsilon) - true_dist * tf.math.log(
      predicted_dist + epsilon)

def jensen_shannon_divergence(
    labels,
    predictions,
    axis=-1,
    weights=1.0,
    scope=None,
    loss_collection=tf.compat.v1.GraphKeys.LOSSES,
    reduction=tf.compat.v1.losses.Reduction.SUM_BY_NONZERO_WEIGHTS):

  with tf.compat.v1.name_scope(scope, 'jensen_shannon_divergence',
                               (predictions, labels, weights)) as scope:
    labels = tf.cast(labels, tf.dtypes.float32)
    predictions = tf.cast(predictions, tf.dtypes.float32)
    predictions.get_shape().assert_is_compatible_with(labels.get_shape())
    if axis is None:
      raise ValueError('You must specify "axis".')
    _assert_valid_axis(labels.get_shape().ndims, axis)
    assert_list = _assert_multinomial_distribution(
        labels, axis) + _assert_multinomial_distribution(predictions, axis)
    with tf.control_dependencies(assert_list):
      means = 0.5 * (labels + predictions)
      divergence_tensor = 0.5 * _kl_divergence_fn(
          labels, means) + 0.5 * _kl_divergence_fn(predictions, means)
      divergence = tf.reduce_sum(
          input_tensor=divergence_tensor, axis=(axis,), keepdims=True)
      return tf.compat.v1.losses.compute_weighted_loss(
          divergence, weights, scope, loss_collection, reduction=reduction)

In [None]:
# Initialize and compile distiller
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# Distill teacher to student
distiller.fit(x_train, y_train, epochs=3)

# Evaluate student on test dataset
distiller.evaluate(x_test, y_test)

In [None]:
# Train student as done usually
student_scratch.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Train and evaluate student trained from scratch.
student_scratch.fit(x_train, y_train, epochs=3)
student_scratch.evaluate(x_test, y_test)