<a href="https://colab.research.google.com/github/AdamPeetz/PlaneGAN/blob/main/MSDS696_GAN_V3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# example of a dcgan on cifar10
from numpy import zeros
from numpy import ones
from numpy.random import randn
from numpy.random import randint
from keras.datasets.cifar10 import load_data
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import Reshape
from keras.layers import Flatten
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import LeakyReLU
from keras.layers import Dropout
from matplotlib import pyplot
import tensorflow as tf
import cv2
import numpy as np
from tqdm import tqdm
import pathlib

import os, shutil 
from google.colab import drive 
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
# define the standalone discriminator model
def define_discriminator(in_shape=(32,32,3)):
	model = Sequential()
	# normal
	model.add(Conv2D(64, (3,3), padding='same', input_shape=in_shape))
	model.add(LeakyReLU(alpha=0.2))
	# downsample to 16x16
	model.add(Conv2D(128, (3,3), strides=(2,2), padding='same'))
	model.add(LeakyReLU(alpha=0.2))
	# downsample to 8x8
	model.add(Conv2D(128, (3,3), strides=(2,2), padding='same'))
	model.add(LeakyReLU(alpha=0.2))
	# downsample to 4x4
	model.add(Conv2D(256, (3,3), strides=(2,2), padding='same'))
	model.add(LeakyReLU(alpha=0.2))
	# classifier
	model.add(Flatten())
	model.add(Dropout(0.4))
	model.add(Dense(1, activation='sigmoid'))
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
	return model


In [None]:
# define the standalone generator model
def define_generator(latent_dim):
	model = Sequential()
	# foundation for 4x4 image
	n_nodes = 256 * 4 * 4
	model.add(Dense(n_nodes, input_dim=latent_dim))
	model.add(LeakyReLU(alpha=0.2))
	model.add(Reshape((4, 4, 256)))
	# upsample to 8x8
	model.add(Conv2DTranspose(256, (3,3), strides=(2,2), padding='same'))
	model.add(LeakyReLU(alpha=0.2))
	# upsample to 16x16
	model.add(Conv2DTranspose(256, (3,3), strides=(2,2), padding='same'))
	model.add(LeakyReLU(alpha=0.2))
	# upsample to 32x32
	model.add(Conv2DTranspose(256, (3,3), strides=(2,2), padding='same'))
	model.add(LeakyReLU(alpha=0.2))
	# output layer
	model.add(Conv2D(3, (3,3), activation='tanh', padding='same'))
	return model

In [None]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(g_model, d_model):
	# make weights in the discriminator not trainable
	d_model.trainable = False
	# connect them
	model = Sequential()
	# add generator
	model.add(g_model)
	# add the discriminator
	model.add(d_model)
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt)
	return model

In [None]:
# load and prepare cifar10 training images
def load_real_samples():
	# load cifar10 dataset
	(trainX, _), (_, _) = load_data()
	# convert from unsigned ints to floats
	X = trainX.astype('float32')
	# scale from [0,255] to [-1,1]
	X = (X - 127.5) / 127.5
	return X

#load and prepare custom image dataset
def create_real_img_dataset(image_path, resize_dim):
		dataset = []
		for file in tqdm(os.listdir(image_path)):
				image = cv2.imread(os.path.join(image_path,file))
				X = image.astype('float32')
				X = X[:, :, [2, 1, 0]]
				X = (X - 127.5) / 127.5
				X = tf.image.resize(X, [resize_dim, resize_dim])
				dataset.append(X)
		dataset = np.array(dataset)
		return dataset

#load a custom image dataset with 1 level of directory
# tensorflow documentation
def load_custom_image_set(dataroot,resizedim,directorylevels):
    #image parse function
    def parse_image(filename):
        parts = tf.strings.split(filename, os.sep)
        label = parts[-2]
        image = tf.io.read_file(filename)
        image = tf.io.decode_jpeg(image)
        image = tf.image.convert_image_dtype(image, tf.float32)
 #       image = tf.image.resize(image, [resizedim,resizedim])
 #       image = (image - 127.5) / 127.5
        return image, label
    
    data_root = pathlib.Path(dataroot)
    if directorylevels == 1:
      list_ds = tf.data.Dataset.list_files(str(data_root/'*/*'))
    if directorylevels == 2:
      list_ds = tf.data.Dataset.list_files(str(data_root/'*/*/*'))
    images_ds = list_ds.map(parse_image)
    #https://stackoverflow.com/questions/70535683/extract-data-from-tensorflow-dataset-e-g-to-numpy
    images = np.asarray(list(images_ds.map(lambda x, y: x)))
    return images

# select real samples
def generate_real_samples(dataset, n_samples):
	# choose random instances
	ix = randint(0, dataset.shape[0], n_samples)
	# retrieve selected images
	X = dataset[ix]
	# generate 'real' class labels (1)
	y = ones((n_samples, 1))
	return X, y

# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
	# generate points in the latent space
	x_input = randn(latent_dim * n_samples)
	# reshape into a batch of inputs for the network
	x_input = x_input.reshape(n_samples, latent_dim)
	return x_input

# modified with suggestion from Das Shuvo (2020)
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(g_model, latent_dim, n_samples):
	# generate points in latent space
	x_input = generate_latent_points(latent_dim, n_samples)
	# predict outputs
	input_tensor = tf.convert_to_tensor(x_input)
	X = g_model(input_tensor)
	X = X.numpy()
	# create 'fake' class labels (0)
	y = zeros((n_samples, 1))
	return X, y

In [None]:
# create and save a plot of generated images
def save_plot(examples, epoch, n=7):
	# scale from [-1,1] to [0,1]
	examples = (examples + 1) / 2.0
#	example = (examples + 1) * 255
	# plot images
	for i in range(n * n):
		# define subplot
		pyplot.subplot(n, n, 1 + i)
		# turn off axis
		pyplot.axis('off')
		# plot raw pixel data
		pyplot.imshow(examples[i])
	# save plot to file
	filename = '/content/gdrive/My Drive/planegan/sample_output/generated_plot_e%03d.png' % (epoch+1)
	pyplot.savefig(filename)
	pyplot.close()
 
# create a line plot of loss for the gan and save to file
def plot_history(d1_hist, d2_hist, g_hist, a1_hist, a2_hist):
	# plot loss
	pyplot.subplot(2, 1, 1)
	pyplot.plot(d1_hist, label='d-real')
	pyplot.plot(d2_hist, label='d-fake')
	pyplot.plot(g_hist, label='gen')
	pyplot.legend()
	# plot discriminator accuracy
	pyplot.subplot(2, 1, 2)
	pyplot.plot(a1_hist, label='acc-real')
	pyplot.plot(a2_hist, label='acc-fake')
	pyplot.legend()
	# save plot to file
	pyplot.savefig('/content/gdrive/My Drive/planegan/saved_models/plot_line_plot_loss.png')
	pyplot.close()


In [None]:
# evaluate the discriminator, plot generated images, save generator model
def summarize_performance(epoch, g_model, d_model, dataset, latent_dim, n_samples=150):
	# prepare real samples
	X_real, y_real = generate_real_samples(dataset, n_samples)
	# evaluate discriminator on real examples
	_, acc_real = d_model.evaluate(X_real, y_real, verbose=0)
	# prepare fake examples
	x_fake, y_fake = generate_fake_samples(g_model, latent_dim, n_samples)
	# evaluate discriminator on fake examples
	_, acc_fake = d_model.evaluate(x_fake, y_fake, verbose=0)
	# summarize discriminator performance
	print('>Accuracy real: %.0f%%, fake: %.0f%%' % (acc_real*100, acc_fake*100))
	# save plot
	save_plot(x_fake, epoch)
	# save the generator model tile file
	filename = '/content/gdrive/My Drive/planegan/saved_models/generator_model_%03d.h5' % (epoch+1)
	g_model.save(filename)

# train the generator and discriminator
def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=300, n_batch=128):
	bat_per_epo = int(dataset.shape[0] / n_batch)
	# define half batches for training
	half_batch = int(n_batch / 2)
  # prepare lists for storing stats each iteration
	d1_hist, d2_hist, g_hist, a1_hist, a2_hist = list(), list(), list(), list(), list()
	# manually enumerate epochs
	for i in range(n_epochs):
		# enumerate batches over the training set
		for j in range(bat_per_epo):
			# get randomly selected 'real' samples
			X_real, y_real = generate_real_samples(dataset, half_batch)
			# update discriminator model weights
			d_loss1, d_acc1 = d_model.train_on_batch(X_real, y_real)
			# generate 'fake' examples
			X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
			# update discriminator model weights
			d_loss2, d_acc2 = d_model.train_on_batch(X_fake, y_fake)
			# prepare points in latent space as input for the generator
			X_gan = generate_latent_points(latent_dim, n_batch)
			# create inverted labels for the fake samples
			y_gan = ones((n_batch, 1))
			# update the generator via the discriminator's error
			g_loss = gan_model.train_on_batch(X_gan, y_gan)
	 	  # record training metrics
			d1_hist.append(d_loss1)
			d2_hist.append(d_loss2)
			g_hist.append(g_loss)
			a1_hist.append(d_acc1)
			a2_hist.append(d_acc2)
		# summarize loss
			if (j+1) % 5 == 0:
				print('>%d, %d/%d, d1=%.3f, d2=%.3f g=%.3f' % (i+1, j+1, bat_per_epo, d_loss1, d_loss2, g_loss))
				# record training metrics
				d1_hist.append(d_loss1)
				d2_hist.append(d_loss2)
				g_hist.append(g_loss)
				a1_hist.append(d_acc1)
				a2_hist.append(d_acc2)
		# evaluate the model performance, sometimes
		if (i+1) % 10 == 0:
			summarize_performance(i, g_model, d_model, dataset, latent_dim)
	# save plot to directory
	plot_history(d1_hist, d2_hist, g_hist, a1_hist, a2_hist)


In [None]:
# size of the latent space
latent_dim = 100
# create the discriminator
d_model = define_discriminator()
# create the generator
g_model = define_generator(latent_dim)
# create the gan
gan_model = define_gan(g_model, d_model)
# load image data
image_path = "/content/gdrive/My Drive/CIFAR10/"
resizedim = 32
directorylevels = 1
dataset = load_custom_image_set(image_path,resizedim,directorylevels)
# train model
# train(g_model, d_model, gan_model, dataset, latent_dim)

  super().__init__(name, **kwargs)


In [None]:
dataset_1 = (dataset -0.5)

In [None]:
dataset_1[1][1]

array([[-0.25294116,  0.07254905, -0.09215683],
       [-0.24117646,  0.05686277, -0.0960784 ],
       [-0.37843138, -0.12352937, -0.25686273],
       [-0.18235293,  0.06470591, -0.05686271],
       [-0.30392155, -0.0333333 , -0.15490195],
       [-0.15490195,  0.1156863 ,  0.00196081],
       [-0.22941175,  0.02156866, -0.08431369],
       [-0.3117647 , -0.06470585, -0.17058823],
       [-0.272549  , -0.0215686 , -0.12745097],
       [-0.29999998, -0.04117644, -0.15098038],
       [-0.26862743,  0.00980395, -0.1078431 ],
       [-0.11568624,  0.15882355,  0.04901963],
       [-0.07254899,  0.19803923,  0.08823532],
       [-0.26862743, -0.03725487, -0.12352937],
       [-0.30784312, -0.13529411, -0.20196077],
       [-0.29999998, -0.15882352, -0.21372548],
       [-0.32352942, -0.19019607, -0.24509802],
       [-0.4254902 , -0.2960784 , -0.3392157 ],
       [-0.43333334, -0.27647057, -0.30784312],
       [-0.42941177, -0.27647057, -0.28823528],
       [-0.41764706, -0.2843137 , -0.276

In [None]:
train(g_model, d_model, gan_model, dataset_1, latent_dim)

>1, 5/390, d1=0.027, d2=0.949 g=0.781
>1, 10/390, d1=0.752, d2=0.741 g=0.706
>1, 15/390, d1=0.465, d2=0.889 g=0.639
>1, 20/390, d1=0.791, d2=0.602 g=1.003
>1, 25/390, d1=0.720, d2=0.718 g=0.687
>1, 30/390, d1=0.586, d2=0.762 g=0.653
>1, 35/390, d1=0.680, d2=0.660 g=0.877
>1, 40/390, d1=0.719, d2=0.740 g=0.699
>1, 45/390, d1=0.715, d2=0.667 g=0.778
>1, 50/390, d1=0.694, d2=0.711 g=0.698
>1, 55/390, d1=0.685, d2=0.727 g=0.705
>1, 60/390, d1=0.710, d2=0.658 g=0.782
>1, 65/390, d1=0.721, d2=0.689 g=0.720
>1, 70/390, d1=0.723, d2=0.661 g=0.759
>1, 75/390, d1=0.728, d2=0.679 g=0.726
>1, 80/390, d1=0.705, d2=0.696 g=0.703
>1, 85/390, d1=0.687, d2=0.705 g=0.695
>1, 90/390, d1=0.668, d2=0.703 g=0.709
>1, 95/390, d1=0.678, d2=0.698 g=0.736
>1, 100/390, d1=0.704, d2=0.691 g=0.718
>1, 105/390, d1=0.632, d2=0.691 g=0.733
>1, 110/390, d1=0.718, d2=0.643 g=0.810
>1, 115/390, d1=0.763, d2=0.623 g=0.799
>1, 120/390, d1=0.726, d2=0.661 g=0.754
>1, 125/390, d1=0.717, d2=0.703 g=0.712
>1, 130/390, d1=0.71

In [None]:
`# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
	# generate points in the latent space
	x_input = randn(latent_dim * n_samples)
	# reshape into a batch of inputs for the network
	x_input = x_input.reshape(n_samples, latent_dim)
	return x_input

# create and save a plot of generated images
def save_plot(examples, n):
	# plot images
	for i in range(n * n):
		# define subplot
		pyplot.subplot(n, n, 1 + i)
		# turn off axis
		pyplot.axis('off')
		# plot raw pixel data
		pyplot.imshow(examples[i, :, :])
	pyplot.show()

# load model
model = gan_model
# generate images
latent_points = generate_latent_points(100, 100)
# generate images
X = model.predict(latent_points)
# scale from [-1,1] to [0,1]
X = (X + 1) / 2.0
# plot the result
save_plot(X, 10)

SyntaxError: ignored

# References

Brownlee, Jason. (2019). Generative Adversarial Networks with Python. Ebook. Machine Learning Mastery. 

Das Shuvo, Falguni. (2020). Repeatedly calling model.predict(...) results in memory leak. GitHub. Retrieved 3/15/2023 from https://github.com/keras-team/keras/issues/13118