In [None]:
import tensorflow as tf
import keras
from keras.callbacks import Callback, ModelCheckpoint
from keras.engine.network import Network
from keras.layers import Dense, Dropout, Input
from keras.layers.pooling import GlobalAveragePooling2D
from keras.models import Model
from keras_vggface.vggface import VGGFace

import math
import numpy as np
import os
import random
from datetime import datetime

In [None]:
# 変数設定
EXPAND_SIZE = 448
CROP_SIZE = 400
INPUT_SIZE = 224
NUM_CLASS = 21

In [None]:
# 変数設定
LOGDIR = 'log/data_%s/' % datetime.now().strftime("%Y-%m-%d_%H%M")
flags = tf.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string('trainDir', LOGDIR, 'DIrectory to put the training data')
flags.DEFINE_integer('maxEpoch', 100, 'Number of epochs to run trainer')
flags.DEFINE_integer('batchSize', 64, 'train data size of subset')
flags.DEFINE_integer('threadNum', 1, 'num of threads')
flags.DEFINE_float('learningLate', 0.001, 'Initial learning rate')
flags.DEFINE_string('f', '', 'kernel') # エラー回避

In [None]:
# TFRecordまわりの設定
trainTFrecordFileName = "train_tf_file_%sx%s.tfrecords" % (INPUT_SIZE, INPUT_SIZE)
testTFrecordFileName = "test_tf_file_%sx%s.tfrecords" % (INPUT_SIZE, INPUT_SIZE)
numRecordsTrain = sum([1 for record  in tf.python_io.tf_record_iterator(trainTFrecordFileName)])
numRecordsTest = sum([1 for record  in tf.python_io.tf_record_iterator(testTFrecordFileName)])
stepsPerEpochTrain = (numRecordsTrain - 1) // FLAGS.batchSize + 1
stepsPerEpochTest = (numRecordsTest - 1) // FLAGS.batchSize + 1

In [None]:
def _parse_function(exampleProto):
    features={
            'label': tf.FixedLenFeature((), tf.int64, default_value=0),
            'image': tf.FixedLenFeature((), tf.string, default_value="")
        }
    parsedFeatures = tf.parse_single_example(exampleProto, features)  # データ構造を解析
     
    return parsedFeatures["image"], parsedFeatures["label"]
 
def read_image_train(argImages, argLabels):      
    images = tf.decode_raw(argImages, tf.uint8)
    images = tf.cast(images, tf.float32)
    images = tf.reshape(images, [INPUT_SIZE, INPUT_SIZE, 3])
    images = tf.image.resize_images(images, [EXPAND_SIZE, EXPAND_SIZE], method=tf.image.ResizeMethod.BICUBIC)
    
    ### 水増し
    # 切り取り
    cropsize = random.randint(CROP_SIZE, CROP_SIZE + (EXPAND_SIZE - CROP_SIZE) / 2)
    framesize = CROP_SIZE + (cropsize - CROP_SIZE) * 2
    images = tf.image.resize_image_with_crop_or_pad(images, framesize, framesize)
    images = tf.random_crop(images, [cropsize, cropsize, 3])
    
    # 左右反転
    images = tf.image.random_flip_left_right(images)
    
    # サイズもとに戻す
    images = tf.image.resize_images(images, [INPUT_SIZE, INPUT_SIZE], method=tf.image.ResizeMethod.BICUBIC)
    
    images = images / 255
    images = tf.reshape(images, [INPUT_SIZE, INPUT_SIZE, 3])
    
    labels = tf.cast(argLabels, tf.int32)
    labels = tf.one_hot(labels, NUM_CLASS)
     
    return images, labels

def read_image_test(argImages, argLabels):     
    images = tf.decode_raw(argImages, tf.uint8)
    images = tf.cast(images, tf.float32)
    images = images / 255  # 画像データを、0～1の範囲に変換する
    images = tf.reshape(images, [INPUT_SIZE, INPUT_SIZE, 3])
    
    labels = tf.cast(argLabels, tf.int32)
    labels = tf.one_hot(labels, NUM_CLASS)
     
    return images, labels

In [None]:
# 学習中に検証を実施するためのコールバック
class EvaluateInputTensor(Callback):
    def __init__(self, model, steps, metrics_prefix="val", verbose=1):
        super(EvaluateInputTensor, self).__init__()
        self.val_model = model
        self.num_steps = steps
        self.verbose = verbose
        self.metrics_prefix = metrics_prefix

    def on_epoch_end(self, epoch, logs={}):
        # 重みパラメータは学習用と評価用で共有されているので、
        # そのまま評価用モデルで評価すればよい
        results = self.val_model.evaluate(
            steps=int(self.num_steps),
            verbose=self.verbose)
        # 評価結果を出力
        if self.verbose >= 0:
            metrics_str = ""
            for result, name in zip(results, self.val_model.metrics_names):
                metric_name = self.metrics_prefix + "_" + name
                logs[metric_name] = result
                if self.verbose > 0:
                    metrics_str += metric_name + ": " + str(result) + " "
            metrics_str += "\n"
            print(metrics_str)

In [None]:
# TensorBoardにtrain testのaccuracy と lossを書き込む
class TrainValTensorBoard(keras.callbacks.TensorBoard):
    def __init__(self, logDir, summaryWriter, summaryOp, **kwargs):
        # Make the original `TensorBoard` log to a subdirectory 'training'
        training_logDir = os.path.join(logDir, 'training')
        super(TrainValTensorBoard, self).__init__(training_logDir, **kwargs)

        # Log the validation metrics to a separate subdirectory
        self.val_logDir = os.path.join(logDir, 'validation')
        
        self.writer = summaryWriter
        self.op = summaryOp

    def set_model(self, model):
        # Setup writer for validation metrics
        self.val_writer = tf.summary.FileWriter(self.val_logDir)
        super(TrainValTensorBoard, self).set_model(model)

    def on_epoch_end(self, epoch, logs=None):
        # Pop the validation logs and handle them separately with
        # `self.val_writer`. Also rename the keys so that they can
        # be plotted on the same figure with the training metrics
        logs = logs or {}
        val_logs = {k.replace('val_', ''): v for k, v in logs.items() if k.startswith('val_')}
        for name, value in val_logs.items():
            summary = tf.Summary()
            summary_value = summary.value.add()
            summary_value.simple_value = value.item()
            summary_value.tag = name
            self.val_writer.add_summary(summary, epoch)
        self.val_writer.flush()

        # Pass the remaining logs to `TensorBoard.on_epoch_end`
        logs = {k: v for k, v in logs.items() if not k.startswith('val_')}
        super(TrainValTensorBoard, self).on_epoch_end(epoch, logs)
        
        # tf.image 書き込み用
        with tf.Session() as session:
            self.writer.add_summary(session.run(self.op), epoch)
        self.writer.flush()

    def on_train_end(self, logs=None):
        super(TrainValTensorBoard, self).on_train_end(logs)
        self.val_writer.close()

In [None]:
# 入力層
trainDataset = tf.data.TFRecordDataset(trainTFrecordFileName)
trainDataset = trainDataset.map(_parse_function, FLAGS.threadNum)  # レコードを解析し、テンソルに変換
trainDataset = trainDataset.map(read_image_train, FLAGS.threadNum)  # データの形式、形状を変更
trainDataset = trainDataset.shuffle(numRecordsTrain)
trainDataset = trainDataset.batch(FLAGS.batchSize)  # 連続するレコードをバッチに結合
trainDataset = trainDataset.repeat(-1)  # 無限に繰り返す
trainIterator = trainDataset.make_one_shot_iterator()
trainImages, trainLabels = trainIterator.get_next()  # イテレータの次の要素を取得

testDataset = tf.data.TFRecordDataset(testTFrecordFileName)
testDataset = testDataset.map(_parse_function, FLAGS.threadNum)  # レコードを解析し、テンソルに変換
testDataset = testDataset.map(read_image_test, FLAGS.threadNum)  # データの形式、形状を変更
testDataset = testDataset.batch(FLAGS.batchSize)  # 連続するレコードをバッチに結合
testDataset = testDataset.repeat(-1)  # 無限に繰り返す
testIterator = testDataset.make_one_shot_iterator()
testImages, testLabels = testIterator.get_next()   

tf.summary.image('traing_image', trainImages, max_outputs = 100)

summaryWriter = tf.summary.FileWriter(FLAGS.trainDir)
summaryOp =  tf.summary.merge_all()

In [None]:
# チェックポイントコールバックの指定
checkPointPath = FLAGS.trainDir + "weights_epoch-{epoch:02d}_loss-{loss:.4f}_val_loss-{val_loss:.4f}.hdf5"
modelCheckPoint = tf.keras.callbacks.ModelCheckpoint(
                    filepath=checkPointPath,
                    monitor='val_loss',
                    verbose=1,
                    save_best_only=True,
                    mode='min',
                    period=5)

In [None]:
inputShape = (INPUT_SIZE, INPUT_SIZE, 3)
inputTensor = Input(shape=inputShape)

# モデル構築 Resnet50 face
ResNet50Model = VGGFace(model='resnet50', weights='vggface', include_top=False, input_tensor=inputTensor, input_shape=inputShape)
flat = GlobalAveragePooling2D()(ResNet50Model.output)
dropout = Dropout(0.3)(flat)
output = Dense(NUM_CLASS, activation='softmax')(dropout)

In [None]:
# 重みを共有するネットワークの作成
commonNetwork = Network(inputTensor, output)

# train test それぞれのinput
trainInputLayer = Input(tensor=trainImages)
testInputLayer = Input(tensor=testImages)

# output
trainOutput = commonNetwork(trainInputLayer)
testOutput = commonNetwork(testInputLayer)

# train test モデル
trainModel = Model(inputs=trainInputLayer, outputs=trainOutput)
testModel = Model(inputs=testInputLayer, outputs=testOutput)

In [None]:
# 指定した層以降は学習する
trainStartLayerName = 'activation_40'

setTrainable = False
for layer in trainModel.get_layer("network_1").layers:
    if layer.name == trainStartLayerName:
        setTrainable = True
    layer.trainable = setTrainable

# layerの重みを学習するか確認のためのprint
for index, layer in enumerate(trainModel.get_layer("network_1").layers):
    print(index, layer.name, layer.trainable)

In [None]:
# コンパイル optimizerは好み？
trainModel.compile(optimizer='nadam',
                   loss='categorical_crossentropy',
                   metrics=['accuracy'],
                   target_tensors=[trainLabels])
testModel.compile(optimizer='nadam',
                   loss='categorical_crossentropy',
                   metrics=['accuracy'],
                   target_tensors=[testLabels])

In [None]:
epochHistory = trainModel.fit(
                epochs=FLAGS.maxEpoch, 
                verbose=1,
                steps_per_epoch=stepsPerEpochTrain,
                callbacks=[
                EvaluateInputTensor(testModel, steps=stepsPerEpochTest),
                modelCheckPoint,
                TrainValTensorBoard(logDir=FLAGS.trainDir, summaryWriter=summaryWriter, summaryOp=summaryOp,
                                    write_graph=True, write_images=True)])