In [22]:
import os
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from keras.layers import Input, concatenate
from keras.models import Model, Sequential
from keras.layers.core import Reshape, Dense, Dropout, Flatten
from keras.layers.advanced_activations import LeakyReLU
from keras.layers.convolutional import Conv2D, UpSampling2D
from keras.datasets import mnist
from keras.utils import np_utils
from keras.optimizers import Adam
from keras import backend as K
from keras import initializers
%matplotlib inline

In [None]:
np.random.seed(1000)

# dimention of input random noise
randomDim = 100
# dimention of input label
labelDim = 10

# Load MNIST data
# 0~255 -> -1~1
# (Batch, 1, H, W)
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = (X_train.astype(np.float32) - 127.5)/127.5
X_train = X_train[:, np.newaxis, :, :]

# Optimizer
adam = Adam(lr=0.0002, beta_1=0.5)

# Generator
generator = Sequential()
# (110, , )
generator.add(Dense(128*7*7, input_dim=randomDim+labelDim, kernel_initializer=initializers.RandomNormal(stddev=0.02)))
generator.add(LeakyReLU(0.2))
generator.add(Reshape((128, 7, 7)))
# (128, 7, 7)
generator.add(UpSampling2D(size=(2, 2), data_format='channels_first'))
# (128, 14, 14)
generator.add(Conv2D(64, kernel_size=(5, 5), padding='same', data_format='channels_first'))
generator.add(LeakyReLU(0.2))
# (64, 14, 14)
generator.add(UpSampling2D(size=(2, 2), data_format='channels_first'))
# (64, 28, 28)
generator.add(Conv2D(1, kernel_size=(5, 5), padding='same', activation='tanh', data_format='channels_first'))
# (1, 28, 28)
generator.compile(loss='binary_crossentropy', optimizer=adam)

# Discriminator
discriminator = Sequential()
# (1, 28, 28)
discriminator.add(Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding='same', input_shape=(1, 28, 28), kernel_initializer=initializers.RandomNormal(stddev=0.02), data_format='channels_first'))
discriminator.add(LeakyReLU(0.2))
discriminator.add(Dropout(0.3))
# (64, 14, 14)
discriminator.add(Conv2D(128, kernel_size=(5, 5), strides=(2, 2), padding='same', data_format='channels_first'))
discriminator.add(LeakyReLU(0.2))
discriminator.add(Dropout(0.3))
# (128, 7, 7)
discriminator.add(Flatten())
# (128*7*7, , )
discriminator.add(Dense(1, activation='sigmoid'))
# (1, , )
discriminator.compile(loss='binary_crossentropy', optimizer=adam)

# Combined network
discriminator.trainable = False
ganInput = Input(shape=(randomDim,))
x = generator(ganInput)
ganOutput = discriminator(x)
gan = Model(inputs=ganInput, outputs=ganOutput)
gan.compile(loss='binary_crossentropy', optimizer=adam)

dLosses = []
gLosses = []

In [26]:
np.random.seed(1000)

# dimention of input random noise
randomDim = 100
# dimention of input label
labelDim = 10

# Load MNIST data
# 0~255 -> -1~1
# (Batch, 1, H, W)
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = (X_train.astype(np.float32) - 127.5)/127.5
X_train = X_train[:, np.newaxis, :, :]
y_train = np_utils.to_categorical(y_train, nb_classes)

# Optimizer
adam = Adam(lr=0.0002, beta_1=0.5)

# Generator
def Generator():
    z = Input(shape=(randomDim,))
    y = Input(shape=(labelDim,))
    x = concatenate([z, y])
    x = Dense(128*7*7, input_dim=randomDim+labelDim, kernel_initializer=initializers.RandomNormal(stddev=0.02))(x)
    x = LeakyReLU(0.2)(x)
    x = Reshape((128, 7, 7))(x)
    x = UpSampling2D(size=(2, 2), data_format='channels_first')(x)
    x = Conv2D(64, kernel_size=(5, 5), padding='same', data_format='channels_first')(x)
    x = LeakyReLU(0.2)(x)
    x = UpSampling2D(size=(2, 2), data_format='channels_first')(x)
    x = Conv2D(1, kernel_size=(5, 5), padding='same', activation='tanh', data_format='channels_first')(x)
    generator = Model(inputs=[z, y], outputs=x)
    return generator

# Discriminator
def Discriminator():
    genimgs = Input(shape=(1, 28, 28))
    y = Input(shape=(labelDim,))
    w = Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding='same', input_shape=(1, 28, 28), kernel_initializer=initializers.RandomNormal(stddev=0.02), data_format='channels_first')(genimgs)
    w = LeakyReLU(0.2)(w)
    w = Dropout(0.3)(w)
    w = Conv2D(128, kernel_size=(5, 5), strides=(2, 2), padding='same', data_format='channels_first')(w)
    w = LeakyReLU(0.2)(w)
    w = Dropout(0.3)(w)
    w = Flatten()(w)
    w = concatenate([w, y])
    w = Dense(1, activation='sigmoid')(w)
    discriminator = Model(inputs=[genimgs, y], outputs=w)
    return discriminator

# Combined network
def ConditionalGAN():
    z = Input(shape=(randomDim,))
    y = Input(shape=(labelDim,))
    generator = Generator()
    generatedimages = generator([z, y])
    
    discriminator = Discriminator()
    discriminator.trainable = False
    w = discriminator([generatedimages, y])
    
    cgan = Model(inputs=[z, y], outputs=w)
    return cgan

dLosses = []
gLosses = []

In [16]:
def plotGeneratedImages(epoch, examples=100, dim=(10, 10), figsize=(10, 10)):
    noise = np.random.normal(0, 1, size=[examples, randomDim])
    generatedImages = generator.predict(noise)

    plt.figure(figsize=figsize)
    for i in range(generatedImages.shape[0]):
        plt.subplot(dim[0], dim[1], i+1)
        plt.imshow(generatedImages[i, 0], interpolation='nearest', cmap='gray_r')
        plt.axis('off')
    plt.tight_layout()
    plt.savefig('images/generated_image_epoch_%d.png' % epoch)

In [17]:
def saveModels(epoch):
    generator.save('models/cgan_generator_epoch_%d.h5' % epoch)
    discriminator.save('models/cgan_discriminator_epoch_%d.h5' % epoch)

In [47]:
def train(epochs=1, batchSize=128):
    batchCount = X_train.shape[0] // batchSize
    print('Epochs:', epochs)
    print('Batch size:', batchSize)
    print('Batches per epoch:', batchCount)
    
    generator = Generator()
    generator.compile(loss='binary_crossentropy', optimizer=adam)
    discriminator = Discriminator()
    discriminator.compile(loss='binary_crossentropy', optimizer=adam)
    cgan = ConditionalGAN()
    cgan.compile(loss='binary_crossentropy', optimizer=adam)

    for e in range(1, epochs+1):
        print('-'*15, 'Epoch %d' % e, '-'*15)        
        for _ in tqdm(range(batchCount)):
            # Get a random set of input noise and images
            z = np.random.normal(0, 1, size=[batchSize, randomDim])
            index = np.random.randint(0, X_train.shape[0], size=batchSize)
            imageBatch = X_train[index]
            labelBatch = y_train[index]

            # Generate fake MNIST images
            generatedImages = generator.predict([z, labelBatch])
            y = np.concatenate([labelBatch, labelBatch])
            X = np.concatenate([imageBatch, generatedImages])
            
            # Labels for generated and real data
            yDis = np.zeros(2*batchSize)
            # One-sided label smoothing
            yDis[:batchSize] = 0.9

            # Train discriminator
            discriminator.trainable = True
            dloss = discriminator.train_on_batch([X, y], yDis)

            # Train generator
            z = np.random.normal(0, 1, size=[batchSize, randomDim])
            yGen = np.ones(batchSize)
            discriminator.trainable = False
            gloss = gan.train_on_batch([z, labelBatch], yGen)

        # Store loss of most recent batch from this epoch
        dLosses.append(dloss)
        gLosses.append(gloss)

        if e == 1 or e % 5 == 0:
            plotGeneratedImages(e)
            saveModels(e)

    # Plot losses from every epoch
#     plotLoss(e)

In [48]:
train()

Epochs: 1
Batch size: 128
Batches per epoch: 468


  0%|          | 0/468 [00:00<?, ?it/s]

--------------- Epoch 1 ---------------


  3%|▎         | 12/468 [00:58<37:17,  4.91s/it] 

KeyboardInterrupt: 

In [24]:
print(y_train[0])
nb_classes = 10
y_train = np_utils.to_categorical(y_train, nb_classes)
print(y_train[0])

[ 0.  0.  0.  0.  0.  1.  0.  0.  0.  0.]
[ 1.  0.  0.  0.  0.  0.  0.  0.  0.  0.]


In [36]:
print(np.random.randint(0, 1, 2))

[0 0]
