In [1]:
from __future__ import print_function, division
from tensorflow import keras
import tensorflow as tf

from keras.layers import *
from keras.models import Model, Sequential
from keras.optimizers import Adam, RMSprop
import keras.backend as K

from math import log2
import numpy as np
from random import randint

import matplotlib.pyplot as plt

In [2]:
# Константы
img_side = 204

# Разбиваем датасет на тренировочную группу и группу валидации
def init_data_with_batch_size(batch_size, dataset="flowers"):
    """Чтобы использовать "new_flowers" (расширенный датасет) надо запустить increasing_data.py"""
    train_data = keras.preprocessing.image_dataset_from_directory(
        dataset,
        image_size=(img_side, img_side),
        label_mode="int",  # Тут не "categorical" т.к. мы испольлзауем Embedding
        shuffle=True,
        batch_size=batch_size,
    )

    # Добавляем лейблы (т.к. у нас Cgan) и нормализуем в [-1; 1], т.к. юзаем tanh
    # (не sigmoid, а tanh, т.к. с ним градиент не затухает)
    x = np.array([i[0]/127.5 -1 for i in train_data][:-1])
    y = np.array([i[1] for i in train_data][:-1])

    return x, y

In [3]:
class CGAN():
    def __init__(self):
        # Input shape
        self.img_shape = (img_side, img_side, 3)
        self.num_classes = 5
        self.latent_dim = 32

        optimizer = Adam(4e-4, 0.5)

        # Build and compile the discriminator
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(
            loss=['binary_crossentropy'],
            optimizer=optimizer,
            metrics=['accuracy'],
        )

        # Build the generator
        self.generator = self.build_generator()

        # The generator takes noise and the target label as input
        # and generates the corresponding digit of that label
        noise = Input(shape=(self.latent_dim,))
        label = Input(shape=(1,))
        img = self.generator([noise, label])

        # For the combined model we will only train the generator
        self.discriminator.trainable = False

        # The discriminator takes generated image as input and determines validity
        # and the label of that image
        valid = self.discriminator([img, label])

        # The combined model  (stacked generator and discriminator)
        # Trains generator to fool discriminator
        self.combined = Model([noise, label], valid)
        self.combined.compile(
            loss=['binary_crossentropy'],
            optimizer=optimizer,
        )

    def build_generator(self):
        model = Sequential()

        model.add(Dense(64, input_dim=self.latent_dim))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(128))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(256))
        model.add(LeakyReLU(alpha=0.2))
        model.add(BatchNormalization(momentum=0.8))

        model.add(Dense(np.prod(self.img_shape), activation='tanh'))
        model.add(Reshape(self.img_shape))

        model.summary()

        noise = Input(shape=(self.latent_dim,))
        label = Input(shape=(1,), dtype='int32')
        label_embedding = Flatten()(Embedding(1, self.latent_dim)(label))

        model_input = multiply([noise, label_embedding])
        img = model(model_input)

        return Model([noise, label], img)

    def build_discriminator(self):
        model = Sequential()

        model.add(Dense(512, input_dim=np.prod(self.img_shape)))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.2))
        model.add(Dense(512))
        model.add(LeakyReLU(alpha=0.2))
        model.add(Dropout(0.2))
        model.add(Dense(1, activation='sigmoid'))
        model.summary()

        img = Input(shape=self.img_shape)
        label = Input(shape=(1,), dtype='int32')

        label_embedding = Flatten()(Embedding(1, np.prod(self.img_shape))(label))
        flat_img = Flatten()(img)

        model_input = multiply([flat_img, label_embedding])

        validity = model(model_input)

        return Model([img, label], validity)

    def train(self, data, batch_size, epochs, sample_interval=50):
        for epoch in range(epochs):
            # ---------------------
            #  Train Discriminator
            # ---------------------

            # Select a random half batch of images
            idx = np.random.randint(0, img_side, batch_size)
            imgs, labels = data[0][idx], data[1][idx]

            # Sample noise as generator input
            noise = np.random.normal(0, 1, (batch_size, 100))

            # Generate a half batch of new images
            gen_imgs = self.generator.predict([noise, labels])

            # Train the discriminator
            d_loss_real = self.discriminator.train_on_batch([imgs, labels], valid)
            d_loss_fake = self.discriminator.train_on_batch([gen_imgs, labels], fake)
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

            # ---------------------
            #  Train Generator
            # ---------------------

            # Condition on labels
            sampled_labels = np.random.randint(0, 10, batch_size).reshape(-1, 1)

            # Train the generator
            g_loss = self.combined.train_on_batch([noise, sampled_labels], valid)

            # If at save interval => save generated image samples
            if epoch % sample_interval == 0:
                self.sample_images(epoch)

            # Plot the progress
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))


    def sample_images(self, epoch):
        r, c = 2, 5
        noise = np.random.normal(0, 1, (r * c, 100))
        sampled_labels = np.arange(0, 10).reshape(-1, 1)

        gen_imgs = self.generator.predict([noise, sampled_labels])

        # Rescale images 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i, j].imshow(gen_imgs[cnt,:,:,0], cmap='gray')
                axs[i, j].set_title("Digit: %d" % sampled_labels[cnt])
                axs[i, j].axis('off')
                cnt += 1
        fig.savefig("images/%d.png" % epoch)
        plt.close()

cgan = CGAN()

print()
print("Generator    :", f"{cgan.generator.count_params():,}")
print("Discriminator:", f"{cgan.discriminator.count_params():,}")
print("Sum:          ", f"{cgan.generator.count_params() + cgan.discriminator.count_params():,}")

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 512)               63922688  
                                                                 
 leaky_re_lu (LeakyReLU)     (None, 512)               0         
                                                                 
 dense_1 (Dense)             (None, 512)               262656    
                                                                 
 leaky_re_lu_1 (LeakyReLU)   (None, 512)               0         
                                                                 
 dropout (Dropout)           (None, 512)               0         
                                                                 
 dense_2 (Dense)             (None, 512)               262656    
                                                                 
 leaky_re_lu_2 (LeakyReLU)   (None, 512)               0

In [4]:
data = init_data_with_batch_size(1, "flowers")

cgan.train(data, 16, 20000, sample_interval=500)

Found 2799 files belonging to 5 classes.


ValueError: in user code:

    File "C:\ProgramData\miniconda3\envs\main\lib\site-packages\keras\engine\training.py", line 2041, in predict_function  *
        return step_function(self, iterator)
    File "C:\ProgramData\miniconda3\envs\main\lib\site-packages\keras\engine\training.py", line 2027, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "C:\ProgramData\miniconda3\envs\main\lib\site-packages\keras\engine\training.py", line 2015, in run_step  **
        outputs = model.predict_step(data)
    File "C:\ProgramData\miniconda3\envs\main\lib\site-packages\keras\engine\training.py", line 1983, in predict_step
        return self(x, training=False)
    File "C:\ProgramData\miniconda3\envs\main\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "C:\ProgramData\miniconda3\envs\main\lib\site-packages\keras\engine\input_spec.py", line 295, in assert_input_compatibility
        raise ValueError(

    ValueError: Input 0 of layer "model_1" is incompatible with the layer: expected shape=(None, 32), found shape=(None, 100)


In [None]:
def show_row_images(raw_data):
    # Ограничиваемся только 32 восстановленными изображениями (чтобы считать меньше)
    data = np.array([i[0][0] for count, i in enumerate(raw_data) if count < 32])
    labels = np.array([i[1][0] for count, i in enumerate(raw_data) if count < 32])
    _, _, encoded = encoder.predict([data, labels], verbose=False)
    generated_images = decoder.predict([encoded, labels], verbose=False)

    num_images = 4

    plt.figure(figsize=(20, 11))

    for _ in range(num_images):
        random_num = randint(0, 32-1)

        # Оригинальное изображение
        plt.subplot(2, num_images, _ + 1)
        plt.imshow(data[random_num])
        plt.gray()
        plt.title("Train")
        plt.axis("off")

        # Сгенерированное изображение
        plt.subplot(2, num_images, _ + num_images + 1)
        plt.imshow(generated_images[random_num])
        plt.gray()
        plt.title("Generated")
        plt.axis("off")
    plt.tight_layout()
    plt.show()

raw_data = init_data_with_batch_size(1, "flowers")
for _ in range(3):
    show_row_images(raw_data)

In [None]:
d = init_data_with_batch_size(1, "flowers")
data = np.array([next(iter(d))[0][0] for _ in range(1000)])
labels = np.array([next(iter(d))[1][0] for _ in range(1000)])

_, _, all_hidden_data = encoder.predict([data, labels])
mean, std = np.mean(all_hidden_data), np.std(all_hidden_data)
print("mean (ideal: 0):", mean)
print("std  (ideal: 1):", std)

for _ in range(4):
    num_images = 4

    noise = np.random.normal(mean, std, [num_images*2, hidden_units])
    label = np.array([ keras.utils.to_categorical(np.random.randint(0, 5), 5)
                       for _ in range(num_images*2)])
    generated_images = np.array(decoder.predict([noise, label], verbose=False))

    plt.figure(figsize=(20, 11))

    for i in range(num_images):
        # Оригинальное изображение
        plt.subplot(2, num_images, i + 1)

        # Переводим в промежуток [0; 1]
        plt.imshow(generated_images[i + num_images])
        plt.axis("off")

        # Сгенерированное изображение
        plt.subplot(2, num_images, i + num_images + 1)
        plt.imshow(generated_images[i])
        plt.axis("off")
    plt.tight_layout()
    plt.show()

In [None]:
"""Выводим Архитектуру"""
encoder_img = tf.keras.utils.plot_model(encoder, to_file="encoder.png", show_shapes=False, show_layer_names=False,
                                        dpi=128, show_layer_activations=False)

decoder_img = tf.keras.utils.plot_model(decoder, to_file="decoder.png", show_shapes=False, show_layer_names=False,
                                        dpi=128, show_layer_activations=False)

In [None]:
cgan.save_weights("cgan")
# cgan.load_weights("cgan")