In [None]:
import os
import random
import numpy as np
import imageio

from keras.layers import Input, Dense, Conv2D, BatchNormalization, MaxPooling2D, UpSampling2D, Flatten
from keras.layers import BatchNormalization, Activation, Conv2D, Conv2DTranspose, Dropout, Reshape, Concatenate
from keras.layers.advanced_activations import LeakyReLU
from keras.models import Model
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.preprocessing import image
import keras.backend as K
import h5py

import matplotlib.pyplot as plt
%matplotlib inline  

In [None]:
## Load dataset

In [None]:
def load_dataset(path):
    files = os.listdir(path)
    x = []
    for file in files: # number of files to go through
        im = imageio.imread(os.path.join(path, file))
        x.append(im)
    x = np.asarray(x)
    return x

In [None]:
path_train_orig = './Food11_256/training_256'
path_val_orig = './Food11_256/validation_256'

x_train = load_dataset(path_train_orig)
x_val = load_dataset(path_val_orig)

print('original dataset:', np.shape(x_train), np.shape(x_val))

In [None]:
path_train_date = './Food11_256/training_256_datestamp'
path_val_date = './Food11_256/validation_256_datestamp'

x_train_date = load_dataset(path_train_date)
x_val_date = load_dataset(path_val_date)

print('datestamped dataset:', np.shape(x_train_date), np.shape(x_val_date))

In [None]:
x_train = x_train.astype('float32') / 255.0
x_val  = x_val.astype('float32') / 255.0
x_train_date = x_train_date.astype('float32') / 255.0
x_val_date  = x_val_date.astype('float32') / 255.0

In [None]:
## Convolutional autoencoder with discriminator

In [None]:
def get_autoencoder(input_layer):

    x = Conv2D(3, (3, 3), padding='same')(input_layer)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(48, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    encoded = MaxPooling2D((2, 2), padding='same')(x)

    x = Conv2D(48, (3, 3), padding='same')(encoded)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(96, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(3, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)
    decoded = Activation('sigmoid')(x)

    model = Model(input_layer, decoded)
    
    #model.summary()

    return model

In [None]:
def get_discriminator(input_layer):

    hid = Conv2D(16, kernel_size=3, strides=1, padding='same')(input_layer)
    hid = BatchNormalization(momentum=0.9)(hid)
    hid = LeakyReLU(alpha=0.1)(hid)

    hid = Conv2D(16, kernel_size=2, strides=2, padding='same')(hid)
    hid = BatchNormalization(momentum=0.9)(hid)
    hid = LeakyReLU(alpha=0.1)(hid)

    hid = Conv2D(16, kernel_size=2, strides=2, padding='same')(hid)
    hid = BatchNormalization(momentum=0.9)(hid)
    hid = LeakyReLU(alpha=0.1)(hid)

    hid = Conv2D(16, kernel_size=2, strides=2, padding='same')(hid)
    hid = BatchNormalization(momentum=0.9)(hid)
    hid = LeakyReLU(alpha=0.1)(hid)

    hid = Conv2D(16, kernel_size=2, strides=2, padding='same')(hid)
    hid = BatchNormalization(momentum=0.9)(hid)
    hid = LeakyReLU(alpha=0.1)(hid)
    
    hid = Flatten()(hid)
    hid = Dropout(0.4)(hid)
    classifier = Dense(1, activation='sigmoid')(hid)

    model = Model(input_layer, classifier)

    model.summary()

    return model

In [None]:
in_shape = (x_train.shape[1:])
input_img = Input(shape=in_shape)

discriminator = get_discriminator(input_img)
discriminator.compile(optimizer=Adam(0.0002, 0.5), loss='binary_crossentropy', metrics=['accuracy'])
discriminator.trainable = False

autoencoder = get_autoencoder(input_img)
x = autoencoder(input_img)
output_img = discriminator(autoencoder(input_img))
gan = Model(input_img, output_img)

gan.summary()
gan.compile(optimizer=Adam(0.0002, 0.5), loss='binary_crossentropy')

In [None]:
x_training = np.concatenate((x_train, x_val))
x_training_date = np.concatenate((x_train_date, x_val_date))

In [None]:
epochs = 100
batch_size = 8
num_batches = int(x_training.shape[0]/batch_size)

for epoch in range(epochs):

    cum_d_loss = 0.
    cum_g_loss = 0.

    for batch_idx in range(num_batches): #num_batches
        images = x_training[batch_idx*batch_size : (batch_idx+1)*batch_size]
        images_date = x_training_date[batch_idx*batch_size : (batch_idx+1)*batch_size]
        generated_images = autoencoder.predict(images_date)
        #show_samples(images_date,generated_images, 8)

        # real data
        true_labels = np.zeros((batch_size, 1)) + np.random.uniform(low=0.0, high=0.1, size=(batch_size, 1))
        d_loss_true = discriminator.train_on_batch(images, true_labels)

        # generated data
        ae_labels = np.ones((batch_size, 1)) - np.random.uniform(low=0.0, high=0.1, size=(batch_size, 1))
        d_loss_ae = discriminator.train_on_batch(generated_images, ae_labels)

        d_loss = 0.5 * np.add(d_loss_true, d_loss_ae)
        cum_d_loss += d_loss
        
        # Train generator
        g_loss = gan.train_on_batch(images_date, np.zeros((batch_size, 1)))
        cum_g_loss += g_loss

    print('  Epoch: {}, Generator Loss: {}, Discriminator Loss: {}'.format(epoch+1, cum_g_loss/num_batches, cum_d_loss/num_batches))

In [None]:
## Show original and reconstructed image

In [None]:
path_test = './Food11_256/evaluation_256_datestamp'
x_test = load_dataset(path_test)
x_test = x_test.astype('float32') / 255.0
print(np.shape(x_test))

In [None]:
# show original image and reconstructed image
def show_samples(orig, dec, num=9):
    n = num
    plt.figure(figsize=(23, 5))

    for i in range(n):
        # display original
        ax = plt.subplot(2, n, i+1)
        plt.imshow(orig[i].reshape(in_shape))
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # display reconstruction
        ax = plt.subplot(2, n, i +1 + n)
        plt.imshow(dec[i].reshape(in_shape))
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
    plt.show()

In [None]:
def random_subset( iterator, K ):
    result = []
    N = 0

    for item in iterator:
        N += 1
        if len( result ) < K:
            result.append( item )
        else:
            s = int(random.random() * N)
            if s < K:
                result[ s ] = item

    return result

In [None]:
n = 9
samples = np.asarray(random_subset(x_test, n))
predictions = autoencoder.predict(samples)
show_samples(samples,predictions,n)