In [1]:
import keras.optimizers
import numpy as np, os, pathlib, matplotlib.pyplot as plt, sys, seaborn as sns
from keras.layers import Input, Dense, BatchNormalization, Dropout, Flatten, Reshape, Lambda, Concatenate
from keras.models import Model
from keras.layers import concatenate
from keras.layers import Rescaling, Reshape, Resizing, RandomZoom, RandomRotation, RandomFlip

import tensorflow as tf
from keras.metrics.metrics import binary_crossentropy
from keras.layers import LeakyReLU, Conv2D, MaxPool2D, UpSampling2D, RepeatVector, MaxPooling2D
from keras import backend as bk

In [2]:
# PARAMETERS
batch_size = 240
latent_dim = 512  # to be easier generate and visualize result
dropout_r = 0.1
lr_0 = 0.0001
epoch = 10

img_height = 224
img_width = 224
ims = 224

Adam = keras.optimizers.Adam

In [3]:
name = f'pets_cvae_dim{latent_dim}_epochs{epoch}_ims{ims}'

data_dir = 'D:/DataSets/dogs_cats'
data_dir = pathlib.Path(data_dir)
image_count = len(list(data_dir.glob('*/*.jpg')))
print('Number of images:', image_count)

Number of images: 4800


In [4]:
if batch_size < image_count:
    batch_size = int(image_count * 0.05)
    # print(batch_size)
else:
    batch_size = image_count // 2

print('BATCH SIZE:', batch_size)
image_generator = tf.keras.preprocessing.image.ImageDataGenerator(rescale=1.0 / 255, validation_split=0.2)

BATCH SIZE: 240


In [5]:
train_ds = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=42,
    batch_size=batch_size,
    image_size=(ims, ims),
    labels='inferred',
    label_mode='categorical',
    color_mode="rgb"
)

valid_ds = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=42,
    batch_size=batch_size,
    image_size=(ims, ims),
    labels='inferred',
    label_mode='categorical',
    color_mode="rgb"
)

ishape = (ims, ims, 3)
class_names = train_ds.class_names  # os.listdir(os.path.join(data_dir))
num_classes = len(class_names)
print('num_classes:', num_classes, '-', class_names)


full_imgs = lambda ds: np.concatenate([x for x, y in ds])
full_lbls = lambda ds: np.concatenate([y for x, y in ds])

train_img = full_imgs(train_ds)
train_img = train_img.astype('float32') / 255.
train_lbl = full_lbls(train_ds)
# train_lbl_cat = tf.keras.utils.to_categorical(train_lbl).astype(np.float32)

valid_img = full_imgs(valid_ds)
valid_img = valid_img.astype('float32') / 255.
valid_lbl = full_lbls(valid_ds)
print('\n\nshape train_img, train_lbl:', np.shape(train_img), np.shape(train_lbl), '\n')

Found 4800 files belonging to 2 classes.
Using 3840 files for training.
Found 4800 files belonging to 2 classes.
Using 960 files for validation.
num_classes: 2 - ['cats', 'dogs']


shape train_img, train_lbl: (3840, 224, 224, 3) (3840, 2) 



In [6]:
def add_units_to_conv2d(conv2, units):
    dim1 = int(conv2.shape[1])
    dim2 = int(conv2.shape[2])
    dimc = int(units.shape[1])
    repeat_n = dim1 * dim2
    units_repeat = RepeatVector(repeat_n)(units)  # lbl -> units
    units_repeat = Reshape((dim1, dim2, dimc))(units_repeat)
    return concatenate([conv2, units_repeat])

In [7]:
def create_cvae():
    models = {}

    def apply_bn_and_dropout(x):
        return Dropout(dropout_r)(BatchNormalization()(x))

    # Encoder
    inp_img = Input(shape=ishape)  # batch_shape=(batch_size, ims, ims, 1)
    # flat = Flatten()(inp_img)
    inp_lbls = Input(shape=(num_classes,), dtype='float32')
    # print('shape of inp_lbls 0, 1', inp_lbls.shape[0], inp_lbls.shape[1])

    x = Conv2D(64, (7, 7), activation='relu', padding='same')(inp_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = add_units_to_conv2d(x, inp_lbls)
    x = Conv2D(32, (5, 5), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)

    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(128, kernel_size=(3, 3), activation='relu', padding='same')(x)
    x = MaxPooling2D((2, 2), padding='same')(x)
    enc = Conv2D(3, (7, 7), activation='relu', padding='same')(x)
    x = Flatten()(enc)

    # predict logarithm of variation instead of standard deviation
    z_mean = Dense(latent_dim)(x)
    z_log_var = Dense(latent_dim)(x)
    print('\n\nshape of z_mean [0], [1]:', z_mean.shape[0], z_mean.shape[1], '\n')

    # sampling from Q with reparametrisation
    def sampling(args):
        z_means, z_log_vars = args
        epsilon = bk.random_normal(shape=(batch_size, latent_dim), mean=0., stddev=1.0)
        return z_means + bk.exp(z_log_vars / 2) * epsilon

    l = Lambda(sampling, output_shape=(latent_dim,))([z_mean, z_log_var])
    l_z = concatenate([l, inp_lbls])
    # l_z = concatenate([z_mean, inp_lbls])

    encoder = Model([inp_img, inp_lbls], l_z, name='my_encoder')
    encoder.summary()
    z_meaner = Model([inp_img, inp_lbls], z_mean, name='Enc_z_mean')
    models["encoder"] = encoder
    models["z_meaner"] = z_meaner
    # models["z_lvarer"] = Model([inp_img, inp_lbls], z_log_var, name='Enc_z_log_var')

    # Decoder
    z = Input(shape=(latent_dim + num_classes,))
    # x = concatenate([z, lbl])

    nn = int(ims // 56)  # 28
    bs = int(batch_size // 2)
    x = Dense(7 * 2 * 7 * 2 * 8, activation='relu', name='decoder_dense_1')(z)
    x = Reshape((7 * 2, 7 * 2, 8))(x)
    x = Conv2D(32, kernel_size=(7, 7), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(128, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(128, (5, 5), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(3, (7, 7), activation='sigmoid', padding='same')(x)

    decoder = Model(z, decoded, name='my_decoder')  # [z, inp_lbls_d]
    decoder.summary()
    models["decoder"] = decoder

    cvae_out = decoder(encoder([inp_img, inp_lbls]))

    my_cvae = Model([inp_img, inp_lbls], cvae_out, name='my_cvae')
    models['cvae'] = my_cvae

    out_style = decoder(concatenate([z_meaner([inp_img, inp_lbls]), inp_lbls]))
    models["style_t"] = Model([inp_img, inp_lbls], out_style, name="style_transfer")

    def vae_loss(xf, decodedf):
        xf = bk.reshape(xf, shape=(batch_size, ims * ims * 3))
        decodedf = bk.reshape(decodedf, shape=(batch_size, ims * ims * 3))
        xent_loss = ims * ims * 3 * binary_crossentropy(xf, decodedf)
        kl_loss = -0.5 * bk.sum(1 + z_log_var - bk.square(z_mean) - bk.exp(z_log_var), axis=-1)
        return (xent_loss + kl_loss) / 2 / ims / ims / 3

    return models, vae_loss


In [8]:
cvae_models, cvae_losses = create_cvae()
cvae = cvae_models["cvae"]

cvae.compile(optimizer='adam', loss='binary_crossentropy', experimental_run_tf_function=True)



shape of z_mean [0], [1]: None 512 

Model: "my_encoder"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 input_2 (InputLayer)           [(None, 2)]          0           []                               
                                                                                                  
 conv2d (Conv2D)                (None, 224, 224, 64  9472        ['input_1[0][0]']                
                                )                                                                 
                                                  

In [9]:
from IPython.display import clear_output
from keras.callbacks import LambdaCallback, ReduceLROnPlateau, TensorBoard

In [10]:
figs = [[] for x in range(num_classes)]
latent_distrs = [[] for x in range(num_classes)]
epochs = []

# Saves epoches
save_epochs = set(list((np.arange(0, 59) ** 1.701).astype(int)) + list(range(10)))

n_compare = 10

# Models
generator = cvae_models["decoder"]
encoder_mean = cvae_models["z_meaner"]

In [None]:
tb = TensorBoard(log_dir=f'logs/{name}')

# Run training
# disable_eager_execution()

cvae.fit(
    x=[train_img, train_lbl],
    y=train_img,
    batch_size=batch_size,
    shuffle=True,
    epochs=epoch,
    validation_data=([valid_img, valid_lbl], valid_img),
    callbacks=[tb],
    verbose=1)

Epoch 1/10


In [15]:
def plot_images(*args, invert_colors=False):
    args = [x.squeeze() for x in args]
    n_f = min([x.shape[0] for x in args])
    figure = np.zeros((ims * len(args), ims * n_f, 3))

    for i in range(n_f):
        for j in range(len(args)):
            figure[j * ims: (j + 1) * ims,
            i * ims: (i + 1) * ims, :] = args[j][i].squeeze()

    if invert_colors:
        figure = 1 - figure

    plt.figure(figsize=(2 * n_f, 2 * len(args)))
    plt.imshow(figure)  # cmap='Greys_r'
    plt.grid(False)
    ax = plt.gca()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
    plt.show()

In [18]:
from scipy.stats import norm
n = 5
grid_x = norm.ppf(np.linspace(0.05, 0.95, n))
grid_y = norm.ppf(np.linspace(0.05, 0.95, n))

In [None]:
def draw_z_distr(z_predicted, lbl):
    input_lbl = np.zeros((1, 10))
    input_lbl[0, lbl] = 1
    im = plt.scatter(z_predicted[:, 0], z_predicted[:, 1])
    im.axes.set_xlim(-5, 5)
    im.axes.set_ylim(-5, 5)
    plt.show()


def style_transfer(model, X, lbl_in, lbl_out):
    rows = X.shape[0]
    if isinstance(lbl_in, int):
        lbl_f = lbl_in
        lbl_in = np.zeros((rows, 2))
        lbl_in[:, lbl_f] = 1
    if isinstance(lbl_out, int):
        lbl_f = lbl_out
        lbl_out = np.zeros((rows, 2))
        lbl_out[:, lbl_f] = 1
    return model.predict([X, lbl_in, lbl_out])

In [None]:
lbl = 1
generated = []

In [None]:
for i in range(num_classes):
    print(i)
    prot = train_img[train_lbl.T[0] == i][:n]
    generated.append(style_transfer(cvae_models["style_t"], prot, lbl, i))

prot = train_img[train_lbl.T[0] == lbl][:n]
generated[lbl] = prot
plot_images(*generated, invert_colors=False)

In [None]:
print(type(imgs))
print(imgs.shape)
decoded = cvae.predict([imgs, imgs_lbls], batch_size=batch_size)
plot_images(imgs[:n_compare], decoded[:n_compare])