In [None]:
# example of semi-supervised gan for mnist
import numpy as np
import os
import time

from numpy.random import randn, randint
import matplotlib.pyplot as plt

In [None]:
from keras.datasets.mnist import load_data
from keras.optimizers import Adam
from keras.models import Model
from keras.layers import Input, Dense, Reshape, Flatten, Conv2D, Conv2DTranspose
from keras.layers import LeakyReLU, Dropout, Lambda, Activation

from keras import backend as K

In [None]:
from utils import verifyDir
from utils.networks import normalize, unnormalize, plot_data

In [None]:
# os.environ["CUDA_VISIBLE_DEVICES"]="1"

### Parameters

In [None]:
x_height, x_width = [28, 28]
num_channels = 1
input_shape = (x_height, x_width, num_channels)
num_classes = 10

# learning rate
learning_rate = 2e-5
# optimizer
optimizer_grad = Adam(lr=learning_rate, beta_1=0.5)
# size of the latent space
latent_dim = 100

epochs=30
batch_size=128
labeled_rate = 1/600
labeled_samples = 60000*labeled_rate

In [None]:
LOG_PATH = f"Logs/SSGAN_MNIST/Classifier_{int(labeled_samples)}/"

In [None]:
verifyDir(LOG_PATH)

### Dataset

In [None]:
# load the images MNIST
def load_real_samples():
    # load dataset
    (trainX, trainy), (testX, testy) = load_data()
    # expand to 3d, e.g. add channels
    X_train = np.expand_dims(trainX, axis=-1)
    X_test = np.expand_dims(testX, axis=-1)
    X_test = normalize(X_test)
    X_train = normalize(X_train)
    print(X_train.shape, trainy.shape, X_test.shape, testy.shape)
    return [X_train, trainy], [X_test, testy]

### Discriminator

Custom Activation for the Unsupervised model

In [None]:
# custom activation function
def custom_activation(output):
    logexpsum = K.sum(K.exp(output), axis=-1, keepdims=True)
    result = logexpsum / (logexpsum + 1.0)
    return result

Defines a unsupersived output and supervised output.  
Both shared the same weights until the last Dense.

In [None]:
# define the standalone supervised and unsupervised discriminator models
def define_discriminator(in_shape=(28,28,1), n_classes=10):
    # image input
    in_image = Input(shape=in_shape)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # downsample
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same')(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    # flatten feature maps
    fe = Flatten()(fe)
    # dropout
    fe = Dropout(0.4)(fe)
    # output layer nodes
    fe = Dense(n_classes)(fe)
    
    # supervised output
    c_out_layer = Activation('softmax')(fe)
    # define and compile supervised discriminator model
    supervised_model = Model(in_image, c_out_layer)
    supervised_model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer_grad, metrics=['accuracy'])
    
    # unsupervised output
    d_out_layer = Lambda(custom_activation)(fe)
    # define and compile unsupervised discriminator model
    unsupervised_model = Model(in_image, d_out_layer)
    unsupervised_model.compile(loss='binary_crossentropy', optimizer=optimizer_grad)
    
    return unsupervised_model, supervised_model

### Generator

In [None]:
# define the standalone generator model
def define_generator(latent_dim):
    # image generator input
    in_lat = Input(shape=(latent_dim,))
    # foundation for 7x7 image
    n_nodes = 128 * 7 * 7
    gen = Dense(n_nodes)(in_lat)
    gen = LeakyReLU(alpha=0.2)(gen)
    gen = Reshape((7, 7, 128))(gen)
    # upsample to 14x14
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # upsample to 28x28
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
    # output
    out_layer = Conv2D(1, (7,7), activation='tanh', padding='same')(gen)
    # define model
    generator_model = Model(in_lat, out_layer)
    return generator_model

### Semi-Supervised GAN

We let the unsupervised model as not trainable, because we gonna trin the weights of the supervised model (which shares weights with the unsupervised).

In [None]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(generator_model, unsupervised_model):
    # make weights in the discriminator not trainable
    unsupervised_model.trainable = False
    # connect image output from generator as input to discriminator
    gan_output = unsupervised_model(generator_model.output)
    # define gan model as taking noise and outputting a classification
    model = Model(generator_model.input, gan_output)
    # compile model
    model.compile(loss='binary_crossentropy', optimizer=optimizer_grad)
    return model

### Selecting sub-set 

In [None]:
# select a supervised subset of the dataset, ensures classes are balanced
def select_supervised_samples(dataset, percent_samples=1.0, n_classes=10):
    X, y = dataset
    X_list, y_list = list(), list()
    #n_per_class = int(n_samples / n_classes)
    for i in range(n_classes):
        # get all images for this class
        X_with_class = X[y == i]
        # choose random instances
        ix = randint(0, len(X_with_class), int(len(X_with_class)*percent_samples))
        # add to list
        [X_list.append(X_with_class[j]) for j in ix]
        [y_list.append(i) for j in ix]
    return np.asarray(X_list), np.asarray(y_list)

### Getting Real Samples

In [None]:
# select real samples
def generate_real_samples(dataset, n_samples=100):
    # split into images and labels
    images, labels = dataset
    # choose random instances
    ix = randint(0, images.shape[0], n_samples)
    # select images and labels
    X, labels = images[ix], labels[ix]
    # generate class labels
    y = np.ones((n_samples, 1))
    return [X, labels], y

### Getting Fake Samples

In [None]:
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples=100):
    # generate points in the latent space
    z_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = z_input.reshape(n_samples, latent_dim)
    return z_input

In [None]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples=100):
    # generate points in latent space
    z_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    images = generator.predict(z_input)
    # create class labels
    y = np.zeros((n_samples, 1))
    return images, y

### Training

In [None]:
# train the generator and discriminator
def train(generator_model, unsupervised_model, supervised_model, gan_model, dataset, dataset_test, latent_dim, n_epochs=20, n_batch=100, percent_samples=1.0):
    # select supervised dataset
    X_sup, y_sup = select_supervised_samples(dataset, percent_samples=percent_samples)
    print(X_sup.shape, y_sup.shape)
    # calculate the number of batches per training epoch
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    print('n_epochs=%d, n_batch=%d, 1/2=%d, b/e=%d, steps=%d' % (n_epochs, n_batch, half_batch, bat_per_epo, n_steps))
    # manually enumerate epochs
    f_history = open(f"{LOG_PATH}SSL_GAN.csv", "w")
    f_history.write("step,generator_loss,unsupervised_real_loss,unsupervised_fake_loss,supervised_loss,supervised_acc,train_loss,test_loss,train_acc,test_acc\n")
    for step in range(1,n_steps+1):
#         t_start = time.time()
        # update supervised discriminator (c)
        [Xsup_real, ysup_real], _ = generate_real_samples([X_sup, y_sup], half_batch)
        c_loss, c_acc = supervised_model.train_on_batch(Xsup_real, ysup_real)
        
        # update unsupervised discriminator (d)
        [X_real, _], y_real = generate_real_samples(dataset, half_batch)
        d_loss1 = unsupervised_model.train_on_batch(X_real, y_real)
        
        X_fake, y_fake = generate_fake_samples(generator_model, latent_dim, half_batch)
        d_loss2 = unsupervised_model.train_on_batch(X_fake, y_fake)
        
        # update generator (g)
        X_gan, y_gan = generate_latent_points(latent_dim, n_batch), np.ones((n_batch, 1))
        g_loss = gan_model.train_on_batch(X_gan, y_gan)
#         t_total = (time.time() - t_start)
        # summarize loss on this batch
    
        # Train - Test
        X_train, y_train = dataset
        loss_train, acc_train = supervised_model.evaluate(X_train, y_train, verbose=0)

        # evaluate the test classifier model
        X_test, y_test = dataset_test
        loss_test, acc_test = supervised_model.evaluate(X_test, y_test, verbose=0)
        
        # Log
        print('step: %d | Train: G_Loss: %.3f, ' \
              'D_unsup_loss_real: %.3f, D_unsup_loss_fake: %.3f, ' \
              'D_sup_loss: %.3f, D_sup_acc: %.2f ' \
              'Train acc: %.3f Test acc: %.3f ' %(step, g_loss,
                                                d_loss1, d_loss2,
                                                c_loss, c_acc*100,
                                                 acc_train*100, acc_test*100))#, end = '\r')
        f_history.write(f"{step},{g_loss},{d_loss1},{d_loss2},{c_loss},{c_acc*100},{loss_train},{loss_test},{acc_train*100},{acc_test*100}\n")
        
        if step==1:
            plot_data(X_test, 0, "test", grid_size = [10, 10], OUT_PATH=LOG_PATH, gray=True)
        # evaluate the model performance every so often
        if (step) % (100) == 0 or step == 1:
            #summarize_performance(step, generator_model, supervised_model, latent_dim, dataset, dataset_test)
            # prepare fake examples
            X_generated, _ = generate_fake_samples(generator_model, latent_dim, n_samples=100)
            # scale from [-1,1] to [0,1]
            plot_data(X_generated, step, "generated", grid_size = [10, 10], OUT_PATH=LOG_PATH, gray=True)
            
            X_train, y_train = dataset
            _, acc = supervised_model.evaluate(X_train, y_train, verbose=0)
            print('Train Classifier Accuracy: %.3f%%\n' % (acc * 100))
            
            # evaluate the test classifier model
            X_test, y_test = dataset_test
            _, acc = supervised_model.evaluate(X_test, y_test, verbose=0)
            print('Test Classifier Accuracy: %.3f%%\n' % (acc * 100))
            # save the generator model
            filename2 = f'{LOG_PATH}generator_model_{step}.h5'
            generator_model.save(filename2)
            # save the classifier model
            filename3 = f'{LOG_PATH}supervised_model_{step}.h5'
            supervised_model.save(filename3)
            print('>Saving models Generator: %s and Supervised: %s' % (filename2, filename3))
    
    f_history.close()

In [None]:
# create the discriminator models
unsupervised_model, supervised_model = define_discriminator(in_shape=input_shape,n_classes=num_classes)
# create the generator
generator_model = define_generator(latent_dim)
# create the gan
gan_model = define_gan(generator_model, unsupervised_model)
# load image data
dataset, dataset_test = load_real_samples()
# train model
train(generator_model, unsupervised_model, supervised_model, gan_model, dataset, dataset_test, latent_dim, n_epochs=epochs, n_batch=batch_size, percent_samples=labeled_rate)

### Testing

In [None]:
dataset, dataset_test = load_real_samples()

In [None]:
X_train, y_train = dataset
_, acc = supervised_model.evaluate(X_train, y_train, verbose=0)
print('Train Classifier Accuracy: %.3f%%\n' % (acc * 100))

In [None]:
X_test, y_test = dataset_test
_, acc = supervised_model.evaluate(X_test, y_test, verbose=0)
print('Test Classifier Accuracy: %.3f%%\n' % (acc * 100))