<a href="https://colab.research.google.com/github/dolDolSee/TF2.0/blob/main/cat_dog_finetuning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

def make_catndog_dataframe():
    paths = []
    dataset_gubuns = []
    label_gubuns = []
    # os.walk()를 이용하여 특정 디렉토리 밑에 있는 모든 하위 디렉토리를 모두 조사. 
    # cat-and-dog 하위 디렉토리 밑에 jpg 확장자를 가진 파일이 모두 이미지 파일임
    # cat-and-dog 밑으로 /train/, /test/ 하위 디렉토리 존재(학습, 테스트 용 이미지 파일들을 가짐)

    for dirname, _, filenames in os.walk('/kaggle/input/cat-and-dog'):
        for filename in filenames:
            # 이미지 파일이 아닌 파일도 해당 디렉토리에 있음.
            if '.jpg' in filename:
                # 파일의 절대 경로를 file_path 변수에 할당. 
                file_path = dirname+'/'+ filename
                paths.append(file_path)
                # 파일의 절대 경로에 training_set, test_set가 포함되어 있으면 데이터 세트 구분을 'train'과 'test'로 분류. 
                if '/training_set/' in file_path:
                    dataset_gubuns.append('train')  
                elif '/test_set/' in file_path:
                    dataset_gubuns.append('test')
                else: dataset_gubuns.append('N/A')

                # 파일의 절대 경로에 dogs가 있을 경우 해당 파일은 dog 이미지 파일이고, cats일 경우는 cat 이미지 파일임. 
                if 'dogs' in file_path:
                    label_gubuns.append('DOG')
                elif 'cats' in file_path:
                    label_gubuns.append('CAT')
                else: label_gubuns.append('N/A')
    
    data_df = pd.DataFrame({'path':paths, 'dataset':dataset_gubuns, 'label':label_gubuns})
    return data_df

In [None]:
data_df = make_catndog_dataframe()
data_df.head()

In [None]:
from tensorflow.keras.utils import Sequence
import sklearn
import cv2

BATCH_SIZE = 64
IMAGE_SIZE=160

class CnD_Dataset(Sequence):
    def __init__(self, image_filenames, labels, batch_size=BATCH_SIZE, augmentor=None,shuffle=False, pre_func = None):
        self.image_filenames = image_filenames
        self.labels = labels
        self.batch_size = batch_size
        self.augmentor = augmentor
        self.pre_func = pre_func
        self.shuffle = shuffle
        if self.shuffle:
            pass
    
    # Sequence를 상속받은 Dataset은 batch_size 단위로 입력된 데이터를 처리함. 
    # __len__()은 전체 데이터 건수가 주어졌을 때 batch_size단위로 몇번 데이터를 반환하는지 나타남
    def __len__(self):
        return int(np.ceil(len(self.image_filenames)/BATCH_SIZE))
    
    # batch_size 단위로 image_array, label_array 데이터를 가져와서 변환한 뒤 다시 반환함
    # 인자로 몇번째 batch 인지를 나타내는 index를 입력하면 해당 순서에 해당하는 batch_size 만큼의 데이타를 가공하여 반환
    # batch_size 갯수만큼 변환된 image_array와 label_array 반환.
    def __getitem__(self, index):
        image_name_batch = self.image_filenames[index*self.batch_size:(index+1)*self.batch_size]
        if self.labels is not None:
            label_batch = self.labels[index*self.batch_size:(index+1)*self.batch_size]
        
        # 만일 객체 생성 인자로 albumentation으로 만든 augmentor가 주어진다면 아래와 같이 augmentor를 이용하여 image 변환
        # albumentations은 개별 image만 변환할 수 있으므로 batch_size만큼 할당된 image_name_batch를 한 건씩 iteration하면서 변환 수행. 
        # image_batch 배열은 float32 로 설정.
        image_batch = np.zeros((image_name_batch.shape[0],IMAGE_SIZE, IMAGE_SIZE,3), dtype='float32')
        
        for image_index in range(image_name_batch.shape[0]):
            image = cv2.cvtColor(cv2.imread(image_name_batch[image_index]), cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
            if self.augmentor is not None:
                image = self.augmentor(image=image)['image']
                
            image_batch[image_index]=image
        return image_batch, label_batch
    
    # epoch가 한번 수행이 완료 될 때마다 모델의 fit()에서 호출됨. 
    def on_epoch_end(self):
        if(self.shuffle):
            
            self.image_filenames, self.labels = sklearn.utils.shuffle(self.image_filenames, self.labels)
        else:
            pass

In [None]:
from sklearn.model_selection import train_test_split

def get_train_valid_test(data_df):
    train_df = data_df[data_df["dataset"]=="train"]
    test_df = data_df[data_df['dataset']=='test']
    
    train_path = train_df['path'].values
    train_label = pd.factorize(train_df['label'])[0]
    
    test_path = test_df['path'].values
    test_label = pd.factorize(test_df['label'])[0]
    
    tr_path, val_path, tr_label, val_label = train_test_split(train_path, train_label, test_size=0.5, random_state=2021)
    print('학습용 path shape:', tr_path.shape, '검증용 path shape:', val_path.shape, 
      '학습용 label shape:', tr_label.shape, '검증용 label shape:', val_label.shape)
    return tr_path, tr_label, val_path, val_label, test_path, test_label
    
    

In [None]:
import tensorflow as tf
def create_model(model_name = "mobilenet", verbose=False):
    input_tensor = tf.keras.layers.Input(shape=(IMAGE_SIZE, IMAGE_SIZE,3))
    if model_name == 'vgg16':
        base_model = tf.keras.applications.vgg16.VGG16(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == 'resnet50':
        base_model = tf.keras.applications.ResNet50V2(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name =="xception":
        base_model = tf.keras.applications.Xception(input_tensor=input_tensor, include_top=False, weights='imagenet')
    elif model_name == "mobilenet":
        base_model = tf.keras.applications.MobileNetV2(input_tensor=input_tensor, include_top=False, weights='imagenet')
    
    bm_output = base_model.output
    
    x = tf.keras.layers.GlobalAveragePooling2D()(bm_output)
    if model_name !="vgg16":
        x = tf.keras.layers.Dropout(0.5)(x)
    x = tf.keras.layers.Dense(50, activation='relu', name='fc1')(x)
    output = tf.keras.layers.Dense(1, activation='sigmoid', name='output')(x)
    
    model = tf.keras.models.Model(inputs=input_tensor, outputs=output)
    
    if verbose:
        model.summary()
    return model
        

In [None]:
from tensorflow.keras.applications.mobilenet import preprocess_input as mobile_preprocess_input

def train_model(data_df, model_name, augmentor, preprocessing_func):
    tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)
    
    tr_ds = CnD_Dataset(tr_path, tr_label, batch_size=BATCH_SIZE, augmentor=augmentor, shuffle=True, pre_func=preprocessing_func)
    val_ds = CnD_Dataset(val_path, val_label, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=preprocessing_func)
    
    model= create_model(model_name=model_name)
    model.compile(optimizer=tf.keras.optimizers.Adam(0.0001), loss='binary_crossentropy', metrics=['accuracy'])
    N_EPOCHS = 20
    history = model.fit(tr_ds,epochs=N_EPOCHS, validation_data = val_ds, verbose=1)
    return model, history

In [None]:
# 학습/검증/테스트로 쪼개질 데이터를 전체 데이터의 30%로 설정. 
input_df, _ = train_test_split(data_df, test_size=0.7, random_state=2021)

mobile_model, mobile_history = train_model(input_df, 'mobilenet', None, mobile_preprocess_input)

In [None]:
test_df = data_df[data_df['dataset']=='test']
test_path = test_df['path'].values
test_label = pd.factorize(test_df['label'])[0]

test_ds = CnD_Dataset(test_path, test_label, batch_size=BATCH_SIZE, augmentor=None, shuffle=False, pre_func=mobile_preprocess_input)
mobile_model.evaluate(test_ds)

In [None]:
model = create_model(model_name='mobilenet')
model.summary()

In [None]:
model.layers

In [None]:
model.layers[-4:]

In [None]:
for layer in model.layers:
    print(layer.name, 'trainable:', layer.trainable)

In [None]:
for layer in model.layers[:-4]:
    layer.trainable=False
    print(layer.name, 'trainable:', layer.trainable)

In [None]:
print('\n### final 4 layers ### ')
for layer in model.layers[-4:]:
    print(layer.name, 'trainable:', layer.trainable)

In [None]:
from tensorflow.keras import layers

def train_model_fine_tune(data_df, model_name, augmentor, preprocessing_func):
    # 학습/검증/테스트용 이미지 파일 절대경로와 Label encoding 된 데이터 세트 반환
    tr_path, tr_label, val_path, val_label, test_path, test_label = get_train_valid_test(data_df)
    
    # 학습과 검증용 Sequence Dataset 생성. 
    tr_ds = CnD_Dataset(tr_path, tr_label, batch_size=BATCH_SIZE, augmentor=augmentor, 
                          shuffle=True, pre_func=preprocessing_func)
    val_ds = CnD_Dataset(val_path, val_label, batch_size=BATCH_SIZE, augmentor=None, 
                           shuffle=False, pre_func=preprocessing_func)
    
    # 입력된 model_name에 따라 모델 생성. 
    model = create_model(model_name=model_name)
    # 최종 output 출력을 softmax에서 sigmoid로 변환되었으므로 binary_crossentropy로 변환 
    model.compile(optimizer=tf.keras.optimizers.Adam(0.0001), loss='binary_crossentropy', metrics=['accuracy'])
    
    
    
    
    
    # feature extractor layer들을 freeze
    for layer in model.layers[:-4]:
        layer.trainable = False
    
    
    
    
    
    FIRST_EPOCHS = 10
    SECOND_EPOCHS = 10
    # 1단계 fine tuning 학습 수행. 
    history = model.fit(tr_ds, epochs=FIRST_EPOCHS, steps_per_epoch=int(np.ceil(tr_path.shape[0]/BATCH_SIZE)), 
                       validation_data=val_ds, validation_steps=int(np.ceil(val_path.shape[0]/BATCH_SIZE)),
                       verbose=1)
    # 전체 layer들을 unfreeze, 단 batch normalization layer는 그대로 freeze
    for layer in model.layers:
        if not isinstance(layer, layers.BatchNormalization):
            layer.trainable = True
    # 2단계는 learning rate를 기존 보다 1/10 감소    
    model.compile(optimizer=tf.keras.optimizers.Adam(0.00001), loss='binary_crossentropy', metrics=['accuracy'])    
    history = model.fit(tr_ds, epochs=SECOND_EPOCHS, steps_per_epoch=int(np.ceil(tr_path.shape[0]/BATCH_SIZE)), 
                       validation_data=val_ds, validation_steps=int(np.ceil(val_path.shape[0]/BATCH_SIZE)),
                       verbose=1)
    
    return model, history

In [None]:
mobile_model_tuned, mobile_tuned_history = train_model_fine_tune(input_df, 'mobilenet', None, mobile_preprocess_input)

In [None]:
mobile_model_tuned.evaluate(test_ds)