In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.image import imread
import matplotlib.gridspec as gridspec
from random import sample

from keras.layers import Input
from keras.models import Model
from keras.layers import Input, Activation, Add, Dropout,Reshape, Concatenate,PReLU,Embedding,multiply
from keras.layers.core import Dense,Activation,Flatten
from keras.layers import UpSampling2D, Lambda

from keras.models import Model,Sequential
from keras.layers.convolutional import Conv2D,UpSampling2D,Conv2DTranspose
from keras.layers.pooling import AvgPool2D
from keras.layers.normalization import BatchNormalization
from keras.layers.advanced_activations import LeakyReLU
from keras.optimizers import SGD, Adam, RMSprop
from keras import backend as K
import tensorflow as tf
import pandas as pd
from tqdm import tqdm_notebook
import matplotlib.pyplot as plt
from keras.preprocessing.image import load_img

In [None]:
class ACGAN():
    """
    一共12类。定义了带Resnet的G和D，Loss Function是按照官方Keras的ACGAN选的。
    之前有一次loss到了nan是因为最大类别数写错了。
    现在的版本是在mnist上面的版本。
    """
    def __init__(self):
        # Input shape
        self.img_rows = 96
        self.img_cols = 96
        self.channels = 3
        self.img_shape = (self.img_rows, self.img_cols,self.channels)
        self.num_classes = 12
        self.latent_dim = 100

        optimizer1 = Adam(0.0002, 0.5)
        optimizer2 = Adam(0.0002, 0.5)
        losses = ['binary_crossentropy', 'sparse_categorical_crossentropy']
        #losses = ['binary_crossentropy']
        loss_weight=[1.,0.5]
              
        # Build and compile the discriminator
        img_input = Input(shape=(self.img_rows,self.img_cols,self.channels))
        self.discriminator,valid, target_label = self.get_discriminator(img_input)
        self.discriminator.compile(loss=losses,
            optimizer=optimizer2,
            metrics=['accuracy'])      


        # Build the generator
        noise = Input(shape=(100,))
        label = Input(shape=(1,))


        # The generator takes noise and the target label as input
        # and generates the corresponding digit of that label
        self.generator = self.get_generator(noise,label)
        img = self.generator([noise,label])
        valid, target_label = self.discriminator(img)


        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated image as input and determines validity
        # and the label of that image
       
        # The combined model  (stacked generator and discriminator)
        # Trains the generator to fool the discriminator
        self.combined = Model([noise,label],[valid, target_label])
        self.combined.compile(loss=losses,loss_weights=loss_weight,
            optimizer=optimizer1)
        
    def SubpixelConv2D(self,name, scale=2):
        """
        Keras layer to do subpixel convolution.
        NOTE: Tensorflow backend only. Uses tf.depth_to_space

        :param scale: upsampling scale compared to input_shape. Default=2
        :return:
        """

        def subpixel_shape(input_shape):
            dims = [input_shape[0],
                    None if input_shape[1] is None else input_shape[1] * scale,
                    None if input_shape[2] is None else input_shape[2] * scale,
                    int(input_shape[3] / (scale ** 2))]
            output_shape = tuple(dims)
            return output_shape

        def subpixel(x):
            return tf.depth_to_space(x, scale)

        return Lambda(subpixel, output_shape=subpixel_shape, name=name)

    def get_generator(self,input_layer, condition_layer):
        def residual_block(input):
            x = Conv2D(64, kernel_size=3, strides=1, padding='same')(input)
            x = BatchNormalization(momentum=0.8)(x)
            x = PReLU(shared_axes=[1,2])(x)            
            x = Conv2D(64, kernel_size=3, strides=1, padding='same')(x)
            x = BatchNormalization(momentum=0.8)(x)
            x = Add()([x, input])
            return x

        def upsample(x, number):
            x = Conv2D(256, kernel_size=3, strides=1, padding='same', name='upSampleConv2d_'+str(number))(x)
            x = self.SubpixelConv2D('upSampleSubPixel_'+str(number), 2)(x)
            x = PReLU(shared_axes=[1,2], name='upSamplePReLU_'+str(number))(x)
            return x

        
        
        label_embedding = Flatten()(Embedding(self.num_classes, 100)(condition_layer))
        
        model_input = Concatenate()([input_layer,label_embedding])

        hid = Dense(512 * 12 * 12, activation='relu')(model_input)    
        hid = BatchNormalization(momentum=0.9)(hid)
        hid = LeakyReLU(alpha=0.1)(hid)
        x_start = Reshape((12, 12, 512))(hid)

        x_start = Conv2D(64, kernel_size=9, strides=1, padding='same')(x_start)
        x_start = PReLU(shared_axes=[1,2])(x_start)
        r = residual_block(x_start)
        for _ in range(9):
            r = residual_block(r)
        x = Conv2D(64, kernel_size=3, strides=1, padding='same')(r)
        x = BatchNormalization(momentum=0.8)(x)
        x = Add()([x, x_start])
        
        gen = UpSampling2D(size=(2,2))(x)
        gen = Conv2D(256, (3,3), padding='same')(gen)
        gen = BatchNormalization(momentum = 0.5)(gen)
        gen = LeakyReLU(0.2)(gen)   
        
        gen = UpSampling2D(size=(2,2))(gen)
        gen = Conv2D(256, (3,3), padding='same')(gen)
        gen = BatchNormalization(momentum = 0.5)(gen)
        gen = LeakyReLU(0.2)(gen) 
        
        gen = UpSampling2D(size=(2,2))(gen)
        gen = Conv2D(256, (3,3), padding='same')(gen)
        gen = BatchNormalization(momentum = 0.5)(gen)
        gen = LeakyReLU(0.2)(gen) 

        #x = upsample(x, 1)
        #x = upsample(x, 2)
        #x = upsample(x, 3)
        hr_output = Conv2D(
                3, 
                kernel_size=9, 
                strides=1, 
                padding='same', 
                activation='tanh'
            )(gen)
        model = Model(inputs=[input_layer, condition_layer], outputs=hr_output)  
        model.summary()
        return model

    def get_discriminator(self,input_layer):
            def conv2d_block(input, filters, strides=1, bn=True):
                d = Conv2D(filters, kernel_size=3, strides=strides, padding='same')(input)
                d = LeakyReLU(alpha=0.2)(d)
                if bn:
                    d = BatchNormalization(momentum=0.8)(d)
                return d
            def residual_block(input,filters=64):
                x = Conv2D(filters, kernel_size=3, strides=1, padding='same')(input)
                x = BatchNormalization(momentum=0.8)(x)
                x = PReLU(shared_axes=[1,2])(x)            
                x = Conv2D(filters, kernel_size=3, strides=1, padding='same')(x)
                x = BatchNormalization(momentum=0.8)(x)
                x = Add()([x, input])
                return x

            filters = 32
            x = Conv2D(filters, kernel_size=3, strides=2, padding='same')(input_layer)
            x = LeakyReLU(alpha=0.2)(x)
            x = residual_block(x,filters)
            #x = residual_block(x,filters)
            x = Conv2D(filters*2, kernel_size=3, strides=2, padding='same')(x)
            x = LeakyReLU(alpha=0.2)(x)
            #x = residual_block(x, filters*2)
            x = residual_block(x, filters*2)
            x = Conv2D(filters*4, kernel_size=3, strides=2, padding='same')(x)
            x = LeakyReLU(alpha=0.2)(x)
            #x = residual_block(x, filters*4)
            x = residual_block(x, filters*4)
            x = Conv2D(filters*8, kernel_size=3, strides=2, padding='same')(x)
            x = LeakyReLU(alpha=0.2)(x)
            #x = residual_block(x, filters*8)
            x = residual_block(x, filters*8)
            x = Conv2D(filters*16, kernel_size=3, strides=2, padding='same')(x)
            x = LeakyReLU(alpha=0.2)(x)
            #x = residual_block(x, filters*8)
            x = residual_block(x, filters*16)
            x = Conv2D(filters*32, kernel_size=3, strides=2, padding='same')(x)
            x = LeakyReLU(alpha=0.2)(x)
            x = Flatten()(x)
            x1 = Dense(1, activation='sigmoid')(x)
            x2 = Dense(13, activation='sigmoid')(x)
            
            model = Model(inputs=input_layer, outputs=[x1,x2])
            model.summary()
            return model,x1,x2
    def train(self,X_train, y_train, epochs, batch_size=128, sample_interval=50):

        # Load the dataset
        #(X_train, y_train), (_, _) = mnist.load_data()

        # Configure inputs
        #X_train = (X_train.astype(np.float32) - 127.5) / 127.5
        #X_train = np.expand_dims(X_train, axis=3)
        #y_train = y_train.reshape(-1, 1)
        #print('y_train',y_train)
        
        # Adversarial ground truths
        #self.generator.load_weights('./saved_model/generator_weights.hdf5')
        #self.discriminator.load_weights('./saved_model/discriminator_weights.hdf5')
        valid = np.ones((batch_size, 1))
        fake = np.zeros((batch_size, 1))

        for epoch in range(epochs):
            for mini_batch in range(X_train.shape[0]//batch_size-1):
                # ---------------------
                #  Train Discriminator
                # ---------------------

                # Select a random batch of images
                #idx = np.random.randint(0, X_train.shape[0], batch_size)
                idx = np.array([_ for _ in range(batch_size*mini_batch,batch_size*(mini_batch+1))])
                imgs = X_train[idx]

                # Sample noise as generator input
                noise = np.random.normal(0, 1, (batch_size, 100))
                #print('imgs',np.shape(imgs))
                # The labels of the digits that the generator tries to create an
                # image representation of
                sampled_labels = np.random.randint(0, self.num_classes, (batch_size, 1))
                # Generate a half batch of new images
                gen_imgs = self.generator.predict([noise, sampled_labels])
                #print('G',np.shape(gen_imgs))
                # Image labels. 0-11 if image is valid or 12 if it is generated (fake)
                img_labels = y_train[idx]
                fake_labels = self.num_classes * np.ones(img_labels.shape)
                #print('D',(self.discriminator.predict(gen_imgs)))
                # Train the discriminator
                if mini_batch % 5 == 0:
                    d_loss_real = self.discriminator.train_on_batch(imgs, [valid, img_labels])
                    #print('real_loss',d_loss_real)
                    d_loss_fake = self.discriminator.train_on_batch(gen_imgs, [fake, fake_labels])
                    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

                # ---------------------
                #  Train Generator
                # ---------------------

                # Train the generator
                g_loss = self.combined.train_on_batch([noise, sampled_labels], [valid, sampled_labels])

                # Plot the progress
                print ("Epoch%d,MiniBatch:%d [D loss: %f, acc.: %.2f%%, op_acc: %.2f%%] [G loss: %f]" % (epoch,mini_batch, d_loss[0], 100*d_loss[3], 100*d_loss[4], g_loss[0]))

                # If at save interval => save generated image samples
                if mini_batch % sample_interval == 0 or mini_batch == (X_train.shape[0]//batch_size-1):
                    self.save_model()
                    self.sample_images(epoch,mini_batch)
            

    def sample_images(self, epoch,mini_batch):
        r, c = 2, 6
        noise = np.random.normal(0, 1, (r * c, 100))
        sampled_labels = np.array([_ for _ in range(r*c)])
        gen_imgs = self.generator.predict([noise, sampled_labels])
        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt,:,:,:])
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig("images/flip_%d_%d.png" % (epoch,mini_batch))
        plt.close()
        

    def save_model(self):
        def save(model, model_name):
            model_path = "saved_model/%s.json" % model_name
            weights_path = "saved_model/%s_weights.hdf5" % model_name
            options = {"file_arch": model_path,
                        "file_weight": weights_path}
            json_string = model.to_json()
            open(options['file_arch'], 'w').write(json_string)
            model.save_weights(options['file_weight'])

        save(self.generator, "generator")
        save(self.discriminator, "discriminator")

In [None]:
acgan = ACGAN()

In [None]:
tags = pd.read_csv('./tags_hair_num.csv')
X_train = np.array([(np.array(load_img("./faces/{}.jpg".format(ids))) / 127.5 - 1) for ids in tqdm_notebook(tags.idx)])
y_train = np.array(tags.hair)


y_train = y_train.reshape(-1, 1)
np.shape(X_train)

In [None]:
X_train = np.array([np.fliplr(x) for x in X_train])

In [None]:
        from keras.datasets import mnist
        # Load the dataset
        (X_train, y_train), (_, _) = mnist.load_data()

        # Configure inputs
        X_train = (X_train.astype(np.float32) - 127.5) / 127.5
        X_train = np.expand_dims(X_train, axis=3)
        y_train = y_train.reshape(-1, 1)
        #print('y_train',y_train)

In [None]:
max(y_train)

In [None]:
acgan.train(X_train, y_train,epochs=50, batch_size=32, sample_interval=200)