In [1]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn import preprocessing
from models import CVAEGAN
from utils import generate_ring, generate_linear, gaussian_mixture

In [2]:
def compute_loss(model, x_cond):
    z_mean, z_logvar, x_f = model.sampling(x_cond)

    # KL loss
    kl_divergance = 0.5*tf.reduce_sum(tf.square(z_mean)+tf.square(z_logvar)-tf.math.log(1e-8+tf.square(z_logvar))-1, 1)
    # marginal_likelihood = tf.reduce_sum(x_cond[0]*tf.math.log(x_f)+(1-x_cond[0])*tf.math.log(1-x_f), 1)
    kl_divergance = tf.reduce_mean(kl_divergance)
    # marginal_likelihood = tf.reduce_mean(marginal_likelihood)
    # ELBO = marginal_likelihood - kl_divergance
    kl_loss = kl_divergance

    # D loss
    prob_real, f_D_x_r = model.discriminate(x_cond[0])
    prob_fake, f_D_x_f = model.discriminate(x_f)
    real_mask = tf.ones_like(prob_real)
    fake_mask = tf.zeros_like(prob_fake)
    r_loss = tf.keras.losses.binary_crossentropy(real_mask, prob_real, from_logits=True)
    f_loss = tf.keras.losses.binary_crossentropy(fake_mask, prob_fake, from_logits=True)
    d_loss = tf.reduce_mean(r_loss+f_loss)

    # GD loss
    avg_f_D_x_r = tf.reduce_mean(f_D_x_r, axis=0)
    avg_f_D_x_f = tf.reduce_mean(f_D_x_f, axis=0)
    gd_loss = 0.5 * tf.reduce_mean(tf.square(avg_f_D_x_r - avg_f_D_x_f))

    # C loss
    c_r, f_C_x_r = model.classify(x_cond[0])
    _, f_C_x_f = model.classify(x_f)
    c_loss = tf.reduce_mean(tf.keras.losses.categorical_crossentropy(c_r, x_cond[1]))

    # GC loss
    avg_f_C_x_r = tf.reduce_mean(f_C_x_r, axis=0)
    avg_f_C_x_f = tf.reduce_mean(f_C_x_f, axis=0)
    gc_loss = 0.5 * tf.reduce_mean(tf.square(avg_f_C_x_r - avg_f_C_x_f))

    # G loss
    g_loss_x = tf.reduce_mean(tf.square(x_cond[0] - x_f))
    g_loss_f_d = tf.reduce_mean(tf.square(f_D_x_r - f_D_x_f))
    g_loss_f_c = tf.reduce_mean(tf.square(f_C_x_r - f_C_x_f))
    g_loss =  g_loss_x + g_loss_f_d + g_loss_f_c

    return kl_loss, d_loss, gd_loss, c_loss, gc_loss, g_loss

# @tf.function
def train_step(model, x_cond, optimizer):
    with tf.GradientTape() as kl_tape, tf.GradientTape() as g_tape, tf.GradientTape() as d_tape, tf.GradientTape() as c_tape:
        kl_loss, d_loss, gd_loss, c_loss, gc_loss, g_loss = compute_loss(model, x_cond)
        enc_loss = kl_loss + g_loss
        gen_loss = g_loss + gd_loss + gc_loss

    enc_gradients = kl_tape.gradient(enc_loss, model.encoder.trainable_variables)
    dis_gradients = d_tape.gradient(d_loss, model.discriminator.trainable_variables)
    gen_gradients = g_tape.gradient(gen_loss, model.generator.trainable_variables)
    cls_gradients = c_tape.gradient(c_loss, model.classifier.trainable_variables)

    optimizer.apply_gradients(zip(enc_gradients, model.encoder.trainable_variables))   
    optimizer.apply_gradients(zip(dis_gradients, model.discriminator.trainable_variables))
    optimizer.apply_gradients(zip(gen_gradients, model.generator.trainable_variables))
    optimizer.apply_gradients(zip(cls_gradients, model.classifier.trainable_variables))

In [3]:
def show_results_with_dataset(model, epoch, x_cond):
    _, _, x_f = model.sampling(x_cond)

    a_idx = tf.where(x_cond[1][:,0] == 1.)
    b_idx = tf.where(x_cond[1][:,0] == 0.)

    a_idx = tf.reshape(a_idx, (a_idx.shape[0],))
    b_idx = tf.reshape(b_idx, (b_idx.shape[0],))

    a_x = tf.experimental.numpy.take(x_cond[0], a_idx, axis=0)
    b_x = tf.experimental.numpy.take(x_cond[0], b_idx, axis=0)
    a_x_f = tf.experimental.numpy.take(x_f, a_idx, axis=0)
    b_x_f = tf.experimental.numpy.take(x_f, b_idx, axis=0)

    plt.figure(2)
    plt.scatter(a_x[:,0], a_x[:,1], alpha=.5, color='indianred', label='real_A')
    plt.scatter(b_x[:,0], b_x[:,1], alpha=.5, color='cornflowerblue', label='real_B')
    plt.scatter(a_x_f[:,0], a_x_f[:,1], alpha=.5, color='maroon', label='fake_A')
    plt.scatter(b_x_f[:,0], b_x_f[:,1], alpha=.5, color='navy', label='fake_B')
    plt.xlim(xmin=-0.2, xmax=1.2)
    plt.ylim(ymin=-0.2, ymax=1.2)
    plt.legend(loc='upper left')
    plt.text(0.9, 1.1, 'Epoch(s):'+str(epoch))
    plt.savefig('./results/cvaegan/{:04d}.png'.format(epoch))
    # plt.show()
    plt.cla()

In [4]:
datasize = 5000
trainsize, testsize = 4800, 200
dataset = gaussian_mixture(datasize, condition=True)
x, cond = dataset[:,:2], dataset[:,2].astype(int)

# Scale x
scaler = preprocessing.MinMaxScaler()
scaled_x = scaler.fit_transform(x)

# One-Hot encoding for cond
cond = np.eye(np.unique(cond, axis=0).shape[0], dtype='float32')[cond]

# for scaled
train_x, train_cond = scaled_x[:trainsize], cond[:trainsize]
test_x, test_cond = scaled_x[trainsize:], cond[trainsize:]

# for normal
# train_x, train_cond = x[:4000], cond[:4000]
# test_x, test_cond = x[4000:], cond[4000:]

batch_size = 32
train_dataset = (tf.data.Dataset.from_tensor_slices((train_x, train_cond)).shuffle(trainsize).batch(batch_size))
test_dataset = (tf.data.Dataset.from_tensor_slices((test_x, test_cond)).shuffle(testsize).batch(200))

In [5]:
z_dim = 10
model = CVAEGAN(z_dim=z_dim)
opt = tf.keras.optimizers.Adam(1e-5)
for test_x_cond_batch in test_dataset.take(1):
    test_x_cond_sample = test_x_cond_batch[0:testsize]

In [6]:
for epoch in range(1, 200):
    for train_data in train_dataset:
        train_step(model, train_data, opt)
    
    # kl_loss, d_loss, gd_loss, c_loss, gc_loss, g_loss = compute_loss(model, test_x_cond_sample)
    # print(epoch, d_loss)
    show_results_with_dataset(model, epoch, test_x_cond_sample)
    # show_results_with_dataset(model, epoch, test_x_cond_sample)

No handles with labels found to put in legend.


KeyboardInterrupt: 