In [None]:
# IMPORT LIBRARIES

from PIL import Image
from sklearn.utils import shuffle
import numpy as np
import tensorflow as tf
import random


In [None]:
# # HELPER FUNCTION FOR EVALUATION

# def display_model_prediction(prediction_model, latent_dim, id=0):
#     # compare false positive and false negative rates
#     # true_lpoints = generate_latent_points_from_images([id], ext_format=lambda x:f"{x}.jpg", folder_name="mytests", latent_dim=latent_dim)
#     false_lpoints = generate_random_latent_points(latent_dim, 1)
    
#     # prediction_t = np.asarray((prediction_model.predict(true_lpoints)*127.5)+127.5).reshape(png_shape)
#     prediction_f = np.asarray((prediction_model.predict(false_lpoints)*127.5)+127.5).reshape(png_shape)
    
#     # true_points = np.array(Image.open(f"images/{random.randint(0,1024):04}.png")).reshape(expanded_png_shape)
#     # probability_t = discriminator.predict(prediction_t.reshape(expanded_png_shape))
#     probability_f = discriminator.predict(prediction_f.reshape(expanded_png_shape))
#     print(probability_f)

#     # display and return true image
#     image = tf.keras.utils.array_to_img(prediction_f)
#     return image

In [None]:
# TRAINING DATA SETUP

def load_real_data(folder_name="afremov", dataset_sz=10, ext_format=lambda x:f"{x}.jpg", base_sz=128):
    all_images = []
    for i in range(dataset_sz):
        # load image
        img = Image.open(f"data/afremov/{folder_name}/{ext_format(i)}")

        # scale training image to dimension of testing image
        img = img.resize((base_sz, base_sz))

        # add to dataset
        img_data = tf.keras.utils.img_to_array(img, dtype="uint8")
        img_data = (img_data - 127.5) / 127.5
        all_images.append(img_data)
    result = tf.convert_to_tensor(all_images)
    return result


In [None]:
# NOISE GENERATION

def generate_random_latent_points(latent_dim, n):
    # generate a random array of size latent_dim (compressed 24x24)
    x_input = np.random.randn(latent_dim * n)
    x_input = x_input.reshape((n, latent_dim))
    return x_input   


In [None]:
# TRAINING SAMPLES

def generate_real_samples(dataset, n):
    indices = random.sample(range(len(dataset)), n)
    new_x = tf.gather(dataset, indices=indices)
    new_y = tf.ones((n, 1))
    return new_x, new_y

def generate_fake_samples(generator, latent_dim, n):
    x_input = generate_random_latent_points(latent_dim, n)
    new_x = generator.predict(x_input)
    new_y = tf.zeros((n, 1))
    return new_x, new_y

In [None]:
from keras.layers import BatchNormalization
from keras.layers import Conv2D
from keras.layers import Conv2DTranspose
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import Flatten
from keras.layers import LeakyReLU
from keras.layers import MaxPooling2D
from keras.layers import Reshape
from keras.models import Sequential
from keras.optimizers import Adam
from keras.regularizers import L2
from keras.initializers import RandomNormal

# MODEL DEFINITION

def discriminator_model(in_shape):
  model = Sequential()
  num_features = in_shape[0]

  # convolution
  model.add(Conv2D(filters=num_features,
                   kernel_size=3,
                   strides=1,
                   padding='same',
                   kernel_initializer=RandomNormal(stddev=0.02),
                   input_shape=in_shape,
                   use_bias=True))
  model.add(LeakyReLU(alpha=0.2))
  model.add(MaxPooling2D(2,2))
    
  # convolution again
  model.add(Conv2D(filters=num_features,
                   kernel_size=3,
                   kernel_initializer=RandomNormal(stddev=0.02),
                   padding='same',
                   use_bias=True))
  model.add(LeakyReLU(alpha=0.2))
  model.add(MaxPooling2D(2,2))

  # convolution one more
  model.add(Conv2D(filters=num_features*2,
                   kernel_size=3,
                   kernel_initializer=RandomNormal(stddev=0.02),
                   kernel_regularizer=L2(l=0.0001),
                   padding='same',
                   use_bias=True))
  model.add(LeakyReLU(alpha=0.2))
  model.add(MaxPooling2D(2,2))

  # classifier
  model.add(Flatten())
  model.add(Dense(num_features*4,activation='relu', use_bias=True, kernel_regularizer=L2(l=0.0001)))
  model.add(Dropout(0.3))
  model.add(Dense(3, activation='softmax', use_bias=True))
  model.add(Dense(1, activation='sigmoid'))
  # compile model
  model.compile(loss='binary_crossentropy',
                optimizer=Adam(learning_rate=0.0002, beta_1=0.5),
                metrics=['accuracy'])
  return model


def generator_model(pic_dim, num_channels, latent_dim):
  model = Sequential()
  num_features = pic_dim
  n_nodes = (pic_dim**2) * num_features

  # transform latent points to small image (8x8)
  model.add(Dense(n_nodes,
                  input_dim=latent_dim))
  model.add(LeakyReLU(alpha=0.2))
  model.add(Reshape((pic_dim, pic_dim, num_features)))

  # upsample to 16x16
  model.add(Conv2DTranspose(filters=num_features,
                            kernel_size=4,
                            strides=2,
                            kernel_initializer=RandomNormal(stddev=0.02),
                            padding='same'))
  model.add(LeakyReLU(alpha=0.2))

  # # upsample to 32x32
  model.add(Conv2DTranspose(filters=num_features,
                            kernel_size=4,
                            strides=2,
                            kernel_initializer=RandomNormal(stddev=0.02),
                            kernel_regularizer=L2(l=0.0001),
                            padding='same'))
  model.add(LeakyReLU(alpha=0.2))

  # upsample to 64x64
  model.add(Conv2DTranspose(filters=num_features,
                            kernel_size=4,
                            strides=2,
                            kernel_initializer=RandomNormal(stddev=0.02),
                            kernel_regularizer=L2(l=0.0001),
                            padding='same'))
  model.add(LeakyReLU(alpha=0.2))

  # upsample to 128x128
  model.add(Conv2DTranspose(filters=num_features,
                            kernel_size=4,
                            strides=2,
                            kernel_initializer=RandomNormal(stddev=0.02),
                            kernel_regularizer=L2(l=0.0001),
                            padding='same'))
  model.add(LeakyReLU(alpha=0.2))

  # finish with tanh activation for each channel
  model.add(Conv2D(filters=num_channels,
                   kernel_size=8,
                   kernel_initializer=RandomNormal(stddev=0.02),
                   activation='tanh',
                   padding='same'))
  return model


def define_gan(gen, disc):
  # lock weights for the discriminator
  disc.trainable = False

  # create joint network
  model = Sequential()
  model.add(gen)
  model.add(disc)
  model.compile(loss='binary_crossentropy',
                optimizer=Adam(learning_rate=0.0002, beta_1=0.5))
  return model

In [None]:
# TRAIN MODELS

def train(g_model, d_model, gan_model, dataset, latent_dim, epochs=100, n_batch=32):
    batch_per_epoch = dataset.shape[0]//n_batch
    if batch_per_epoch == 0:
        raise AssertionError("n_batch must be smaller than dataset size")
        
    for i in range(epochs):
        for _ in range(batch_per_epoch):
            # train discriminator to recognize real samples
            x_real, y_real = generate_real_samples(dataset, n_batch)
            x_fake, y_fake = generate_fake_samples(g_model, latent_dim, n_batch)
            x_total, y_total = shuffle(np.concatenate([x_real, x_fake]), np.concatenate([y_real, y_fake]))
            d_model.trainable = True
            d_model.train_on_batch(x_total, y_total)

            # train generator to fool discriminator
            x_gan = generate_random_latent_points(latent_dim=latent_dim, n=n_batch)
            y_gan = tf.ones((n_batch, 1)) # the "false" one
            d_model.trainable = False
            gan_model.train_on_batch(x_gan, y_gan)
            
            
        if i%(epochs/100) == (epochs/100)-1:
            prediction = np.asarray((g_model.predict(generate_random_latent_points(latent_dim, 1))*127.5)+127.5).reshape(png_shape)
            img = tf.keras.utils.array_to_img(prediction)
            img.save(f"data/dcgan_result/iter_{i+1}.png")
            print(f"{int((i+1)/epochs * 100)}% complete")
    return g_model



In [None]:
# CREATE MODELS

# setup
latent_dim = 1024
num_channels = 3
pix_width = 128
low_res = 8
png_shape = (pix_width, pix_width, num_channels)
expanded_png_shape = (1, pix_width, pix_width, num_channels)

# models
discriminator = discriminator_model(png_shape)
# print(discriminator.summary())
generator = generator_model(low_res, num_channels, latent_dim)
# print(generator.summary())
gan_model = define_gan(generator, discriminator)

In [None]:
# train
dataset = load_real_data(dataset_sz=60)
model = train(generator, discriminator, gan_model, dataset, latent_dim, epochs=1000, n_batch=32)