## 빅데이터기획분석론 논문리뷰과제 (구현체실습)
선정 논문 : Distilling the Knowledge in a Neural Network <br>
실험 환경 : CoLab<br>
정보융합학부 고해지 <br>
2021.12.19.

# Knowledge Distillation

**Description:** Implementation of classical Knowledge Distillation.<br>
**Reference:** "Distilling the Knowledge in a Neural Network" (https://arxiv.org/abs/1503.02531) <br>
**source code:** https://keras.io/examples/vision/knowledge_distillation/#distill-teacher-to-student

## Setup
필요한 라이브러리 import

In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np


## `Distiller()` 클래스 정의

사용자 정의 클래스 `Distiller()`
  `keras.Model`을 상속하여 `train_step`, `test_step`, `compile()` override하여 사용. <br> <br>

**[distiller를 사용하기위해 필요한 사항들]**

- A trained teacher model <br>
  훈련된 교사모델 <- large model
- A student model to train <br> 
  훈련할 학생모델 <- small model
- A student loss function on the difference between student predictions and ground-truth<br> 
  학생모델에 대한 loss 함수 : 학생모델의 예측값과 실제정답의 차
- A distillation loss function, along with a `temperature`, on the difference between the soft student predictions and the soft teacher labels <br> 
  distillation loss 함수 : 학생모델이 예측한 soft 값과 교사모델이 예측한 soft 값(logit)
- An `alpha` factor to weight the student and distillation loss <br> 
  `alpha` : 학생 loss와 distillation loss에서의 가중치
- An optimizer for the student and (optional) metrics to evaluate performance <br> 
  학생모델을 위한 최적화 도구, 성능을 평가하기 위한 metrics

<br>

**[`train_step`]** <br>
교사모델과 학생모델에 대해 전진패스(backward pass) 수행 (학습) <br>
alpha를 이용하여 loss 계산 (loss = student_loss + distillation_loss) <br>
학생모델의 weight에 대한 기울기(gradient) 계산하기위해 후진패스(backward pass) <br>

**[`test_step`]** <br>
학생 모델 평가 


In [2]:
# Distiller class 정의 
class Distiller(keras.Model):
    def __init__(self, student, teacher):
        super(Distiller, self).__init__()
        self.teacher = teacher
        self.student = student

    def compile(
        self,
        optimizer, # 학생 가중치를 위한 keras optimizer
        metrics, # 평가를 위한 keras metrics
        student_loss_fn, #loss: 학생모델 예측-실제정답
        distillation_loss_fn, #loss: 학생모델의 soft 예측값-교사모델의 soft예측값
        alpha=0.1, #loss에 주는 가중치
        temperature=3, #확률분포를 더 부드럽게 하기 위해 사용
    ):
        
        super(Distiller, self).compile(optimizer=optimizer, metrics=metrics)
        self.student_loss_fn = student_loss_fn
        self.distillation_loss_fn = distillation_loss_fn
        self.alpha = alpha
        self.temperature = temperature

    # 학습 단계
    def train_step(self, data):
        # Unpack data 
        x, y = data

        # Forward pass of teacher (교사모델의 Forward pass)
        teacher_predictions = self.teacher(x, training=False) #학습된 교사모델 사용하므로 False

        with tf.GradientTape() as tape:
            # Forward pass of student (학생모델의 Forward pass)
            student_predictions = self.student(x, training=True)

            # Compute losses (loss 계산)
            ## 학생모델 loss
            student_loss = self.student_loss_fn(y, student_predictions)
            ## 증류 loss
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions / self.temperature, axis=1),
                tf.nn.softmax(student_predictions / self.temperature, axis=1),
            )
            ## alpha 가중치를 적용하여 최종 loss 계산
            loss = self.alpha * student_loss + (1 - self.alpha) * distillation_loss

        # Compute gradients (기울기 계산)
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(loss, trainable_vars)

        # Update weights (가중치 업데이트)
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))

        # Update the metrics configured in `compile()`. 
        self.compiled_metrics.update_state(y, student_predictions)

        # Return a dict of performance (수행에 대한 결과를 dictionary로 반환)
        results = {m.name: m.result() for m in self.metrics}
        results.update(
            {"student_loss": student_loss, "distillation_loss": distillation_loss}
        )
        return results

    # 테스트 단계 ; 학생모델 평가
    def test_step(self, data):
        # Unpack the data
        x, y = data

        # Compute predictions (예측값 계산)
        y_prediction = self.student(x, training=False)

        # Calculate the loss (loss 계산)
        student_loss = self.student_loss_fn(y, y_prediction)

        # Update the metrics. (성능지표 업데이트)
        self.compiled_metrics.update_state(y, y_prediction)

        # Return a dict of performance (수행에 대한 결과를 dictionary로 반환)
        results = {m.name: m.result() for m in self.metrics}
        results.update({"student_loss": student_loss})
        return results


## 학생모델(student model) 및 교사모델(teacher model) 생성
교사모델을 만들고, 더 작은 학생모델을 만든다.  <br>
두 모델 모두 컨볼루션 신경망(CNN)이다.<br>
아래의 코드에서는 Sequential()을 사용했지만, 어느 Keras model이든 사용가능하다.<br>


In [3]:
# 교사모델(teacher model) 생성
teacher = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(256, (3, 3), strides=(2, 2), padding="same"), # 256개의 convolution filter 사용
        layers.LeakyReLU(alpha=0.2),
        layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
        layers.Conv2D(512, (3, 3), strides=(2, 2), padding="same"), # 512개의 convolution filter 사용
        layers.Flatten(),
        layers.Dense(10),
    ],
    name="teacher",
)

# 학생모델(student model) 생성
student = keras.Sequential(
    [
        keras.Input(shape=(28, 28, 1)),
        layers.Conv2D(16, (3, 3), strides=(2, 2), padding="same"), # 16개의 convolution filter 사용 (256개인 teacher model보다 smaller)
        layers.LeakyReLU(alpha=0.2),
        layers.MaxPooling2D(pool_size=(2, 2), strides=(1, 1), padding="same"),
        layers.Conv2D(32, (3, 3), strides=(2, 2), padding="same"), # 32개의 convolution filter 사용 (512개인 teacher model보다 smaller)
        layers.Flatten(),
        layers.Dense(10),
    ],
    name="student",
)


# 나중에 비교를 위해 학생모델(student model) 복제
student_scratch = keras.models.clone_model(student)

## DataSet 준비

교사모델을 학습시키고, 교사모델을 distilling하는데 사용하는 데이터셋 : **MNIST** https://keras.io/api/datasets/mnist/) <br>
(다른 dataset도 동일한 절차로 적절한 모델을 선택하여 진행하면 됨 ex.CIFAR-10) <br>

학생모델과 교사모델 모두 training set으로 학습되고, test set으로 평가된다. 


In [4]:
# train dataset, test dataset 준비
batch_size = 64
(x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()

# train data 정규화 (Nomalization)
x_train = x_train.astype("float32") / 255.0
x_train = np.reshape(x_train, (-1, 28, 28, 1))

# test data 정규화 (Nomalization)
x_test = x_test.astype("float32") / 255.0
x_test = np.reshape(x_test, (-1, 28, 28, 1))


Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz


## 교사모델(teacher model) 학습
지식증류에서, 교사모델이 학습되고 고정되었다고 가정함 <br>
--> 일반적인 방법으로 교사모델 학습시킴

In [5]:
# 일반적인 방식으로 교사모델(teacher model)학습
teacher.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# 학습 및 data로 교사모델(teacher model)의 성능 확인
teacher.fit(x_train, y_train, epochs=5)
teacher.evaluate(x_test, y_test)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


[0.10237742960453033, 0.9729999899864197]

## 교사모델(teacher model)에서 학생모델(student model)로 증류(distilling)
교사모델 : 이미 학습이 완료된 상태 <br>
`Distiller(student, teacher)` 인스턴스를  초기화 및 컴파일`compile()` 진행<br>
(loss함수, 하이퍼파라미터, optimizer 설정)

In [6]:
# Distiller 인스턴스 초기화 및 컴파일
distiller = Distiller(student=student, teacher=teacher) # student model과 teacher model 설정

distiller.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# 선생모델에서 학생모델로 증류 진행 (distill)
distiller.fit(x_train, y_train, epochs=3)

# test dataset에 대하여 평가
distiller.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.9760000109672546, 0.046132225543260574]

**Experiment : Temperature = 1**

In [9]:
# Distiller 인스턴스 초기화 및 컴파일
distiller_temp1 = Distiller(student=student, teacher=teacher) # student model과 teacher model 설정

distiller_temp1.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=1,
)

# 선생모델에서 학생모델로 증류 진행 (distill)
distiller_temp1.fit(x_train, y_train, epochs=3)

# test dataset에 대하여 평가
distiller_temp1.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.9753999710083008, 0.010427047498524189]

**Experiment : Temperature = 5**

In [10]:
# Distiller 인스턴스 초기화 및 컴파일
distiller_temp5 = Distiller(student=student, teacher=teacher) # student model과 teacher model 설정

distiller_temp5.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=5,
)

# 선생모델에서 학생모델로 증류 진행 (distill)
distiller_temp5.fit(x_train, y_train, epochs=3)

# test dataset에 대하여 평가
distiller_temp5.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.9787999987602234, 0.00665842043235898]

**Experiment : alpha=0.7**

In [12]:
# Distiller 인스턴스 초기화 및 컴파일
distiller_alpha7 = Distiller(student=student, teacher=teacher) # student model과 teacher model 설정

distiller_alpha7.compile(
    optimizer=keras.optimizers.Adam(),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
    student_loss_fn=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    distillation_loss_fn=keras.losses.KLDivergence(),
    alpha=0.1,
    temperature=10,
)

# 선생모델에서 학생모델로 증류 진행 (distill)
distiller_alpha7.fit(x_train, y_train, epochs=3)

# test dataset에 대하여 평가
distiller_alpha7.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.9799000024795532, 0.0017649412620812654]

## 일반적인 방식으로 학생모델(student model)학습 (비교를 위해)
지식증류에의해 얻어지는 성능을 평가하기위해 <br>
동일한 학생모델을 교사모델없이 일반적인 방식으로 학습시킨다.


In [7]:
# Train student as doen usually
student_scratch.compile(
    optimizer=keras.optimizers.Adam(),
    loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True),
    metrics=[keras.metrics.SparseCategoricalAccuracy()],
)

# Train and evaluate student trained from scratch.
student_scratch.fit(x_train, y_train, epochs=3)
student_scratch.evaluate(x_test, y_test)

Epoch 1/3
Epoch 2/3
Epoch 3/3


[0.0697457566857338, 0.977400004863739]