In [86]:
import os
from PIL import Image
import glob
import time

import cv2 as cv
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow import keras

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from keras import backend
from tensorflow.keras.utils import to_categorical

In [12]:
# custom activation function
def custom_activation(output):
    logexpsum = backend.sum(backend.exp(output), axis=-1, keepdims=True)
    result = logexpsum / (logexpsum + 1.0)
    return result

In [118]:
# define the standalone supervised and unsupervised discriminator models
def define_discriminator(in_shape=(96, 96, 3), n_classes=10, dropout_rate=0.2):
    # Image input
    in_image = layers.Input(shape=in_shape)
    # Downsample
    fe = layers.Conv2D(32, (3,3), activation="relu")(in_image)
    fe = layers.BatchNormalization()(fe)
    fe = layers.MaxPooling2D((2, 2))(fe)
    fe = layers.Dropout(rate=dropout_rate)(fe)
    # Downsample
    fe = layers.Conv2D(64, (3,3), activation="relu")(fe)
    fe = layers.BatchNormalization()(fe)
    fe = layers.MaxPooling2D((2, 2))(fe)
    fe = layers.Dropout(rate=dropout_rate)(fe)
    # Downsample
    fe = layers.Conv2D(128, (3,3), activation="relu")(fe)
    fe = layers.BatchNormalization()(fe)
    fe = layers.MaxPooling2D((2, 2))(fe)
    fe = layers.Dropout(rate=dropout_rate)(fe)
    # Downsample
    fe = layers.Conv2D(256, (3,3), activation="relu")(fe)
    fe = layers.BatchNormalization()(fe)
    fe = layers.MaxPooling2D((2, 2))(fe)
    fe = layers.Dropout(rate=dropout_rate)(fe)
    # Downsample
    fe = layers.Conv2D(512, (3,3), activation="relu")(fe)
    fe = layers.BatchNormalization()(fe)
    fe = layers.MaxPooling2D((2, 2))(fe)
    fe = layers.Dropout(rate=dropout_rate)(fe)
    # Flatten feature maps
    fe = layers.Flatten()(fe)
    # Dense
    fe = layers.Dense(64, activation="relu")(fe)
    fe = layers.BatchNormalization()(fe)
    fe = layers.Dropout(rate=dropout_rate)(fe)
    # Output layer nodes
    fe = layers.Dense(n_classes)(fe)
    # supervised output
    c_out_layer = layers.Activation('softmax')(fe)
    # define and compile supervised discriminator model
    c_model = Model(in_image, c_out_layer)
    c_model.compile(loss='categorical_crossentropy', optimizer=Adam(learning_rate=0.0002, beta_1=0.5), metrics=['accuracy'])
    
    # unsupervised output
    d_out_layer = layers.Lambda(custom_activation)(fe)
    # define and compile unsupervised discriminator model
    d_model = Model(in_image, d_out_layer)
    d_model.compile(loss='binary_crossentropy', optimizer=Adam(learning_rate=0.0002, beta_1=0.5))
    
    return d_model, c_model

In [108]:
# define the standalone generator model
def define_generator(latent_dim):
    # Image generator input
    in_lat = layers.Input(shape=(latent_dim,))
    # Foundation for 6x6 image
    n_nodes = 128 * 6 * 6
    gen = layers.Dense(n_nodes)(in_lat)
    gen = layers.LeakyReLU(alpha=0.2)(gen)
    gen = layers.Reshape((6, 6, 128))(gen)
    # upsample to 12x12
    gen = layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding="same")(gen)
    gen = layers.LeakyReLU(alpha=0.2)(gen)
    # upsample to 24x24
    gen = layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding="same")(gen)
    gen = layers.LeakyReLU(alpha=0.2)(gen)
    # upsample to 48x48
    gen = layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding="same")(gen)
    gen = layers.LeakyReLU(alpha=0.2)(gen)
    # upsample to 96x96
    gen = layers.Conv2DTranspose(128, (4,4), strides=(2,2), padding="same")(gen)
    gen = layers.LeakyReLU(alpha=0.2)(gen)
    # output
    out_layer = layers.Conv2D(3, (7,7), activation='tanh', padding="same")(gen)
    # define model
    model = Model(in_lat, out_layer)
    return model

In [120]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
    d_model.trainable = False
    # connect image output from generator as input to discriminator
    gan_output = d_model(g_model.output)
    # define gan model as taking noise and outputting a classification
    model = Model(g_model.input, gan_output)
    # compile model
    opt = Adam(learning_rate=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    return model

In [16]:
def load_dataset():
    # Train set
    train_images = []
    train_labels = []
    i = 0
    for folder in glob.iglob("./dataset/train/*"):
        for file in glob.iglob(f"{folder}/*"):
            train_images.append(np.asarray(Image.open(file)))
            train_labels.append(np.asarray(i))
        i += 1
    
    # Test set
    test_images = []
    test_labels = []
    i = 0
    for folder in glob.iglob("./dataset/test/*"):
        for file in glob.iglob(f"{folder}/*"):
            test_images.append(np.asarray(Image.open(file)))
            test_labels.append(np.asarray(i))
        i += 1
        
    train_images = np.asarray(train_images)
    train_labels = to_categorical(train_labels, 10)
    
    test_images = np.asarray(test_images)
    test_labels = to_categorical(test_labels, 10)
    
    return train_images, test_images, train_labels, test_labels

In [33]:
def load_unlabeled_dataset():
    images = []
    # Unlabeled data
    for f in glob.iglob("./dataset/unlabeled/*"):
        images.append(np.asarray(Image.open(f)))
        
    images = np.array(images)
    return images

In [18]:
def normalize_images(img):
    img_normalized = cv.normalize(img, None, 0, 1.0, cv.NORM_MINMAX, dtype=cv.CV_32F)
    return img_normalized

In [62]:
# select real samples
def generate_real_samples(dataset, n_samples):
    # split into images and labels
    images, labels = dataset
    # choose random instances
    ix = np.random.randint(0, images.shape[0], n_samples)
    # select images and labels
    x, = images[ix], 
    if labels is not None:
        labels = labels[ix]
    # generate class labels
    y = np.ones((n_samples, 1))
    return x, labels, y

In [66]:
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
    # generate points in the latent space
    z_input = np.random.randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = z_input.reshape(n_samples, latent_dim)
    return z_input

In [68]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples):
    # generate points in latent space
    z_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    images = generator.predict(z_input)
    # create class labels
    y = np.zeros((n_samples, 1))
    return images, y

In [111]:
# train the generator and discriminator
def train(g_model, d_model, c_model, gan_model, x_train, y_train, x_test, y_test, unlabeled_data, latent_dim, n_epochs=10, n_batch=64):
    # calculate the number of batches per training epoch
    bat_per_epo = int(unlabeled_data.shape[0] / n_batch)
    # calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    #print('n_epochs=%d, n_batch=%d, 1/2=%d, b/e=%d, steps=%d' % (n_epochs, n_batch, half_batch, bat_per_epo, n_steps))
    # manually enumerate epochs
    for i in range(n_epochs):
        for j in range(bat_per_epo):
            # Start timer
            start_t = time.time()
            
            # update supervised discriminator (c)
            x_batch, y_batch, _ = generate_real_samples((x_train, y_train), n_batch)
            c_loss, c_acc = c_model.train_on_batch(x_batch, y_batch)
            
            # update unsupervised discriminator (d)
            x_real, _, y_real = generate_real_samples((unlabeled_data, None), half_batch)
            d_loss1 = d_model.train_on_batch(x_real, y_real)
            x_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
            d_loss2 = d_model.train_on_batch(x_fake, y_fake)
            
            # update generator (g)
            x_gan, y_gan = generate_latent_points(latent_dim, n_batch), np.ones((n_batch, 1))
            g_loss = gan_model.train_on_batch(x_gan, y_gan)
            
            # End timer
            tpb = time.time() - start_t
            print(f"batch {j} / {bat_per_epo}: tpb - {tpb:.2f}" ,end='\r')
        
        # evaluate the model performance every so often
        c_loss, c_acc = c_model.evaluate(x_train, y_train)
        c_loss_test, c_acc_test = c_model.evaluate(x_test, y_test)
        print(f">{i+1}")
        c_model.save(f"./model_weights/GAN/run3/model_{i}")

In [34]:
# Load images
x_train, x_test, y_train, y_test = load_dataset()
unlabeled_data = load_unlabeled_dataset()
# Normalize data
x_train = normalize_images(x_train)
x_test = normalize_images(x_test)
unlabeled_data = normalize_images(unlabeled_data)

In [121]:
# Size of the latent space
latent_dim = 512
# Create the discriminator models
d_model, c_model = define_discriminator(dropout_rate=0.35)
# Create the generator
g_model = define_generator(latent_dim)
# Create the gan
gan_model = define_gan(g_model, d_model)

In [None]:
# Train model
train(g_model, d_model, c_model, gan_model, x_train, y_train, x_test, y_test, unlabeled_data, latent_dim, n_epochs=50)

>1
INFO:tensorflow:Assets written to: ./model_weights/GAN/model_0\assets
>2
INFO:tensorflow:Assets written to: ./model_weights/GAN/model_1\assets
>3
INFO:tensorflow:Assets written to: ./model_weights/GAN/model_2\assets
>4
INFO:tensorflow:Assets written to: ./model_weights/GAN/model_3\assets
>5
INFO:tensorflow:Assets written to: ./model_weights/GAN/model_4\assets
batch 724 / 1562: tpb - 0.36

In [114]:
c_model.evaluate(x_test, y_test)



[2.8632307052612305, 0.4623333215713501]