In [25]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import pandas as pd
import math
import cv2
import numpy as np

In [2]:
# 모델을 만들어주는 함수 정의
def get_sequencial_model(imput_shape):
    model = keras.Sequential(
        [
            layers.Input(input_shape), # 입력층 만듦 -> 은닉층으로 데이터를 보낼거임
            
            # 1st
            # 64개의 데이터 받음(필터 64개 사용). 3x3 필터크기. 패딩을 자동으로 삽입해서 크기 유지(<-> 'valid')
            layers.Conv2D(64, 3, strides=1, activation='relu', padding='same'),
            layers.Conv2D(64, 3, strides=1, activation='relu', padding='same'),
            # 가중치 2번 업데이트. 좀 더 세밀해짐.
            layers.MaxPool2D(), # 가장 큰 값을 뽑아서 크기를 줄여줌
            layers.BatchNormalization(), # 픽셀값을 정규화.
            layers.Dropout(0.5), # 과대적합 방지를 위해
            
            # 2nd
            layers.Conv2D(128, 3, strides=1, activation='relu', padding='same'), # 128개 데이터 추출
            layers.Conv2D(128, 3, strides=1, activation='relu', padding='same'),
            layers.MaxPool2D(), 
            layers.BatchNormalization(), 
            layers.Dropout(0.3),
            
            # FC
            layers.GlobalMaxPool2D(), # 다합쳐서 맥스풀링
            layers.Dense(128, activation="relu"), # Dence : 일반 레이어
            layers.Dense(1, activation='sigmoid') # 참/거짓 둘 중 하나로 값 추출되도록
        ]
    )
    return model

In [3]:
input_shape = (256, 256, 3)
model = get_sequencial_model(input_shape)

model.compile(
    optimizer = 'adam',
    loss='binary_crossentropy',
    metrics='accuracy' # 검증방법
)

In [7]:
class DataGenerator(keras.utils.Sequence):
    def __init__(self, batch_size, csv_path, fold, image_size, mode='train', shuffle=True):
        self.batch_size = batch_size
        self.csv_path = csv_path
        self.fold = fold
        self.image_size = image_size
        self.mode = mode
        self.shuffle = shuffle
        
        self.df = pd.read_csv(csv_path)
        df = pd.read_csv(csv_path)
        
        if self.mode == 'train':
            self.df = self.df[self.df['fold'] != self.fold]
        elif self.mode == 'val':
            self.df = self.df[self.df['fold'] == self.fold]
        
        self.on_epoch_end()

    def on_epoch_end(self):
        if self.shuffle:
            self.df = self.df.sample(frac=1).reset_index(drop=True)
    
    def __len__(self):
        return math.ceil(len(self.df) / self.batch_size)  

    def __getitem__(self, idx):
        start = idx * self.batch_size
        end = (idx+1) * self.batch_size
        data = self.df.iloc[start:end]
        batch_x, batch_y = self.get_data(data)
        return np.array(batch_x), np.array(batch_y)
    
    def get_data(self, data):
        batch_x = []
        batch_y = []
        
        for _, r in data.iterrows():
            file_name = r['file_name']
            image = cv2.imread(f'data/images/{file_name}.jpg')
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, (self.image_size, self.image_size))
            image = image / 255.

            label = int(r['species']) - 1

            batch_x.append(image)
            batch_y.append(label)

        return batch_x, batch_y

In [11]:
csv_path = 'data/kfolds.csv'

train_generator = DataGenerator(
    batch_size=128,
    csv_path = csv_path,
    fold=1,
    image_size=256,
    mode='train',
    shuffle=True
)

valid_generator = DataGenerator(
    batch_size=128,
    csv_path = csv_path,
    fold=1,
    image_size=256,
    mode='val',
    shuffle=True
)

첫 번째 콜백 : 어떻게 멈출것인지 설정

In [14]:
# EalryStopping : Epoch을 많이 돌린 후 특정 시점에 멈추게 함
# monitor : EarlyStopping의 기준이 되는 값. 예) val_loss 더이상 감소되지 않을 경우 EarlyStopping 적용
# patience : Training이 진행됨에도 더이상 monitor되는 값의 개선이 없을 경우 몇번 epoch을 진행할 지 설정. 몇번이상 변화가 없으면 멈출지를 설정 가능
# mode : monitor되는 값이 최소가 되야 하는지, 최대가 되어야 하는지 설정
# restore_best_weights : true로 설정하면 training이 끝난 후 model의 weight를 monitor하고 있던 값이 가장
# 좋았을 경우와 weight로 복원. False라면 마지막 training이 끝난 후의 weight로 설정

# setting 객체 만들어놓기
early_stopping = tf.keras.callbacks.EarlyStopping(
    monitor = 'val_loss',
    patience = 3, # 3번 이상 좋아지지 않으면 멈출거임
    mode='min',
    restore_best_weights=False
)

두 번째 콜백: 상태가 좋지 않았을 때 어떻게 수정

In [16]:
# ReduceLROnPlateau : 모델의 개선이 없을 경우 learning rate를 조절해 모델의 개선을 유도함
# monitor : ReduceLROnPlateau의 기준이 되는 값. val_loss가 더이상 감소되지 않을 경우 ReduceLROnPlateau 적용
# factor : learning rate를 얼마나 변경시킬 것인지 정하는 값. learning rate * factor
# patience : training이 진행됨에도 더이상 monitor되는 값의 개선이 없을 경우 최적의 monitor값을 기준으로
# 몇 번의 epoch을 진행하고 learning rate를 조절할지를 값을 설정.

reduce_on_plateau = tf.keras.callbacks.ReduceLROnPlateau(
    monitor = 'val_loss',
    factor = 0.1, # 수정할 때 얼마를 더 곱해줄지를 설정
    patience = 10, # 10번 좋아지지 않으면 적용하겠다.
    mode = 'min',
    min_lr = 0.0001 # 계속 값이 작아지기 때문에 제한을 둠
)

세 번째 콜백 : 어떻게 로그를 저장할 지

In [19]:
# ModelCheckpoint : 모델의 경로 설정
# 모델 경로를 '{epoch:02d}-{val_loss:.2f}.hdf5' 라고 하면 앞의 명시한 문자열로 파일을 저장
# 예) 01-0.12f.h5
# save_weight_only : True(weight만 저장), False(모델, layer, weight 모두 저장)
# save_best_only : True(모델의 정확도가 최고값을 갱신했을 때만 저장), False(매회 저장)

filepath = '{epoch:02d}-{val_loss:.2f}.hdf5' # 예) 01-0.12f.h5
model_checkpoint = tf.keras.callbacks.ModelCheckpoint(
    filepath, 
    moniter='val_loss',
    save_weight_only=True,
    save_best_only=False,
    mode='min',
    verbose=1 # 에폭 돌때마다 설명출력해줌
)

In [26]:
history = model.fit(
    train_generator,
    validation_data = valid_generator,
    epochs=10,
    callbacks=[
        early_stopping,
        reduce_on_plateau,
        model_checkpoint
    ]
)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
