# DCGAN implementation by tensorflow

In [1]:
import tensorflow as tf
import os
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt

# utility func

In [2]:
# save metrics
def save_metrics(metrics, epoch=None):
    # make directory if there is not
    path = "metrics_DC"
    if not os.path.isdir(path):
        os.makedirs(path)

    # save metrics
    plt.figure(figsize=(10,8))
    plt.plot(metrics["d_loss"], label="discriminative loss", color="b")
    plt.legend()
    plt.savefig(os.path.join(path, "dloss" + str(epoch) + ".png"))
    plt.close()

    plt.figure(figsize=(10,8))
    plt.plot(metrics["g_loss"], label="generative loss", color="r")
    plt.legend()
    plt.savefig(os.path.join(path, "g_loss" + str(epoch) + ".png"))
    plt.close()

    plt.figure(figsize=(10,8))
    plt.plot(metrics["g_loss"], label="generative loss", color="r")
    plt.plot(metrics["d_loss"], label="discriminative loss", color="b")
    plt.legend()
    plt.savefig(os.path.join(path, "both_loss" + str(epoch) + ".png"))
    plt.close()

In [3]:
# plot images
def save_imgs(images, plot_dim=(10,6), size=(6,10), epoch=None):
    # make directory if there is not
    path = "generated_figures_DC"
    if not os.path.isdir(path):
        os.makedirs(path)

    num_examples = plot_dim[0]*plot_dim[1]
    num_examples = 60
    fig = plt.figure(figsize=size)

    for i in range(num_examples):
        plt.subplot(plot_dim[0], plot_dim[1], i+1)
        img = images[i, :]
        img = img.reshape((32, 32, 3))
        plt.tight_layout()
        plt.imshow(img)
        plt.axis("off")
    plt.subplots_adjust(wspace=0.1, hspace=0.1)
    plt.savefig(os.path.join(path, str(epoch) + ".png"))
    plt.close()

In [4]:
def save_imgs_(imgs, path, filename):
    if not os.path.isdir(path):
        os.mkdir(path)

    # print(imgs.shape)

    for i in range(imgs.shape[0]):
        img = imgs[i,:]
        img = img.reshape((32,32,3))
        # print(type(img))
        # pil_img = Image.fromarray(img)
        pil_img = Image.fromarray(img.astype('uint8'))
        pil_img.save(os.path.join(path, str(filename + i) + ".jpg"))

In [5]:
def save_imgs_2(imgs, path, filename):
    if not os.path.isdir(path):
        os.mkdir(path)

    # print(imgs.shape)

    for i in range(imgs.shape[0]):
        img = imgs[i,:]
        img = img.reshape((32,32,3))
        # print(type(img))
        # pil_img = Image.fromarray(img)
        # pil_img = Image.fromarray(img.astype('uint8'))
        # pil_img.save(os.path.join(path, str(filename + i) + ".jpg"))
    
        fig = plt.figure()
        fig.set_size_inches(1, 1, forward=False)
        ax = plt.Axes(fig, [0., 0., 1., 1.])
        ax.set_axis_off()
        fig.add_axes(ax)
        ax.imshow(img)
        plt.savefig(os.path.join(path, str(filename + i) + ".jpg"), dpi = 32)
        plt.close()

In [6]:
# training
import pickle
import numpy as np
import os

def unpickle(file):
    fo = open(file, 'rb')
    #print(file)
    dict = pickle.load(fo, encoding='latin1')
    fo.close()
    return dict

def one_hot_vec(label):
    vec = np.zeros(10)
    vec[label] = 1
    return vec

def load_data():
    x_all = []
    y_all = []
    for i in range (5):
        d = unpickle("cifar-10-batches-py/data_batch_" + str(i+1))
        x_ = d['data']
        y_ = d['labels']
        x_all.append(x_)
        y_all.append(y_)

    d = unpickle('cifar-10-batches-py/test_batch')
    x_all.append(d['data'])
    y_all.append(d['labels'])

    x = -0.5 + (np.concatenate(x_all) / np.float32(255))
    y = np.concatenate(y_all)
    x = np.dstack((x[:, :1024], x[:, 1024:2048], x[:, 2048:]))
    x = x.reshape((x.shape[0], 32, 32, 3))

    #pixel_mean = np.mean(x[0:50000],axis=0)
    #x -= pixel_mean
    y = np.array(list(map(one_hot_vec, y)))
    X_train = x[0:50000,:,:,:]
    Y_train = y[0:50000]
    #X_test = x[50000:,:,:,:]
    #Y_test = y[50000:]

    #return (X_train, Y_train, X_test, Y_test)
    return X_train, Y_train

# model

In [7]:
class Generator:
    def __init__(self):
        self.reuse = False
        self.initializer = tf.contrib.layers.xavier_initializer()
        self.X_dim = 32*32*3
        self.z_dim = 128
        self.y_dim = 10
        self.DIM = 128

    def __call__(self, z, y, training=False):
        with tf.variable_scope('g', reuse=self.reuse):
            #print(z.get_shape())
            #print(y.get_shape())
            inputs = tf.concat([z, y], 1)
            inputs = tf.reshape(inputs, [-1, self.z_dim + self.y_dim])
            #fc1 = z
            fc1 = tf.layers.dense(inputs, 2*2*1024, kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # noise???
            fc1 = tf.layers.batch_normalization(fc1)
            fc1 = tf.nn.relu(fc1)
            fc1 = tf.reshape(fc1, [-1, 2, 2, 1024])

            conv1 = tf.layers.conv2d_transpose(fc1, 512, [4,4],[2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 4x4
            conv1 = tf.layers.batch_normalization(conv1)
            conv1 = tf.nn.relu(conv1)  

            conv2 = tf.layers.conv2d_transpose(conv1, 256, [4,4],[2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 8x8
            conv2 = tf.layers.batch_normalization(conv2)
            conv2 = tf.nn.relu(conv2)

            conv3 = tf.layers.conv2d_transpose(conv2, 128, [4,4],[2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 16x16
            conv3 = tf.layers.batch_normalization(conv3)
            conv3 = tf.nn.relu(conv3)

            conv4 = tf.layers.conv2d_transpose(conv3, 3, [4,4],[2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 32x32
            conv4 = tf.layers.batch_normalization(conv4)
            conv4 = tf.nn.tanh(conv4)
            conv4 = tf.reshape(conv4, [-1, 32*32*3])
            print(conv4.get_shape())
            outputs = conv4
        self.reuse = True
        self.variables = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='g')
        return outputs

In [8]:
class Discriminator:
    def __init__(self):
        self.reuse = False
        self.X_dim = 32*32*3
        self.initializer = tf.contrib.layers.xavier_initializer()

    def __call__(self, x, y, training=False, name=''):
        def leaky_relu(x, leak=0.2, name='outputs'):
            return tf.maximum(x, x * leak, name=name)

        with tf.name_scope('d' + name), tf.variable_scope('d', reuse=self.reuse):
            x = tf.reshape(x, [-1, 32, 32, 3])
            y = tf.reshape(y, [-1, 1, 1, 10])
            y = tf.tile(y, [1, 16, 16, 1])
            
            #print(x.get_shape())
            #print(y.get_shape())

            conv1 = tf.layers.conv2d(x, 64, [4,4], [2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 16x16x64
            conv1 = leaky_relu(conv1)

            print(conv1.get_shape())

            conv1 = tf.concat([conv1, y], 3)

            conv2 = tf.layers.conv2d(conv1, 128, [4,4], [1,1], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 16x16
            conv2 = tf.layers.batch_normalization(conv2)
            conv2 = leaky_relu(conv2)

            conv3 = tf.layers.conv2d(conv2, 256, [4,4], [2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 8x8
            conv3 = tf.layers.batch_normalization(conv3)
            conv3 = leaky_relu(conv3)

            conv4 = tf.layers.conv2d(conv3, 512, [4,4], [1,1], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 8x8
            conv4 = tf.layers.batch_normalization(conv4)
            conv4 = leaky_relu(conv4)

            conv5 = tf.layers.conv2d(conv4, 512, [4,4], [2,2], padding="SAME", kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02)) # 4x4
            conv5 = tf.contrib.layers.flatten(conv5)
            outputs = tf.layers.dense(conv5, 1, kernel_initializer=tf.truncated_normal_initializer(0.0, 0.02))
            #outputs = tf.nn.sigmoid(outputs)

        self.reuse = True
        self.variables = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope='d')
        return outputs

In [9]:
# f(x) = ||h(x) - h(x_)|| - ||h(x)||
class Critic(object):
    def __init__(self, h):
        self.h = h
    def __call__(self, x, x_):
        return tf.norm(self.h(x) - self.h(x_), axis=1) - tf.norm(self.h(x), axis=1)

# f(x) = ||h(x) - h(x_)||
class calc_Norm(object):
    def __init__(self, h):
        self.h = h
    def __call__(self, x, x_):
        return tf.norm(self.h(x) -self.h(x_))

In [10]:
class GAN:
    def __init__(self):
        self.batch_size = 100
        img_size = 32
        num_class = 10
        self.z_dim = 128

        self.epochs = 1000000
        self.epoch_saveMetrics = 500
        self.epoch_saveSampleImg = 500
        self.epoch_saveParamter = 10000
        self.losses = {"d_loss":[], "g_loss":[]}

        self.x, self.y = load_data()

        self.X_tr = tf.placeholder(tf.float32, shape=[None, img_size, img_size, 3])
        self.Y_tr = tf.placeholder(tf.float32, shape=[None, num_class])
        self.Z1 = tf.placeholder(tf.float32, [None, self.z_dim])
        
        self.g = Generator()
        self.d = Discriminator()
        self.Xg = self.g(self.Z1, self.Y_tr)

    def loss_WGANgp(self):
        Xg = tf.reshape(self.Xg, [self.batch_size, -1]) 
        Xr = tf.reshape(self.X_tr, [self.batch_size, -1])
        epsilon = tf.random_uniform([], 0.0, 1.0) # 怪しい
        Xhat = epsilon * Xr + (1-epsilon) * Xg   
        dhat = self.d(Xhat, self.Y_tr)
        ddx = tf.gradients(dhat, Xhat)[0]
        #print((ddx.get_shape().as_list()))
        ddx = tf.norm(ddx, axis=1)
        ddx = tf.reduce_mean(tf.square(ddx - 1.0) * 10)      
  
        L_d = self.d(Xg, self.Y_tr) - self.d(Xr, self.Y_tr) + ddx
        L_g = -self.d(Xg, self.Y_tr)
        return L_g, L_d
    
    def loss_DC(self):
        Dr = self.d(self.X_tr, self.Y_tr)
        Dg = self.d(self.Xg, self.Y_tr)

        g_cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Dg, labels=tf.ones_like(Dg)))
        d_cost = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Dg, labels=tf.zeros_like(Dg)))
        d_cost += tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=Dr, labels=tf.ones_like(Dr)))
        d_cost /= 2.

        return g_cost, d_cost

    def train(self):
        # Optimizer
        learning_rate = 1e-4
        # learning_rate = 2e-4
        beta1 = 0.5

        self.L_g, self.L_d = self.loss_DC()

        d_opt = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=beta1)
        d_train_op = d_opt.minimize(self.L_d, var_list=self.d.variables)
        g_opt = tf.train.AdamOptimizer(learning_rate=learning_rate, beta1=beta1)
        g_train_op = g_opt.minimize(self.L_g, var_list=self.g.variables)

        saver = tf.train.Saver()
        #%debug

        config = tf.ConfigProto(
            gpu_options=tf.GPUOptions(
                visible_device_list="0"
            )
        )
        
        with tf.Session(config=config) as sess:
            #sess.run(tf.global_variables_initializer())
            epoch_pre = 20001
            path = "model_DC"
            saver.restore(sess, path+"/dcgan_model" + str(epoch_pre) + ".ckpt")
            z_test = np.random.uniform(-1, 1, size=[60, self.z_dim])
            y_tmp = np.repeat(np.arange(10), 6)
            y_test = np.array(list(map(one_hot_vec, y_tmp)))
            print(y_test.shape)
            print(y_test)

            for epoch in range(self.epochs):              
                for _ in range(1):
                    # 訓練データを抜粋
                    z1 = np.random.uniform(-1, 1, size=[self.batch_size, self.z_dim])

                    rand_id = np.random.randint(0, self.x.shape[0], size=self.batch_size)
                    X_mb = self.x[rand_id]
                    Y_mb = self.y[rand_id]

                    # X_mb = X_mb/255
                    X_mb = X_mb.astype(np.float32)

                    # Y_mb = Y_mb/255
                    Y_mb = Y_mb.astype(np.float32)
                    
                    _, d_loss_value = sess.run([d_train_op, self.L_d], feed_dict={self.X_tr: X_mb, self.Y_tr: Y_mb, self.Z1:z1})

                # train G
                _, g_loss_value = sess.run([g_train_op, self.L_g], feed_dict={self.X_tr: X_mb, self.Y_tr: Y_mb, self.Z1: z1})

                # generate Sample Imgs
                #sampleImgsOfX2Y, sampleImgsOfY2X = sess.run([self.X2Y, self.Y2X], feed_dict={self.X_tr: X_mb, self.Y_tr: Y_mb})

                # 結果をappend
                self.losses["d_loss"].append(np.sum(d_loss_value))
                self.losses["g_loss"].append(np.sum(g_loss_value))
                
                if epoch % 100 == 0:
                    print("epoch:" + str(epoch+epoch_pre))

                # lossの可視化
                if epoch % self.epoch_saveMetrics == 1:
                    save_metrics(self.losses, epoch)

                # 画像の変換テスト
                if epoch % self.epoch_saveSampleImg == 0:
                    img = sess.run(self.Xg, feed_dict={self.Z1: z_test, self.Y_tr: y_test})
                    img += 0.5
                    save_imgs(img, epoch=str(epoch+epoch_pre))
                # parameterのsave
                if epoch % self.epoch_saveParamter == 1:
                    path = "model_DC"
                    if not os.path.isdir(path):
                        os.makedirs(path)

                    saver.save(sess, path+"/dcgan_model" + str(epoch+epoch_pre) + ".ckpt")
       


    def test(self):
        config = tf.ConfigProto(
            gpu_options=tf.GPUOptions(
                visible_device_list="0"
            )
        )

        saver = tf.train.Saver()
        #%debug
        
        with tf.Session(config=config) as sess:
            #sess.run(tf.global_variables_initializer())
            epoch_pre = 30002
            path = "model_DC"
            saver.restore(sess, path+"/dcgan_model" + str(epoch_pre) + ".ckpt")
            #z_test = np.random.uniform(-1, 1, size=[60, self.z_dim])
            #y_tmp = np.repeat(np.arange(10), 6)
            #y_test = np.array(list(map(one_hot_vec, y_tmp)))
            #print(y_test.shape)
            #print(y_test)

            num_imgs_per_class = 100000
            # クラスごとに画像生成
            for class_ in range(10):
                # i 番目のクラス
                i = 0
                y_class = np.repeat(class_, self.batch_size)
                y_class = np.array(list(map(one_hot_vec, y_class)))

                while(i < num_imgs_per_class):

                    z1 = np.random.uniform(-1, 1, size=[self.batch_size, self.z_dim])
                    Y_mb = y_class
                    Y_mb = Y_mb.astype(np.float32)
                    
                    imgs = sess.run(self.Xg, feed_dict={self.Z1: z1, self.Y_tr:Y_mb})
                    imgs += 0.5
                    # save_imgs_(imgs, str(class_), i)
                    save_imgs_2(imgs, str(class_), i)
                    i += self.batch_size

    def sample_images(self, row=5, col=12, inputs=None, epoch=None):
        images = self.g(inputs, training=True)
        return images

In [11]:
gan = GAN()
#gan.train()
gan.test()

(?, 3072)


