In [14]:
#MNIST 데이터를 사용한 knowledge distillation

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

In [15]:
# 사용자 정의 Distiller() 클래스
# 이 클래스를 사용하여 teacher 모델의 지식을 student 모델로 넘겨준다.

class Distiller(keras.Model):
    
    # 생성 인자로 student 모델과 teacher 모델
    # teacher 모델은 사전 학습된 모델, student는 학습되지 않은 모델(layer 구조)
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    # compile 메서드 오버라이딩
    def compile(
        self,
        optimizer, # student 가중치를 위한 keras optimizer
        metrics, # 평가를 위한 keras metric
        student_loss_fn, # student 모델의 예측값과 실제값 차이의 손실 함수
        distillation_loss_fn, # studnet 모델의 soft 예측값과 teacher 모델의 soft 예측값 차이의 손실 함수
        alpha=0.1, # studnet loss, distillation loss를 각각 alpha, 1-alpha로 계산
        temperature=3, # 확률 분포를 soft 시키기 위함
    ):
        """ Configure the distiller.

        Args:
            optimizer: Keras optimizer for the student weights
            metrics: Keras metrics for evaluation
            student_loss_fn: Loss function of difference between student
                predictions and ground-truth
            distillation_loss_fn: Loss function of difference between soft
                student predictions and soft teacher predictions
            alpha: weight to student_loss_fn and 1-alpha to distillation_loss_fn
            temperature: Temperature for softening probability distributions.
                Larger temperature gives softer distributions.
        """
        
        """ 증류기 구성
        
            1) 옵티마이저 : 학생 가중치를위한 Keras 옵티마이저
            2) 메트릭 : 평가를위한 Keras 메트릭
            3) student_loss_fn : 학생차의 손실 함수(예측값과 실제값)
            4) distillation_loss_fn : 연약한 차이의 손실 함수(소프트학생 예측 및 소프트교사 예측)
            5) alpha : student_loss_fn 및 1-alpha to distillation_loss_fn에 대한 가중치
            6) 온도 : 확률 분포를 연화시키기 위한 온도(더 큰 온도는 더 부드러운 분포를 제공)
        """
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    def train_step(self, data):
        # 데이터 언패킹
        # data 객체로 합쳐져 있던 데이터를 x, y로 언패킹
        x, y = data

        # Teacher 모델 forward pass
        teacher_predictions = self.teacher(x, training=False)

        with tf.GradientTape() as tape:
            # Student 모델 forward pass
            student_predictions = self.student(x, training=True)

            # studnet loss 계산
            student_loss = self.student_loss_fn(y, student_predictions)
            
            # distillation loss 계산
            # teacher 모델의 soft 예측값과 student 모델의 soft 예측값 차이의 손실 함수
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            # studnet loss, distillation loss를 각각 alpha, 1-alpha로 계산
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # gradients 계산
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # 가중치 업데이트
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # compile() 안에서 metric 업데이트
        self.compiled_metrics.update_state(y, student_predictions)

        # 성능 dictionary 리턴
        # studnet_loss, distillation_loss 보여준다.
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    def test_step(self, data):
        # 데이터 언패킹
        # data 객체로 합쳐져 있던 데이터를 x, y로 언패킹
        x, y = data

        # 예측 계산
        y_prediction = self.student(x, training=False)

        # loss 계산
        student_loss = self.student_loss_fn(y, y_prediction)

        # 메트릭 업데이트
        self.compiled_metrics.update_state(y, y_prediction)

        # 성능 dictionary 리턴
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results

In [16]:
# 처음에는 교사 모델과 사전 훈련된 교사모델보단 작은 학생 모델을 만든다.

# Teacher 모델 생성
teacher = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
        layers.Conv2D(512, (3, 3), strides=(2, 2), padding="same"),
        layers.Flatten(),
        layers.Dense(10),
    ],
    name="teacher",
)

# Student 모델 생성
student = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(16, (3, 3), strides=(2, 2), padding="same"),
        layers.LeakyReLU(alpha=0.2),
        layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
        layers.Conv2D(32, (3, 3), strides=(2, 2), padding="same"),
        layers.Flatten(),
        layers.Dense(10),
    ],
    name="student",
)

# 비교를 위한 학생 모델 복제
student_scratch = keras.models.clone_model(student)

In [17]:
# 데이터셋 준비
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# 데이터 정규화
x_train = x_train.astype("float32") / 255.0
x_train = np.reshape(x_train, (-1, 28, 28, 1))

x_test = x_test.astype("float32") / 255.0
x_test = np.reshape(x_test, (-1, 28, 28, 1))

In [18]:
# teacher 모델 훈련
# 지식 증류에서 훈련된 교사 모델이 필요하기 때문에 일반적인 방법으로 훈련한다.
teacher.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# 데이터셋을 통해 교사의 트레인 및 평가
teacher.fit(x_train, y_train, epochs=5)
teacher.evaluate(x_test, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


[0.08497295528650284, 0.9782999753952026]

In [19]:
# Distiller 초기화 및 컴파일
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# Distill teacher to student
distiller.fit(x_train, y_train, epochs=3)

# 테스트 데이터로 학생 모델 평가
distiller.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


0.9789000153541565

In [24]:
# 교사 모델을 증류받은 학생 모델과 비교를 위해
# 증류받지 않은 학생 모델을 일반적인 방법으로 학습

student_scratch.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

student_scratch.fit(x_train, y_train, epochs=3)
student_scratch.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.27641168236732483, 0.9225000143051147]