### Variables

In [4]:
# https://dpb.bitbucket.io/reloading-modified-code-when-using-the-ipython-interactive-shell.html
%load_ext autoreload
%autoreload 1 # from now on, always reload those modules marked with %aimport before executing any Python code

In [5]:
import os

from collections import defaultdict


from PIL import Image
from six.moves import range

import matplotlib
import matplotlib.pyplot as plt

import datetime

import numpy as np
from scipy.stats import truncnorm

import keras.backend as K

from keras.models import Model

from keras.layers import Input, Dense, Reshape, Flatten, Dropout
from keras.layers import Activation, ZeroPadding2D

from keras.layers.convolutional import UpSampling2D, Conv2D, Conv2DTranspose
from keras.layers.pooling import MaxPooling2D

from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU

from keras.regularizers import l2
from keras.optimizers import Adam

from keras.utils import plot_model
from keras.utils.generic_utils import Progbar

from sklearn.metrics import confusion_matrix
from keras.preprocessing.image import ImageDataGenerator, array_to_img, img_to_array, load_img

Using TensorFlow backend.


In [6]:
validation_directory= "C:\\Users\\Master\\Project\\Validator\\Python\\images\\docs\\valid"
train_directory="C:\\Users\\Master\\Project\\Validator\\Python\\images\\docs\\train"

input_shape         = (64, 64, 1)
latent_shape        = (100, )
n_filters           = [512, 256, 128, 64]
filter_size         = 5
strides             = 2
gen_lr              = 2e-4 
gen_beta1           = 0.8
dis_lr              = 2e-4
dis_beta1           = 0.8
model_name          = "my_model"
output_dirname      = 'output'
stats_step_interval = 150
epochs              = 1

### Use generator to extract images

In [7]:
train_datagen = ImageDataGenerator()

test_datagen = ImageDataGenerator()

train_generator = train_datagen.flow_from_directory(
    directory   = train_directory,       # this is the target directory
    target_size = (64, 64, 1)[:-1],         # all images will be resized to 64x64
    batch_size  = 64,
    color_mode  = "grayscale",                    # We use a grayscale dataset
    classes=["docs"],
    class_mode  = None                            # We do not need to get any label => Everything is healthy
)  

# this is a similar generator, for validation data
validation_generator = test_datagen.flow_from_directory(
    directory   = validation_directory,     # this is the target directory
    target_size = (64, 64, 1)[:-1],         # all images will be resized to 64x64
    batch_size  = 64,
    color_mode  = "grayscale",              # We use a grayscale dataset
    classes     = ["docs", "other"],        # {'docs': 0, 'other': 1} => Needs to be enforced
    class_mode  = 'binary'                  # We want to have binary labels for validation
)


Found 208 images belonging to 1 classes.
Found 55 images belonging to 2 classes.


In [8]:
def init_discriminator(input_shape, n_filters, filter_size, strides, dis_lr=2e-4,  dis_beta1=0.5, batch_norm_momentum=0.6):        

        # discriminator input of size related to image size.
        disc_input = Input(shape= input_shape, name='discriminator_input')

        # Layer 2-5 - Conv Layers
        for i, nb_filters in enumerate(n_filters[::-1]):  # List in reverse order
            if (i == 0):
                cnn = Conv2D(
                        filters= nb_filters,
                        kernel_size= filter_size,
                        strides= strides,
                        padding="same",
                        kernel_regularizer=l2(.01))(disc_input)                                
            else:
                 cnn = Conv2D(
                        filters= nb_filters,
                        kernel_size= filter_size,
                        strides= strides,
                        padding="same",
                        kernel_regularizer=l2(.01))(cnn) 
            
            
            cnn = Dropout(0.5)(cnn)
            cnn = BatchNormalization(momentum= batch_norm_momentum)(cnn)
            cnn = LeakyReLU(alpha=0.2)(cnn)

        # transform 3D Matrix to a 1D Vector
        cnn = Flatten()(cnn)

        # Layer 6 - Output Layer
        cnn = Dense(1, kernel_regularizer = l2(.01))(cnn)
        disc_output = Activation('sigmoid', name='discriminator_output')(cnn)

        # Model definition with Functional API
        _discriminator_model = Model(disc_input, disc_output)
        _d_optim = Adam(lr = dis_lr, beta_1 = dis_beta1)        
        _discriminator_model.compile(loss='binary_crossentropy', optimizer=_d_optim, metrics=['accuracy'])
        
        return _discriminator_model;

In [9]:
discriminator_model = init_discriminator(input_shape, n_filters, filter_size, strides, dis_lr, dis_beta1)

In [10]:
def init_generator(latent_shape, n_filters, filter_size, strides, gen_lr=2e-4, gen_beta1=0.5, batch_norm_momentum = 0.8):

        # generator input of latent space vector Z, typically a 1D vector
        gen_input = Input(shape= latent_shape, name='generator_input')

        # layer 1 - high dimensional dense layer
        cnn = Dense(512 * 4 * 4)(gen_input)
        cnn = BatchNormalization(momentum= batch_norm_momentum)(cnn)
        cnn = LeakyReLU(alpha=0.2)(cnn)
        cnn = Reshape((4, 4, 512))(cnn)

        # layer 2-5 - high dimensional dense layer
        for nb_filters in n_filters:
             cnn = Conv2DTranspose(
                    filters     = nb_filters,
                    kernel_size = filter_size,
                    strides     = strides,
                    padding     = 'same')(cnn)

             # simple and works
             cnn = UpSampling2D((2, 2))(cnn)
             cnn = Conv2D(
                        filters=nb_filters,
                        kernel_size= filter_size,
                        padding='same')(cnn)

             cnn = BatchNormalization(momentum= batch_norm_momentum)(cnn)
             cnn = LeakyReLU(alpha=0.2)(cnn)
                

        # layer 6 - Final convolution layer
        cnn = Conv2D(1, kernel_size= filter_size, padding='same')(cnn)
        gen_output = Activation('tanh', name='generator_output')(cnn)

        _generator_model_model = Model(gen_input, gen_output)         
        f_lr=2e-4 
        f_beta1=0.5
        _g_optim = Adam(lr= f_lr, beta_1= f_beta1)      
        _generator_model_model.compile(loss='binary_crossentropy', optimizer=_g_optim)
        return _generator_model_model

In [11]:
#user correct verrables
generator_model = init_generator(latent_shape, n_filters, filter_size, strides, gen_lr, gen_beta1)

In [12]:
def generate_sample_z(batch_size, latent_shape):
        mu, sigma = 0, 0.4
        lower, upper = -1, 1

        return truncnorm.rvs(
            (lower - mu) / sigma, (upper - mu) / sigma,
            loc=mu,
            scale=sigma,
            size=(batch_size,  latent_shape[0]))    

In [13]:
def generate_samples(generator_model, batch_size, latent_shape, verbose=0):
        noise = generate_sample_z(batch_size, latent_shape)       

        # We generate a batch of images with G
        return generator_model.predict(noise, verbose=verbose)

In [14]:
def init_fullgan(
            generator_model,
            discriminator_model,
            input_shape=(64, 64, 1),
            latent_shape=(100,),
            n_filters=[512, 256, 128, 64],
            filter_size=5,
            strides=2,           
            model_name="",
            outdir="output",
            stats_step_interval=100):
    
    print("Model Name: %s" %  model_name)

    if not os.path.exists(outdir):
            os.makedirs(outdir)
            
    gan_input = Input(shape=latent_shape)

    # link the input of the GAN to the Generator
    G_output =  generator_model(gan_input)

    # For the combined model we will only train the generator => We do not want to backpropagate D while training G
    discriminator_model.trainable = False

    # we retrieve the output of the GAN
    gan_output = discriminator_model(G_output)

    # we construct a model out of it.
    _fullgan_model = Model(gan_input, gan_output)
    _fullgan_optim = Adam(lr= gen_lr, beta_1= gen_beta1)    
    _fullgan_model.compile(loss='binary_crossentropy', optimizer=_fullgan_optim, metrics=['accuracy'])
    return _fullgan_model;
    

In [15]:
#todo user correct params
fullgan_model = init_fullgan(
                             generator_model,
                             discriminator_model,
                             input_shape, 
                             latent_shape, 
                             n_filters, 
                             filter_size, 
                             strides, 
                             model_name,
                             output_dirname,
                             stats_step_interval)

Model Name: my_model


In [16]:
def train(discriminator_model, generator_model,  train_generator, validation_generator):
    
  
    batch_size = train_generator.batch_size
        
    n_steps = int(train_generator.samples / batch_size)
    
    #for epoch in range(epochs):
    for epoch in range(1):
        print('Epoch {} of {}\n'.format(epoch + 1, epochs))
        
        progress_bar = Progbar(target=n_steps)
        
        tic = datetime.datetime.now()
    
        last_stats_idx_gen  = 0
        last_stats_idx_disc = 0

        epoch_gen_loss = list()
        epoch_disc_loss = list()

        epoch_gen_acc = list()
        epoch_disc_acc = list()
        
        for step in range(n_steps):
            
            progress_bar.update(step + 1, force=True)            
            
            #generator
            # rescaling images: pixel_values in [-1, 1]
            image_batch = (train_generator.next().astype(np.float32) - 127.5) / 127.5          
           

            
            #train step
            discriminator_model.trainable = False
            for l in discriminator_model.layers: l.trainable = False
            
            g_loss = 0.0
            g_acc  = 0.0
            
           
            # We train G twice, because D is trained twice at each step
            for _ in range(2):
                noise = generate_sample_z(batch_size, latent_shape)
               
                loss, acc = fullgan_model.train_on_batch(noise, np.ones(noise.shape[0]))
       
                g_loss = np.add(g_loss, loss)
                g_acc  = np.add(g_acc, acc)
                
            g_loss *= 0.5
            g_acc  *= 0.5       

        
            #discriminator
            if g_acc >= d_acc_threshold:  # Preventing D to become too strong

                discriminator_model.trainable = True
                for l in discriminator_model.layers: l.trainable = True


                generated_images = generate_samples(generator_model=generator_model, batch_size=batch_size, latent_shape=latent_shape, verbose=verbose)

                d_loss_fake, d_acc_fake = discriminator_model.train_on_batch(
                                                generated_images,
                                                np.zeros(generated_images.shape[0]))
 
                d_loss_real, d_acc_real = discriminator_model.train_on_batch(X, np.ones(X.shape[0]))

                d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)
                d_acc  = 0.5 * np.add(d_acc_real,  d_acc_fake)
 

            else:
                d_loss = None 
                d_acc  = None
            
            
            if not d_loss is None:
                epoch_disc_loss.append(d_loss)
                epoch_disc_acc.append(d_acc)

            epoch_gen_loss.append(g_loss)
            epoch_gen_acc.append(g_acc)
            
            generator_train_loss = np.mean(np.array(epoch_gen_loss), axis=0)
            discriminator_train_loss = np.mean(np.array(epoch_disc_loss), axis=0)
            generator_train_acc = np.mean(np.array(epoch_gen_acc), axis=0)
            discriminator_train_acc = np.mean(np.array(epoch_disc_acc), axis=0)
            
            print("generator_train_loss {}".forma(generator_train_loss))
            print("discriminator_train_loss {}".format(discriminator_train_loss))
            print("generator_train_acc {}".generator_train_acc)
            print("discriminator_train_acc {}".format(discriminator_train_acc))

In [None]:
train(discriminator_model, generator_model, train_generator, validation_generator)

Epoch 1 of 1


In [3]:
def detect_defects(discriminator_model, generator_model,  validation_generator, verbose=1):

        total_samples = validation_generator.samples
        batch_size = validation_generator.batch_size

        results = list()
        labels = list()

        if (verbose != 0):
            progress_bar = Progbar(target=total_samples)

        for _ in range(np.ceil(total_samples / batch_size).astype(np.int32)):

            image_batch, lbls = validation_generator.next()

            labels = np.append(labels, lbls.reshape(lbls.shape[0]))
            image_batch = (image_batch.astype(np.float32) - 127.5) / 127.5

            tmp_rslt = discriminator_model.predict(
                x=image_batch,
                batch_size=image_batch.shape[0],
                verbose=0
            )

            if (verbose != 0):
                progress_bar.add(image_batch.shape[0])

            results = np.append(results, tmp_rslt.reshape(tmp_rslt.shape[0]))

        results = [1 if x >= 0.5 else 0 for x in results]

        tn, fp, fn, tp = confusion_matrix(labels, results).ravel()

        #################### NON DEFECT SITUATIONS ####################

        # Probability of Detecting a Non-Defect: (tp / (tp + fn))
        if ((tp + fn) != 0):
            recall = tp / (tp + fn)
        else:
            recall = 0.0

        # Probability of Correctly Detecting a Non-Defect: (tp / (tp + fp))

        if ((tp + fp) != 0):
            precision = tp / (tp + fp)
        else:
            precision = 0.0

        ###################### DEFECT SITUATIONS ######################

        # Probability of Detecting a Defect: (tn / (tn + fp))
        if ((tn + fp) != 0):
            specificity = tn / (tn + fp)
        else:
            specificity = 0.0

        # Probability of Correctly Detecting a Defect: (tn / (tn + fn))
        if ((tn + fn) != 0):
            negative_predictive_value = tn / (tn + fn)
        else:
            negative_predictive_value = 0.0

        return precision, recall, specificity, negative_predictive_value


In [None]:
precision, recall, specificity, negative_predictive_value = detect_defects(discriminator_model, generator_model, validation_generator)

print()
print("Probability of Detecting a Non-Defect             => Recall      (tp / (tp + fn)):", recall)
print("Probability of Correctly Detecting a Non-Defect   => Precision   (tp / (tp + fp)):", precision)
print("Probability of Detecting a Defect                 => Specificity (tn / (tn + fp)):", specificity)
print("Probability of Correctly Detecting a Defect       => NPV         (tn / (tn + fn)):", negative_predictive_value)