In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/gdrive


In [4]:
%tensorflow_version 2.x

TensorFlow is already loaded. Please restart the runtime to change versions.


In [0]:
import os
import pickle
import random
import time
import numpy as np
import pandas as pd
import tensorflow as tf

In [6]:
os.listdir('/content/gdrive/My Drive/CUB Dataset/CUB_200_2011/')

['parts',
 'images.txt',
 'train_test_split.txt',
 'image_class_labels.txt',
 'classes.txt',
 'bounding_boxes.txt',
 'attributes',
 'images',
 'README.txt']

# Model

In [0]:
import tensorflow.keras.backend as K
from tensorflow.keras import Input, Model
from tensorflow.keras.layers import LeakyReLU, BatchNormalization, ReLU, Activation
from tensorflow.keras.layers import UpSampling2D, Conv2D, Concatenate, Dense, concatenate
from tensorflow.keras.layers import Flatten, Lambda, Reshape, ZeroPadding2D, add
from tensorflow.keras.optimizers import Adam

In [0]:
def build_ca_model():
    input_layer = Input(shape=(1024,))
    x = Dense(256)(input_layer)
    x = LeakyReLU(alpha=0.2)(x)
    model = Model(inputs=[input_layer], outputs=[x])
    return model

In [0]:
def build_embedding_compressor_model():
    input_layer = Input(shape=(1024,))
    x = Dense(128)(input_layer)
    x = ReLU()(x)
    model = Model(inputs=[input_layer], outputs=[x])
    return model

In [0]:
def generate_c(x):
	mean = x[:, :128]
	log_sigma = x[:, 128:]
	stddev = tf.math.exp(log_sigma)
	epsilon = K.random_normal(shape=K.constant((mean.shape[1], ), dtype='int32'))
	c = mean + stddev * epsilon
	return c

In [0]:
def ConvBlock(x, num_kernels, kernel_size=(4,4), strides=2, activation=True):
	x = Conv2D(num_kernels, kernel_size=kernel_size, padding='same', strides=strides, use_bias=False, kernel_initializer='he_uniform')(x)
	x = BatchNormalization(gamma_initializer='ones', beta_initializer='zeros')(x)
	if activation:
		x = LeakyReLU(alpha=0.2)(x)
	return x

In [0]:
def UpSamplingBlock(x, num_kernels):
	x = UpSampling2D(size=(2,2))(x)
	x = Conv2D(num_kernels, kernel_size=(3,3), padding='same', strides=1, use_bias=False, kernel_initializer='he_uniform')(x)
	x = BatchNormalization(gamma_initializer='ones', beta_initializer='zeros')(x)
	x = ReLU()(x)
	return x

### Stage 1



In [0]:
def build_stage1_generator():

    input_layer = Input(shape=(1024,))    
    ca = Dense(256)(input_layer)
    ca = LeakyReLU(alpha=0.2)(ca)

	# Obtain the conditioned text
    c = Lambda(generate_c)(ca)

    input_layer2 = Input(shape=(100,))

    concat = Concatenate(axis=1)([c, input_layer2])

    x = Dense(16384, use_bias=False)(concat)
    x = ReLU()(x)
    x = Reshape((4, 4, 1024), input_shape=(16384,))(x)

    x = UpSamplingBlock(x, 512)
    x = UpSamplingBlock(x, 256)
    x = UpSamplingBlock(x, 128)
    x = UpSamplingBlock(x, 64)

    x = Conv2D(3, kernel_size=3, padding='same', strides=1, use_bias=False, kernel_initializer='he_uniform')(x)
    x = Activation('tanh')(x)

    model = Model(inputs=[input_layer1, input_layer2], outputs=[x, ca])
    return model

In [0]:
def build_stage1_discriminator():
	input_layer1 = Input(shape=(64, 64, 3))

	x = Conv2D(64, kernel_size=(4,4), strides=2, padding='same', use_bias=False, kernel_initializer='he_uniform')(input_layer1)
	x = LeakyReLU(alpha=0.2)(x)

	x = ConvBlock(x, 128)
	x = ConvBlock(x, 256)
	x = ConvBlock(x, 512)

	# Obtain the compressed and spatially replicated text embedding
	input_layer2 = Input(shape=(4, 4, 128))
	concat = concatenate([x, input_layer2])

	x1 = Conv2D(512, kernel_size=(1,1), padding='same', strides=1, use_bias=False, kernel_initializer='he_uniform')(concat)
	x1 = BatchNormalization(gamma_initializer='ones', beta_initializer='zeros')(x)
	x1 = LeakyReLU(alpha=0.2)(x)

	# Flatten and add a FC layer to predict.
	x1 = Flatten()(x1)
	x1 = Dense(1)(x1)
	x1 = Activation('sigmoid')(x1)

	model = Model(inputs=[input_layer1, input_layer2], outputs=[x1])
	return model

In [0]:
def build_stage1_adversarial_model(gen_model, dis_model):
    input_layer = Input(shape=(1024,))
    input_layer2 = Input(shape=(100,))
    input_layer3 = Input(shape=(4, 4, 128))

    x, mean_logsigma = gen_model([input_layer, input_layer2])

    dis_model.trainable = False
    valid = dis_model([x, input_layer3])

    model = Model(inputs=[input_layer, input_layer2, input_layer3], outputs=[valid, mean_logsigma])
    return model

### Stage 2

In [0]:
def residual_block(input):
	x = Conv2D(512, kernel_size=(3,3), padding='same', use_bias=False,
				kernel_initializer='he_uniform')(input)
	x = BatchNormalization(gamma_initializer='ones', beta_initializer='zeros')(x)
	x = ReLU()(x)
	
	x = Conv2D(512, kernel_size=(3,3), padding='same', use_bias=False,
				kernel_initializer='he_uniform')(x)
	x = BatchNormalization(gamma_initializer='ones', beta_initializer='zeros')(x)
	
	x = add([x, input])
	x = ReLU()(x)

	return x

In [0]:
def joint_block(inputs):
    c = inputs[0]
    x = inputs[1]

    c = K.expand_dims(c, axis=1)
    c = K.expand_dims(c, axis=1)
    c = K.tile(c, [1, 16, 16, 1])
    return K.concatenate([c, x], axis=3)

In [0]:
def build_stage2_generator():
    # 1. CA Augmentation Network
    input_layer = Input(shape=(1024,))
    input_lr_images = Input(shape=(64, 64, 3))

    ca = Dense(256)(input_layer)
    mean_logsigma = LeakyReLU(alpha=0.2)(ca)
    c = Lambda(generate_c)(mean_logsigma)

    # 2. Image Encoder
    x = ZeroPadding2D(padding=(1, 1))(input_lr_images)
    x = Conv2D(128, kernel_size=(3, 3), strides=1, use_bias=False)(x)
    x = ReLU()(x)

    x = ZeroPadding2D(padding=(1, 1))(x)
    x = Conv2D(256, kernel_size=(4, 4), strides=2, use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    x = ZeroPadding2D(padding=(1, 1))(x)
    x = Conv2D(512, kernel_size=(4, 4), strides=2, use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    # 3. Joint
    c_code = Lambda(joint_block)([c, x])

    x = ZeroPadding2D(padding=(1, 1))(c_code)
    x = Conv2D(512, kernel_size=(3, 3), strides=1, use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    # 4. Residual blocks
    x = residual_block(x)
    x = residual_block(x)
    x = residual_block(x)
    x = residual_block(x)

    # 5. Upsampling blocks
    x = UpSamplingBlock(x, 512)
    x = UpSamplingBlock(x, 256)
    x = UpSamplingBlock(x, 128)
    x = UpSamplingBlock(x, 64)

    x = Conv2D(3, kernel_size=3, padding="same", strides=1, use_bias=False)(x)
    x = Activation('tanh')(x)

    model = Model(inputs=[input_layer, input_lr_images], outputs=[x, mean_logsigma])
    return model

In [0]:
def build_stage2_discriminator():
	input_layer1 = Input(shape=(256, 256, 3))

	x = Conv2D(64, kernel_size=(4,4), padding='same', strides=2, use_bias=False,
				kernel_initializer='he_uniform')(input_layer1)
	x = LeakyReLU(alpha=0.2)(x)

	x = ConvBlock(x, 128)
	x = ConvBlock(x, 256)
	x = ConvBlock(x, 512)
	x = ConvBlock(x, 1024)
	x = ConvBlock(x, 2048)
	x = ConvBlock(x, 1024, (1,1), 1)
	x = ConvBlock(x, 512, (1,1), 1, False)

	x1 = ConvBlock(x, 128, (1,1), 1)
	x1 = ConvBlock(x1, 128, (3,3), 1)
	x1 = ConvBlock(x1, 512, (3,3), 1, False)

	x2 = add([x, x1])
	x2 = LeakyReLU(alpha=0.2)(x2)

	# Concatenate compressed and spatially replicated embedding
	input_layer2 = Input(shape=(4, 4, 128))
	concat = concatenate([x2, input_layer2])

	x3 = Conv2D(512, kernel_size=(1,1), strides=1, padding='same', kernel_initializer='he_uniform')(concat)
	x3 = BatchNormalization(gamma_initializer='ones', beta_initializer='zeros')(x3)
	x3 = LeakyReLU(alpha=0.2)(x3)

	# Flatten and add a FC layer
	x3 = Flatten()(x3)
	x3 = Dense(1)(x3)
	x3 = Activation('sigmoid')(x3)

	model = Model(inputs=[input_layer1, input_layer2], outputs=[x3])
	return model

In [0]:
def build_stage2_adversarial_model(gen_model2, dis_model, gen_model1):
    
    embeddings_input_layer = Input(shape=(1024, ))
    noise_input_layer = Input(shape=(100, ))
    compressed_embedding_input_layer = Input(shape=(4, 4, 128))

    gen_model1.trainable = False
    dis_model.trainable = False

    lr_images, mean_logsigma1 = gen_model1([embeddings_input_layer, noise_input_layer])
    hr_images, mean_logsigma2 = gen_model2([embeddings_input_layer, lr_images])
    valid = dis_model([hr_images, compressed_embedding_input_layer])

    model = Model(inputs=[embeddings_input_layer, noise_input_layer, compressed_embedding_input_layer], outputs=[valid, mean_logsigma2])
    return model

# Train

In [0]:
condition_dim = 128
z_dim = 100

In [0]:
def KL_loss(y_true, y_pred):
    mean = y_pred[:, :128]
    logsigma = y_pred[:, :128]
    loss = -logsigma + .5 * (-1 + K.exp(2. * logsigma) + K.square(mean))
    loss = K.mean(loss)
    return loss

In [0]:
def custom_generator_loss(y_true, y_pred):
    # Calculate binary cross entropy loss
    return K.binary_crossentropy(y_true, y_pred)

### Stage 1

In [0]:
image_size = 64
batch_size_stage1 = 64
stage1_generator_lr = 0.0002
stage1_discriminator_lr = 0.0002
stage1_lr_decay_step = 600
epochs_stage1 = 1000

In [0]:
def train_stage1(X_train, y_train, embeddings_train, X_test, y_test, embeddings_test):

    dis_optimizer = Adam(lr=stage1_discriminator_lr, beta_1=0.5, beta_2=0.999)
    gen_optimizer = Adam(lr=stage1_generator_lr, beta_1=0.5, beta_2=0.999)

    ca_model = build_ca_model()
    ca_model.compile(loss="binary_crossentropy", optimizer="adam")

    stage1_dis = build_stage1_discriminator()
    stage1_dis.compile(loss='binary_crossentropy', optimizer=dis_optimizer)

    stage1_gen = build_stage1_generator()
    stage1_gen.compile(loss="mse", optimizer=gen_optimizer)

    embedding_compressor_model = build_embedding_compressor_model()
    embedding_compressor_model.compile(loss="binary_crossentropy", optimizer="adam")

    adversarial_model = build_stage1_adversarial_model(gen_model=stage1_gen, dis_model=stage1_dis)
    adversarial_model.compile(loss=['binary_crossentropy', KL_loss], loss_weights=[1, 2.0], optimizer=gen_optimizer, metrics=None)

    real_labels = np.ones((batch_size_stage1, 1), dtype=float) * 0.9
    fake_labels = np.zeros((batch_size_stage1, 1), dtype=float) * 0.1

    for epoch in range(epochs_stage1):

        gen_losses = []
        dis_losses = []
        number_of_batches = int(X_train.shape[0] / batch_size_stage1)
        for index in range(number_of_batches):

            z_noise = np.random.normal(0, 1, size=(batch_size_stage1, z_dim))
            image_batch = X_train[index * batch_size_stage1:(index + 1) * batch_size_stage1]
            embedding_batch = embeddings_train[index * batch_size_stage1:(index + 1) * batch_size_stage1]
            image_batch = (image_batch - 127.5) / 127.5

            fake_images, _ = stage1_gen.predict([embedding_batch, z_noise], verbose=3)

            compressed_embedding = embedding_compressor_model.predict_on_batch(embedding_batch)
            compressed_embedding = np.reshape(compressed_embedding, (-1, 1, 1, condition_dim))
            compressed_embedding = np.tile(compressed_embedding, (1, 4, 4, 1))

            dis_loss_real = stage1_dis.train_on_batch([image_batch, compressed_embedding], np.reshape(real_labels, (batch_size_stage1, 1)))
            dis_loss_fake = stage1_dis.train_on_batch([fake_images, compressed_embedding], np.reshape(fake_labels, (batch_size_stage1, 1)))
            dis_loss_wrong = stage1_dis.train_on_batch([image_batch[:(batch_size_stage1 - 1)], compressed_embedding[1:]], np.reshape(fake_labels[1:], (batch_size_stage1-1, 1)))

            d_loss = 0.5 * np.add(dis_loss_real, 0.5 * np.add(dis_loss_wrong, dis_loss_fake))

            g_loss = adversarial_model.train_on_batch([embedding_batch, z_noise, compressed_embedding],[K.ones((batch_size_stage1, 1)) * 0.9, K.ones((batch_size_stage1, 256)) * 0.9])

            dis_losses.append(d_loss)
            gen_losses.append(g_loss)

    stage1_gen.save_weights("stage1_gen.h5")
    stage1_dis.save_weights("stage1_dis.h5")

### Stage 2

In [0]:
hr_image_size = (256, 256)
lr_image_size = (64, 64)
batch_size_stage2 = 4
stage2_generator_lr = 0.0002
stage2_discriminator_lr = 0.0002
stage2_lr_decay_step = 600
epochs_stage2 = 2

In [0]:
def train_stage2(X_hr_train, y_hr_train, embeddings_train, X_hr_test, y_hr_test, embeddings_test, X_lr_train, y_lr_train, X_lr_test, y_lr_test):

    dis_optimizer = Adam(lr=stage2_discriminator_lr, beta_1=0.5, beta_2=0.999)
    gen_optimizer = Adam(lr=stage2_generator_lr, beta_1=0.5, beta_2=0.999)

    stage2_dis = build_stage2_discriminator()
    stage2_dis.compile(loss='binary_crossentropy', optimizer=dis_optimizer)

    stage1_gen = build_stage1_generator()
    stage1_gen.compile(loss="binary_crossentropy", optimizer=gen_optimizer)

    stage1_gen.load_weights("stage1_gen.h5")

    stage2_gen = build_stage2_generator()
    stage2_gen.compile(loss="binary_crossentropy", optimizer=gen_optimizer)

    embedding_compressor_model = build_embedding_compressor_model()
    embedding_compressor_model.compile(loss='binary_crossentropy', optimizer='adam')

    adversarial_model = build_stage2_adversarial_model(stage2_gen, stage2_dis, stage1_gen)
    adversarial_model.compile(loss=['binary_crossentropy', KL_loss], loss_weights=[1.0, 2.0], optimizer=gen_optimizer, metrics=None)

    real_labels = np.ones((batch_size_stage2, 1), dtype=float) * 0.9
    fake_labels = np.zeros((batch_size_stage2, 1), dtype=float) * 0.1

    for epoch in range(epochs_stage2):

        gen_losses = []
        dis_losses = []

        number_of_batches = int(X_hr_train.shape[0] / batch_size_stage2)
        for index in range(number_of_batches):

            z_noise = np.random.normal(0, 1, size=(batch_size_stage2, z_dim))
            X_hr_train_batch = X_hr_train[index * batch_size_stage2:(index + 1) * batch_size_stage2]
            embedding_batch = embeddings_train[index * batch_size_stage2:(index + 1) * batch_size_stage2]
            X_hr_train_batch = (X_hr_train_batch - 127.5) / 127.5

            lr_fake_images, _ = stage1_gen.predict([embedding_batch, z_noise], verbose=3)
            hr_fake_images, _ = stage2_gen.predict([embedding_batch, lr_fake_images], verbose=3)

            compressed_embedding = embedding_compressor_model.predict_on_batch(embedding_batch)
            compressed_embedding = np.reshape(compressed_embedding, (-1, 1, 1, condition_dim))
            compressed_embedding = np.tile(compressed_embedding, (1, 4, 4, 1))

            dis_loss_real = stage2_dis.train_on_batch([X_hr_train_batch, compressed_embedding], np.reshape(real_labels, (batch_size_stage2, 1)))
            dis_loss_fake = stage2_dis.train_on_batch([hr_fake_images, compressed_embedding], np.reshape(fake_labels, (batch_size_stage2, 1)))
            dis_loss_wrong = stage2_dis.train_on_batch([X_hr_train_batch[:(batch_size_stage2 - 1)], compressed_embedding[1:]], np.reshape(fake_labels[1:], (batch_size_stage2-1, 1)))
            d_loss = 0.5 * np.add(dis_loss_real, 0.5 * np.add(dis_loss_wrong,  dis_loss_fake))

            g_loss = adversarial_model.train_on_batch([embedding_batch, z_noise, compressed_embedding], [K.ones((batch_size_stage2, 1)) * 0.9, K.ones((batch_size_stage2, 256)) * 0.9])

            dis_losses.append(d_loss)
            gen_losses.append(g_loss)

    stage2_gen.save_weights("stage2_gen.h5")
    stage2_dis.save_weights("stage2_dis.h5")