In [1]:
import numpy as np
import cv2
from tensorflow import keras
import pandas as pd
import tensorflow as tf
import os
from random import shuffle

In [2]:

class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras - adapted from https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly'
    def __init__(self, list_IDs, batch_size=32, dim=(95,95), n_channels=3, 
                 datapath='/vagrant/imgs/training_data/training_data/aligned',
                 attribute_path='/vagrant/imgs/list_attr_celeba.csv',
                 label_size=40, shuffle=True):
        'Initialization'
        self.dim = dim
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = shuffle
        self.label_size = label_size
        self.datapath = datapath
        self.on_epoch_end()
    
        self.df = pd.read_csv(attribute_path)
        

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X, y = self.__data_generation(list_IDs_temp)

        return X, y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
            
    def get_numpy_image(self, image_path):
        img =  cv2.imread(os.path.join(self.datapath, image_path))
        return img / 255
    
    def get_label(self, image_path):
        if 'png' in image_path:
            image_path = image_path.replace('png', 'jpg')
        row = self.df.loc[self.df['image_id'] == image_path]
        label = np.array(row.values.tolist()[0][1:])
#         print("label: {}".format(d))
        label[label < 0] = 0
        return label

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        Y = np.empty((self.batch_size, self.label_size), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            X[i,] = self.get_numpy_image(ID)
            Y[i,] = self.get_label(ID)
            
        return X, Y

In [3]:
class PredictionDataGenerator(DataGenerator):
    def __init__(self, list_IDs, dim=(95,95), n_channels=3, 
                 datapath='/vagrant/imgs/training_data/training_data/aligned'):
        'Initialization'
        self.dim = dim
        self.batch_size = len(list_IDs)
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.shuffle = False
        self.datapath = datapath
        self.on_epoch_end()
        
    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X = self.__data_generation(list_IDs_temp)

        return X
    
    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' # X : (n_samples, *dim, n_channels)
        # Initialization
        X = np.empty((self.batch_size, *self.dim, self.n_channels))

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            X[i,] = self.get_numpy_image(ID)
            
        return X

In [4]:

def create_partition(amount='all', datapath='/vagrant/imgs/training_data/training_data/aligned', split=(60, 20, 20)):
    directory = os.listdir(datapath)
    shuffle(directory)
    if amount != 'all':
        directory = directory[:amount]
    l = len(directory)
    train = int(l *split[0]/100)
    val = int(l * split[1]/100) + train
    test = int(l * split[2]/100) + val
    
    return {
        "train": directory[:train],
        "validation": directory[train:val],
        "test": directory[val:]
    }


In [None]:
def evaluate_model(model, data_generators, checkpoint_path, patience=20, period=5, workers=8, epochs=100, verbose=1):
    early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=patience)

    cp_callback = keras.callbacks.ModelCheckpoint(
        checkpoint_path, verbose=verbose, save_weights_only=True,
        period=period)
    
    history = model.fit_generator(generator=data_generators['training_generator'],
                        validation_data=data_generators['validation_generator'],
                        use_multiprocessing=True,
                        workers=workers,
                        epochs=epochs,
                        verbose=verbose,
                        callbacks=[early_stop, cp_callback])

    result = model.evaluate_generator(generator=data_generators['test_generator'], verbose=verbose)
    predictions = model.predict_generator(generator=data_generators['predition_generator'], verbose=verbose)
    return history, result, predictions

In [None]:
def md(directory):

    if not os.path.exists(directory):
        os.makedirs(directory)

In [None]:
# 'adapted from https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly'
import numpy as np
from models import model1, model2
# Parameters
params = {'dim': (95,95),
          'batch_size': 512,
          'n_channels': 3,
          'shuffle': True}

# Datasets
partition = create_partition()

# Generators
data_generators = {
    'training_generator': DataGenerator(partition['train'], **params),
    'validation_generator': DataGenerator(partition['validation'], **params),
    'test_generator': DataGenerator(partition['test'], **params),
    'predition_generator': PredictionDataGenerator(partition['test'])
}

model = model1.create_model()
md('checkpoints/model1')
history, result, predictions = evaluate_model(model, data_generators, 'checkpoints/model1/cp-{epoch:04d}.ckpt')




Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 00005: saving model to checkpoints/model1/cp-0005.ckpt

Consider using a TensorFlow optimizer from `tf.train`.
Epoch 6/100
Epoch 7/100

In [None]:
md('saved_models')
model.save('saved_models/model1.h5')

In [None]:
checkpoint_path = 'checkpoints/model1/cp-0045.ckpt'

In [None]:
model.load_weights(checkpoint_path)
result = model.evaluate_generator(generator=data_generators['test_generator'], verbose=1)
result

In [None]:
latest = tf.train.latest_checkpoint('checkpoints/model1')
model = model1.create_model()
model.load_weights(latest)
result = model.evaluate_generator(generator=data_generators['test_generator'], verbose=1)
result

In [None]:
model = keras.models.load_model('saved_models/model1.h5')
result = model.evaluate_generator(generator=data_generators['test_generator'], verbose=1)
result