<a href="https://colab.research.google.com/github/AlexanderCardarasUCSC/EEG-Decoding-to-Images/blob/main/CSE_247_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Complete the following steps before running the cells below**

1.
Change runtime to GPU 

>Runtime > Change runtime type > GPU

2.
For each of the following link, add a Google Drive shortcut to your Drive root folder(My Drive):

>https://drive.google.com/drive/folders/1z8wcew5R7GCeu5s7SSsWw8bUcx1urVmk?usp=sharing

# Load data - PRP

In [None]:
# !pip -q install keras
!pip -q install google-cloud-storage
!pip -q install google-api-python-client oauth2client
!pip -q install tensorflow-addons
!pip -q install opencv-python

In [None]:
from apiclient import discovery
from httplib2 import Http
import oauth2client
from oauth2client import file, client, tools

obj = lambda: None
lmao = {"auth_host_name":'localhost', 'noauth_local_webserver':'store_true', 
        'auth_host_port':[8080, 8090], 'logging_level':'ERROR'}
for k, v in lmao.items():
    setattr(obj, k, v)
    
# authorization boilerplate code
SCOPES = 'https://www.googleapis.com/auth/drive.readonly'
store = file.Storage('token.json')
creds = store.get()
# The following will give you a link if token.json does not exist, the link allows the
# user to give this app permission
if not creds or creds.invalid:
    flow = client.flow_from_clientsecrets('client_secret.json', SCOPES)
    creds = tools.run_flow(flow, store, obj)

In [None]:
import io
from googleapiclient.http import MediaIoBaseDownload

DRIVE = discovery.build('drive', 'v3', http=creds.authorize(Http()))

def download_file(file_id, filename):
    # if you get the shareable link, the link contains this id, replace the file_id below
    request = DRIVE.files().get_media(fileId=file_id)
    # replace the filename and extension in the first field below
    fh = io.FileIO(filename, mode='w')
    downloader = MediaIoBaseDownload(fh, request)
    done = False
    while done is False:
        status, done = downloader.next_chunk()
        print("Download %d%%." % int(status.progress() * 100))

In [None]:
# Download Image data
!mkdir -p data/images
!wget -q https://raw.githubusercontent.com/AlexanderCardarasUCSC/EEG-Decoding-to-Images/main/image_utils.py 
download_file("1-2WFgtBn5WdZCnDlJkwaswTTrRtVNrS0", "images.npy")
download_file("1Lijm9A5Kq4EMub5jO0Bzz3c2VJLm-WH6", "image_labels.npy")

!sudo mv images.npy data/images
!sudo mv image_labels.npy data/images

# Load data - Colab

**Mount drive**

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")

*55-95Hz*

In [None]:
!mkdir -p /content/data/eeg
!cp /content/gdrive/MyDrive/EEG2Image/eeg_54_95_std.pth /content/data/eeg/
eeg_path = "/content/data/eeg/eeg_54_95_std.pth"

#### Load image data

In [None]:
!mkdir -p /content/images
!wget -q https://raw.githubusercontent.com/AlexanderCardarasUCSC/EEG-Decoding-to-Images/main/image_utils.py /content
!cp /content/gdrive/MyDrive/EEG2Image/data/images.npy /content/images
!cp /content/gdrive/MyDrive/EEG2Image/data/image_labels.npy /content/images

# 28x28

In [None]:
!mkdir images

In [None]:
import glob
import imageio
import matplotlib.pyplot as plt
import numpy as np
import os
import PIL
import tensorflow as tf
from tensorflow.keras import layers
import time
from PIL import Image
import math
import image_utils
from tensorflow_addons.layers import SpectralNormalization
from tensorflow.keras.utils import plot_model
import cv2
from IPython import display

In [None]:
def discriminator_loss(real_output, fake_output):
    real_loss = cross_entropy(tf.ones_like(real_output), real_output)
    fake_loss = cross_entropy(tf.zeros_like(fake_output), fake_output)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    return cross_entropy(tf.ones_like(fake_output), fake_output)

# Notice the use of `tf.function`
# This annotation causes the function to be "compiled".
@tf.function
def train_step(images):
    noise = tf.random.normal([BATCH_SIZE, noise_dim])

    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        generated_images = generator(noise, training=True)

        real_output = discriminator(images, training=True)
        fake_output = discriminator(generated_images, training=True)

        gen_loss = generator_loss(fake_output)
        disc_loss = discriminator_loss(real_output, fake_output)
        
#         losses.append((gen_loss, disc_loss))

    gradients_of_generator = gen_tape.gradient(gen_loss, generator.trainable_variables)
    gradients_of_discriminator = disc_tape.gradient(disc_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(gradients_of_generator, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(gradients_of_discriminator, discriminator.trainable_variables))

    return gen_loss, disc_loss
    
def train(dataset, epochs, losses):
    for epoch in range(epochs):
        gen_losses = []
        disc_losses = []
        start = time.time()

        for image_batch in dataset:
            gen_loss, disc_loss = train_step(image_batch)
            
            gen_losses.append(gen_loss.numpy())
            disc_losses.append(disc_loss.numpy())
        
        losses.append([sum(gen_losses)/len(gen_losses), sum(disc_losses)/len(disc_losses)])
        # Produce images for the GIF as you go
        display.clear_output(wait=True)
        generate_and_save_images(generator, epoch + 1, seed)

        # Save the model every 15 epochs
        if (epoch + 1) % 15 == 0:
            checkpoint.save(file_prefix = checkpoint_prefix)

        print ('Time for epoch {} is {} sec'.format(epoch + 1, time.time()-start))

    # Generate after the final epoch
    display.clear_output(wait=True)
    generate_and_save_images(generator, epochs, seed)

def generate_and_save_images(model, epoch, test_input):
    # Notice `training` is set to False.
    # This is so all layers run in inference mode (batchnorm).
    predictions = generator(seed, training=False)
    predictions = ((predictions + 1)/2.0*255).numpy().astype(int)

    # plot the result
    f, axs = plt.subplots(4,4,figsize=(8,8))
    # plot images
    for i in range(16):
        axs[i//4,i%4].imshow(predictions[i])
        axs[i//4,i%4].axis("off")

    plt.savefig('images/image_at_epoch_{:04d}.png'.format(epoch))
    plt.show()

    
def rgb2gray(rgb):
    return np.dot(rgb[...,:3], [0.299, 0.587, 0.144])

def resize_images(images, size=(28,28)):
    temp = []
    for img in images:
        temp.append(np.array(Image.fromarray(img).resize(size, Image.ANTIALIAS)))
    return np.asarray(temp)

def augment_images(images):
    augmented = []
    for image in images:
        flipped = tf.image.flip_left_right(image)
        
        augmented.append(image)
        augmented.append(flipped)
        
    return np.asarray(augmented)

def trim_classes(images, labels, max_classes=10, images_per_class=900):
    return images[0:images_per_class*max_classes], labels[0:images_per_class*max_classes]

# load and prepare imnet64 training images
def load_real_samples(n_classes=1):
    # load imagenet64 dataset
    images, labels = image_utils.load_images(path_root="")
    images = augment_images(images)
#     images = resize_images(images)
#     images = rgb2gray(images)
    images, labels = trim_classes(images, labels, n_classes,images_per_class=1800)
    (trainX, trainY), (_, _) = image_utils.split_data(images, labels, train=1)
    

    # convert from unsigned ints to floats
    X = trainX.astype('float32')
    
    # scale from [0,255] to [-1,1]
    X = (X - 127.5) / 127.5
    print(X.shape, trainY.shape)
    return [X, trainY]

# Display a single image using the epoch number
def display_image(epoch_no):
    return PIL.Image.open('images/image_at_epoch_{:04d}.png'.format(epoch_no))

In [None]:
def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(16*16*256, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((16, 16, 256)))
    assert model.output_shape == (None, 16, 16, 256)  # Note: None is the batch size

    model.add(layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    assert model.output_shape == (None, 16, 16, 128)

    model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())
    assert model.output_shape == (None, 32, 32, 64)

    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
    assert model.output_shape == (None, 64, 64, 3)

    return model


def make_discriminator_model():
    model = tf.keras.Sequential()

    model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same', input_shape=[64, 64, 3]))
    
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    return model

In [None]:
losses = []
n_classes = 1
# BUFFER_SIZE = 60000
BUFFER_SIZE = 1800*n_classes
# BATCH_SIZE = 256
BATCH_SIZE = 16
EPOCHS = 1000
noise_dim = 100
num_examples_to_generate = 16

train_images = load_real_samples(n_classes=n_classes)[0]

# Batch and shuffle the data
train_dataset = tf.data.Dataset.from_tensor_slices(train_images).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

# You will reuse this seed overtime (so it's easier)
# to visualize progress in the animated GIF)
seed = tf.random.normal([num_examples_to_generate, noise_dim])

generator = make_generator_model()
# noise = tf.random.normal([1, 100])
# generated_image = generator(noise, training=False)

discriminator = make_discriminator_model()
# decision = discriminator(generated_image)

generator_optimizer = tf.keras.optimizers.Adam(1e-4)
discriminator_optimizer = tf.keras.optimizers.Adam(1e-4)

checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 generator=generator,
                                 discriminator=discriminator)

# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

train(train_dataset, EPOCHS, losses)

In [None]:
anim_file = 'dcgan-64x64-1800-1-class-SN_DG.gif'

with imageio.get_writer(anim_file, mode='I') as writer:
    filenames = glob.glob('images/image*.png')
    filenames = sorted(filenames)
    for filename in filenames:
        image = imageio.imread(filename)
        writer.append_data(image)

In [None]:
pic = None
# for i in range(0,5):
for i in range(5,10):
    img = cv2.imread("images/image_at_epoch_%.4d"%((i+1)*100)+".png")[64:-64,30:-30]
    
    if pic is None:
        pic = img
    else:
        pic = np.hstack((pic,img))
    print(pic.shape)

cv2.imwrite("test.png",pic)

In [None]:
plt.title("Generator Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.plot([i for i in range(len(losses))], np.array(losses)[:,0], color="green")
plt.show()
plt.savefig("Generator_Loss.png")

In [None]:
plt.title("Discriminator Loss")
plt.xlabel("Epochs")
plt.ylabel("Loss")
plt.plot([i for i in range(len(losses))], np.array(losses)[:,1], color="blue")
plt.show()
plt.savefig("Discriminator_Loss.png")

### Spectral Norm

In [None]:
def make_generator_model():
    model = tf.keras.Sequential()
    model.add(layers.Dense(16*16*256, use_bias=False, input_shape=(100,)))
    model.add(layers.BatchNormalization())
    model.add(layers.LeakyReLU())

    model.add(layers.Reshape((16, 16, 256)))

    model.add(SpectralNormalization(layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same',
                                                           use_bias=False)))
    model.add(layers.LeakyReLU())

    model.add(SpectralNormalization(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', 
                                                           use_bias=False)))
    model.add(layers.LeakyReLU())

    model.add(layers.Conv2DTranspose(3, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))

    return model

def make_discriminator_model():
    model = tf.keras.Sequential()

    model.add(SpectralNormalization(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same',
                                                  input_shape=[64, 64, 3])))
    
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(SpectralNormalization(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same')))
    model.add(layers.LeakyReLU())
    model.add(layers.Dropout(0.3))

    model.add(layers.Flatten())
    model.add(layers.Dense(1))

    return model

# SA GAN

### Jason's AC-GAN 

In [None]:
!mkdir images

In [None]:
# example of fitting an auxiliary classifier gan (ac-gan) on fashion mnsit
from numpy import zeros
from numpy import ones
from numpy import expand_dims
from numpy.random import randn
from numpy.random import randint
from tensorflow.keras.datasets.fashion_mnist import load_data
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv1D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Activation
from tensorflow.keras.layers import Concatenate
from tensorflow_addons.layers import SpectralNormalization
from tensorflow.keras.initializers import RandomNormal
from matplotlib import pyplot
import numpy as np
import matplotlib.pyplot as plt
import image_utils

**Discriminator architecture**

In [None]:
# define the standalone discriminator model
def define_discriminatorv1(in_shape=(64,64,3), n_classes=10):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # image input
    in_image = Input(shape=in_shape)
    # downsample to 14x14
    fe = Conv2D(32, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)
    fe = Dropout(0.5)(fe)
    # normal
    fe = Conv2D(64, (3,3), padding='same', kernel_initializer=init)(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    fe = Dropout(0.5)(fe)
    # downsample to 7x7
    fe = Conv2D(128, (3,3), strides=(2,2), padding='same', kernel_initializer=init)(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU( alpha=0.2)(fe)
    fe = Dropout(0.5)(fe)
    # normal
    fe = Conv2D(256, (3,3), padding='same', kernel_initializer=init)(fe)
    fe = BatchNormalization()(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    fe = Dropout(0.5)(fe)
    # flatten feature maps
    fe = Flatten()(fe)
    # real/fake output
    out1 = Dense(1, activation='sigmoid')(fe)
    # class label output
    out2 = Dense(n_classes, activation='softmax')(fe)
    # define model
    model = Model(in_image, [out1, out2])
    # compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'sparse_categorical_crossentropy'], optimizer=opt)
    return model

# # define the standalone discriminator model
# def define_discriminator(in_shape=(64,64,3)):
#     model = Sequential()
#     # normal
#     model.add(Conv2D(128, (4,4), padding='same', input_shape=in_shape))
#     model.add(LeakyReLU(alpha=0.2))
#     # downsample
#     model.add(Conv2D(256, (4,4), strides=(2,2), padding='same'))
#     model.add(LeakyReLU(alpha=0.2))
#     # downsample
#     model.add(Conv2D(256, (4,4), strides=(2,2), padding='same'))
#     model.add(LeakyReLU(alpha=0.2))
#     # downsample
#     model.add(Conv2D(512, (4,4), strides=(2,2), padding='same'))
#     model.add(LeakyReLU(alpha=0.2))
#     # classifier
#     model.add(Flatten())
#     model.add(Dropout(0.4))
#     model.add(Dense(1, activation='sigmoid'))
#     # compile model
#     opt = Adam(lr=0.00005, beta_1=0.5)
#     model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
#     return model


# define the standalone discriminator model
def define_discriminator(in_shape, n_classes):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    
    # image input
    in_image = Input(shape=in_shape)
    
    # normal
    fe = SpectralNormalization(Conv2D(128, (4,4), padding='same', kernel_initializer=init))(in_image)
    fe = LeakyReLU(alpha=0.2)(fe)

    # downsample to 32x32
    fe = SpectralNormalization(Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init))(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    
    # downsample to 16x16
    fe = SpectralNormalization(Conv2D(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init))(fe)
    fe = LeakyReLU( alpha=0.2)(fe)
        
    # downsample to 8x8
    fe = SpectralNormalization(Conv2D(512, (4,4), strides=(2,2), padding='same', kernel_initializer=init))(fe)
    fe = LeakyReLU(alpha=0.2)(fe)
    

    # flatten feature maps
    fe = Flatten()(fe)
    fe = Dropout(0.4)(fe)
    
    # real/fake output
    out1 = Dense(1, activation='sigmoid')(fe)
    
    # class label output
    out2 = Dense(n_classes, activation='softmax')(fe)
    
    # define model
    model = Model(in_image, [out1, out2])
   
    # compile model
    opt = Adam(lr=0.00005, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'sparse_categorical_crossentropy'], optimizer=opt)
    return model

def wasserstein_loss(y_true, y_pred):
    return K.mean(y_true*y_pred)


**Generator architecture**

In [None]:
# define the standalone generator model
def define_generatorv1(latent_dim, n_classes=10):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    # label input
    in_label = Input(shape=(1,))
    # embedding for categorical input
    # li = Embedding(n_classes, 50)(in_label)
    li = Embedding(n_classes, 50)(in_label)
    # linear multiplication
    # n_nodes = 7 * 7
    n_nodes = 16 * 16 * 3
    li = Dense(n_nodes, kernel_initializer=init)(li)
    # reshape to additional channel
    # li = Reshape((7, 7, 1))(li)
    li = Reshape((16, 16, 3))(li)
    # image generator input
    in_lat = Input(shape=(latent_dim,))
    # foundation for 7x7 image
    # n_nodes = 384 * 7 * 7
    n_nodes = 384 * 16 * 16
    gen = Dense(n_nodes, kernel_initializer=init)(in_lat)
    gen = Activation('relu')(gen)
    # gen = Reshape((7, 7, 384))(gen)
    gen = Reshape((16, 16, 384))(gen)
    # merge image gen and label input
    merge = Concatenate()([gen, li])
    # upsample to 14x14
    gen = Conv2DTranspose(192, (5,5), strides=(2,2), padding='same', kernel_initializer=init)(merge)
    gen = BatchNormalization()(gen)
    gen = Activation('relu')(gen)
    # upsample to 28x28
    # gen = Conv2DTranspose(1, (5,5), strides=(2,2), padding='same', kernel_initializer=init)(gen)
    gen = Conv2DTranspose(3, (5,5), strides=(2,2), padding='same', kernel_initializer=init)(gen)
    out_layer = Activation('tanh')(gen)
    # define model
    model = Model([in_lat, in_label], out_layer)
    return model

# define the standalone generator model
def define_generator(latent_dim, n_classes):
    # weight initialization
    init = RandomNormal(stddev=0.02)
    
    # label input
    in_label = Input(shape=(1,))
    
    # embedding for categorical input
    li = Embedding(n_classes, 50)(in_label)
    
    # linear multiplication
    n_nodes = 4 * 4 * 3
    li = Dense(n_nodes, kernel_initializer=init)(li)
    
    # reshape to additional channel
    li = Reshape((4, 4, 3))(li)
    
    # image generator input
    in_lat = Input(shape=(latent_dim,)
                  )
    # foundation for 4x4 image
    n_nodes = 256 * 4*4
    lat = Dense(n_nodes, kernel_initializer=init)(in_lat)
    lat = Reshape((4, 4, 256))(lat)
    
    # merge image gen and label input
    merge = Concatenate()([lat, li])
    
    # upsample to 8x8
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(merge)
    gen = LeakyReLU(alpha=0.2)(gen)
#     gen = BatchNormalization(momentum=0.3)(gen)
#     gen = Dropout(0.5)(gen)
    
    # upsample to 16x16
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
#     gen = BatchNormalization(momentum=0.3)(gen)

    # upsample to 32x32
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
#     gen = BatchNormalization(momentum=0.3)(gen)
#     gen = Dropout(0.5)(gen)
    
    # upsample to 64x64
    gen = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same', kernel_initializer=init)(gen)
    gen = LeakyReLU(alpha=0.2)(gen)
#     gen = BatchNormalization(momentum=0.3)(gen)
        
    gen = Conv2D(3, (3,3), padding='same', kernel_initializer=init)(gen)
    out_layer = Activation('tanh')(gen)
    # define model
    model = Model([in_lat, in_label], out_layer)
    return model

**GAN architecture**

In [None]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
    for layer in d_model.layers:
        if not isinstance(layer, BatchNormalization):
            layer.trainable = False
    # connect the outputs of the generator to the inputs of the discriminator
    gan_output = d_model(g_model.output)
    # define gan model as taking noise and label and outputting real/fake and label outputs
    model = Model(g_model.input, gan_output)
    # compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', 'sparse_categorical_crossentropy'], optimizer=opt)
    return model


**Helper functions**

In [None]:
def trim_classes(images, labels, max_classes=10):
    return images[0:900*max_classes], labels[0:900*max_classes]

# load images
def load_real_samples(n_classes):
    # load dataset
    images, labels = image_utils.load_images(path_root="")
    images, labels = trim_classes(images, labels, n_classes)
    (trainX, trainY), (testX, testY) = image_utils.split_data(images, labels, train=1)
    # expand to 3d, e.g. add channels
    X = expand_dims(trainX, axis=-1)
    # convert from ints to floats
    X = X.astype('float32')
    # scale from [0,255] to [-1,1]
    X = (X - 127.5) / 127.5
    print(X.shape, trainY.shape)
    return [X, trainY]
 
# select real samples
def generate_real_samples(dataset, n_samples):
    # split into images and labels
    images, labels = dataset
    # choose random instances
    ix = randint(0, images.shape[0], n_samples)
    # select images and labels
    X, labels = images[ix], labels[ix]
    # generate class labels
    y = ones((n_samples, 1))
    return [X, labels], y
 
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples, n_classes):
    # generate points in the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = x_input.reshape(n_samples, latent_dim)
    # generate labels
    labels = randint(0, n_classes, n_samples)
    return [z_input, labels]
 
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples):
    # generate points in latent space
    z_input, labels_input = generate_latent_points(latent_dim, n_samples, n_classes)
    # predict outputs
    images = generator.predict([z_input, labels_input])
    # create class labels
    y = zeros((n_samples, 1))
    return [images, labels_input], y

def generate_fake_class_sample(generator, class_label, latent_dim, n_samples):
    # generate points in the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    z_input = x_input.reshape(n_samples, latent_dim)
    labels_input = np.asarray([class_label for i in range(n_samples)])
    # predict outputs
    images = generator.predict([z_input, labels_input])
    # create class labels
    y = zeros((n_samples, 1))
    return [images, labels_input], y

 
# create and save a plot of generated images
def save_plot(examples, epoch, n=5):
    # scale from [-1,1] to [0,1]
    examples = (examples + 1) / 2.0
    # plot the result
    f, axs = pyplot.subplots(n,n,figsize=((n*n)-1,(n*n)-1))
    # plot images
    for i in range(n*n):
        axs[i//n,i%n].imshow(examples[i])
        axs[i//n,i%n].axis("off")
    # save plot to file
    filename = 'images/generated_plot_e%03d.png' % (epoch+1)
    pyplot.savefig(filename)
    pyplot.close()

# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, latent_dim, n_samples=100):
    # prepare fake examples
    [X, _], _ = generate_fake_samples(g_model, latent_dim, n_samples)
    save_plot(X, step)
    print('>Saved')
    
# train the generator and discriminator
def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=10, n_batch=64, n_classes=10):
    # calculate the number of batches per training epoch
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    # calculate the number of training iterations
    n_steps = bat_per_epo * n_epochs
    # calculate the size of half a batch of samples
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(n_steps):
        # get randomly selected 'real' samples
        [X_real, labels_real], y_real = generate_real_samples(dataset, half_batch)
        # update discriminator model weights
        _,d_r1,d_r2 = d_model.train_on_batch(X_real, [y_real, labels_real])
        # generate 'fake' examples
        [X_fake, labels_fake], y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
        # update discriminator model weights
        _,d_f,d_f2 = d_model.train_on_batch(X_fake, [y_fake, labels_fake])
        # prepare points in latent space as input for the generator
        [z_input, z_labels] = generate_latent_points(latent_dim, n_batch, n_classes)
        # create inverted labels for the fake samples
        y_gan = ones((n_batch, 1))
        # update the generator via the discriminator's error
        _,g_1,g_2 = gan_model.train_on_batch([z_input, z_labels], [y_gan, z_labels])
        # summarize loss on this batch
        print('>%d, dr[%.3f,%.3f], df[%.3f,%.3f], g[%.3f,%.3f]' % ((i//bat_per_epo)+1, d_r1,d_r2, d_f,d_f2, g_1,g_2))
        # evaluate the model performance every 'epoch'
        if (i+1) % (bat_per_epo * 10) == 0:
            summarize_performance(i//(bat_per_epo), g_model, latent_dim)

**Train GAN**

In [None]:
# size of the latent space
# latent_dim = 100
latent_dim = 100
n_classes=10
n_epochs = 5000
# create the discriminator
discriminator = define_discriminator((64,64,3), n_classes)
discriminator.summary()
# create the generator
generator = define_generator(latent_dim, n_classes)
generator.summary()
# create the gan
gan_model = define_gan(generator, discriminator)
gan_model.summary()
# load image data
dataset = load_real_samples(n_classes)
# train model
train(generator, discriminator, gan_model, dataset, latent_dim, n_epochs=n_epochs, n_classes=n_classes)

In [None]:
[X, _], _ = generate_fake_class_sample(generator, class_label=0, latent_dim=latent_dim, n_samples=16)
# scale from [-1,1] to [0,1]
X = (X + 1) / 2.0

f, axs = plt.subplots(4,4,figsize=(15,15))
# plot images
for i in range(16):
    axs[i//4,i%4].imshow(X[i])
    axs[i//4,i%4].axis("off")
f.show()


In [None]:
from keras.utils.vis_utils import plot_model
plot_model(generator, to_file='generator_plot.png', show_shapes=True, show_layer_names=True)
plot_model(discriminator, to_file='discriminator_plot.png', show_shapes=True, show_layer_names=True)

# Jason RGB GAN + DC GAN + GAN Heuristics

Used

https://machinelearningmastery.com/how-to-develop-a-generative-adversarial-network-for-a-cifar-10-small-object-photographs-from-scratch/

https://machinelearningmastery.com/how-to-code-generative-adversarial-network-hacks/

https://towardsdatascience.com/gan-ways-to-improve-gan-performance-acf37f9f59b

More Details

https://arxiv.org/pdf/1511.06434.pdf

https://sthalles.github.io/advanced_gans/

**Create dir to store images**

In [None]:
!mkdir images

**GAN**

In [None]:
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import vstack
from numpy.random import randn
from numpy.random import randint
from keras.datasets.cifar10 import load_data
from keras.optimizers import Adam
from keras.models import Sequential
from keras.models import load_model
from keras.models import save_model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import UpSampling2D
from keras.layers import ReLU
from keras.layers import LeakyReLU
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.initializers import RandomNormal
import keras.backend as K
from matplotlib import pyplot
import image_utils
from PIL import Image
import numpy as np
from numpy.random import random
import tensorflow as tf
import pickle

def conv2d( x, filters, shape=(4, 4)) :
    '''
    I don't want to write lengthy parameters so I made a short hand function.
    '''
    x.add(Conv2D( filters, shape, strides=(2, 2), padding='same',
               kernel_initializer=RandomNormal(stddev=0.02)))
    #x.add(MaxPooling2D())
    x.add(BatchNormalization(momentum = 0.3))
    x.add(LeakyReLU(alpha=0.2))
    return x
    
def deconv2d( x, filters, shape=(4, 4) ) :
    '''
    Conv2DTransposed gives me checkerboard artifact...
    Select one of the 3.
    '''
    # Simpe Conv2DTranspose
    # Not good, compared to upsample + conv2d below.
    x.add(Conv2DTranspose( filters, shape, padding='same',
        strides=(2, 2), kernel_initializer=RandomNormal(stddev=0.02)))

    # simple and works
#     x.add(UpSampling2D( (2, 2) ))
#     x.add(Conv2D( filters, shape, padding='same' ))

    # Bilinear2x... Not sure if it is without bug, not tested yet.
    # Tend to make output blurry though
    #x.add(bilinear2x( x, filters ))
    #x.add(Conv2D( filters, shape, padding='same' ))

    x.add(BatchNormalization(momentum=0.3))
    x.add(LeakyReLU(alpha=0.2))
    return x
    

# define the standalone discriminator model
def define_discriminatorv1(in_shape=(64,64,3)):
    model = Sequential()
    # normal
    model.add(Conv2D(128, (4,4), padding='same', input_shape=in_shape))
    model.add(LeakyReLU(alpha=0.2))
    # downsample
    model.add(Conv2D(256, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # downsample
    model.add(Conv2D(256, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # downsample
    model.add(Conv2D(512, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # classifier
    model.add(Flatten())
    model.add(Dropout(0.4))
    model.add(Dense(1, activation='sigmoid'))
    # compile model
    opt = Adam(lr=0.00005, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    return model

# define the standalone discriminator model
def define_discriminator(in_shape=(64,64,3)):
    model = Sequential()
    # normal
    model.add(Conv2D(8, (4,4), padding='same', input_shape=in_shape))
    model.add(LeakyReLU(alpha=0.2))
    # downsample
    conv2d(model, 16)
    # downsample
    conv2d(model, 32)
    # downsample
    conv2d(model, 64)
    # classifier
    model.add(Flatten())
    model.add(Dense(1, activation='sigmoid'))
    # compile model
    opt = Adam(lr=0.0001, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    return model

# define the standalone generator model
def define_generatorv1(latent_dim):
    model = Sequential()
    # foundation for 4x4 image
    n_nodes = 256 * 4 * 4
    model.add(Dense(n_nodes, input_dim=latent_dim))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Reshape((4, 4, 256)))
    # upsample to 8x8
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # upsample to 16x16
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # upsample to 32x32
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # upsample to 64x64
    model.add(Conv2DTranspose(128, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # output layer
    model.add(Conv2D(3, (3,3), activation='tanh', padding='same'))
    return model

# define the standalone generator model
def define_generator(latent_dim):
    model = Sequential()
    # foundation for 4x4 image
    n_nodes = 64 * 8 * 8
    model.add(Dense(n_nodes, input_dim=latent_dim))
    model.add(LeakyReLU(alpha=0.2))
    model.add(Reshape((8, 8, 64)))
#     upsample to 8x8
#     model = deconv2d(model, 32)
    # upsample to 16x16
    model = deconv2d(model, 32)
    # upsample to 32x32
    model = deconv2d(model, 16)
    # upsample to 64x64
    model.add(Conv2DTranspose(8, (4,4), strides=(2,2), padding='same'))
    model.add(LeakyReLU(alpha=0.2))
    # output layer
    model.add(Conv2D(3, (3,3), activation='tanh', padding='same'))
    return model


# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
#     d_model.trainable = False
    # make weights in the discriminator not trainable
    for layer in d_model.layers:
        if not isinstance(layer, BatchNormalization):
            layer.trainable = False
    # connect them
    model = Sequential()
    # add generator
    model.add(g_model)
    # add the discriminator
    model.add(d_model)
    # compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt)
    d_model.trainable = True
    return model

def trim_classes(images, labels, n_classes):
    return images[0:900*n_classes], labels[0:900*n_classes]

# load and prepare imnet64 training images
def load_real_samples(n_classes=1):
    # load imagenet64 dataset
    images, labels = image_utils.load_images(path_root="")
    images, labels = trim_classes(images, labels, n_classes)
    (trainX, _), (_, _) = image_utils.split_data(images, labels, train=1)

    # convert from unsigned ints to floats
    X = trainX.astype('float32')
    # scale from [0,255] to [-1,1]
    X = (X - 127.5) / 127.5
    return X

# select real samples
def generate_real_samples(dataset, n_samples):
    # choose random instances
    ix = randint(0, dataset.shape[0], n_samples)
    # retrieve selected images
    X = dataset[ix]
    # generate 'real' class labels (1)
    y = ones((n_samples, 1))

    return X, y
 
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
    # generate points in the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n_samples, latent_dim)
    return x_input
 
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(g_model, latent_dim, n_samples):
    # generate points in latent space
    x_input = generate_latent_points(latent_dim, n_samples)
    # predict outputs
    X = g_model.predict(x_input)
    # create 'fake' class labels (0)
    y = zeros((n_samples, 1))
    return X, y
 
# create and save a plot of generated images
def save_plot(examples, epoch, n=6):
    # scale from [-1,1] to [0,1]
    examples = (examples + 1) / 2.0
    # plot the result
    f, axs = pyplot.subplots(n,n,figsize=((n*n)-1,(n*n)-1))
    # plot images
    for i in range(n*n):
        axs[i//n,i%n].imshow(examples[i])
        axs[i//n,i%n].axis("off")
    # save plot to file
    filename = 'images/generated_plot_e%03d.png' % (epoch+1)
    pyplot.savefig(filename)
    pyplot.close()
    
# evaluate the discriminator, plot generated images, save generator model
def summarize_performance(epoch, g_model, d_model, gan_model, dataset, latent_dim, n_samples=150):
    # prepare real samples
    X_real, y_real = generate_real_samples(dataset, n_samples)
    # evaluate discriminator on real examples
    _, acc_real = d_model.evaluate(X_real, y_real, verbose=0)
    # prepare fake examples
    x_fake, y_fake = generate_fake_samples(g_model, latent_dim, n_samples)
    # evaluate discriminator on fake examples
    _, acc_fake = d_model.evaluate(x_fake, y_fake, verbose=0)
    # summarize discriminator performance
    print('>Accuracy real: %.0f%%, fake: %.0f%%' % (acc_real*100, acc_fake*100))
    
    # save plot
    save_plot(x_fake, epoch)
#     save(gan_model, g_model, d_model)
    
    
def save(gan, generator, discriminator):
    discriminator.trainable = False
    save_model(gan, 'gan')
    discriminator.trainable = True
    save_model(generator, 'generator')
    save_model(discriminator, 'discriminator')


def load():
    discriminator = load_model('discriminator')
#     discriminator.trainiable=True
    generator = load_model('generator', compile=False)
    gan = load_model('gan',)
    discriminator.trainiable=False

#     gan.summary()
#     discriminator.summary()
#     generator.summary()

    return gan, generator, discriminator

    

# train the generator and discriminator
def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=1000, n_batch=100, starting_epoch=0):
    bat_per_epo = int(dataset.shape[0] / n_batch)
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(starting_epoch, n_epochs):
        # enumerate batches over the training set
        for j in range(bat_per_epo):
            # get randomly selected 'real' samples
            X_real, y_real = generate_real_samples(dataset, half_batch)
            # update discriminator model weights
            d_loss1, _ = d_model.train_on_batch(X_real, y_real)
            # generate 'fake' examples
            X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
            # update discriminator model weights
            d_loss2, _ = d_model.train_on_batch(X_fake, y_fake)
            # prepare points in latent space as input for the generator
            X_gan = generate_latent_points(latent_dim, n_batch)
            # create inverted labels for the fake samples
            y_gan = ones((n_batch, 1))
            d_model.trainable = False
            # update the generator via the discriminator's error
            g_loss = gan_model.train_on_batch(X_gan, y_gan)
            # summarize loss on this batch
            print('>%d, %d/%d, d1=%.3f, d2=%.3f g=%.3f' %
                (i+1, j+1, bat_per_epo, d_loss1, d_loss2, g_loss))
            d_model.trainable = True
        # evaluate the model performance, sometimes
        if (i+1) % 10 == 0:
            summarize_performance(i, g_model, d_model, gan_model, dataset, latent_dim)
            
def augment_images(images):
    augmented = []
    for image in images:
        flipped = tf.image.flip_left_right(image)
        
        augmented.append(image)
        augmented.append(flipped)
        
    return np.asarray(augmented)



def create_and_train(n_classes=1):
    # size of the latent space
    latent_dim = 100
    # create the discriminator
    d_model = define_discriminator()
#     d_model.summary()
    
    # create the generator
    g_model = define_generator(latent_dim)
#     g_model.summary()
    
    # create the gan
    gan_model = define_gan(g_model, d_model)
#     gan_model.summary()
    
    # load image data
    dataset = load_real_samples(n_classes)
    dataset = augment_images(dataset)
    # train model
    train(g_model, d_model, gan_model, dataset, latent_dim)
    

def load_and_train(model_name, n_classes, starting_epoch):
    # size of the latent space
    latent_dim = 100

    gan_model, g_model, d_model = load()
    
    # load image data
    dataset = load_real_samples(n_classes)
    dataset = augment_images(dataset)
    
    
    # train model
    train(g_model, d_model, gan_model, dataset, latent_dim, starting_epoch=starting_epoch)
    
    

In [None]:
n_classes = 1
create_and_train(n_classes)
# load_and_train("010", n_classes, 40)

**C-GAN**

In [None]:
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy import vstack
from numpy.random import randn
from numpy.random import randint
from keras.datasets.cifar10 import load_data
from keras.optimizers import Adam
from keras.models import Sequential
from keras.models import Model
from keras.models import load_model
from keras.models import save_model
from keras.layers import Input
from keras.layers import Embedding
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import UpSampling2D
from keras.layers import ReLU
from keras.layers import LeakyReLU
from keras.layers import Dropout
from keras.layers import BatchNormalization
from keras.initializers import RandomNormal
import keras.backend as K
from matplotlib import pyplot
import image_utils
from PIL import Image
import numpy as np
from numpy.random import random
import tensorflow as tf
import pickle

# define the standalone discriminator model
def define_discriminator(in_shape, n_classes):
    # input
    i_model = Input(shape=in_shape)
    # normal
    model = Conv2D(128, (4,4), input_shape=in_shape, padding='same')(i_model)
    model = LeakyReLU(alpha=0.2)(model)
    # downsample
    model = Conv2D(256, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # downsample
    model = Conv2D(256, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # downsample
    model = Conv2D(512, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # classifier
    model = Flatten()(model)
    model = Dropout(0.4)(model)
    
    # output T/F
    o_model = Dense(1, activation='sigmoid')(model)
    
    # output class label
    c_model = Dense(n_classes, activation="softmax")(model)
    
    model = Model(i_model, [o_model, c_model])
    
    # compile model
    opt = Adam(lr=0.00005, beta_1=0.5)
    model.compile(loss=['binary_crossentropy', "sparse_categorical_crossentropy"], optimizer=opt, 
                  metrics=['accuracy'])
    return model

# define the standalone generator model
def define_generator(latent_dim, n_classes):
    # input label
    in_label = Input(shape=(1,))
    # embedding for categorical input
    c_model = Embedding(n_classes, 50)(in_label)
    n_nodes = 4 * 4 * 32
    c_model = Dense(n_nodes)(c_model)
    c_model = Reshape((4, 4, 32))(c_model)
    
    # input noise
    in_latent = Input(shape=(latent_dim,))
    # foundation for 4x4 image
    n_nodes = 256 * 4 * 4
    model = Dense(n_nodes, input_dim=latent_dim)(in_latent)
    model = LeakyReLU(alpha=0.2)(model)
    model = Reshape((4, 4, 256))(model)
    # upsample to 8x8
    model = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # upsample to 16x16
    model = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # upsample to 32x32
    model = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # upsample to 64x64
    model = Conv2DTranspose(128, (4,4), strides=(2,2), padding='same')(model)
    model = LeakyReLU(alpha=0.2)(model)
    # output layer
    o_model = Conv2D(3, (3,3), activation='tanh', padding='same')(model)
    
    model = Model([in_latent, in_label], o_model)
    return model


# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
    # make weights in the discriminator not trainable
    d_model.trainable = False
    # connect the outputs of the generator to the inputs of the discriminator
    o_model = d_model(g_model.output)
    #define gan model as taking noise and label and outputting real/fake and label outputs
    model = Model(g_model.input, o_model)
    # compile model
    opt = Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss=['binary_crossentropy','sparse_categorical_crossentropy'], optimizer=opt)
    d_model.trainable = True
    return model

def trim_classes(images, labels, n_classes):
    return images[0:900*n_classes], labels[0:900*n_classes]

# load and prepare imnet64 training images
def load_real_samples(n_classes=1):
    # load imagenet64 dataset
    images, labels = image_utils.load_images(path_root="")
    images, labels = trim_classes(images, labels, n_classes)
    (trainX, trainY), (_, _) = image_utils.split_data(images, labels, train=1)

    # convert from unsigned ints to floats
    X = trainX.astype('float32')
    # scale from [0,255] to [-1,1]
    X = (X - 127.5) / 127.5
    print(X.shape, trainY.shape)
    return [X, trainY]

# select real samples
def generate_real_samples(dataset, n_samples):
    images, labels = dataset
    # choose random instances
    ix = randint(0, images.shape[0], n_samples)
    # select images and labels
    X, labels = images[ix], labels[ix]
    # generate class labels
    y = ones((n_samples, 1))
    return [X, labels], y
 
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples, n_classes):
    # generate points in the latent space
    x_input = randn(latent_dim * n_samples)
    # reshape into a batch of inputs for the network
    x_input = x_input.reshape(n_samples, latent_dim)
    # generate labels
    labels = randint(0, n_classes, n_samples)
    return [x_input, labels]
 
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(g_model, latent_dim, n_samples, n_classes):
    # generate points in latent space
    x_input, labels_input = generate_latent_points(latent_dim, n_samples, n_classes)
    # predict outputs
    images = g_model.predict([x_input, labels_input])
    # create class labels
    y = zeros((n_samples, 1))
    return [images, labels_input], y

 
# create and save a plot of generated images
def save_plot(examples, epoch, n_classes, n=5):
    # scale from [-1,1] to [0,1]
    examples = (examples + 1) / 2.0
    # plot the result
    f, axs = pyplot.subplots(n,n,figsize=((n*n)-1,(n*n)-1))
    # plot images
    for i in range(n*n):
        axs[i//n,i%n].imshow(examples[i])
        axs[i//n,i%n].axis("off")
    # save plot to file
    filename = 'images/generated_plot_e%03d.png' % (epoch+1)
    pyplot.savefig(filename)
    pyplot.close()
    
# evaluate the discriminator, plot generated images, save generator model
def summarize_performance(epoch, g_model, d_model, gan_model, dataset, latent_dim, n_classes, n_samples=150):
    # prepare real samples
    [X_real, labels_real], y_real = generate_real_samples(dataset, n_samples)
    # evaluate discriminator on real examples
    _, tt, _, _, _ = d_model.evaluate(X_real, [y_real, labels_real], verbose=0)
    _ = d_model.evaluate(X_real, [y_real, labels_real], verbose=0)

    # prepare fake examples
    [X_fake, labels_fake], y_fake = generate_fake_samples(g_model, latent_dim, n_samples, n_classes)

    # evaluate discriminator on fake examples
    _, ff, _, _, _ = d_model.evaluate(X_fake, [y_fake, labels_fake], verbose=0)
    # summarize discriminator performance
    print('>Accuracy real: %.0f%%, fake: %.0f%%' % (tt*100, ff*100))
    
    # save plot
    save_plot(X_fake, epoch, n_classes)
#     save(gan_model, g_model, d_model)
    
    
def save(gan, generator, discriminator):
    discriminator.trainable = False
    save_model(gan, 'gan')
    discriminator.trainable = True
    save_model(generator, 'generator')
    save_model(discriminator, 'discriminator')


def load():
    discriminator = load_model('discriminator')
#     discriminator.trainiable=True
    generator = load_model('generator', compile=False)
    gan = load_model('gan',)
    discriminator.trainiable=False

#     gan.summary()
#     discriminator.summary()
#     generator.summary()

    return gan, generator, discriminator

    

# train the generator and discriminator
def train(g_model, d_model, gan_model, dataset, latent_dim, n_classes, n_epochs=1000, n_batch=100, starting_epoch=0):
    bat_per_epo = int(dataset[0].shape[0] / n_batch)
    half_batch = int(n_batch / 2)
    # manually enumerate epochs
    for i in range(starting_epoch, n_epochs):
        # enumerate batches over the training set
        for j in range(bat_per_epo):
            # get randomly selected 'real' samples
            [X_real, labels_real], y_real = generate_real_samples(dataset, half_batch)
            # update discriminator model weights
            dr,_,_,_,_  = d_model.train_on_batch(X_real, [y_real, labels_real])
            
            # generate 'fake' examples
            [X_fake, labels_fake], y_fake = generate_fake_samples(g_model, latent_dim, half_batch, n_classes)
            # update discriminator model weights
            df,_,_,_,_ = d_model.train_on_batch(X_fake, [y_fake, labels_fake])
        
            # prepare points in latent space as input for the generator
            X_gan = generate_latent_points(latent_dim, n_batch, n_classes)
            # create inverted labels for the fake samples
            y_gan = ones((n_batch, 1))
            d_model.trainable = False
            # update the generator via the discriminator's error
            g,_,_ = gan_model.train_on_batch(X_gan, y_gan)
            # summarize loss on this batch
            print('>%d, dr[%.3f], df[%.3f], g[%.3f]' % (i+1, dr, df, g))
            
            d_model.trainable = True
#             summarize_performance(i, g_model, d_model, gan_model, dataset, latent_dim, n_classes)
            
        # evaluate the model performance, sometimes
        if (i+1) % 5 == 0:
            summarize_performance(i, g_model, d_model, gan_model, dataset, latent_dim, n_classes)
            
def augment_images(images):
    augmented = []
    for image in images:
        flipped = tf.image.flip_left_right(image)
        
        augmented.append(image)
        augmented.append(flipped)
        
    return np.asarray(augmented)



def create_and_train(n_classes):
    # size of the latent space
    latent_dim = 100
    # create the discriminator
    d_model = define_discriminator((64,64,3), n_classes)
    d_model.summary()
    
    # create the generator
    g_model = define_generator(latent_dim, n_classes)
    g_model.summary()
    
    # create the gan
    gan_model = define_gan(g_model, d_model)
    gan_model.summary()
    
    # load image data
    dataset = load_real_samples(n_classes)
#     dataset = augment_images(dataset)
    # train model
    train(g_model, d_model, gan_model, dataset, latent_dim, n_classes)
    

def load_and_train(model_name, n_classes, starting_epoch):
    # size of the latent space
    latent_dim = 100

    gan_model, g_model, d_model = load()
    
    # load image data
    dataset = load_real_samples(n_classes)
#     dataset = augment_images(dataset)
    
    
    # train model
    train(g_model, d_model, gan_model, dataset, latent_dim, n_classes, starting_epoch=starting_epoch)
    
    

n_classes = 3
create_and_train(n_classes)
# load_and_train("010", n_classes, 40)

### ThoughtVIZ's  Baseline AC-GAN

**Download model**

In [None]:
!mkdir -p ThoughtViz/models
!wget -q https://raw.githubusercontent.com/ptirupat/ThoughtViz/master/training/models/ac_gan.py -P ThoughtViz/models

**Import libraries**

In [None]:
import os
import numpy as np
import image_utils
from PIL import Image
import math


from random import randint
from PIL import Image
from keras.optimizers import SGD, Adam
from keras.utils import to_categorical
from ThoughtViz.models import ac_gan
import matplotlib.pyplot as plt
import tensorflow as tf
import image_utils

In [None]:
import os.path
import tarfile

import numpy as np
from six.moves import urllib
import tensorflow as tf
import math
import sys
from keras.utils import to_categorical


MODEL_DIR = '/tmp/imagenet'
DATA_URL = 'http://download.tensorflow.org/models/image/imagenet/inception-2015-12-05.tgz'
softmax = None


# Call this function with list of images. Each of elements should be a
# numpy array with values ranging from 0 to 255.
def get_inception_score(images, splits=10):
  assert(type(images) == list)
  assert(type(images[0]) == np.ndarray)
  assert(len(images[0].shape) == 3)
  assert(np.max(images[0]) > 10)
  assert(np.min(images[0]) >= 0.0)
  inps = []
  for img in images:
    img = img.astype(np.float32)
    inps.append(np.expand_dims(img, 0))
  bs = 1
  with tf.compat.v1.Session() as sess:
    preds = []
    n_batches = int(math.ceil(float(len(inps)) / float(bs)))
    for i in range(n_batches):
        sys.stdout.write(".")
        sys.stdout.flush()
        inp = inps[(i * bs):min((i + 1) * bs, len(inps))]
        inp = np.concatenate(inp, 0)
        pred = sess.run(softmax, {'ExpandDims:0': inp})
        preds.append(pred)
    preds = np.concatenate(preds, 0)
    scores = []
    for i in range(splits):
      part = preds[(i * preds.shape[0] // splits):((i + 1) * preds.shape[0] // splits), :]
      kl = part * (np.log(part) - np.log(np.expand_dims(np.mean(part, 0), 0)))
      kl = np.mean(np.sum(kl, 1))
      scores.append(np.exp(kl))
    return np.mean(scores), np.std(scores)


# This function is called automatically.
def _init_inception():
  global softmax
  if not os.path.exists(MODEL_DIR):
    os.makedirs(MODEL_DIR)
  filename = DATA_URL.split('/')[-1]
  filepath = os.path.join(MODEL_DIR, filename)
  if not os.path.exists(filepath):
    def _progress(count, block_size, total_size):
      sys.stdout.write('\r>> Downloading %s %.1f%%' % (
          filename, float(count * block_size) / float(total_size) * 100.0))
      sys.stdout.flush()
    filepath, _ = urllib.request.urlretrieve(DATA_URL, filepath, _progress)
    print()
    statinfo = os.stat(filepath)
    print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.')
  tarfile.open(filepath, 'r:gz').extractall(MODEL_DIR)
  with tf.compat.v1.gfile.FastGFile(os.path.join(
      MODEL_DIR, 'classify_image_graph_def.pb'), 'rb') as f:
    graph_def = tf.compat.v1.GraphDef()
    graph_def.ParseFromString(f.read())
    _ = tf.import_graph_def(graph_def, name='')
  # Works with an arbitrary minibatch size.
  with tf.compat.v1.Session() as sess:
    pool3 = sess.graph.get_tensor_by_name('pool_3:0')
    ops = pool3.graph.get_operations()
    for op_idx, op in enumerate(ops):
        for o in op.outputs:
            shape = o.get_shape()
            shape = [s for s in shape]
            new_shape = []
            for j, s in enumerate(shape):
                if s == 1 and j == 0:
                    new_shape.append(None)
                else:
                    new_shape.append(s)
            o.set_shape(tf.TensorShape(new_shape))
    w = sess.graph.get_operation_by_name("softmax/logits/MatMul").inputs[1]
    logits = tf.matmul(tf.squeeze(pool3, [1, 2]), w)
    softmax = tf.nn.softmax(logits)


if softmax is None:
  _init_inception()

**Helper Functions**

In [None]:
def trim_classes(images, labels, max_classes=10):
  return images[0:900*max_classes], labels[0:900*max_classes]

def combine_rgb_images(generated_images):
    num = generated_images.shape[0]
    width = int(math.sqrt(num))
    height = int(math.ceil(float(num)/width))
    shape = generated_images.shape[1:3]
    image = np.zeros((height*shape[0], width*shape[1], 3),
                     dtype=generated_images.dtype)
    for index, img in enumerate(generated_images):
        i = int(index/width)
        j = index % width
        image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1], :] = img[:, :, :]
    return image

def load_data(num_classes):
    images, labels = image_utils.load_images(path_root="")
    images, labels = trim_classes(images, labels, num_classes)
    (x_train, y_train), (x_test, y_test) = image_utils.split_data(images, labels)
    x_train = (x_train.astype(np.float32) - 127.5) / 127.5
    x_test = (x_test.astype(np.float32) - 127.5) / 127.5
    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)
    return (x_train, y_train), (x_test, y_test)

def train_gan(input_noise_dim, batch_size, epochs, model_save_dir, output_dir, num_classes):
    
    (x_train, y_train), (x_test, y_test) = load_data(num_classes)
    print(y_train.shape)
    adam_lr = 0.001
    adam_beta_1 = 0.5

    d = ac_gan.discriminator_model_rgb((64,64), num_classes)
    d_optim = Adam(lr=adam_lr, beta_1=adam_beta_1)
    d.compile(loss=['binary_crossentropy','categorical_crossentropy'], optimizer=d_optim)
    d.trainable = True

    g = ac_gan.generator_model_rgb(input_noise_dim + num_classes)
    g_optim = Adam(lr=adam_lr, beta_1=adam_beta_1)
    g.compile(loss='categorical_crossentropy', optimizer=g_optim)

    d_on_g = ac_gan.generator_containing_discriminator(input_noise_dim + num_classes, g, d)
    d_on_g.compile(loss=['binary_crossentropy','categorical_crossentropy'], optimizer=g_optim)

    g.summary()
    d.summary()
    
    for epoch in range(epochs):
        print("Epoch is ", epoch)

        print("Number of batches", int(x_train.shape[0]/batch_size))

        for index in range(int(x_train.shape[0]/batch_size)):
            # generate noise from a normal distribution
            noise = np.random.uniform(-1, 1, (batch_size, input_noise_dim))

            random_labels = [randint(0, 9) for i in range(batch_size)]

            one_hot_vectors = [to_categorical(label, 10) for label in random_labels]

            conditioned_noise = []
            for i in range(batch_size):
                conditioned_noise.append(np.append(noise[i], one_hot_vectors[i]))
            conditioned_noise = np.array(conditioned_noise)

            # get real images and corresponding labels
            real_images = x_train[index * batch_size:(index + 1) * batch_size]
            real_labels = y_train[index * batch_size:(index + 1) * batch_size]

            # generate fake images using the generator
            generated_images = g.predict(conditioned_noise, verbose=0)

            # discriminator loss of real images
            d_loss_real = d.train_on_batch(real_images, [np.array([1] * batch_size), np.array(real_labels)])
            # discriminator loss of fake images
            d_loss_fake = d.train_on_batch(generated_images, [np.array([0] * batch_size), np.array(one_hot_vectors).reshape(batch_size, num_classes)])
            d_loss = (d_loss_fake[0] + d_loss_real[0]) * 0.5

            # save generated images at intermediate stages of training
            if index % 250 == 0:
                image = combine_rgb_images(generated_images)
                image = image * 255.0
                img_save_path = os.path.join(output_dir, str(epoch) + "_g_" + str(index) + ".png")
                Image.fromarray(image.astype(np.uint8)).save(img_save_path)

            d.trainable = False
            # generator loss
            g_loss = d_on_g.train_on_batch(conditioned_noise, [np.array([1] * batch_size), np.array(one_hot_vectors).reshape(batch_size, num_classes)])
            d.trainable = True

        # test_image_count = 50000
        test_image_count = x_train.shape[0]
        test_noise = np.random.uniform(-1, 1, (test_image_count, input_noise_dim))
        test_labels = [randint(0, 9) for i in range(test_image_count)]
        one_hot_vectors_test = [to_categorical(label, 10) for label in test_labels]

        conditioned_noise_test = []
        for i in range(test_image_count):
            conditioned_noise_test.append(np.append(test_noise[i], one_hot_vectors_test[i]))
        conditioned_noise_test = np.array(conditioned_noise_test)

        test_images = g.predict(conditioned_noise_test, verbose=0)
        test_images = test_images * 255.0
        
        if epoch % 50 == 0:        
            inception_score = get_inception_score([test_image for test_image in test_images], splits=10)

        print("Epoch %d d_loss : %f" % (epoch, d_loss))
        print("Epoch %d g_loss : %f" % (epoch, g_loss[0]))
        print("Epoch %d inception_score : %f" % (epoch, inception_score[0]))

        # save generator and discriminator models along with the weights
        g.save(os.path.join(model_save_dir, 'generator_' + str(epoch)), overwrite=True, include_optimizer=True)
        d.save(os.path.join(model_save_dir, 'discriminator_' + str(epoch)), overwrite=True, include_optimizer=True)

**Train GAN**

In [None]:
batch_size =50
run_id = 1
n_epochs = 500
input_dim = 100
n_classes=10

model_save_dir = os.path.join('./saved_models/baseline_acgan/', 'run_' + str(run_id))
if not os.path.exists(model_save_dir):
    os.makedirs(model_save_dir)

output_dir = os.path.join('./outputs/baseline_acgan/', 'run_' + str(run_id))
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

train_gan(input_dim, batch_size, n_epochs, model_save_dir, output_dir, n_classes)

In [None]:
def rgb2gray(rgb):
    return np.dot(rgb[...,:3], [0.299, 0.587, 0.144])

def resize_images(images, size=(28,28)):
  temp = []
  for img in images:
    temp.append(np.array(Image.fromarray(img).resize(size, Image.ANTIALIAS)))
  return np.asarray(temp)

def trim_classes(images, labels, max_classes=10):
  return images[0:900*max_classes], labels[0:900*max_classes]

In [None]:
images, labels = image_utils.load_images()
images = rgb2gray(images)
print(images.shape)
images = resize_images(images)
print(images.shape)
(x_train, y_train), (x_test, y_test) = image_utils.split_data(images, labels)

**Download Trained Model**

In [None]:
!mkdir -p /content/models/ThoughtViz
!cp /content/gdrive/MyDrive/EEG2Image/models/ThoughtViz/baseline\ acgan/run_1/generator_100/saved_model.pb  /content/models/ThoughtViz/
!mv /content/models/ThoughtViz/saved_model.pb /content/models/ThoughtViz/generator.pb
!cp /content/gdrive/MyDrive/EEG2Image/models/ThoughtViz/baseline\ acgan/run_1/discriminator_100/saved_model.pb  /content/models/ThoughtViz/
!mv /content/models/ThoughtViz/saved_model.pb /content/models/ThoughtViz/discriminator.pb

In [None]:
!mkdir -p /content/models/ThoughtViz
!cp -r /content/gdrive/MyDrive/EEG2Image/models/ThoughtViz/baseline\ acgan/run_1/generator_100 /content/models/ThoughtViz/
!cp -r /content/gdrive/MyDrive/EEG2Image/models/ThoughtViz/baseline\ acgan/run_1/discriminator_100 /content/models/ThoughtViz/


**Load Model**

In [None]:
import tensorflow as tf
import pickle
import random

import keras.backend as K
from PIL import Image
from keras.models import load_model
from keras.utils import to_categorical


In [None]:
K.set_learning_phase(False)

input_noise_dim = 100
batch_size = 50

noise = np.random.uniform(-1, 1, (batch_size, input_noise_dim))

random_labels = np.random.randint(0, 10, batch_size)

conditioned_noise = []
for i in range(batch_size):
    conditioned_noise.append(np.append(noise[i], to_categorical(0, 10)))
conditioned_noise = np.array(conditioned_noise)

g = load_model("/content/models/ThoughtViz/generator_100")

# generate images using the generator
generated_images = g.predict(conditioned_noise, verbose=0)

image = combine_rgb_images(generated_images)
image = image * 127.5 + 127.5
plt.imshow(image)
img = Image.fromarray(image.astype(np.uint8))
img.show()
img.save("test.png")

In [None]:
import pickle
import random

import keras.backend as K
from PIL import Image
from keras.models import load_model
from keras.utils import to_categorical

from layers.mog_layer import *
from utils.image_utils import *


class Tests():

    def test_deligan_baseline(self, generator_model):
        K.set_learning_phase(False)

        input_noise_dim = 100
        batch_size = 50

        noise = np.random.uniform(-1, 1, (batch_size, input_noise_dim))

        random_labels = np.random.randint(0, 10, batch_size)

        conditioned_noise = []
        for i in range(batch_size):
            conditioned_noise.append(np.append(noise[i], to_categorical(random_labels[i], 10)))
        conditioned_noise = np.array(conditioned_noise)

        g = load_model(generator_model, custom_objects={'MoGLayer': MoGLayer})

        # generate images using the generator
        generated_images = g.predict(conditioned_noise, verbose=0)

        image = combine_rgb_images(generated_images)
        image = image * 127.5 + 127.5
        img = Image.fromarray(image.astype(np.uint8))
        img.show()

    def test_deligan_final(self, generator_model, classifier_model, eeg_pkl_file):
        K.set_learning_phase(False)

        # load EEG data
        eeg_data = pickle.load(open(eeg_pkl_file, "rb"), encoding='bytes')
        classifier = load_model(classifier_model)

        x_test = eeg_data[b'x_test']
        y_test = eeg_data[b'y_test']
        y_test = [np.argmax(y) for y in y_test]
        layer_index = 9

        # keras way of getting the output from an intermediate layer
        get_nth_layer_output = K.function([classifier.layers[0].input], [classifier.layers[layer_index].output])

        layer_output = get_nth_layer_output([x_test])[0]

        input_noise_dim = 100
        batch_size = 50

        noise = np.random.uniform(-1, 1, (batch_size, input_noise_dim))

        random_labels = np.random.randint(0, 10, batch_size)

        eeg_feature_vectors = [layer_output[random.choice(np.where(y_test == random_label)[0])] for random_label in random_labels]

        noises, conditionings = [], []
        for i in range(batch_size):
            noises.append(noise[i])
            conditionings.append(eeg_feature_vectors[i])

        g = load_model(generator_model, custom_objects={'MoGLayer': MoGLayer})

        # generate images using the generator
        generated_images = g.predict([np.array(noises), np.array(conditionings)], verbose=0)

        image = combine_rgb_images(generated_images)
        image = image * 127.5 + 127.5
        img = Image.fromarray(image.astype(np.uint8))
        img.show()


if __name__ == '__main__':
    tests = Tests()
    #tests.test_deligan_baseline('../models/gan_models/baseline/deligan/image/generator.model')
    tests.test_deligan_final('../models/gan_models/final/image/generator.model',
                       '../models/eeg_models/image/run_final.h5',
                       '../data/eeg/image/data.pkl')