# Преамбула

In [None]:
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds

tfds.disable_progress_bar()
tf.enable_v2_behavior()

print(tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
tf.config.experimental.list_physical_devices()

In [None]:
#tf.debugging.set_log_device_placement(True)

# Place tensors on the CPU
#with tf.device('/GPU:0'):
#    a = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
#    b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
#    c = tf.matmul(a, b)
    
#print(c)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import copy

In [None]:
from sklearn.neighbors import KernelDensity
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV

In [None]:
from joblib import Parallel, delayed

In [None]:
def normalize_img(image, label):
    """Нормализация изображений: `uint8` -> `float32`."""
    return tf.cast(image, tf.float32) / 255.0, label

In [None]:
def crop_pixels(x):
    """Обрезание значений пикселей нормированного изображения."""
    return min(1.0, max(0.0, x))

In [None]:
def imshow_array(array):
    """Отображение массива нормированных пикселей."""
    plt.axis('off')
    plt.imshow((255.0 * array).astype(np.uint8), cmap=plt.get_cmap("gray"), vmin=0, vmax=255)

In [None]:
def dataset_Y_to_X(X, Y):
    """Поменять у датасета пары (X, Y) на (X, X) (нужно, например, для обучения автоэнкодера)."""
    return X, X

In [None]:
def similarity_loss(y_true, y_pred):
    """Функция потерь, которая показала результаты лучше, чем MAE."""
    delta = tf.keras.backend.abs(y_true - y_pred)
    squared = tf.keras.backend.square(y_true - y_pred)
    return tf.keras.backend.mean(delta - 0.5 * squared, axis=-1)

In [None]:
mnist_shape = (28, 28, 1)

In [None]:
(ds_full, _), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

In [None]:
ds_full = ds_full.map(
    normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
#ds_full = ds_full.batch(1)
#ds_full = ds_full.cache()
#ds_full = ds_full.prefetch(tf.data.experimental.AUTOTUNE)

# Классификатор

## Получение датасета MNIST и создание тренировочных и тестовых выборок

In [None]:
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

### Тренировочный набор

In [None]:
ds_train = ds_train.map(
    normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_train = ds_train.cache()
ds_train = ds_train.shuffle(ds_info.splits['train'].num_examples)
ds_train = ds_train.batch(128)
ds_train = ds_train.prefetch(tf.data.experimental.AUTOTUNE)

### Тестовый набор

In [None]:
ds_test = ds_test.map(
    normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_test = ds_test.batch(128)
ds_test = ds_test.cache()
ds_test = ds_test.prefetch(tf.data.experimental.AUTOTUNE)

## Модель

In [None]:
def define_classifier(shape_input):
    # Инициализация весов.
    init = tf.keras.initializers.RandomNormal(stddev = 0.02)

    # Входные данные генератора / выборки.
    input_layer = tf.keras.layers.Input(shape_input)
    next_layer = input_layer

    # 1 блок слоёв.
    next_layer = tf.keras.layers.GaussianNoise(0.1)(input_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 4, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Activation(tf.keras.activations.tanh)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.2)(next_layer)
    #next_layer = tf.keras.layers.SpatialDropout2D(0.1)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)

    output_layer_1 = tf.keras.layers.Activation(tf.keras.activations.sigmoid)(next_layer)

    # 2 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(input_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 8, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Activation(tf.keras.activations.tanh)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    #next_layer = tf.keras.layers.SpatialDropout2D(0.05)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    output_layer_2 = tf.keras.layers.Activation(tf.keras.activations.sigmoid)(next_layer)

    # 3 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(input_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 16, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Activation(tf.keras.activations.tanh)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.2)(next_layer)
    #next_layer = tf.keras.layers.SpatialDropout2D(0.2)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    output_layer_3 = tf.keras.layers.Activation(tf.keras.activations.sigmoid)(next_layer)

    # Вывод.
    next_layer = tf.keras.layers.Flatten()(next_layer)
    next_layer = tf.keras.layers.Dense(10)(next_layer)
    output_layer = tf.keras.layers.Activation('softmax')(next_layer)

    # Модель.
    model = tf.keras.models.Model(input_layer, output_layer)
    debug_model = tf.keras.models.Model([input_layer], [output_layer_1, output_layer_2, output_layer_3])

    # Компиляция модели.
    opt = tf.keras.optimizers.Adam(lr = 0.001)
    model.compile(loss = 'sparse_categorical_crossentropy', optimizer = opt, loss_weights = [1.0], metrics=['accuracy'])
    return model, debug_model

In [None]:
classifier, debug_classifier = define_classifier(mnist_shape)
# Сводка по модели.
classifier.summary()
# Отрисовка модели.
#tf.keras.utils.plot_model(classifier, show_shapes = True, show_layer_names = True)

In [None]:
classifier.fit(
    ds_train,
    epochs=6,
    validation_data=ds_test,
)

In [None]:
classifier.save('./models/classifier/classifier.h5')
debug_classifier.save('./models/classifier/debug_classifier.h5')

### Визуальная проверка

In [None]:
mnist_example = ds_full.take(3)
for sample in mnist_example:
    image, label = sample[0].numpy(), sample[1].numpy()
    imshow_array(image[:, :, 0])
    plt.show()
    print("Label: %d" % label)

    # Предсказание.
    print(classifier.predict(np.array([image]))[0])

    # Отрисовка слоёв.
    debug = debug_classifier.predict(np.array([image]))
    for output in debug:
        for element in output:
            element = np.swapaxes(element, 0, 2)
            element = np.swapaxes(element, 1, 2)

            i = 0
            n = len(element)
            for filter in element:
                plt.subplot(1, n, 1 + i)
                imshow_array(filter)
                i += 1

            plt.show()

# Kernel density

#### Полный датасет без разбиения по комплектам

In [None]:
samples = list(ds_full.take(-1))
images = [sample[0].numpy().reshape(28,28) for sample in samples]
labels = [sample[1].numpy() for sample in samples]

images_vectors = [image.reshape(28*28) for image in images]

#### PCA-преобразование

In [None]:
pca_images = PCA(n_components=10, whiten=False)
images_vectors_pca = pca_images.fit_transform(images_vectors)

#### Визуальная проверка PCA-преобразования

In [None]:
test = pca_images.inverse_transform(images_vectors_pca[0])
test = test.reshape(28,28)
test = np.vectorize(crop_pixels)(test)

imshow_array(images[0])
plt.show()
imshow_array(test)
plt.show()

### Инициализация и обучение модели KDE

#### Оптимизация ширины

In [None]:
#params = {'bandwidth': np.logspace(-1, 0, 20)}
#grid = GridSearchCV(KernelDensity(), params)
#grid.fit(lowdim_vectors)

In [None]:
kde_images = KernelDensity(bandwidth=0.25, kernel='gaussian')
kde_images.fit(images_vectors_pca)
#kde_images = grid.best_estimator_
print(kde_images.get_params())

In [None]:
for i in range(4):
    test = kde_images.sample()#(random_state=i)
    test = pca_images.inverse_transform(test)
    test = test.reshape(28,28)
    test = np.vectorize(crop_pixels)(test)

    imshow_array(test)
    plt.show()

# Автоэнкодер

In [None]:
(ds_train_images, ds_test_images), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True
)

In [None]:
ds_train_images = ds_train_images.map(
    normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_train_images = ds_train_images.map(
    dataset_Y_to_X, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_train_images = ds_train_images.cache()
ds_train_images = ds_train_images.shuffle(ds_info.splits['train'].num_examples)
ds_train_images = ds_train_images.batch(128)
ds_train_images = ds_train_images.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
ds_test_images = ds_test_images.map(
    normalize_img, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_test_images = ds_test_images.map(
    dataset_Y_to_X, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_test_images = ds_test_images.batch(128)
ds_test_images = ds_test_images.cache()
ds_test_images = ds_test_images.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
def define_autoencoder(shape_input, dimension):
    # Инициализация весов.
    init = tf.keras.initializers.RandomNormal(stddev = 0.02)

    # Входные данные генератора / выборки.
    input_layer = tf.keras.layers.Input(shape_input)
    next_layer = input_layer

    # 1 блок слоёв.
    next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 64, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.2)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    # 2 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 128, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    # 3 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 256, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.2)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    # Бутылочное горлышко.
    next_layer = tf.keras.layers.Flatten()(next_layer)
    next_layer = tf.keras.layers.Dense(dimension)(next_layer)
    bottleneck = tf.keras.layers.Activation('sigmoid')(next_layer)

    # Модель кодировщика.
    encoder = tf.keras.Model(input_layer, bottleneck)

    # Начало модели декодировщика.
    input_code_layer = tf.keras.layers.Input((dimension))
    next_layer = input_code_layer

    # 3 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Dense(4*4*256)(next_layer)
    next_layer = tf.keras.layers.Reshape((4, 4, 256))(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.2)(next_layer)

    # 2 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.UpSampling2D(size=(2, 2))(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 128, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.Cropping2D(cropping=((0, 1), (0, 1)))(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    # 1 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.UpSampling2D(size=(2, 2))(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 64, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    # 0 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.UpSampling2D(size=(2, 2))(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 1, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.Activation('sigmoid')(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    output_layer = next_layer

    # Модель.
    decoder = tf.keras.models.Model(input_code_layer, output_layer) # Декодировщик.
    autoencoder = tf.keras.Sequential([encoder, decoder])

    # Компиляция модели.
    opt = tf.keras.optimizers.Adam(lr = 1e-3)
    autoencoder.compile(loss = similarity_loss, optimizer = opt, loss_weights = [1.0])
    return encoder, decoder, autoencoder

In [None]:
codes_dim = 16
encoder, decoder, autoencoder = define_autoencoder(mnist_shape, codes_dim)

In [None]:
# Сводка по модели.
encoder.summary()
# Отрисовка модели.
#tf.keras.utils.plot_model(encoder, show_shapes = True, show_layer_names = True)

In [None]:
# Сводка по модели.
decoder.summary()
# Отрисовка модели.
#tf.keras.utils.plot_model(decoder, show_shapes = True, show_layer_names = True)

In [None]:
#encoder = tf.keras.models.load_model('encoder.h5')
#decoder = tf.keras.models.load_model('decoder.h5')
#autoencoder = autoencoder = tf.keras.Sequential([encoder, decoder])

In [None]:
autoencoder.fit(
    ds_train_images,
    epochs=100,
    validation_data=ds_test_images,
)

In [None]:
autoencoder.save('./models/autoencoder/autoencoder_' + str(codes_dim) + '.h5')
encoder.save('./models/autoencoder/encoder_' + str(codes_dim) + '.h5')
decoder.save('./models/autoencoder/decoder_' + str(codes_dim) + '.h5')

### Визуальная проверка

In [None]:
mnist_example = ds_full.take(50)
for sample in mnist_example:
    image, label = sample[0].numpy(), sample[1].numpy()

    plt.subplot(1, 2, 1)
    imshow_array(image[:, :, 0])

    # Предсказание.
    plt.subplot(1, 2, 2)
    imshow_array(autoencoder.predict(np.array([image]))[0][:, :, 0])
    plt.show()

### Получение кодов всех изображений датасета

In [None]:
samples = list(ds_full.take(-1))
images = np.array([sample[0].numpy().reshape(28,28,1) for sample in samples])
codes = np.array(encoder.predict(images))

In [None]:
pca_codes_dim = codes_dim
pca_codes = PCA(n_components=pca_codes_dim, whiten=True)
codes_pca = np.array(pca_codes.fit_transform(codes))

In [None]:
# Проверка центрирования.
codes_mean = sum(codes) / len(codes)
codes_pca_mean = sum(codes_pca) / len(codes_pca)
print(codes_mean)
print(codes_pca_mean)

In [None]:
# Проверка вторых моментов.
def standard_deviation(samples, mean, dimension):
    stdev = np.array([0.0 for i in range(dimension)])
    
    for sample in samples:
        for i in range(0, dimension):
            stdev[i] += (sample[i] - mean[i])**2
            
    stdev /= (len(samples) - 1)
    stdev = np.sqrt(stdev)
    return(stdev)

codes_stdev = standard_deviation(codes, codes_mean, codes_dim)
codes_pca_stdev = standard_deviation(codes_pca, codes_pca_mean, pca_codes_dim)
print(codes_stdev)
print(codes_pca_stdev)

### KDE для кодов

In [None]:
params = {'bandwidth': np.logspace(np.log10(0.3), np.log10(0.7), 6)}
grid = GridSearchCV(KernelDensity(rtol = 0.01), params, n_jobs=8, verbose=10, cv=5)
grid.fit(codes_pca)

In [None]:
#kde_codes = KernelDensity(bandwidth=0.46, kernel='gaussian')
#kde_codes.fit(codes_pca)
kde_codes = grid.best_estimator_
kde_codes.set_params(rtol = 0.0)
print(kde_codes.get_params())

In [None]:
for i in range(40):
    test = kde_codes.sample()#(random_state=i)
    test = pca_codes.inverse_transform(test)
    
    imshow_array(decoder.predict(test)[0][:, :, 0])
    plt.show()

## Подсчёт энтропии

In [None]:
def entropy_monte_carlo(kde, N, random_state = 42):
    samples  = kde.sample(N, random_state)
    log_prob = np.array(kde.score_samples(samples))
    
    average = -sum(log_prob) / N
    standard_deviation = 0.0
    for i in range(N):
        standard_deviation += (log_prob[i] - average)**2
        
    standard_deviation = np.sqrt(standard_deviation / (N * (N - 1)))
        
    return average, standard_deviation

In [None]:
entropy, entropy_error = entropy_monte_carlo(kde_codes, 10000)
entropy_error *= 3.3 # Коэффициент Стьюдента.
print("H: %f, errH: %f" % (entropy, entropy_error))

In [None]:
def entropy_leave_one_out(kde, samples, random_state = 42):
    N = len(samples)
    log_prob = np.array([0.0 for i in range(N)])
    for i in range(N):
        lvo_samples = samples
        np.delete(lvo_samples, i)
        kde.fit(lvo_samples)
        
        log_prob[i] = kde.score_samples([samples[i]])[0]
        print("i: %d / %d" % (i+1, N), end="\r")
    
    average = -sum(log_prob) / N
    standard_deviation = 0.0
    for i in range(N):
        print("i: %d / %d" % (i+1, N), end="\r")
        standard_deviation += (log_prob[i] - average)**2
        
    standard_deviation = np.sqrt(standard_deviation / (N * (N - 1)))
        
    return average, standard_deviation

In [None]:
entropy, entropy_error = entropy_leave_one_out(copy.copy(kde_codes), codes_pca)
entropy_error *= 3.3 # Коэффициент Стьюдента.
print("H: %f, errH: %f" % (entropy, entropy_error))

In [None]:
def _lvo_step(bandwidth, samples, i):
    lvo_samples = samples
    np.delete(lvo_samples, i)
    
    kde = KernelDensity(bandwidth=bandwidth, kernel='gaussian')
    kde.fit(lvo_samples)
    return kde.score_samples([samples[i]])[0]

In [None]:
def entropy_leave_one_out_parallel(bandwidth, samples, random_state = 42):
    N = len(samples)
    log_prob = np.array(Parallel(n_jobs=8, verbose=10, batch_size=2)(delayed(_lvo_step)(bandwidth, samples, i) for i in range(N)))
    
    average = -sum(log_prob) / N
    standard_deviation = 0.0
    for i in range(N):
        #print("i: %d / %d" % (i+1, N), end="\r")
        standard_deviation += (log_prob[i] - average)**2
        
    standard_deviation = np.sqrt(standard_deviation / (N * (N - 1)))
        
    return average, standard_deviation

In [None]:
entropy, entropy_error = entropy_leave_one_out_parallel(kde_codes.get_params()['bandwidth'], codes_pca)
entropy_error *= 3.3 # Коэффициент Стьюдента.
print("H: %f, errH: %f" % (entropy, entropy_error))

In [None]:
def entropy_multivariate_normal(dimension, stdev):
    return 0.5 * dimension * (1 + np.log(2 * np.pi) + 2*np.log(stdev))

In [None]:
entropy_multivariate_normal(pca_codes_dim, kde_codes.get_params()['bandwidth'])