<a href="https://colab.research.google.com/github/JunSeokCheon/Estimating-user-age-for-kiosk-UI-UX-personalization/blob/master/Mobilenet_model_make.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# google drive mount code|
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import sys
import math
import itertools
from zipfile import ZipFile

import time
from datetime import datetime

# opencv 관련 코드는 tf.image로 대체해봄. 문제 생기면 알려줄 것
# import cv2

In [3]:
import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.metrics import confusion_matrix

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import utils
from tensorflow.keras import models
from tensorflow.keras import layers
from tensorflow.keras import callbacks
from tensorflow.keras import applications


In [4]:
# 테스트를 위한 seed 고정. 이후 정규 학습시에는 시드 고정 해제할 것
SEED = 2021
np.random.seed(SEED)
tf.random.set_seed(SEED)

In [5]:
# 각종 매개변수
PATIENCE = 20       # 얼리스탑에 사용된 주시 횟수
EPOCH_SIZE = 100    # epoch 횟수
BATCH_SIZE = 192    # batch 크기
IMAGE_SIZE = 200    # 이미지 크기
CHANNELS = 3
INPUT_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, CHANNELS)   # 이미지 크기에 따른 shape 크기

In [6]:
# 텐서플로우 버전 확인하는 코드
print(tf.__version__)
# GPU가 잡히는지 확인하는 코드
device_name = tf.test.gpu_device_name()
if device_name != '/device:GPU:0':
    raise SystemError('GPU device not found')
print('Found GPU at: {}'.format(device_name))

2.6.0
Found GPU at: /device:GPU:0


In [None]:
# 파일이 압축되어 있을 때 압축 해제하는 코드
file_path = "/content/drive/My Drive/age_detection/zipped_dataset/Combined_data_final.zip"
try:
    os.mkdir("combined")
    os.remove("combined")
except OSError:
    pass

with ZipFile(file_path, 'r') as myzip:
    myzip.extractall("combined")
    print('Done unzipping ', file_path)

In [None]:
!pip install matplotlib

In [None]:
# # 이미지 경로로 특정 이미지 불러오는 코드
# img_path = "/content/combined/100_0_1.jpg"

# image_string = tf.io.read_file(img_path)
# img = tf.io.decode_jpeg(image_string, channels=3)
# # img = cv2.imread(img_path)
# # img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR) # 불러온 이미지의 컬러 채널 순서 변경

# plt.axis('off')
# plt.imshow(img)
# # matplotlib.imshow(img)

In [None]:
# csv 파일을 추가하는 코드
csv_path = "/content/drive/My Drive/age_detection/input_output/images_filenames_labels.csv"

data_df = pd.read_csv(csv_path)
data_df.head()

Unnamed: 0,filename,age,target
0,/content/combined/test_image/47_3_305.jpg,47,3
1,/content/combined/test_image/27_2_1505.jpg,27,1
2,/content/combined/test_image/56_0_633.jpg,56,4
3,/content/combined/test_image/17_3_166.jpg,17,0
4,/content/combined/test_image/27_3_6726.jpg,27,1


In [None]:
# target 데이터로 이미지 연령 그룹 분포를 확인하는 코드(개수)
data_df['target'].value_counts(normalize=False)

1    18555
2    13486
3     9428
4     7620
0     7063
5     6492
Name: target, dtype: int64

In [None]:
# target 데이터로 이미지 연령 그룹 분포를 확인하는 코드(정규화)
data_df['target'].value_counts(normalize=False)

1    30534
0    25618
2     6492
Name: target, dtype: int64

In [None]:
# target별로 n개만큼 샘플링하는 코드
# 샘플링 안할 때는 코드 전체를 주석처리할 것
sample_n = 6492   # 추출할 샘플의 개수
sample_df = pd.DataFrame(columns=['filename', 'age', 'target'])
for t in data_df['target'].unique():
    temp_df = data_df.query('target==@t').sample(n=sample_n)    # target 별로 n개 만큼 샘플링
    sample_df = pd.concat([sample_df, temp_df])                 # 데이터 결합

data_df = sample_df.reset_index(drop=True)                      # 샘플링한 df로 새로 인덱싱

In [None]:
# feature와 target으로 분리
x_data = data_df['filename']    # feature는 이미지
y_data = data_df['target']      # target은 연령대

# one-hot encoding
y_data = utils.to_categorical(y_data)

In [None]:
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.3, random_state=SEED)

In [None]:
x_train = x_train.reset_index(drop=True)
x_test = x_test.reset_index(drop=True)

In [None]:
# 파일 경로에 해당하는 이미지를 로드하는 함수
def _load_img(filename):
  # 이미지 불러오는 코드
  image_string = tf.io.read_file(filename)
  image_decoded = tf.io.decode_jpeg(image_string, channels=CHANNELS)
  # 이미지 어규먼트 하는 코드
  image_augment = tf.image.random_flip_left_right(image_decoded)
  # image_augment = tf.image.random_flip_up_down(image_augment)
  image_augment = tf.image.random_brightness(image_augment, max_delta=0.1)
  # image_augment = tf.image.random_contrast(image_augment, lower=0.8, upper=1.2)
  # 이미지 리사이징 하는 코드
  image_resize = tf.image.resize(image_augment, (IMAGE_SIZE, IMAGE_SIZE))
  image_normalize = image_resize / 255

  return image_normalize

In [None]:
# 커스텀 데이터 로더 함수
class Dataloader(utils.Sequence):

    def __init__(self, x_set, y_set, batch_size, shuffle=False):
        self.x, self.y = x_set, y_set
        self.batch_size = batch_size
        self.shuffle=shuffle
        self.on_epoch_end()

    def __len__(self):              # 로더의 길이 계산
        return math.ceil(len(self.x) / self.batch_size)

    def __getitem__(self, idx):     # batch 단위로 직접 묶어줘야 함
		                            # sampler의 역할(index를 batch_size만큼 sampling해줌)
        indices = self.indices[idx*self.batch_size:(idx+1)*self.batch_size]

        batch_path_x = [self.x[i] for i in indices]
        batch_x = [_load_img(path) for path in batch_path_x]
        batch_y = [self.y[i] for i in indices]

        return np.array(batch_x), np.array(batch_y)

    def on_epoch_end(self):         # epoch이 끝날때마다 실행
        self.indices = np.arange(len(self.x))
        if self.shuffle == True:
            np.random.shuffle(self.indices)

In [None]:
# 커스텀 데이터 로더 적용
loader_train = Dataloader(x_train, y_train, batch_size=BATCH_SIZE, shuffle=True)
loader_test = Dataloader(x_test, y_test, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
# MobileNetV2 모델 로드
model_MobileNetV2 = applications.MobileNetV2(input_shape=INPUT_SHAPE, 
                                            include_top=False, 
                                            weights="imagenet"
                                            )
model_MobileNetV2.trainabel = True
# model_MobileNetV2.trainabel = False   # 주석 함부러 지우지 말 것

In [None]:
# MobileNetV2 기반 CNN 모델 MobileNetV2 기반 CNN 모델
model = models.Sequential()

model.add(model_MobileNetV2)
model.add(layers.GlobalAveragePooling2D())
model.add(layers.Dense(128, activation='relu'))
model.add(layers.Dropout(0.7))
model.add(layers.Dense(64, activation='relu'))
model.add(layers.Dropout(0.7))
model.add(layers.Dense(3, activation='softmax'))

model.summary()

In [None]:
# 얼리스탑 코드 일부 수정. min_delta 값을 기존(0.01)보다 더 낮춤(0.005)
earlystopping = callbacks.EarlyStopping(monitor='val_loss', min_delta=0.005, patience=PATIENCE, verbose=1, mode='min')

# 코랩 기준의 텐서보드와 체크포인트 구성 코드이므로 수정할 필요가 있음
callback_path = "/content/drive/My Drive/Age_Classification_with_Faces/input_output"

# 파일 이름에 에포크 번호와 학습 정확도를 포함
checkpoint_path = "mobilenet_test_model/cp-ep{epoch:04d}-acc{val_accuracy:.3f}.h5"
tensorboard_path = "cnn_logs/mobilenet_test_model"

# 텐서보드
tensorboard = callbacks.TensorBoard(log_dir=os.path.join(callback_path, tensorboard_path))

# 체크포인트 코드
checkpoint = callbacks.ModelCheckpoint(filepath=os.path.join(callback_path, checkpoint_path),
                             monitor='val_accuracy',
                             mode = 'max',
                             save_best_only=True,
                             save_weights_only=False,
                             verbose=1
                            )

In [None]:
# 컴파일 코드
model.compile(loss='categorical_crossentropy', optimizer= 'adam', metrics=['accuracy'])

model_history = model.fit(loader_train,
                            validation_data=loader_test,
                            callbacks=[tensorboard, checkpoint],
                            epochs=EPOCH_SIZE
                            )

In [None]:
# history 값 얻어오기
train_loss = model_history.history['loss']
train_accuracy = model_history.history['accuracy']

test_loss = model_history.history['val_loss']
test_accuracy = model_history.history['val_accuracy']

In [None]:
# loss - accuracy 그래프 그리기
fig, ax = plt.subplots(ncols=2, figsize=(15,7))

ax = ax.ravel()

ax[0].plot(train_loss, label='Train Loss', color='royalblue', marker='o', markersize=5)
ax[0].plot(test_loss, label='Test Loss', color = 'orangered', marker='o', markersize=5)

ax[0].set_xlabel('Epochs', fontsize=14)
ax[0].set_ylabel('Categorical Crossentropy', fontsize=14)

ax[0].legend(fontsize=14)
ax[0].tick_params(axis='both', labelsize=12)

ax[1].plot(train_accuracy, label='Train Accuracy', color='royalblue', marker='o', markersize=5)
ax[1].plot(test_accuracy, label='Test Accuracy', color='orangered', marker='o', markersize=5)

ax[1].set_xlabel('Epochs', fontsize=14)
ax[1].set_ylabel('Accuracy', fontsize=14)

ax[1].legend(fontsize=14)
ax[1].tick_params(axis='both', labelsize=12)

fig.suptitle(x=0.5, y=0.92, t="Lineplots showing loss and accuracy of CNN model by epochs", fontsize=16)

plt.savefig('/content/drive/My Drive/age_detection/plot_images/mobilenet_model_accuracy.png', bbox_inches='tight');

In [None]:
# 스코어 확인 코드
model_score = model.evaluate(loader_test, verbose=1)