In [1]:
import tensorflow as tf
import scipy.misc
import sys
import numpy as np
import os

# Batch normalization

When changing the parameters of a model during the process of learning the distribition functions of each hidden layer are also changing. For that reason each layer needs to adapt itself to those changes avoiding the noise they produce.To Batch-Normalize a network is the process that smooth what have been told first. BN is applied on the input of each neuron making that the input to each activacion function has mean equal to 0 and variance equal to 1. The formula used in this function is the next one:
    X = x - E[x] / sqrt(Var[x] + eps)

In case of dimension this depends on the activation that it is happening during the process. It could be dimension 2 or 4 deppending on the process's step.

In [2]:
def batchnormalization(X, eps=1e-8, W=None, b=None):
    
    if X.get_shape().ndims == 4:
        mean = tf.reduce_mean(X, [0,1,2])
        standar_desviation = tf.reduce_mean(tf.square(X-mean), [0,1,2])
        X = (X - mean) / tf.sqrt(standar_desviation + eps)
        
        if W is not None and b is not None:
            W = tf.reshape(W, [1,1,1,-1])
            b = tf.reshape(b, [1,1,1,-1])
            X = X*W + b
    
    elif X.get_shape().ndims == 2:
        mean = tf.reduce_mean(X, 0)
        standar_desviation = tf.reduce_mean(tf.square(X-mean), 0)
        X = (X - mean) / tf.sqrt(standar_desviation + eps)
        
        if W is not None and b is not None:
            W = tf.reshape(W, [1,-1])
            b = tf.reshape(b, [1,-1])
            X = X*W + b
    
    return X
        

# Leaky Relu

The Rectifier (Rectified Linear Unit) is an activation defined as max(0,x). This is also known as a ramp function.
The Leaky Relu activation is a variant from the Relu and it is defined as max(x,alpha*x). This Leaky Relu function has been probed to work well with images avoiding the problem of dying ReLU.

In [3]:
def leakyRelu(X, alpha=0.2):
    return tf.maximum(X,tf.multiply(X, alpha))

# BCE

Calculate the cross entropy between y and y'. This value is going to be used by the optimizer

In [4]:
def bce(x, z):
    x = tf.clip_by_value(x, 1e-7, 1. - 1e-7)
    return tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits = x, labels = z))

# GENERATOR AND DISCRIMINATOR FUNCTIONS

This two methods basically consists on the two differents multilayers that are going to be used in a GAN network, both of them are going to use weights initialized with a random normal desviation of 0,02. We have used relu as the activation function for the generator and leakyRelu for the discriminator. In each step we concat the Y labels, as Y or as yb, and they act like the bias in this network. We have only used two conv (conv2d and conv2d_transpose) to simplify the results and to reduce computational time. 

In [5]:
def MultilayerPerceptronGenerator(Z, Y, batch_size):
     
    kernel_W1 = [int(Z.get_shape()[1] + Y.get_shape()[1]), dim_W1]
    kernel_W2 = [dim_W1 + int(Y.get_shape()[1]), dim_W2*7*7]
    kernel_W3 = [5, 5, dim_W3, dim_W2 +  int(Y.get_shape()[1])]
    kernel_W4 = [5, 5, dim_channel, dim_W3 +  int(Y.get_shape()[1])]
    
    gen_W1 = tf.get_variable("gen_W1", kernel_W1, initializer=tf.random_normal_initializer(stddev=0.02))
    gen_W2 = tf.get_variable("gen_W2", kernel_W2, initializer=tf.random_normal_initializer(stddev=0.02))
    gen_W3 = tf.get_variable("gen_W3", kernel_W3, initializer=tf.random_normal_initializer(stddev=0.02))
    gen_W4 = tf.get_variable("gen_W4", kernel_W4, initializer=tf.random_normal_initializer(stddev=0.02))
    
    yb = tf.reshape(Y, [batch_size, 1, 1, int(Y.get_shape()[1])])
    Z = tf.concat([Z, Y], axis=1) 
    op1 = tf.nn.relu(batchnormalization(tf.matmul(Z, gen_W1)))
    op1 = tf.concat([op1, Y], axis=1)
    op2 = tf.nn.relu(batchnormalization(tf.matmul(op1, gen_W2)))
    op2 = tf.reshape(op2, [batch_size, 7, 7, dim_W2])
    op2 = tf.concat([op2, yb*tf.ones([batch_size, 7, 7, int(Y.get_shape()[1])])], axis = 3)
    
    op3 = tf.nn.conv2d_transpose(op2, gen_W3, output_shape=[batch_size, 14, 14, dim_W3], strides=[1,2,2,1])
    op3 = tf.nn.relu(batchnormalization(op3))
    op3 = tf.concat([op3, yb*tf.ones([batch_size, 14, 14, Y.get_shape()[1]])], axis = 3)
    op4 = tf.nn.conv2d_transpose(op3, gen_W4, output_shape=[batch_size, 28, 28, dim_channel], strides=[1,2,2,1])
    
    
    return op4

In [6]:
def MultilayerPerceptronDiscriminator(image, Y, batch_size):
    
    kernel_W1 = [5, 5, dim_channel + int(dim_y), dim_W3]
    kernel_W2 = [5, 5, dim_W3 + int(dim_y), dim_W2]
    kernel_W3 = [dim_W2*7*7 + int(dim_y), dim_W1]
    kernel_W4 = [dim_W1 + int(dim_y), 1]
    
    dis_W1 = tf.get_variable("dis_W1", kernel_W1, initializer=tf.random_normal_initializer(stddev=0.02))
    dis_W2 = tf.get_variable("dis_W2", kernel_W2, initializer=tf.random_normal_initializer(stddev=0.02))
    dis_W3 = tf.get_variable("dis_W3", kernel_W3, initializer=tf.random_normal_initializer(stddev=0.02))
    dis_W4 = tf.get_variable("dis_W4", kernel_W4, initializer=tf.random_normal_initializer(stddev=0.02))

    yb = tf.reshape(Y, tf.stack([batch_size, 1, 1, int(Y.get_shape()[1])]))
    X = tf.concat([image, yb*tf.ones([batch_size, 28, 28, int(Y.get_shape()[1])])], axis = 3)
    op1 = leakyRelu( tf.nn.conv2d( X, dis_W1, strides=[1, 2, 2, 1], padding='SAME'))
    op1 = tf.concat([op1, yb*tf.ones([batch_size, 14, 14, int(Y.get_shape()[1])])], axis = 3)
    op2 = leakyRelu( tf.nn.conv2d( op1, dis_W2, strides=[1, 2, 2, 1], padding='SAME'))
    op2 = tf.reshape(op2, [batch_size, -1])
    op2 = tf.concat([op2, Y], axis = 1)
    op3 = leakyRelu(batchnormalization(tf.matmul(op2, dis_W3)))
    op3 = tf.concat([op3, Y], axis = 1)
    
    p = tf.nn.sigmoid(tf.matmul(op3, dis_W4))
    return p, op3

# Model

The model is going to connect the generator and discriminator and calculate the different variables to optimize during our training. This variables are calculated with the BCE functions that calculate the cross entropy between the results from the discriminator (with the real or generated image) and the labels.

In [7]:
def createModel(batch_size):
    
    Z = tf.placeholder(tf.float32, [batch_size, dim_z])
    Y = tf.placeholder(tf.float32, [batch_size, dim_y])

    image_real = tf.placeholder(tf.float32, [batch_size] + image_shape)

    op4_generated = MultilayerPerceptronGenerator(Z,Y, batch_size)
    image_generate = tf.nn.sigmoid(op4_generated)

    with tf.variable_scope("discriminator_variables") as scope:
        p_real, raw_real = MultilayerPerceptronDiscriminator(image_real, Y, batch_size)
        scope.reuse_variables()
        p_gen, raw_gen = MultilayerPerceptronDiscriminator(image_generate, Y, batch_size)


    dis_cost_real = bce(raw_real, tf.ones_like(raw_real))
    dis_cost_gen = bce(raw_gen, tf.zeros_like(raw_gen))
    dis_cost = dis_cost_real + dis_cost_gen

    gen_cost = bce (raw_gen, tf.ones_like(raw_gen))

    return Z, Y, image_real, dis_cost, gen_cost, p_real, p_gen
    

# Optimizer

The AdamOptimizer is used with a learning rate of 0.0002 and a beta of 0.5. These parameters determine how fast change the weights and the bias. This function computes both, the optimizer from the generator and from the discriminator.

In [8]:
def optimizer_function(d_cost_tf, g_cost_tf, dis_vars, gen_vars):
    train_op_dis = tf.train.AdamOptimizer(learning_rate, beta1=0.5).minimize(d_cost_tf, var_list=dis_vars)
    train_op_gen = tf.train.AdamOptimizer(learning_rate, beta1=0.5).minimize(g_cost_tf, var_list=gen_vars)
    
    return train_op_dis, train_op_gen

# Sample Generator

This is the sample generator that is going to be used for extracting a sample during the training. This sample allows us to see how the generator is rectifing and creating more accurate image as the training progresses.

In [9]:
def sample_creator(dimension):
    
    Z = tf.placeholder(tf.float32, [dimension, dim_z])
    Y = tf.placeholder(tf.float32, [dimension, dim_y])
    
    op4 = MultilayerPerceptronGenerator(Z,Y,dimension)
    image = tf.nn.sigmoid(op4)
    
    return Z,Y,image
    

# Aux functions 

In [10]:
def OneHot(X, n=None, negative_class=0.):
    X = np.asarray(X).flatten()
    if n is None:
        n = np.max(X) + 1
    Xoh = np.ones((len(X), n)) * negative_class
    Xoh[np.arange(len(X)), X] = 1.
    return Xoh

In [11]:

def save_visualization(X, nh_nw, save_path='tmp/sample.jpg'):
    h,w = X.shape[1], X.shape[2]
    img = np.zeros((h * nh_nw[0], w * nh_nw[1], 3))
    for n,x in enumerate(X):
        j = n // nh_nw[1]
        i = n % nh_nw[1]
        img[j*h:j*h+h, i*w:i*w+w, :] = x
        
        
    scipy.misc.imsave(save_path, img)

# Load the DATA

In [12]:

sys.path.append('..')

data_dir = 'data/'
def mnist():
    fd = open(os.path.join(data_dir,'train-images.idx3-ubyte'))
    loaded = np.fromfile(file=fd,dtype=np.uint8)
    trX = loaded[16:].reshape((60000,28*28)).astype(float)

    fd = open(os.path.join(data_dir,'train-labels.idx1-ubyte'))
    loaded = np.fromfile(file=fd,dtype=np.uint8)
    trY = loaded[8:].reshape((60000))

    fd = open(os.path.join(data_dir,'t10k-images.idx3-ubyte'))
    loaded = np.fromfile(file=fd,dtype=np.uint8)
    teX = loaded[16:].reshape((10000,28*28)).astype(float)

    fd = open(os.path.join(data_dir,'t10k-labels.idx1-ubyte'))
    loaded = np.fromfile(file=fd,dtype=np.uint8)
    teY = loaded[8:].reshape((10000))

    trY = np.asarray(trY)
    teY = np.asarray(teY)

    return trX, teX, trY, teY

def mnist_with_valid_set():
    trX, teX, trY, teY = mnist()

    train_inds = np.arange(len(trX))
    np.random.shuffle(train_inds)
    trX = trX[train_inds]
    trY = trY[train_inds]
    vaX = trX[50000:]
    vaY = trY[50000:]
    trX = trX[:50000]
    trY = trY[:50000]

    return trX, vaX, teX, trY, vaY, teY

In [13]:
train_data, validation_data, test_data, train_label, validation_label, test_label = mnist_with_valid_set()
print("Train set of : " + str(train_data.shape))
print("Train label of : " + str(train_label.shape))
print("Test set of : " + str(test_data.shape))
print("Test label of : " + str(test_label.shape))
print("Validation set of : " + str(validation_data.shape))
print("Validation label of : " + str(validation_label.shape))

Train set of : (50000, 784)
Train label of : (50000,)
Test set of : (10000, 784)
Test label of : (10000,)
Validation set of : (10000, 784)
Validation label of : (10000,)


# Training Part

In [14]:
n_epochs = 100
learning_rate = 0.0002
batch_size = 128
image_shape = [28,28,1]
dim_z = 100
dim_W1 = 1024
dim_W2 = 128
dim_W3 = 64
dim_channel = 1
dim_y = 10
visualize_dimension=196

In [15]:
with tf.variable_scope("training_part") as scope:
    Z_tf, Y_tf, image_tf, d_cost_tf, g_cost_tf, p_real, p_gen = createModel(batch_size)
    session = tf.InteractiveSession()
    saver = tf.train.Saver(max_to_keep=10)

    scope.reuse_variables()
    Z_sample, Y_sample, image_sample = sample_creator(visualize_dimension)

dis_vars = filter(lambda x: x.name.startswith(scope.name+'/dis'), tf.global_variables())
gen_vars = filter(lambda x: x.name.startswith(scope.name+'/gen'), tf.global_variables())
dis_vars = [i for i in dis_vars]
gen_vars = [i for i in gen_vars]

In [16]:
train_op_dis, train_op_gen = optimizer_function(d_cost_tf, g_cost_tf, dis_vars, gen_vars)


In [17]:
tf.global_variables_initializer().run()

In [18]:
Z_np_sample = np.random.uniform(-1, 1, size=(visualize_dimension, dim_z))
Y_np_sample = OneHot(np.random.randint(10, size=[visualize_dimension]))
iterations = 0
k = 2

In [19]:
#Information variables of the training process
sample_creation = 200 #Iteration where a sample is going to be created
show_information = 25 #Iteration where the information is going to be showed

In [22]:
print("Starting the training process")
for epoch in range(n_epochs):
    index = np.arange(len(train_label))
    np.random.shuffle(index)
    train_data = train_data[index]
    train_label = train_label[index]

    for start, end in zip(
            range(0, len(train_label), batch_size),
            range(batch_size, len(train_label), batch_size)
            ):

        Xs = train_data[start:end].reshape( [-1, 28, 28, 1]) / 255.
        Ys = OneHot(train_label[start:end])
        Zs = np.random.uniform(-1, 1, size=[batch_size, dim_z]).astype(np.float32)

        if np.mod( iterations, k ) != 0:
            _, gen_loss_val = session.run([train_op_gen, g_cost_tf],feed_dict={Z_tf:Zs,Y_tf:Ys})
            discrim_loss_val, p_real_val, p_gen_val = session.run([d_cost_tf,p_real,p_gen],feed_dict={Z_tf:Zs, image_tf:Xs, Y_tf:Ys})

        else:
            _, discrim_loss_val = session.run([train_op_dis, d_cost_tf],feed_dict={Z_tf:Zs,Y_tf:Ys,image_tf:Xs})
            gen_loss_val, p_real_val, p_gen_val = session.run([g_cost_tf, p_real, p_gen],feed_dict={Z_tf:Zs, image_tf:Xs, Y_tf:Ys})

        if np.mod(iterations, show_information) == 0:
            print("========== Showing information =========")
            print("iteration:", iterations)
            print("gen loss:", gen_loss_val)
            print("discrim loss:", discrim_loss_val)
            print("Average P(real)=", p_real_val.mean())
            print("Average P(gen)=", p_gen_val.mean())

        if np.mod(iterations, sample_creation) == 0:
            generated_sample = session.run(image_sample,feed_dict={Z_sample:Z_np_sample,Y_sample:Y_np_sample})
            generated_samples = (generated_sample + 1.)/2.
            save_visualization(generated_samples, (14,14), save_path='image/sample_%04d.jpg' % int(iterations/sample_creation))

        iterations += 1

Starting the training process
iteration: 0
gen loss: 0.59991
discrim loss: 1.43471
Average P(real)= 0.488061
Average P(gen)= 0.486245


FileNotFoundError: [Errno 2] No such file or directory: 'image/sample_0000.jpg'