AE with conditions

In [None]:
import os
import scipy.misc as scm
import numpy as np

def make_project_dir(project_dir):
    if not os.path.exists(project_dir):
        os.makedirs(project_dir)
        os.makedirs(os.path.join(project_dir, 'models'))
        os.makedirs(os.path.join(project_dir, 'result'))
        os.makedirs(os.path.join(project_dir, 'result_test'))


def get_image(img_path):
    img = scm.imread(img_path)/255. - 0.5
    img = img[..., ::-1]  # rgb to bgr
    return img

def get_label(path, size):
    label = int(path[-5])
    one_hot = np.zeros(size)
    one_hot[ label ] = 1.
    return one_hot

def inverse_image(img):
    img = (img + 0.5) * 255.
    img[img > 255] = 255
    img[img < 0] = 0
    img = img[..., ::-1] # bgr to rgb
    return img

def pair_expressions(paths):
    subject_exprs = []
    subject_pairs = []
    all_pairs = []
    last_subject = 0

    # Pair all expression of a subject
    for path in paths:
        subject = int(path[-9:-6])

        if subject != last_subject and last_subject != 0:
            subject_pairs = [(x, y) for x in subject_exprs for y in subject_exprs]
            all_pairs.extend(subject_pairs)
            subject_exprs = []

        subject_exprs.append(path)
        last_subject = subject

    # Last subject
    subject_pairs = [(x, y) for x in subject_exprs for y in subject_exprs]
    all_pairs.extend(subject_pairs)
    return all_pairs

def get_shape_c(tensor): # static shape
    return tensor.get_shape().as_list()


In [None]:
import tensorflow as tf
import numpy as np


def conv2d(x, filter_shape, bias=True, stride=1, padding="SAME", name="conv2d"):
    kw, kh, nin, nout = filter_shape
    pad_size = (kw - 1) / 2

    if padding == "VALID":
        x = tf.pad(x, [[0, 0], [pad_size, pad_size], [pad_size, pad_size], [0, 0]], "SYMMETRIC")

    initializer = tf.random_normal_initializer(0., 0.02)
    with tf.variable_scope(name):
        weight = tf.get_variable("weight", shape=filter_shape, initializer=initializer)
        x = tf.nn.conv2d(x, weight, [1, stride, stride, 1], padding=padding)

        if bias:
            b = tf.get_variable("bias", shape=filter_shape[-1], initializer=tf.constant_initializer(0.))
            x = tf.nn.bias_add(x, b)
    return x


def fc(x, output_shape, bias=True, name='fc'):
    shape = x.get_shape().as_list()
    dim = np.prod(shape[1:])
    x = tf.reshape(x, [-1, dim])
    input_shape = dim

    initializer = tf.random_normal_initializer(0., 0.02)
    with tf.variable_scope(name):
        weight = tf.get_variable("weight", shape=[input_shape, output_shape], initializer=initializer)
        x = tf.matmul(x, weight)

        if bias:
            b = tf.get_variable("bias", shape=[output_shape], initializer=tf.constant_initializer(0.))
            x = tf.nn.bias_add(x, b)
    return x


def pool(x, r=2, s=1):
    return tf.nn.avg_pool(x, ksize=[1, r, r, 1], strides=[1, s, s, 1], padding="SAME")


def l1_loss(x, y):
    return tf.reduce_mean(tf.abs(x - y))


def resize_nn(x, size):
    return tf.image.resize_nearest_neighbor(x, size=(int(size), int(size)))

In [None]:
import glob
import time
import numpy as np
import tensorflow as tf

class op_base:
    def __init__(self, sess, project_name):
        self.sess = sess

        # Train
        self.flag = True #args.flag
        self.gpu_number = 0 #args.gpu_number
        self.project = project_name #"test_began" #args.project

        # Train Data
        self.data_dir = "./Face_data/Faces_with_expression_label/dataset_64x64" #args.data_dir #./Data
        self.dataset = "expr" #args.dataset  # celeba
        self.data_size = 64 #args.data_size  # 64 or 128
        self.data_opt = "crop" #args.data_opt  # raw or crop
        self.data_label_vector_size = 7 #size of one-hot-encoded label vector

        # Train Iteration
        self.niter = 5000 #50 #args.niter
        self.niter_snapshot = 500 #args.nsnapshot
        self.max_to_keep = 5 #args.max_to_keep

        # Train Parameter
        self.batch_size = 16 #args.batch_size
        self.learning_rate = 1e-4 #args.learning_rate
        self.mm = 0.5 #args.momentum
        self.mm2 = 0.999 #args.momentum2
        self.lamda = 0.001 #args.lamda
        self.gamma = 0.5 #args.gamma
        self.filter_number = 64 #args.filter_number
        self.input_size = 64 #args.input_size
        self.embedding = 64 #args.embedding

        # Result Dir & File
        self.project_dir = 'assets/{0}_{1}_{2}_{3}/'.format(self.project, self.dataset, self.data_opt, self.data_size)
        self.ckpt_dir = os.path.join(self.project_dir, 'models')
        self.model_name = "{0}.model".format(self.project)
        self.ckpt_model_name = os.path.join(self.ckpt_dir, self.model_name)

        # etc.
        if not os.path.exists('assets'):
            os.makedirs('assets')
        make_project_dir(self.project_dir)

    def load(self, sess, saver, ckpt_dir):
        ckpt = tf.train.get_checkpoint_state(ckpt_dir)
        ckpt_name = os.path.basename(ckpt.model_checkpoint_path)
        saver.restore(sess, os.path.join(ckpt_dir, ckpt_name))

In [None]:
import glob
import time
import datetime


class Operator(op_base):
    def __init__(self, sess, project_name):
        op_base.__init__(self, sess, project_name)
        self.build_model()

    def build_model(self):
        # Input placeholder
        self.x = tf.placeholder(tf.float32, shape=[None, self.data_size, self.data_size, 3], name='x')
        self.y = tf.placeholder(tf.float32, shape=[None, self.data_size, self.data_size, 3], name='y')
        self.c = tf.placeholder(tf.float32, shape=[None, self.data_label_vector_size], name='c')
        self.lr = tf.placeholder(tf.float32, name='lr')

        # AE
        self.ae = self.decoder(self.encoder(self.x), self.c)
        self.ae_test = self.decoder(self.encoder(self.x, reuse=True), self.c, reuse=True)

        # Loss
        self.ae_loss = l1_loss(self.y, self.ae)
        #self.test_loss = l1_loss(self.y, self.ae_test)

        # Variables
        ae_vars = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, "ae_")

        # Optimizer
        self.opt_ae = tf.train.AdamOptimizer(self.lr, self.mm).minimize(self.ae_loss, var_list=ae_vars)

        # initializer
        self.sess.run(tf.global_variables_initializer())

        # tf saver
        self.saver = tf.train.Saver(max_to_keep=(self.max_to_keep))

        try:
            self.load(self.sess, self.saver, self.ckpt_dir)
        except:
            # save full graph
            self.saver.save(self.sess, self.ckpt_model_name, write_meta_graph=True)

        # Summary
        if self.flag:
            tf.summary.scalar('loss/loss', self.ae_loss)
            #tf.summary.scalar('loss/test_loss', self.test_loss)
            self.merged = tf.summary.merge_all()
            self.writer = tf.summary.FileWriter(self.project_dir, self.sess.graph)

    def train(self, train_flag):
        # load data
        data_path = self.data_dir
        #test_data_path = "./Face_data/Faces_with_expression_label/dataset_64x64" # expressions

        if os.path.exists(data_path + '.npy'):
            data = np.load(data_path + '.npy')
        else:
            data = sorted(glob.glob(os.path.join(data_path, "*.*")))
            np.save(data_path + '.npy', data)
            
#         if os.path.exists(test_data_path + '.npy'):
#             test_data = np.load(test_data_path + '.npy')
#         else:
#             test_data = sorted(glob.glob(os.path.join(test_data_path, "*.*")))
#             np.save(test_data_path + '.npy', test_data)

        print('Shuffle ....')
        random_order = np.random.permutation(len(data))
        data = [data[i] for i in random_order[:]]
        print('Shuffle Done')

        # initial parameter
        start_time = time.time()
        lr = np.float32(self.learning_rate)
        self.count = 0

        for epoch in range(self.niter):
            batch_idxs = len(data) // self.batch_size
            
#             # Get a random batch of test data for the whole epoch
#             test_random_order = np.random.permutation(len(test_data))
#             test_data = [test_data[i] for i in test_random_order[:]]
#             test_files = test_data[0:self.batch_size]
#             test_data_batch = [get_image(test_file[0]) for test_file in test_files]

            for idx in range(0, batch_idxs):
                self.count += 1

                batch_files = data[idx * self.batch_size: (idx + 1) * self.batch_size]
                batch_inputs = [get_image(batch_file[0]) for batch_file in batch_files]
                batch_targets = [get_image(batch_file[1]) for batch_file in batch_files]
                batch_target_labels = [get_label(batch_file[1], self.data_label_vector_size) for batch_file in batch_files]

                # opt & feed list 
                ae_opt = [self.opt_ae, self.ae_loss, self.merged]
                feed_dict = {self.x: batch_inputs, self.y: batch_targets, self.c: batch_target_labels, self.lr: lr}

                # run tensorflow
                _, loss_ae, summary = self.sess.run(ae_opt, feed_dict=feed_dict)
                
#                 # test & feed list 
#                 ae_test = [self.test_loss, self.ae_test, self.merged]
#                 feed_dict = {self.x: test_data_batch, self.y: test_data_batch}

#                 # run tensorflow
#                 test_loss, _, summary = self.sess.run(ae_test, feed_dict=feed_dict)
        
                if self.count % 100 == 1:
                    print("Epoch: [%2d] [%4d/%4d] time: %4.4f, "
                          "loss: %.4f"
                          % (epoch, idx, batch_idxs, time.time() - start_time,
                             loss_ae))

                # write train summary
                self.writer.add_summary(summary, self.count)

                # Test during Training
                if self.count % self.niter_snapshot == (self.niter_snapshot - 1):
                    # save & test
                    self.saver.save(self.sess, self.ckpt_model_name, global_step=self.count, write_meta_graph=False)
                    #self.test_expr(train_flag)
                    self.test_celebra(train_flag)

    def test_celebra(self, train_flag=True):
        print('Test Sample Generation...')
        # generate output
        img_num = 8*8
        output_f = int(np.sqrt(img_num))
        in_img_num = output_f
        img_size = self.data_size
        gen_img_num = img_num - output_f
        label_size = self.data_label_vector_size
        
        # load data test
        data_path = "./Face_data/Celeba/dataset_64x64" # expressions

        if os.path.exists(data_path + '.npy'):
            data = np.load(data_path + '.npy')
        else:
            data = sorted(glob.glob(os.path.join(data_path, "*.*")))
            np.save(data_path + '.npy', data)

        # shuffle test data
        random_order = np.random.permutation(len(data))
        data = [data[i] for i in random_order[:]]

        im_output_gen = np.zeros([img_size * output_f, img_size * output_f, 3])

        test_files = data[0: output_f]
        test_data = [get_image(test_file) for test_file in test_files]
        test_data = np.repeat(test_data, [label_size]*in_img_num, axis=0)
        test_data_o = [scm.imread(test_file) for test_file in test_files]
        
        # get one-hot labels
        int_labels = list(range(label_size))
        one_hot = np.zeros((label_size, label_size))
        one_hot[np.arange(label_size), int_labels] = 1
        target_labels = np.tile(one_hot, (output_f, 1))
        
        
        output_gen = (self.sess.run(self.ae_test, feed_dict={self.x: test_data, 
                                                             self.c: target_labels}))  # generator output

        output_gen = [inverse_image(output_gen[i]) for i in range(gen_img_num)]

        for i in range(output_f):
            for j in range(output_f):
                if j == 0:
                    im_output_gen[i * img_size:(i + 1) * img_size, j * img_size:(j + 1) * img_size, :] \
                        = test_data_o[i]
                else:
                    im_output_gen[i * img_size:(i + 1) * img_size, j * img_size:(j + 1) * img_size, :] \
                        = output_gen[(j-1) + (i * int(output_f-1))]

        # output save
        scm.imsave(self.project_dir + '/result/' + str(self.count) + '_celebra_output.bmp', im_output_gen)
        
    def test_expr(self, train_flag=True):
        print('Test Sample Generation...')
        # generate output
        img_num =  36 #self.batch_size
        display_img_num = int(img_num / 3)
        img_size = self.data_size

        output_f = int(np.sqrt(img_num))
        im_output_gen = np.zeros([img_size * output_f, img_size * output_f, 3])
        
        # load data
        data_path = self.data_dir

        if os.path.exists(data_path + '.npy'):
            data = np.load(data_path + '.npy')
        else:
            data = sorted(glob.glob(os.path.join(data_path, "*.*")))
            data = pair_expressions(data)
            np.save(data_path + '.npy', data)

        # Test data shuffle
        random_order = np.random.permutation(len(data))
        data = [data[i] for i in random_order[:]]
        
        batch_files = data[0: display_img_num]
        test_inputs = [get_image(batch_file[0]) for batch_file in batch_files]
        test_inputs_o = [scm.imread((batch_file[0])) for batch_file in batch_files]
        test_targets = [scm.imread((batch_file[1])) for batch_file in batch_files]
        test_target_labels = [get_label(batch_file[1], self.data_label_vector_size) for batch_file in batch_files]

        output_gen = (self.sess.run(self.ae_test, feed_dict={self.x: test_inputs, 
                                                             self.c: test_target_labels}))  # generator output

        output_gen = [inverse_image(output_gen[i]) for i in range(display_img_num)]

        for i in range(output_f): # row
            for j in range(output_f): # col
                if j % 3 == 0: # input img
                    im_output_gen[i * img_size:(i + 1) * img_size, j * img_size:(j + 1) * img_size, :] \
                        = test_inputs_o[int(j / 3) + (i * int(output_f / 3))]
                elif j % 3 == 1: # output img
                    im_output_gen[i * img_size:(i + 1) * img_size, j * img_size:(j + 1) * img_size, :] \
                        = output_gen[int(j / 3) + (i * int(output_f / 3))]
                else: # target img
                    im_output_gen[i * img_size:(i + 1) * img_size, j * img_size:(j + 1) * img_size, :] \
                        = test_targets[int(j / 3) + (i * int(output_f / 3))]
                   

        labels = np.argmax(test_target_labels, axis=1)
        label_string = ''.join(str(int(l)) for l in labels)
        # output save
        scm.imsave(self.project_dir + '/result/' + str(self.count) + '_' + label_string 
                   + '_output.bmp', im_output_gen)
        

In [None]:

class AE(Operator):
    def __init__(self, sess, project_name):
        Operator.__init__(self, sess, project_name)


    def encoder(self, x, reuse=None):
        with tf.variable_scope('ae_') as scope:
            if reuse:
                scope.reuse_variables()

            f = self.filter_number
            h = self.embedding
            p = "SAME"

            x = conv2d(x, [3, 3, 3, f], stride=1,  padding=p,name='conv1_enc_a')
            x = tf.nn.elu(x)

            x = conv2d(x, [3, 3, f, f], stride=1,  padding=p,name='conv2_enc_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, f, f], stride=1,  padding=p,name='conv2_enc_b')
            x = tf.nn.elu(x)

            x = conv2d(x, [1, 1, f, 2 * f], stride=1,  padding=p,name='conv3_enc_0')
            x = pool(x, r=2, s=2)
            x = conv2d(x, [3, 3, 2 * f, 2 * f], stride=1,  padding=p,name='conv3_enc_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, 2 * f, 2 * f], stride=1,  padding=p,name='conv3_enc_b')
            x = tf.nn.elu(x)

            x = conv2d(x, [1, 1, 2 * f, 3 * f], stride=1,  padding=p,name='conv4_enc_0')
            x = pool(x, r=2, s=2)
            x = conv2d(x, [3, 3, 3 * f, 3 * f], stride=1,  padding=p,name='conv4_enc_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, 3 * f, 3 * f], stride=1,  padding=p,name='conv4_enc_b')
            x = tf.nn.elu(x)

            x = conv2d(x, [1, 1, 3 * f, 4 * f], stride=1,  padding=p,name='conv5_enc_0')
            x = pool(x, r=2, s=2)
            x = conv2d(x, [3, 3, 4 * f, 4 * f], stride=1,  padding=p,name='conv5_enc_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, 4 * f, 4 * f], stride=1,  padding=p,name='conv5_enc_b')
            x = tf.nn.elu(x)

            if self.data_size == 128:
                x = conv2d(x, [1, 1, 4 * f, 5 * f], stride=1,  padding=p,name='conv6_enc_0')
                x = pool(x, r=2, s=2)
                x = conv2d(x, [3, 3, 5 * f, 5 * f], stride=1,  padding=p,name='conv6_enc_a')
                x = tf.nn.elu(x)
                x = conv2d(x, [3, 3, 5 * f, 5 * f], stride=1,  padding=p,name='conv6_enc_b')
                x = tf.nn.elu(x)

            x = fc(x, h, name='enc_fc')
        return x

    def decoder(self, x, c, reuse=None):
        with tf.variable_scope('ae_') as scope:
            if reuse:
                scope.reuse_variables()

            w = self.data_size
            f = self.filter_number
            p = "SAME"
            
            x = tf.concat([x, c], axis=1) # adding label information

            x = fc(x, 8 * 8 * f, name='fc')
            x = tf.reshape(x, [-1, 8, 8, f])

            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv1_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv1_b')
            x = tf.nn.elu(x)

            if self.data_size == 128:
                x = resize_nn(x, w / 8)
                x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv2_a')
                x = tf.nn.elu(x)
                x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv2_b')
                x = tf.nn.elu(x)

                x = resize_nn(x, w / 4)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv3_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv3_b')
            x = tf.nn.elu(x)

            x = resize_nn(x, w / 2)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv4_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv4_b')
            x = tf.nn.elu(x)

            x = resize_nn(x, w)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv5_a')
            x = tf.nn.elu(x)
            x = conv2d(x, [3, 3, f, f], stride=1, padding=p, name='conv5_b')
            x = tf.nn.elu(x)

            x = conv2d(x, [3, 3, f, 3], stride=1, padding=p, name='conv6_a')
        return x

In [None]:
import distutils.util
import os
import tensorflow as tf

''' config settings '''

project_name = "ConditionalAE_Face_2"
train_flag = True

'''-----------------'''

gpu_number = "0"
os.environ["CUDA_VISIBLE_DEVICES"] = "0" #args.gpu_number

with tf.device('/gpu:{0}'.format(gpu_number)):
    gpu_options = tf.GPUOptions(per_process_gpu_memory_fraction=0.90)
    config = tf.ConfigProto(allow_soft_placement=True, gpu_options=gpu_options)

    with tf.Session(config=config) as sess:
        model = AE(sess, project_name)

        # TRAIN / TEST
        if train_flag:
            model.train(train_flag)
        else:
            model.test(train_flag)