In [1]:
import os,glob, cv2
import pandas as pd

os.environ["CUDA_VISIBLE_DEVICES"] = "0"  # GPU 사용 안 함
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.keras import layers, models, losses, optimizers, metrics

In [2]:
class Distiller(tf.keras.Model):
    def __init__(
        self, student, teacher, 
        student_loss_fun, 
        distillation_loss_fun = tf.keras.losses.KLDivergence(),
        temperature = 5.0, alpha = 0.5):
        super().__init__()
        self.student = student
        self.teacher = teacher
        self.temperature = temperature
        self.alpha = alpha
        self.student_loss_fun = student_loss_fun
        self.distillation_loss_fun = distillation_loss_fun

    def compile(self, optimizer, metrics=None):
        super().compile()
        self.optimizer = optimizer
        if metrics is not None:
            self.student_metric = metrics

    def train_step(self, data):
        x, y = data
        # teacher는 훈련 안함
        teacher_predictions = self.teacher(x, training=False)
        
        with tf.GradientTape() as tape:
            student_predictions = self.student(x, training=True)
            # 일반 loss (hard label)
            student_loss = self.student_loss_fun(y, student_predictions)
            # distillation loss (soft label)
            student_soft = tf.nn.softmax(student_predictions / self.temperature)
            teacher_soft = tf.nn.softmax(teacher_predictions / self.temperature) 
            distill_loss = self.distillation_loss_fun(teacher_soft, student_soft)
            # 최종 loss (alpha 가중치)
            loss = self.alpha * student_loss + (1 - self.alpha) * distill_loss * (self.temperature ** 2)
        
        grads = tape.gradient(loss, self.student.trainable_variables)
        # 오차 역전파
        self.optimizer.apply_gradients(zip(grads, self.student.trainable_variables))
        
        self.student_metric.update_state(y, tf.nn.softmax(student_predictions))
        return {"loss": loss, "accuracy": self.student_metric.result()}
    
    def test_step(self, data):
        x, y = data
        student_predictions = self.student(x, training=False)
        student_loss = self.student_loss_fun(y, student_predictions)
        self.student_metric.update_state(y, tf.nn.softmax(student_predictions))
        return {"loss": student_loss, "accuracy": self.student_metric.result()}


In [3]:
def ratio_padding(frame, H, W, value=255):
    """
    비율을 유지하며 이미지를 HxW 크기로 padding 하는 함수
    배경색은 value (기본 흰색: 255)

    Args:
        frame (np.array): 원본 이미지 (H_f, W_f, C) 또는 (H_f, W_f)
        H (int): 출력 높이
        W (int): 출력 너비
        value (int, optional): 패딩 색상. 기본 255 (흰색)

    Returns:
        np.array: 크기 (H, W, C) 또는 (H, W)인 출력 이미지
    """
    # 출력 배열 초기화 (채널 유지)
    if len(frame.shape) == 2:  # 흑백 이미지
        out = np.full((H, W), value, dtype=frame.dtype)
    else:  # 컬러 이미지
        out = np.full((H, W, frame.shape[-1]), value, dtype=frame.dtype)

    f_H, f_W = frame.shape[:2]
    aspect_ratio = f_H / f_W

    if aspect_ratio < 1:  # 가로가 더 긴 이미지
        if aspect_ratio * W > H:
            # 높이를 기준으로 너비 조정
            new_W = int(H / aspect_ratio)
            resized = cv2.resize(frame, (new_W, H))
            start_x = (W - new_W) // 2
            out[:, start_x:start_x + new_W] = resized
        else:
            # 너비를 기준으로 높이 조정
            new_H = int(aspect_ratio * W)
            resized = cv2.resize(frame, (W, new_H))
            start_y = (H - new_H) // 2
            out[start_y:start_y + new_H, :] = resized
    else:  # 세로가 더 긴 이미지 or 정사각형에 가까운 경우
        new_W = int(H / aspect_ratio)
        resized = cv2.resize(frame, (new_W, H))
        start_x = (W - new_W) // 2
        out[:, start_x:start_x + new_W] = resized

    return out

In [4]:
class MyGenerator(tf.keras.utils.Sequence):
    def __init__(self, df, batch_size, preprocessing, aug_fun=None, is_test=False):
        self.data = df
        self.batch_size = batch_size 
        self.preprocessing = preprocessing
        self.aug_fun = aug_fun
        self.is_test = is_test
    def __len__(self):
        return np.ceil(self.data.shape[0] / self.batch_size).astype(int)
    def __getitem__(self, index):
        st = index * self.batch_size
        ed = (index + 1) * self.batch_size
        paths = self.data.values[st:ed]
        x_list = []
        y_list = []
        for file_path in paths:
            if self.is_test:
                x = self.preprocessing(file_path, self.is_test)
            else:
                x, y = self.preprocessing(file_path)
            if self.aug_fun:
                x = self.aug_fun(image= x)["image"]
                x = np.clip(x, 0, 255)
            x_list.append(x)
            if not self.is_test:
                y_list.append(y)
        bat_x = np.array(x_list)
        if self.is_test:
            return bat_x
        bat_y = np.array(y_list)
        return bat_x, bat_y
    def on_epoch_end(self):
        self.data = self.data.sample(frac = 1)

In [5]:
from scipy.special import softmax
import numpy as np
logits = np.array([5.0, 2.0, 0.5])
softmax(logits) , softmax(logits / 5.0)  
#Temperature로 나눴을 때 분포가 퍼짐
# 티쳐가 더 많은 정보를 알려 줄 수 있음

(array([0.94259941, 0.04692926, 0.01047133]),
 array([0.51140921, 0.28066732, 0.20792347]))

In [6]:
files = glob.glob("data/**/*")
df = pd.DataFrame({"path":files})

In [7]:
train_df, test_df = train_test_split(
    df, test_size=0.2, random_state=42)
valid_df, test_df = train_test_split(
    test_df, test_size=0.5, random_state=42)

In [8]:
def preprocessing(path):
    img = cv2.imread(path)
    img = ratio_padding(img, 224,224)
    y = int(path.split("/")[-1]=="Cat")
    return img, y

In [9]:
tr_gen = MyGenerator(train_df["path"], batch_size = 32, preprocessing=preprocessing)
val_gen = MyGenerator(valid_df["path"], batch_size = 32, preprocessing=preprocessing)

In [10]:
x,y = next(iter(tr_gen))

In [11]:
x.shape, y.shape

((32, 224, 224, 3), (32,))

In [12]:
reduce_lr = tf.keras.callbacks.ReduceLROnPlateau(
    monitor='val_loss',    # 검증 손실 기준
    factor=0.5,            # 학습률 줄이는 비율 (50%)
    patience=6,            # 개선 없을 시 3 에폭 기다림
    verbose=1,
    min_lr=1e-7
)

early_stop = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=10,            # 7 에폭 동안 개선 없으면 중단
    verbose=1,
    restore_best_weights=True
)

callbacks = [reduce_lr, early_stop]

In [13]:
inp = tf.keras.layers.Input((224,224,3))
x = tf.keras.applications.efficientnet.preprocess_input(inp)
backbone = tf.keras.applications.EfficientNetB0(
    input_shape = (224, 224, 3), include_top=False)
x = backbone(x)
x = tf.keras.layers.GlobalAveragePooling2D()(x)
out = tf.keras.layers.Dense(1,activation="sigmoid")(x)
teacher = tf.keras.Model(inp, out)

teacher.compile(
    optimizer=optimizers.Adam(),
    loss=losses.BinaryCrossentropy(),
    metrics=[metrics.BinaryAccuracy()]
)

2025-09-29 14:35:13.666594: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2025-09-29 14:35:13.677616: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2025-09-29 14:35:13.681543: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:936] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2025-09-29 14:35:13.684288: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

In [14]:
checkpoint_path = f'model/teacher.h5'
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',
    verbose=1,
    save_best_only=True,
    save_weights_only=False  
)

In [None]:
history = teacher.fit(
    tr_gen,
    validation_data= val_gen,
    epochs=100,
    callbacks=[*callbacks, checkpoint],
    use_multiprocessing=True,
    workers=4
    )
model.load_weights(checkpoint_path)

Epoch 1/100


2025-09-29 14:35:29.602195: I tensorflow/stream_executor/cuda/cuda_dnn.cc:368] Loaded cuDNN version 8201
2025-09-29 14:35:33.170734: I tensorflow/stream_executor/cuda/cuda_blas.cc:1786] TensorFloat-32 will be used for the matrix multiplication. This will only be logged once.


 39/586 [>.............................] - ETA: 2:47 - loss: 0.0449 - binary_accuracy: 0.9808   

Corrupt JPEG data: 2230 extraneous bytes before marker 0xd9


108/586 [====>.........................] - ETA: 2:26 - loss: 0.0164 - binary_accuracy: 0.9931

Corrupt JPEG data: 226 extraneous bytes before marker 0xd9




Corrupt JPEG data: 254 extraneous bytes before marker 0xd9




Corrupt JPEG data: 1403 extraneous bytes before marker 0xd9




Corrupt JPEG data: 239 extraneous bytes before marker 0xd9




Corrupt JPEG data: 65 extraneous bytes before marker 0xd9








Corrupt JPEG data: 399 extraneous bytes before marker 0xd9




Corrupt JPEG data: 162 extraneous bytes before marker 0xd9




Corrupt JPEG data: 214 extraneous bytes before marker 0xd9




Corrupt JPEG data: 99 extraneous bytes before marker 0xd9


Epoch 1: val_loss improved from inf to 0.00001, saving model to model/teacher.h5
Epoch 2/100
114/586 [====>.........................] - ETA: 2:25 - loss: 1.2140e-05 - binary_accuracy: 1.0000 

Corrupt JPEG data: 2230 extraneous bytes before marker 0xd9


124/586 [=====>........................] - ETA: 2:22 - loss: 1.2255e-05 - binary_accuracy: 1.0000

Corrupt JPEG data: 226 extraneous bytes before marker 0xd9




Corrupt JPEG data: 1403 extraneous bytes before marker 0xd9




Corrupt JPEG data: 99 extraneous bytes before marker 0xd9




Corrupt JPEG data: 162 extraneous bytes before marker 0xd9




Corrupt JPEG data: 65 extraneous bytes before marker 0xd9








Corrupt JPEG data: 214 extraneous bytes before marker 0xd9




Corrupt JPEG data: 399 extraneous bytes before marker 0xd9




Corrupt JPEG data: 239 extraneous bytes before marker 0xd9




Corrupt JPEG data: 254 extraneous bytes before marker 0xd9


Epoch 2: val_loss improved from 0.00001 to 0.00001, saving model to model/teacher.h5
Epoch 3/100
 72/586 [==>...........................] - ETA: 2:37 - loss: 5.8727e-06 - binary_accuracy: 1.0000 

Corrupt JPEG data: 239 extraneous bytes before marker 0xd9


120/586 [=====>........................] - ETA: 2:22 - loss: 5.6823e-06 - binary_accuracy: 1.0000

Corrupt JPEG data: 226 extraneous bytes before marker 0xd9




Corrupt JPEG data: 162 extraneous bytes before marker 0xd9




In [None]:
teacher.trainable = False
teacher.summary()

In [None]:
from utils.classification.basic import ClassificationBuilder
from utils.classification import vgg, res, mobile, efficient

builder = ClassificationBuilder(
    num_classes = 1, input_shape = (224,224,3),
    activation = "sigmoid")

args = {
    'input_shape': (224, 224, 3),
    'filters': [32, 64, 128, 256, 512],
    'iters': [1, 2, 2, 6, 2]
}
student = builder.build(args, mobile.get_model)
student.summary()

In [None]:
distiller = Distiller(
    student=student, teacher=teacher, 
    student_loss_fun = losses.CategoricalCrossentropy(from_logits=True),
    temperature=5.0, alpha=0.5)
distiller.compile(
    optimizer=tf.keras.optimizers.Adam(),
    metrics=tf.keras.metrics.CategoricalAccuracy()
)

checkpoint_path = f'model/student.h5'
checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_path,
    monitor='val_loss',
    verbose=1,
    save_best_only=True,
    save_weights_only=False  
)

distiller.fit(
    tr_gen,
    validation_data= val_gen,
    epochs=100,
    callbacks=[*callbacks, checkpoint],
)