# GAN IMPLEMENTATION ON MNIST DATASET

In [86]:
# import necessary packages
import os
import torch
import torchvision
import torch.nn as nn
from torchvision import transforms
from torchvision.utils import save_image
from torch.autograd import Variable
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pylab
import h5py

from PIL import Image
import pandas as pd
import tensorflow as tf
from tensorflow import keras

import numpy as np
from numpy import expand_dims
from numpy import zeros
from numpy import ones
from numpy.random import random

from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Reshape
from tensorflow.keras.layers import Flatten
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.layers import Conv2DTranspose
from tensorflow.keras.layers import LeakyReLU
from tensorflow.keras.layers import Dropout
from tensorflow.keras.layers import Input
from tensorflow.keras.layers import Embedding
from tensorflow.keras.layers import Concatenate

from tensorflow.keras.models import Sequential
from tensorflow.keras.models import load_model
from tensorflow.keras.models import Model

from tqdm import tqdm 
from IPython import display 

In [87]:
# define the standalone discriminator model
def define_discriminator(in_shape=(200,200,1)):
	# weight initialization
	init = 'uniform'
	# define model
	model = Sequential()
	# downsample to 100x100
	model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init, input_shape=in_shape))
	model.add(LeakyReLU(alpha=0.2))
	# downsample to 50x50
	model.add(Conv2D(64, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
	model.add(LeakyReLU(alpha=0.2))
	# classifier
	model.add(Flatten())
	model.add(Dense(1, activation='sigmoid'))
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
	return model

In [88]:
def define_generator(latent_dim):
	# weight initialization
	init = 'uniform'
	# define model
	model = Sequential()
	# foundation for 50x50 image
	n_nodes = 256 * 50 * 50
	model.add(Dense(n_nodes, kernel_initializer=init, input_dim=latent_dim))
	model.add(LeakyReLU(alpha=0.2))
	model.add(Reshape((50, 50, 256)))
	# upsample to 100x100
	model.add(Conv2DTranspose(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
	model.add(LeakyReLU(alpha=0.2))
	# upsample to 200x200
	model.add(Conv2DTranspose(256, (4,4), strides=(2,2), padding='same', kernel_initializer=init))
	model.add(LeakyReLU(alpha=0.2))
	# output 28x28x1
	model.add(Conv2D(1, (7,7), activation='tanh', padding='same', kernel_initializer=init))
	return model

In [89]:
# define the combined generator and discriminator model, for updating the generator
def define_gan(generator, discriminator):
	# make weights in the discriminator not trainable
	discriminator.trainable = False
	# connect them
	model = Sequential()
	# add generator
	model.add(generator)
	# add the discriminator
	model.add(discriminator)
	# compile model
	opt = Adam(lr=0.0002, beta_1=0.5)
	model.compile(loss='binary_crossentropy', optimizer=opt)
	return model

In [90]:
# default values of standardised images
preferred_width = 200
preferred_height = 200

In [91]:
# paths to take images from and to save to (after transformation to BnW)
pathSource = "C:\\Users\\DataSci\\Desktop\\DS_Testing\\warGAN\\igTest"
pathDest = "C:\\Users\\DataSci\\Desktop\\DS_Testing\\warGAN\\BnWImages"
# list all objects in pathSource
dir_list = os.listdir(pathSource)

# setting a param for proportion of data to keep for testing
testingSize = int(len(dir_list)/5)

In [92]:
# create list, to contain all standardised images; values [0:1], instead of [0:255]
images = []

# for image in pathSource, convert them to BnW, save that to pathDest, then add standardised version to images[]
for image in dir_list:
    img = Image.open(pathSource+"\\"+image).convert('L').resize((preferred_width, preferred_height), Image.ANTIALIAS)
    # img.show()
    imgArray = np.array(img) # int array of pixels
    imgArray = imgArray.astype(float)/255 # standardise values to be 0:255
    images.append(imgArray)
    img.save(pathDest+"\\BnW"+image, "JPEG")

# diagnostic checks of images[]
print(np.shape(images))
print(images[0])
print(np.shape(images[0]))


(10, 200, 200)
[[0.4627451  0.4627451  0.4627451  ... 0.36470588 0.36470588 0.36470588]
 [0.4627451  0.4627451  0.4627451  ... 0.36470588 0.36470588 0.36470588]
 [0.46666667 0.46666667 0.46666667 ... 0.36470588 0.36470588 0.35294118]
 ...
 [0.4745098  0.47058824 0.4627451  ... 0.37647059 0.37647059 0.37647059]
 [0.46666667 0.47058824 0.45882353 ... 0.38823529 0.38039216 0.38431373]
 [0.4627451  0.47058824 0.4627451  ... 0.38431373 0.38823529 0.39215686]]
(200, 200)


In [93]:
X_testing = np.array(images[0:testingSize])
X_training = np.array(images[testingSize:len(images)])

print(np.shape(X_testing))
print(X_testing[1]) # checking it's correctly put something into the testing variable
# what we want to pass to the generator is list (X_testing[index])
print(np.shape(X_training))

(2, 200, 200)
[[0.53333333 0.53333333 0.53333333 ... 0.39607843 0.4        0.39607843]
 [0.53333333 0.53333333 0.53333333 ... 0.39607843 0.4        0.39607843]
 [0.5372549  0.5372549  0.53333333 ... 0.4        0.40392157 0.39607843]
 ...
 [0.43137255 0.41176471 0.42352941 ... 0.36470588 0.36078431 0.35686275]
 [0.41176471 0.42745098 0.42745098 ... 0.35294118 0.35294118 0.34901961]
 [0.4        0.43137255 0.41568627 ... 0.34901961 0.34901961 0.34901961]]
(8, 200, 200)


In [94]:
"""
train_data = tf.data.Dataset.from_tensor_slices((X_training))
valid_data = tf.data.Dataset.from_tensor_slices((X_testing))
"""

'\ntrain_data = tf.data.Dataset.from_tensor_slices((X_training))\nvalid_data = tf.data.Dataset.from_tensor_slices((X_testing))\n'

In [95]:
# select real samples
def generate_real_samples(dataset, n_samples):
	# choose random instances
	ix = randint(0, dataset.shape[0], n_samples)
	# select images
	X = dataset[ix]
	# generate class labels
	y = ones((n_samples, 1))
	return X, y

In [124]:
# generate points in latent space as input for the generator
def generate_latent_points(latent_dim, n_samples):
	# generate points in the latent space
	x_input = np.random.uniform(low=0.0, high=1.0, size=(latent_dim, n_samples))
	# reshape into a batch of inputs for the network
	x_input = x_input.reshape(n_samples, latent_dim)
	return x_input

In [125]:
# use the generator to generate n fake examples, with class labels
def generate_fake_samples(generator, latent_dim, n_samples):
	# generate points in latent space
	x_input = abs(generate_latent_points(latent_dim, n_samples))
	# predict outputs
	X = generator.predict(x_input)
	# create class labels
	y = zeros((n_samples, 1))
	return X, y

In [98]:
pathOut = "C:\\Users\\DataSci\\Desktop\\DS_Testing\\warGAN\\results"

In [99]:
# currently commented out
# generate samples and save as a plot and save the model
def summarize_performance(step, g_model, latent_dim, n_samples=100):
	# prepare fake examples
	X, _ = generate_fake_samples(g_model, latent_dim, n_samples)
	# plot images
	for i in range(10 * 10):
		# define subplot
		plt.subplot(10, 10, 1 + i)
		# turn off axis
		plt.axis('off')
		# plot raw pixel data
		plt.imshow(X[i, :, :, 0], cmap='gray_r')
	# save plot to file
	plt.savefig(pathOut+"results\\generated_plot_%03d.png" % (step+1))
	plt.close()
	# save the generator model
	g_model.save(pathOut+"results\\model_%03d.h5" % (step+1))

In [100]:
# currently not used
# create a line plot of loss for the gan and save to file
def plot_history(d1_hist, d2_hist, g_hist, a1_hist, a2_hist):
	# plot loss
	plt.subplot(2, 1, 1)
	plt.plot(d1_hist, label='d-real')
	plt.plot(d2_hist, label='d-fake')
	plt.plot(g_hist, label='gen')
	plt.legend()
	# plot discriminator accuracy
	plt.subplot(2, 1, 2)
	plt.plot(a1_hist, label='acc-real')
	plt.plot(a2_hist, label='acc-fake')
	plt.legend()
	plt.show()

In [126]:
# train the generator and discriminator
def train(g_model, d_model, gan_model, dataset, latent_dim, n_epochs=3, n_batch=2):
	# calculate the number of batches per epoch
	bat_per_epo = int(dataset.shape[0] / n_batch)
	# calculate the total iterations based on batch and epoch
	n_steps = bat_per_epo * n_epochs
	# calculate the number of samples in half a batch
	half_batch = int(n_batch / 2)
	# prepare lists for storing stats each iteration
	d1_hist, d2_hist, g_hist, a1_hist, a2_hist = list(), list(), list(), list(), list()
	# manually enumerate epochs
	for i in range(n_steps):
		# get randomly selected 'real' samples
		X_real, y_real = generate_real_samples(dataset, half_batch)
		# update discriminator model weights
		d_loss1, d_acc1 = d_model.train_on_batch(X_real, y_real)
		# generate 'fake' examples
		X_fake, y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
		# update discriminator model weights
		d_loss2, d_acc2 = d_model.train_on_batch(X_fake, y_fake)
		# prepare points in latent space as input for the generator
		X_gan = generate_latent_points(latent_dim, n_batch)
		# create inverted labels for the fake samples
		y_gan = ones((n_batch, 1))
		# update the generator via the discriminator's error
		g_loss = gan_model.train_on_batch(X_gan, y_gan)
		# summarize loss on this batch
		print('>%d, d1=%.3f, d2=%.3f g=%.3f, a1=%d, a2=%d' %
			(i+1, d_loss1, d_loss2, g_loss, int(100*d_acc1), int(100*d_acc2)))
		# record history
		d1_hist.append(d_loss1)
		d2_hist.append(d_loss2)
		g_hist.append(g_loss)
		a1_hist.append(d_acc1)
		a2_hist.append(d_acc2)
		"""
		# evaluate the model performance every 'epoch'
		if (i+1) % bat_per_epo == 0:
			summarize_performance(i, g_model, latent_dim)
		"""
	print("finished training")
	g_model.save('WARgenerator.h5')
	# plot_history(d1_hist, d2_hist, g_hist, a1_hist, a2_hist)

In [110]:
X_train=np.reshape(X_training,(X_training.shape[0], X_training.shape[1],X_training.shape[2],1))

In [127]:
# make folder for results
# os.mkdir("results")
# size of the latent space
latent_dim = 50
# create the discriminator
discriminator = define_discriminator()
# create the generator
generator = define_generator(latent_dim)
# create the gan
gan_model = define_gan(generator, discriminator)
# load image data
dataset = X_train
# train model
train(generator, discriminator, gan_model, dataset, latent_dim)

>1, d1=0.661, d2=0.697 g=0.713, a1=100, a2=0
>2, d1=0.155, d2=1.215 g=0.446, a1=100, a2=0
>3, d1=0.209, d2=1.610 g=0.391, a1=100, a2=0
>4, d1=0.357, d2=1.394 g=0.556, a1=100, a2=0
>5, d1=0.607, d2=0.921 g=0.800, a1=100, a2=0
>6, d1=0.805, d2=0.727 g=0.851, a1=0, a2=0
>7, d1=0.915, d2=0.821 g=0.652, a1=0, a2=0
>8, d1=0.758, d2=0.899 g=0.594, a1=0, a2=0
>9, d1=0.739, d2=0.928 g=0.584, a1=0, a2=0
>10, d1=0.646, d2=0.971 g=0.549, a1=100, a2=0
>11, d1=0.563, d2=1.164 g=0.456, a1=100, a2=0
>12, d1=0.576, d2=1.267 g=0.461, a1=100, a2=0
finished training


In [128]:
model = load_model('WARgenerator.h5', compile = False)
latent_points = generate_latent_points(latent_dim, 1)
# generate images
X = model.predict(latent_points) # has quite the think about it
print("done thinking")

done thinking


In [129]:
# getting the prediction back out
np.shape(X)
xout = X*255
print(xout) # there are negative values, thats an issue!!!
np.shape(xout)

[[[[ 43.64162  ]
   [ -3.7596905]
   [ 70.66313  ]
   ...
   [ 40.238754 ]
   [ 57.63533  ]
   [ 16.762583 ]]

  [[ 46.617905 ]
   [  8.3795805]
   [ 43.234547 ]
   ...
   [-21.79819  ]
   [ 20.747938 ]
   [ -3.0840867]]

  [[ 59.20359  ]
   [ 45.79153  ]
   [ 28.884651 ]
   ...
   [ 42.80076  ]
   [ 64.347206 ]
   [ 32.81958  ]]

  ...

  [[ 61.294662 ]
   [-33.718727 ]
   [ 28.426353 ]
   ...
   [-47.38519  ]
   [ 83.20259  ]
   [-22.731602 ]]

  [[ 40.170803 ]
   [ 42.917572 ]
   [ 43.993412 ]
   ...
   [ 29.91435  ]
   [ 22.920279 ]
   [ 22.528784 ]]

  [[ 43.3212   ]
   [ 13.647952 ]
   [ 45.321617 ]
   ...
   [ 23.408825 ]
   [ 29.392797 ]
   [ -3.8614464]]]]


(1, 200, 200, 1)

In [138]:
flat = xout.flatten()  
print(len(flat)) # should be 40,000
arr_2d = np.reshape(flat, (200, 200))

In [142]:
# convert float array to ints
array_int = abs(np.array(arr_2d, dtype='int'))
np.shape(array_int)

(200, 200)

In [None]:
# plt.imshow(xOutImg) # still kills kernel

In [28]:
# plot the result

'''
earlier we used
img.show() # PIL object, range 0:255
the prediction result needs to be possibly turned back into a 200*200 array, from a 40,000 long tensor, and have the float values multipled by 255, and maybe rounded to ints
From there, we should be able to plot the prediction
'''