In [None]:
# Imports
import os, warnings
import matplotlib.pyplot as plt
from matplotlib import gridspec

import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Set Matplotlib defaults
plt.rc('figure', autolayout=True)
plt.rc('axes', labelweight='bold', labelsize='large',
       titleweight='bold', titlesize=18, titlepad=10)
plt.rc('image', cmap='magma')
warnings.filterwarnings("ignore") # to clean up output cells

# Load training and validation sets
datagen_kwargs = dict(rescale=1./255, validation_split=.20)
dataflow_kwargs = dict(target_size=(299, 299), batch_size=16,
                       interpolation="nearest", seed=123)

valid_datagen = ImageDataGenerator(
    **datagen_kwargs)
validation_generator = valid_datagen.flow_from_directory(
        '/data/classification/all_merged_train_images', 
        subset="validation",
        shuffle=False, 
        **dataflow_kwargs)

do_data_augmentation = True
if do_data_augmentation:
    train_datagen = ImageDataGenerator(
      rotation_range=20,
      horizontal_flip=True,
      width_shift_range=0.2, 
      height_shift_range=0.2,
      shear_range=0.2, 
      zoom_range=[0.7, 1.3],
      **datagen_kwargs)

else:
    train_datagen = valid_datagen
train_generator = train_datagen.flow_from_directory(
    '/data/classification/all_merged_train_images', 
    subset="training", 
    shuffle=True, 
    **dataflow_kwargs)

In [None]:
x,y = train_generator.next()

image = x[0]
plt.imshow(image)
plt.show()

In [None]:
# class 확인, 키와 밸류 서로 바꾸기.
# print(train_generator.class_indices)
class_dict = train_generator.class_indices
keys = list(class_dict.keys())
reverse_class_dict = {}
for i in range(len(class_dict.items())) :
    reverse_class_dict[i] = keys[i]
print(reverse_class_dict)

In [None]:
# 1개의 미니 배치 확인
image_batch, labels_batch = next(iter(train_generator))
print(image_batch.shape)
print(labels_batch.shape)

In [None]:
pretrained_base = tf.keras.applications.Xception(
    include_top=False, weights='imagenet', input_tensor=None,
    input_shape=None, pooling='avg',
)

In [None]:
for i, layer in enumerate(pretrained_base.layers):
    print(i, layer.name)

In [None]:
# 사전 학습된 모델 재훈련하지 않도록 설정
pretrained_base.trainable = False

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.layers import Dropout

model = keras.Sequential([
    pretrained_base,
    layers.Flatten(),
    layers.Dense(2048),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.Dropout(0.1),
    layers.Dense(1024),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.Dropout(0.1),
    layers.Dense(512),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.Dropout(0.1),
    layers.Dense(256),
    layers.BatchNormalization(),
    layers.Activation('relu'),
    layers.Dense(16, activation='softmax')
])

In [None]:
model.summary()

In [None]:
from tensorflow.keras import backend as K

def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true, 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true * y_pred, 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred, 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*((precision*recall)/(precision+recall+K.epsilon()))

# compile the model
model.compile(
    optimizer='adam',
    loss='categorical_crossentropy', 
    metrics=['categorical_accuracy',f1_m, precision_m, recall_m])

In [None]:
# Save Model with CheckPoint & StopPoint
from tensorflow.keras.callbacks import ModelCheckpoint,EarlyStopping
import os
import datetime

checkpoint_path = 'training_Xception/cp.ckpt'
checkpoint_dir = os.path.dirname(checkpoint_path)

# create checkpoint callback
check_point_callback = ModelCheckpoint(checkpoint_path,
            save_best_only=True,
            monitor='val_f1_m',
            mode='max',
            verbose=1
)
 
early_stopping_callback = EarlyStopping(monitor='val_loss', patience=20)

In [None]:
history = model.fit(
    train_generator,
    validation_data = validation_generator, 
    epochs=130,
    callbacks=[check_point_callback, early_stopping_callback],
)

In [None]:
model = tf.keras.models.load_model(
    "training_Xception/cp.ckpt", custom_objects={"f1_m": f1_m, "precision_m":precision_m, "recall_m":recall_m}, compile=True, options=None
)

In [None]:
pretrained_base.trainable = True

# set_trainable = False
# for layer in pretrained_base.layers :
#     if layer.name == 'block14_sepconv1' :
#         set_trainable = True
#     if set_trainable :
#         layer.trainable = True
#     else :
#         layer.trainable = False

In [None]:
# recompile the model
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0002)

model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy', 
    metrics=['categorical_accuracy',f1_m, precision_m, recall_m])

In [None]:
history = model.fit_generator(
    train_generator,
    validation_data = validation_generator, 
    epochs=150,
    callbacks=[check_point_callback, early_stopping_callback],
)

In [None]:
model.save("/data/classification/models/clothes_classfication_with_Xception_all_merged_96")

In [None]:
model = tf.keras.models.load_model(
    "training_Xception/cp.ckpt", custom_objects={"f1_m": f1_m, "precision_m":precision_m, "recall_m":recall_m}, compile=True, options=None
)

In [None]:
model.save("/data/classification/models/clothes_classfication_with_Xception_all_merged_96_final")

In [None]:
def smooth_curve(points, factor=0.8) :
    smoothed_points = []
    for point in points :
        if smoothed_points :
            previous = smoothed_points[-1]
            smoothed_points.append(previous * factor + point * (1 - factor))
        else :
            smoothed_points.append(point)
    return smoothed_points

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']
f1_m = history.history['f1_m']
val_f1_m = history.history['val_f1_m']

epochs = range(1, len(loss) + 1)

plt.plot(epochs, smooth_curve(f1_m), 'bo', label='training f1_score')
plt.plot(epochs, smooth_curve(val_f1_m), 'b', label='validation f1_score')
plt.title('Training and validation f1_score')
plt.legend()

plt.figure()

plt.plot(epochs, smooth_curve(loss), 'bo', label='training loss')
plt.plot(epochs, smooth_curve(val_loss), 'b', label='validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

In [None]:
# 나중에 Test 데이터셋 만들고 쓸 코드
test_dir = "/data/clasification/train_images"

dataflow_kwargs = dict(target_size=(299, 299), batch_size=16,
                       class_mode='categorical', interpolation="nearest")

test_datagen = ImageDataGenerator(rescale=1./255)

test_generator = test_datagen.flow_from_directory(
    test_dir,
     **dataflow_kwargs
)

categorical_accuracy, f1_m, precision_m, recall_m = model.evaluate_generator(test_generator) # 디버깅해보기
print('val_categorical_accuracy:', val_categorical_accuracy)
print('val_f1_m:', val_f1_m)
print('val_precision_m:', val_precision_m)
print('val_recall_m:', val_recall_m)

In [None]:
# predictions = loaded_model.predict(validation_generator)
predictions = model.predict(validation_generator)

In [None]:
predictions

In [None]:
len(predictions)