In [88]:
import os
import math
import numpy as np
import pandas as pd
import tensorflow as tf
from keras import layers, callbacks
from keras.models import Model
from keras.regularizers import l2
from keras.optimizers import SGD
from keras.applications import ResNet50V2
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import LearningRateScheduler, EarlyStopping
from keras.layers import Input
from sklearn.model_selection import train_test_split
from PIL import Image, UnidentifiedImageError

In [89]:
#사용자 셋팅
data_dir = 'TrashBox_train_set'
saved_model_dir = 'fine_tuned_saved_model'
saved_model_file = 'model\ResNet50V2_fine_tuned.h5'
image_exts = ['.jpg', '.jpeg', '.png']

In [3]:
# ResNet50V2 모델 수정 및 완전 연결층 추가
def create_teacher_model():
    input = layers.Input(shape=(224, 224, 3))
    base_model = ResNet50V2(input_tensor=input, include_top=False, weights='imagenet')
    bm_output = base_model.output

    # 모델 정규화 및 완전 연결층 추가
    x = layers.GlobalAveragePooling2D()(bm_output)
    x = layers.Dense(1024)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Activation('relu')(x)
    x = layers.Dropout(0.5)(x)
    x = layers.Dense(7, activation="softmax")(x)
    
    fine_tuned_model = Model(input, x)
    return fine_tuned_model

# 학습률 스케줄러
def step_decay(epoch):
    initial_lrate = 0.1
    drop = 0.5
    epochs_drop = 10.0
    lrate = initial_lrate * math.pow(drop, math.floor((1+epoch)/epochs_drop))
    return lrate

# 이미지 파일 경로 및 라벨 정보를 담은 데이터 프레임 생성
def create_dataframe(data_dir, exts):
    data = []
    for subdir, dirs, files in os.walk(data_dir):
        for file in files:
            ext = os.path.splitext(file)[-1].lower()
            if ext in exts:
                file_path = os.path.join(subdir, file)
                try:
                    with Image.open(file_path) as im:
                        im.verify()  # 이미지 파일 유효성 검사
                except (IOError, ValueError, UnidentifiedImageError):
                    print(f"Invalid image file '{file_path}', skipped.")
                    os.remove(file_path)
                    continue
                label = os.path.split(subdir)[-1].lower()
                data.append((file_path, label))
    return pd.DataFrame(data, columns=['filepath', 'label'])
image_files = []
labels = []

for root, dirs, files in os.walk(data_dir):
    for file in files:
        if file.endswith('.jpg') or file.endswith('.jpeg') or file.endswith('.png'):
            # 이미지 파일 경로와 라벨 정보 저장
            img_path = os.path.join(root, file)
            label = os.path.basename(os.path.dirname(img_path))
            image_files.append(img_path)
            labels.append(label)

# 데이터 프레임 생성
data = create_dataframe(data_dir, image_exts)

# 학습 데이터 증가(ImageDataGenerator)
train_datagen = ImageDataGenerator(rescale=1./255,
                                   rotation_range=15,
                                   shear_range=0.2,
                                   zoom_range=0.2,
                                   horizontal_flip=True,
                                   fill_mode='nearest',
                                   validation_split=0.2)
train_data, valid_data = train_test_split(data, test_size=0.2, random_state=42, stratify=data['label'])
# 훈련 및 검증 데이터 생성
train_generator = train_datagen.flow_from_dataframe(dataframe=train_data,
                                                    x_col='filepath',
                                                    y_col='label',
                                                    target_size=(224, 224),
                                                    batch_size=32,
                                                    class_mode='categorical',
                                                    subset='training')

validation_generator = train_datagen.flow_from_dataframe(dataframe=valid_data,
                                                         x_col='filepath',
                                                         y_col='label',
                                                         target_size=(224, 224),
                                                         batch_size=32,
                                                         class_mode='categorical',
                                                         subset='validation')

# 모델 생성
teacher_model = create_teacher_model()
teacher_model.summary()

# 기존 ResNet50V2 층 동결
for layer in teacher_model.layers[:-1]:
    layer.trainable = False

# 최적화 알고리즘 변경(SGD) 및 학습률 스케줄러 콜백 추가
opt = SGD(lr=0.1, decay=0.0, momentum=0.9, nesterov=False)
lrate = LearningRateScheduler(step_decay)
early_stopping = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)
callbacks_list = [lrate, early_stopping]
teacher_model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

# 새로운 완전 연결층(Dense layer) 학습
history = teacher_model.fit(train_generator, validation_data=validation_generator, epochs=30, verbose=1, callbacks=callbacks_list)

# 동결을 풀기 전 학습률을 낮추어 미세조정을 효과적으로 수행
opt = SGD(lr=0.001, decay=1e-6, momentum=0.9, nesterov=False)
teacher_model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])

# 기존 ResNet50V2 층의 동결 해제
for layer in teacher_model.layers[-20:]:
    layer.trainable = True

# 학습률 스케줄러 콜백 추가
lrate = LearningRateScheduler(step_decay)
callbacks_list = [lrate, early_stopping]

# 미세 조정(fine-tuning) 실행
history = teacher_model.fit(train_generator, validation_data=validation_generator, epochs=30, verbose=1, callbacks=callbacks_list)


Found 9139 validated image filenames belonging to 7 classes.
Found 571 validated image filenames belonging to 7 classes.
Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 conv1_pad (ZeroPadding2D)      (None, 230, 230, 3)  0           ['input_1[0][0]']                
                                                                                                  
 conv1_conv (Conv2D)            (None, 112, 112, 64  9472        ['conv1_pad[0][0]']              
                                )                                       

  super(SGD, self).__init__(name, **kwargs)


Epoch 1/30




Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30
Epoch 1/30
Epoch 2/30
Epoch 3/30
Epoch 4/30
Epoch 5/30
Epoch 6/30
Epoch 7/30
Epoch 8/30
Epoch 9/30
Epoch 10/30
Epoch 11/30
Epoch 12/30
Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
Epoch 22/30
Epoch 23/30
Epoch 24/30
Epoch 25/30
Epoch 26/30
Epoch 27/30
Epoch 28/30
Epoch 29/30
Epoch 30/30


In [90]:
#Student 모델 생성 (기존 코드의 create_fine_tune_model 함수 이용)
def create_student_model():
    input_layer = Input(shape=(224, 224, 3))

    x = layers.Conv2D(16, (3 ,3), activation='relu')(input_layer)
    x = layers.MaxPooling2D((2 ,2))(x)

    x = layers.Conv2D(32 , (3 ,3), activation='relu')(x)
    x = layers.MaxPooling2D((2 ,2))(x)

    x = layers.Flatten()(x)

    x = layers.Dense(64, activation='relu')(x)

    output_layer = layers.Dense(7, activation='softmax')(x)  # 분류 클래스 개수에 맞게 출력 레이어를 설정합니다.

    model = Model(inputs=input_layer , outputs=output_layer)
        
    return model
#student 모델 생성
student_model = create_student_model()
print("student model", student_model.summary())

student model None


In [91]:
# 1. Define the teacher model (pre-trained or trained with larger network)
teacher = teacher_model  # Replace with your teacher model

# 2. Define the student model (smaller than the teacher)
student = student_model  # Replace with your student model

class Distiller(tf.keras.Model):
    def __init__(self, student, teacher):
        super().__init__()
        self.teacher = teacher
        self.student = student
        self.loss_tracker = tf.keras.metrics.Mean(name="distillation loss")  
        
    def compile(self, optimizer, metrics, distillation_loss_fn, temperature=3):
        super().compile(optimizer=optimizer, metrics=metrics)
        self.distillation_loss_fn = distillation_loss_fn
        self.temperature = temperature
        
    def train_step(self, data):
        tf.keras.utils.disable_interactive_logging()
        x, _ = data
        # Forward pass of teacher
        teacher_predictions = self.teacher(x, training=False)
        
        with tf.GradientTape() as tape:
            # Forward pass of student
            student_predictions = self.student(x, training=True)
            
            # Compute loss
            distillation_loss = self.distillation_loss_fn(
                tf.nn.softmax(teacher_predictions, axis=1),
                tf.nn.softmax(student_predictions, axis=1),
            )
            
        # Compute gradients
        trainable_vars = self.student.trainable_variables
        gradients = tape.gradient(distillation_loss, trainable_vars)
        
        # Update weights
        self.optimizer.apply_gradients(zip(gradients, trainable_vars))
        
        # Report progress
        self.loss_tracker.update_state(distillation_loss)
        tf.keras.utils.enable_interactive_logging()
        return {"distillation loss": self.loss_tracker.result()}
        
optimizer = tf.keras.optimizers.Adam()
distiller = Distiller(student=student, teacher=teacher)
distiller.compile(optimizer, metrics=[tf.keras.metrics.SparseCategoricalAccuracy()], distillation_loss_fn=tf.keras.losses.KLDivergence(), temperature=3)
batch_size = 32
history = distiller.fit(train_generator, steps_per_epoch=len(train_data) // batch_size, epochs=30, validation_data=validation_generator, validation_steps=len(validation_generator) // batch_size, verbose=1)

student = distiller.student
student.compile(metrics=["accuracy"])
_, accuracy = student.evaluate(validation_generator)
print(f"Validation accuracy: {accuracy * 100:.2f}%")
# student 모델 저장
student.save("model\student_model.h5")

StopIteration: 