# Mutual Information (MNIST)

Эксперименты с оценкой энтропии для данных рукописных цифр.

# Преамбула

## Библиотеки

### Tensorflow

In [None]:
import tensorflow.compat.v2 as tf
import tensorflow_datasets as tfds
import tensorflow_addons as tfa

tfds.disable_progress_bar()
tf.enable_v2_behavior()

print(tf.__version__)
print("Num GPUs Available: ", len(tf.config.experimental.list_physical_devices('GPU')))
tf.config.experimental.list_physical_devices()

### Math, Numpy, Scipy, Pandas

In [None]:
import math
import numpy as np
import scipy as sp
import scipy.stats as sps
import scipy.linalg as spl
import pandas as pd

### Matplotlib, Seaborn

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

### Sklearn

In [None]:
# Деревья.
from sklearn.neighbors import KernelDensity
from sklearn.neighbors import BallTree
from sklearn.neighbors import KDTree

# Метрика.
from sklearn.metrics import pairwise_distances_argmin_min

# Метод главных компонент.
from sklearn.decomposition import PCA

# Выбор модели по кросс-валидации (поиск по сетке).
from sklearn.model_selection import GridSearchCV

### Joblib

In [None]:
from joblib import Parallel, delayed

n_jobs = 16

### OS, shutil, Json, CSV, copy

In [None]:
import os
import shutil
import json
import csv
import copy

## Вспомогательное

In [None]:
# Информация об опыте.
info = dict()

In [None]:
def normalize_uint8(data, label):
    """Нормализация: `uint8` -> `float32`."""
    return tf.cast(data, tf.float32) / 255.0, label

In [None]:
def imshow_array(array):
    """Отображение массива нормированных пикселей."""
    plt.axis('off')
    plt.imshow((255.0 * array).astype(np.uint8), cmap=plt.get_cmap("gray"), vmin=0, vmax=255)

In [None]:
def dataset_Y_to_X(X, Y):
    """Поменять у датасета пары (X, Y) на (X, X) (нужно, например, для обучения автоэнкодера)."""
    return X, X

In [None]:
def concave_loss(y_true, y_pred):
    """Вогнутая функция потерь, дающая более четкие изображения при обучении."""
    delta = tf.keras.backend.abs(y_true - y_pred)
    squared = tf.keras.backend.square(y_true - y_pred)
    return tf.keras.backend.mean(delta - 0.5 * squared, axis=-1)

## Google Drive

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

## Путь к папке с данными

In [None]:
#path = "/content/drive/My Drive/Information_v2/"
path = os.path.abspath(os.getcwd()) + "/data/"

In [None]:
experiments_path = path + "mutual_information/MNIST/"
models_path = experiments_path + "models/"

In [None]:
mnist_shape = (28, 28, 1)

### Полный набор данных

In [None]:
(ds_full_train, ds_full_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

In [None]:
ds_full_train = ds_full_train.map(normalize_uint8, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_full_test  = ds_full_test.map(normalize_uint8, num_parallel_calls=tf.data.experimental.AUTOTUNE)

In [None]:
ds_train = ds_full_train.take(60000)
ds_train = np.array([sample for sample in ds_train])

ds_test  = ds_full_test.take(60000)
ds_test  = np.array([sample for sample in ds_test])

In [None]:
ds_train_X = ds_train[:,0]
ds_test_X  = ds_test[:,0]

In [None]:
ds_train_Y = ds_train[:,1]
ds_test_Y = ds_test[:,1]

## Классификатор изображений

### Тренировочные и тестовые наборы

In [None]:
cl_batch_size = 2048

In [None]:
(ds_cl_train, ds_cl_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

In [None]:
ds_cl_train = ds_cl_train.map(normalize_uint8, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_cl_train = ds_cl_train.cache()
ds_cl_train = ds_cl_train.shuffle(ds_info.splits['train'].num_examples)
ds_cl_train = ds_cl_train.batch(cl_batch_size)
ds_cl_train = ds_cl_train.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
ds_cl_test = ds_cl_test.map(normalize_uint8, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_cl_test = ds_cl_test.batch(cl_batch_size)
ds_cl_test = ds_cl_test.cache()
ds_cl_test = ds_cl_test.prefetch(tf.data.experimental.AUTOTUNE)

### Классификатор

In [None]:
def convolutional_classifier(shape_input):
    # Инициализация весов.
    init = tf.keras.initializers.RandomNormal(stddev = 0.02)

    # Входные данные генератора / выборки.
    input_layer = tf.keras.layers.Input(shape_input)
    next_layer = input_layer
    next_layer = tf.keras.layers.GaussianNoise(1e-2, name='AGN_0')(next_layer)

    # 1 блок слоёв.  
    next_layer = tfa.layers.SpectralNormalization(name='SN_1', tf.keras.layers.Conv2D(
        filters = 16, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init))(next_layer)
    
    #next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1, name='DO_1')(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)
    next_layer = tf.keras.layers.GaussianNoise(1e-2, name='AGN_1')(next_layer)

    output_layer_1 = next_layer

    # 2 блок слоёв. 
    next_layer = tfa.layers.SpectralNormalization(name='SN_2', tf.keras.layers.Conv2D(
        filters = 8, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init))(next_layer)
    
    next_layer = tf.keras.layers.BatchNormalization(name='BN_2')(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1, name='DO_2')(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)
    next_layer = tf.keras.layers.GaussianNoise(1e-2, name='AGN_2')(next_layer)

    output_layer_2 = next_layer

    # 3 блок слоёв.
    next_layer = tfa.layers.SpectralNormalization(name='SN_3', tf.keras.layers.Conv2D(
        filters = 4, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init))(next_layer)
    next_layer = tf.keras.layers.BatchNormalization(name='BN_3')(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1, name='DO_3')(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)
    next_layer = tf.keras.layers.GaussianNoise(1e-2, name='AGN_3')(next_layer)

    output_layer_3 = next_layer
    
    # 4 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.05, name='AGN_4')(next_layer)
    #next_layer = tf.keras.layers.Conv2D(filters = 16, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    #next_layer = tf.keras.layers.BatchNormalization(name='BA_3')(next_layer)
    #next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    #next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    #output_layer_4 = tf.keras.layers.Activation(tf.keras.activations.sigmoid)(next_layer)

    # Вывод.
    next_layer = tf.keras.layers.Flatten()(next_layer)
    next_layer = tfa.layers.SpectralNormalization(tf.keras.layers.Dense(10))(next_layer)
    output_layer = tf.keras.layers.Activation('softmax')(next_layer)

    # Модель.
    model = tf.keras.models.Model(input_layer, output_layer)
    debug_model = tf.keras.models.Model([input_layer], [output_layer_1, output_layer_2, output_layer_3])

    # Компиляция модели.
    opt = tf.keras.optimizers.Adam(lr = 1e-3)
    model.compile(loss = 'sparse_categorical_crossentropy', optimizer = opt, loss_weights = [1.0], metrics=['accuracy'])
    return model, debug_model

In [None]:
# Загрузка модели.
#classifier = tf.keras.models.load_model(models_path + "/classifier/classifier.h5")
#debug_classifier = tf.keras.models.load_model(models_path + "/classifier/debug_classifier.h5")

In [None]:
classifier, debug_classifier = convolutional_classifier(mnist_shape)
# Сводка по модели.
classifier.summary()
# Отрисовка модели.
#tf.keras.utils.plot_model(classifier, show_shapes = True, show_layernames = True)

In [None]:
classifier.fit(
    ds_cl_train,
    epochs=300,
    validation_data=ds_cl_test
)

In [None]:
classifier.save(models_path + "/classifier/classifier.h5")
debug_classifier.save(models_path + "/classifier/debug_classifier.h5")

## Автокодировщик для изображений

### Тренировочные и тестовые наборы

In [None]:
ae_batch_size = 2048

In [None]:
(ds_ae_train, ds_ae_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

In [None]:
ds_ae_train = ds_ae_train.map(normalize_uint8, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_ae_train = ds_ae_train.map(dataset_Y_to_X, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_ae_train = ds_ae_train.cache()
ds_ae_train = ds_ae_train.shuffle(ds_info.splits['train'].num_examples)
ds_ae_train = ds_ae_train.batch(ae_batch_size)
ds_ae_train = ds_ae_train.prefetch(tf.data.experimental.AUTOTUNE)

In [None]:
ds_ae_test = ds_ae_test.map(normalize_uint8, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_ae_test = ds_ae_test.map(dataset_Y_to_X, num_parallel_calls=tf.data.experimental.AUTOTUNE)
ds_ae_test = ds_ae_test.batch(ae_batch_size)
ds_ae_test = ds_ae_test.cache()
ds_ae_test = ds_ae_test.prefetch(tf.data.experimental.AUTOTUNE)

### Автокодировщик

In [None]:
# РАЗМЕРНОСТЬ КОДА.
# #
# #

codes_dim_X = 10 # MNSIT

# #
# #

In [None]:
def cnn_autoencoder(shape_input, dimension):
    # Инициализация весов.
    init = tf.keras.initializers.RandomNormal(stddev = 1.0)

    # Входные данные генератора / выборки.
    input_layer = tf.keras.layers.Input(shape_input)
    next_layer = input_layer

    # 1 блок слоёв.
    next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 12, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    # 2 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 18, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    # 3 блок слоёв.
    next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 27, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    next_layer = tf.keras.layers.MaxPooling2D(pool_size = (2, 2), padding = 'same')(next_layer)

    # Бутылочное горлышко.
    next_layer = tf.keras.layers.Flatten()(next_layer)
    next_layer = tf.keras.layers.Dense(dimension)(next_layer)
    bottleneck = tf.keras.layers.Activation('tanh')(next_layer)

    # Модель кодировщика.
    encoder = tf.keras.Model(input_layer, bottleneck)

    # Начало модели декодировщика.
    input_code_layer = tf.keras.layers.Input((dimension))
    next_layer = input_code_layer

    # 3 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.Dense(4*4*27)(next_layer)
    next_layer = tf.keras.layers.Reshape((4, 4, 27))(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.2)(next_layer)

    # 2 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.UpSampling2D(size=(2, 2))(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 18, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.Cropping2D(cropping=((0, 1), (0, 1)))(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    # 1 блок слоёв.
    next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.UpSampling2D(size=(2, 2))(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 12, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    # 0 блок слоёв.
    #next_layer = tf.keras.layers.GaussianNoise(0.1)(next_layer)
    next_layer = tf.keras.layers.UpSampling2D(size=(2, 2))(next_layer)
    next_layer = tf.keras.layers.Conv2D(filters = 1, kernel_size = (3, 3), strides = (1, 1), padding = 'same', kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.BatchNormalization()(next_layer)
    next_layer = tf.keras.layers.Activation('sigmoid')(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    output_layer = next_layer

    # Модель.
    decoder = tf.keras.models.Model(input_code_layer, output_layer) # Декодировщик.
    autoencoder = tf.keras.Sequential([encoder, decoder])

    # Компиляция модели.
    opt = tf.keras.optimizers.Adam(lr = 5e-3)
    autoencoder.compile(loss = concave_loss, optimizer = opt, loss_weights = [1.0])
    return encoder, decoder, autoencoder

In [None]:
#encoder_X = tf.keras.models.load_model(models_path + "autoencoder/encoder_X.h5")
#decoder_X = tf.keras.models.load_model(models_path + "autoencoder/decoder_X.h5")
#autoencoder_X = tf.keras.Sequential([encoder_X, decoder_X])
#autoencoder_X.compile(loss = concave_loss, optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3), loss_weights = [1.0])

In [None]:
encoder_X, decoder_X, autoencoder_X = cnn_autoencoder(mnist_shape, codes_dim_X)

In [None]:
autoencoder_X.fit(
    ds_ae_train,
    epochs=300,
    validation_data=ds_ae_test,
    batch_size=128
)

In [None]:
autoencoder_X.compile(loss = concave_loss, optimizer = tf.keras.optimizers.Adam(lr = 1e-3), loss_weights = [1.0])

In [None]:
autoencoder_X.fit(
    ds_ae_train,
    epochs=100,
    validation_data=ds_ae_test,
    batch_size=128
)

In [None]:
# Сохранение моделей.
autoencoder_X.save(models_path + "/autoencoder/autoencoder_X.h5")
encoder_X.save(models_path + "/autoencoder/encoder_X.h5")
decoder_X.save(models_path + "/autoencoder/decoder_X.h5")

# Оценка взаимной информации

In [None]:
# Номер исследуемого слоя.
layer_index = 3

### Перестройка исследуемой модели без выпадения и нормализации

In [None]:
# Выключение выпадение.
debug_classifier.get_layer('DO_1').rate = 0.0
debug_classifier.get_layer('DO_2').rate = 0.0
debug_classifier.get_layer('DO_3').rate = 0.0

# Выключение нормализации по батчам.
debug_classifier.get_layer('BN_2').trainable = False
debug_classifier.get_layer('BN_3').trainable = False

# Выключение спектральной нормализации.
debug_classifier.get_layer('SN_1').trainable = False
debug_classifier.get_layer('SN_2').trainable = False
debug_classifier.get_layer('SN_3').trainable = False

In [None]:
# Клонирование модели
old_classifier = classifier
classifier = tf.keras.models.clone_model(classifier)

In [None]:
# Загрузка весов
classifier.set_weights(old_classifier.get_weights())

In [None]:
mnist_example = ds_full_train.take(3)
for sample in mnist_example:
    image, label = sample[0].numpy(), sample[1].numpy()
    imshow_array(image[:, :, 0])
    plt.show()
    print("Label: %d" % label)

    # Предсказание.
    print(classifier.predict(np.array([image]))[0])

    # Отрисовка слоёв.
   
    debug = debug_classifier(np.array([image]), training=True)
    for output in debug:
        for element in output:
            element = np.swapaxes(element, 0, 2)
            element = np.swapaxes(element, 1, 2)

            i = 0
            n = len(element)
            for filter in element:
                plt.subplot(1, n, 1 + i)
                imshow_array(filter)
                i += 1

            plt.show()

### Получение значений слоя

In [None]:
_splitted = tf.split(tf.stack(ds_train_X), 10)
_layer_predicted_train = tf.concat([debug_classifier(_splitted[i], training=False)[layer_index - 1] for i in range(10)], 0)

_splitted = tf.split(tf.stack(ds_test_X), 10)
_layer_predicted_test = tf.concat([debug_classifier(_splitted[i], training=False)[layer_index - 1] for i in range(10)], 0)

In [None]:
ds_train_L = np.array([_layer_predicted_train[i].numpy().flatten() for i in range(_layer_predicted_train.shape[0])])
ds_test_L  = np.array([_layer_predicted_test[i].numpy().flatten() for i in range(_layer_predicted_test.shape[0])])

### Путь к результатам

In [None]:
dataset_path = experiments_path + "layer_" + str(layer_index) + "/"

## Автокодировщик

Сжатие данных предлагается делать автокодировщиком.
Для архитектуры специфицируется только формат входных данных, а также размерность внутреннего представления (кодов).

In [None]:
# РАЗМЕРНОСТЬ КОДА.
# #
# #

codes_dim_L = 4  # Слой.

# #
# #

In [None]:
# Число эпох для обучения.
autoencoders_epochs = 2000

In [None]:
full_path = dataset_path + "autoencoders/"
os.makedirs(full_path, exist_ok=True)

In [None]:
info['autoencoders_epochs'] = autoencoders_epochs

### Автокодировщик для слоя

In [None]:
batch_normalizationdef dense_autoencoder(shape_input, dimension):
    # Инициализация весов.
    init = tf.keras.initializers.RandomNormal(stddev = 0.02)

    # Входные данные генератора / выборки.
    input_layer = tf.keras.layers.Input(shape_input)
    next_layer = input_layer
    next_layer = tf.keras.layers.GaussianNoise(0.02)(next_layer)

    # 1 блок слоёв.
    next_layer = tfa.layers.SpectralNormalization(tf.keras.layers.Dense(512, kernel_initializer = init),
                                                  power_iterations = 3)(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)

    # 2 блок слоёв.
    next_layer = tfa.layers.SpectralNormalization(tf.keras.layers.Dense(256, kernel_initializer = init),
                                                  power_iterations = 3)(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    
    # 3 блок слоёв.
    #next_layer = tfa.layers.SpectralNormalization(tf.keras.layers.Dense(128, kernel_initializer = init),
    #                                              power_iterations = 3)(next_layer)
    #next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    
    # 4 блок слоёв.
    #next_layer = tfa.layers.SpectralNormalization(tf.keras.layers.Dense(32, kernel_initializer = init),
    #                                              power_iterations = 3)(next_layer)
    #next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    #next_layer = tf.keras.layers.Dropout(0.1)(next_layer)
    
    # Бутылочное горлышко.
    next_layer = tfa.layers.SpectralNormalization(tf.keras.layers.Dense(dimension),
                                                  power_iterations = 3)(next_layer)
    bottleneck = tf.keras.layers.Activation('tanh', name='bottleneck')(next_layer)

    # Модель кодировщика.
    encoder = tf.keras.Model(input_layer, bottleneck)

    # Начало модели декодировщика.
    input_code_L = tf.keras.layers.Input((dimension))
    next_layer = input_code_L
    
    # 4 блок слоёв.
    #next_layer = tf.keras.layers.Dense(32, kernel_initializer = init)(next_layer)
    #next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)

    # 3 блок слоёв.
    #next_layer = tf.keras.layers.Dense(128, kernel_initializer = init)(next_layer)
    #next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    
    # 2 блок слоёв.
    next_layer = tf.keras.layers.Dense(256, kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)

    # 1 блок слоёв.
    next_layer = tf.keras.layers.Dense(512, kernel_initializer = init)(next_layer)
    next_layer = tf.keras.layers.LeakyReLU(alpha=0.2)(next_layer)
    
    # 0 блок слоёв.
    next_layer = tf.keras.layers.Dense(shape_input[0])(next_layer) # Подразумевается, что вход - всё равно вектор.
    #next_layer = tf.keras.layers.Activation('tanh')(next_layer)
    
    output_layer = next_layer
    
    # Модель.
    decoder = tf.keras.models.Model(input_code_L, output_layer) # Декодировщик.
    autoencoder = tf.keras.Sequential([encoder, decoder])

    # Компиляция модели.
    opt = tf.keras.optimizers.Adam(learning_rate = 2e-3)
    autoencoder.compile(loss = 'mse', optimizer = opt)
    
    return encoder, decoder, autoencoder

### Загрузка модели

In [None]:
#encoder_L = tf.keras.models.load_model(full_path + "encoder_L.h5")
#decoder_L = tf.keras.models.load_model(full_path + "decoder_L.h5")
#autoencoder_L = tf.keras.Sequential([encoder_L, decoder_L])
#autoencoder.compile(loss = 'mse', optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3), loss_weights = [1.0])

#with open(full_path + 'info.json', 'r') as fp:
#    info = json.load(fp)

In [None]:
encoder_L, decoder_L, autoencoder_L = dense_autoencoder((ds_train_L.shape[1],), codes_dim_L)

#### Обучение 1

In [None]:
autoencoder_L.fit(
    ds_train_L,
    ds_train_L,
    epochs=autoencoders_epochs,
    validation_data=(ds_test_L, ds_test_L),
    batch_size=ds_train_L.shape[0] // 10)

In [None]:
autoencoder_L.compile(loss = 'mse', optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3))

In [None]:
autoencoder_L.fit(
    ds_train_L,
    ds_train_L,
    epochs=1000,
    validation_data=(ds_test_L, ds_test_L),
    batch_size=ds_train_L.shape[0])

In [None]:
# Сохранение моделей.
autoencoder_L.save(full_path + "autoencoder_L.h5")
encoder_L.save(full_path + "encoder_L.h5")
decoder_L.save(full_path + "decoder_L.h5")

In [None]:
# Сохранение информации.
with open(full_path + 'info.json', 'w') as fp:
    json.dump(info, fp, indent=4)

### Получение кодов всех элементов набора данных

In [None]:
# Вход классификатора
_splitted = tf.split(tf.stack(ds_train_X), 10)
codes_X = tf.concat([encoder_X(_splitted[i], training=False) for i in range(10)], 0)

In [None]:
# Выход слоя
codes_L = np.array(encoder_L.predict(ds_train_L))

In [None]:
# Совместный датасет для входа классификатора и выхода слоя
codes_X_L = np.concatenate((codes_X, codes_L), 1)
codes_X_layerdim = codes_dim_L + codes_dim_X

In [None]:
# Проверка
print(np.linalg.norm(ds_train_L[0] -
                     debug_classifier(np.expand_dims(ds_train_X[0], 0), training=False)[2].numpy().flatten()))

print(np.linalg.norm(ds_train_L[1] -
                     debug_classifier(np.expand_dims(ds_train_X[0], 0), training=False)[2].numpy().flatten()))

In [None]:
PCA_codes_X = PCA(n_components=codes_dim_X, whiten=True)
codes_pca_X = np.array(PCA_codes_X.fit_transform(codes_X))

PCA_codes_L = PCA(n_components=codes_dim_L, whiten=True)
codes_pca_L = np.array(PCA_codes_L.fit_transform(codes_L))

In [None]:
PCA_codes_X_L = PCA(n_components=codes_X_layerdim, whiten=True)
codes_pca_X_L = np.array(PCA_codes_X_L.fit_transform(codes_X_L))

In [None]:
pp = sns.pairplot(pd.DataFrame(codes_pca_X_L[0:1000]), height = 2.0, aspect=1.6,
                      plot_kws=dict(edgecolor="k", linewidth=0.0, alpha=0.05, size=0.01, s=0.01),
                      diag_kind="kde", diag_kws=dict(shade=True))

fig = pp.fig
fig.subplots_adjust(top=0.93, wspace=0.3)
t = fig.suptitle('Pairwise Plots', fontsize=14)

### KDE для кодов

In [None]:
def smart_gridsearch(begin, end, data, resolution = 7, rel_x_epsilon = 0.01, rtol = 0.001, n_jobs = 2, cv = 5):
    while True:
        grid = np.logspace(np.log10(begin), np.log10(end), resolution)
        print("Поиск по сетке: ", grid)
        params = {'bandwidth': grid}
        
        grid_search = GridSearchCV(KernelDensity(rtol = rtol, kernel='gaussian'), params, n_jobs = n_jobs, verbose = 10, cv = cv)
        grid_search.fit(data)
        
        if grid_search.best_index_ == 0:
            begin *= begin / end
            end = grid[1]
        elif grid_search.best_index_ == resolution - 1:
            end *= end / grid[-2]
            begin = grid[-2]
        else:
            begin = grid[grid_search.best_index_ - 1]
            end = grid[grid_search.best_index_ + 1]

            if end - begin < rel_x_epsilon * grid[grid_search.best_index_]:
                return grid_search 

In [None]:
KDE_codes_X = smart_gridsearch(0.3, 0.6, codes_pca_X, n_jobs = n_jobs).best_estimator_
KDE_codes_X.set_params(rtol = 0.0)
print(KDE_codes_X.get_params())

In [None]:
KDE_codes_L = smart_gridsearch(0.01, 0.2, codes_pca_L, n_jobs = n_jobs).best_estimator_
KDE_codes_L.set_params(rtol = 0.0)
print(KDE_codes_L.get_params())

In [None]:
KDE_codes_X_L = smart_gridsearch(0.3, 0.6, codes_pca_X_L, n_jobs = n_jobs).best_estimator_
KDE_codes_X_L.set_params(rtol = 0.0)
#KDE_codes_12 = KernelDensity(rtol = 0.0, bandwidth = max(KDE_codes_1.get_params()['bandwidth'], KDE_codes_2.get_params()['bandwidth']))
print(KDE_codes_X_L.get_params())

In [None]:
info['bandwidth_X'] = KDE_codes_X.get_params()['bandwidth']
info['bandwidth_L'] = KDE_codes_L.get_params()['bandwidth']
info['bandwidth_X_L'] = KDE_codes_X_L.get_params()['bandwidth']

# Сохранение информации.
with open(full_path + 'info.json', 'w') as fp:
    json.dump(info, fp, indent=4)

## Подсчёт взаимной информации

### $ I(X, L) $

Взаимная информация между входом и слоем:

$$
I(X, L) = H(X) + H(L) - H(X,L)
$$

In [None]:
def _loo_step(bandwidth, samples, i):
    loo_samples = samples
    np.delete(loo_samples, i)
    
    kde = KernelDensity(bandwidth=bandwidth, kernel='gaussian')
    kde.fit(loo_samples)
    return kde.score_samples([samples[i]])[0]

In [None]:
def entropy_leave_one_out_parallel(path, bandwidth, samples, n_jobs = 2, first_N = None, parts = 10, recover_saved = False):
    """
    Параллельное вычисление оценки энтропии методом убрать-один-элемент.
    """
    
    # Создание временных папок для сохранения прогресса.
    parts_path = path + "LOO_PARTS/"
    os.makedirs(parts_path, exist_ok=True)

    # Если дано first_N, энтропия будет оцениваться только на первых first_N элементах.
    N = 0
    if first_N is None:
        N = len(samples)
    else:
        N = first_N

    # Число частей и массив, их содержащий.
    N_per_part = N // parts
    log_probs = []

    # Восстанавливаем прогресс, если требуется.
    recovered_parts = 0
    if recover_saved:
        for filename in os.listdir(parts_path):
            if filename.endswith(".csv"):
                log_probs.append(np.loadtxt(parts_path + filename))
                recovered_parts += 1

    print("Восстановлено блоков данных: %d" % recovered_parts)

    # Подсчёт логарифма вероятности в точках.
    for part in range(recovered_parts, parts):
        log_probs.append(
            np.array(
                Parallel(n_jobs = n_jobs, verbose = 10, batch_size = 8)(
                    delayed(_loo_step)(bandwidth, samples, i) for i in range(part * N_per_part, min((part + 1) * N_per_part, N))
                )
            )
        )
        np.savetxt(parts_path + str(part) + ".csv", log_probs[part], delimiter="\n")
    
    # Объединение в один массив.
    log_prob = np.concatenate(log_probs)

    # Суммирование и нахождение стандартного отклонения.
    average = -math.fsum(log_prob) / N    
    squared_deviations = np.zeros(N)
    for i in range(N):
        squared_deviations[i] = (log_prob[i] - average)**2
    standard_deviation = np.sqrt(math.fsum(squared_deviations) / (N * (N - 1)))
    
    # Удаление временных файлов.
    shutil.rmtree(parts_path)
        
    return average, standard_deviation

$$
H(X)
$$

In [None]:
# Исходный набор данных.
latent_entropy_X, latent_entropy_error_X = entropy_leave_one_out_parallel(full_path,
                                                                          KDE_codes_X.get_params()['bandwidth'],
                                                                          codes_pca_X,
                                                                          n_jobs = n_jobs,
                                                                          first_N = 60000, 
                                                                          recover_saved = False)

print("LH_X: %f, errLH_X: %f" % (latent_entropy_X, latent_entropy_error_X))

$$
H(L)
$$

In [None]:
# Слой.
latent_entropy_L, latent_entropy_error_L = entropy_leave_one_out_parallel(full_path,
                                                                          KDE_codes_L.get_params()['bandwidth'],
                                                                          codes_pca_L,
                                                                          n_jobs = n_jobs,
                                                                          first_N = 60000,
                                                                          recover_saved = False)

print("LH_L: %f, errLH_L: %f" % (latent_entropy_L, latent_entropy_error_L))

$$
H(X,L)
$$

In [None]:
# Совместное распределение.
latent_entropy_X_L, latent_entropy_error_X_L = entropy_leave_one_out_parallel(full_path,
                                                                              KDE_codes_X_L.get_params()['bandwidth'],
                                                                              codes_pca_X_L,
                                                                              n_jobs = n_jobs,
                                                                              first_N = 60000,
                                                                              recover_saved = False)

print("LH_X_L: %f, errLH_X_L: %f" % (latent_entropy_X_L, latent_entropy_error_X_L))

In [None]:
info['latent_entropy_X'] = latent_entropy_X
info['latent_entropy_error_X'] = latent_entropy_error_X

info['latent_entropy_L'] = latent_entropy_L
info['latent_entropy_error_L'] = latent_entropy_error_L

info['latent_entropy_X_L'] = latent_entropy_X_L
info['latent_entropy_error_X_L'] = latent_entropy_error_X_L

# Сохранение информации.
with open(full_path + 'info.json', 'w') as fp:
    json.dump(info, fp, indent=4)

In [None]:
# Коэффициент растяжения при денормализации.
PCA_codes_defc_X = np.abs(np.linalg.det( PCA_codes_X.inverse_transform(np.eye(codes_dim_X)) -
                                         PCA_codes_X.inverse_transform(np.zeros((codes_dim_X, codes_dim_X))) ))

PCA_codes_defc_L = np.abs(np.linalg.det( PCA_codes_L.inverse_transform(np.eye(codes_dim_L)) -
                                         PCA_codes_L.inverse_transform(np.zeros((codes_dim_L, codes_dim_L))) ))
                                                                       
PCA_codes_defc_X_L = np.abs(np.linalg.det( PCA_codes_X_L.inverse_transform(np.eye(codes_X_layerdim)) -
                                          PCA_codes_X_L.inverse_transform(np.zeros((codes_X_layerdim, codes_X_layerdim))) ))

In [None]:
# Соответствующая энтропия.
PCA_codes_transform_entropy_X = np.log(PCA_codes_defc_X)
PCA_codes_transform_entropy_L = np.log(PCA_codes_defc_L)
PCA_codes_transform_entropy_X_L = np.log(PCA_codes_defc_X_L)

print("PCA_TH_X: %f" % PCA_codes_transform_entropy_X)
print("PCA_TH_L: %f" % PCA_codes_transform_entropy_L)
print("PCA_TH_X_L: %f" % PCA_codes_transform_entropy_X_L)

In [None]:
info['PCA_codes_transform_entropy_X'] = PCA_codes_transform_entropy_X
info['PCA_codes_transform_entropy_L'] = PCA_codes_transform_entropy_L
info['PCA_codes_transform_entropy_X_L'] = PCA_codes_transform_entropy_X_L

# Сохранение информации.
with open(full_path + 'info.json', 'w') as fp:
    json.dump(info, fp, indent=4)

In [None]:
# Итоговая оценка энтропии.
entropy_X = latent_entropy_X + PCA_codes_transform_entropy_X
entropy_error_X = latent_entropy_error_X

entropy_L = latent_entropy_L + PCA_codes_transform_entropy_L
entropy_error_L = latent_entropy_error_L

entropy_X_L = latent_entropy_X_L + PCA_codes_transform_entropy_X_L
entropy_error_X_L = latent_entropy_error_X_L

print("H_X: %f, errH_X: %f\nH_L: %f, errH_L %f\nH_X_L: %f, errH_X_L: %f" %
      (entropy_X, entropy_error_X,
       entropy_L, entropy_error_L,
       entropy_X_L, entropy_error_X_L))

In [None]:
mutual_information_X_L = entropy_X + entropy_L - entropy_X_L
mutual_information_error_X_L = entropy_error_X + entropy_error_L + entropy_error_X_L

print("MI: %f, errMI: %f" % (mutual_information_X_L, mutual_information_error_X_L))

In [None]:
info['entropy_X'] = entropy_X
info['entropy_error_X'] = entropy_error_X

info['entropy_L'] = entropy_L
info['entropy_error_L'] = entropy_error_L

info['entropy_X_L'] = entropy_X_L
info['entropy_error_X_L'] = entropy_error_X_L


info['mutual_information_X_L'] = mutual_information_X_L
info['mutual_information_error_X_L'] = mutual_information_error_X_L

# Сохранение информации.
with open(full_path + 'info.json', 'w') as fp:
    json.dump(info, fp, indent=4)

### $ I(L, Y) $

Взаимная информация между слоем и меткой:

$$
I(L, Y) = H(L) - H(L \mid Y)
$$

$$
H(L \mid Y) = \sum_{\text{Im} \, Y} p_Y(y) \cdot \left[ - \int\limits_{\text{Im} \, Y} \rho_L(l \mid y) \ln \left( \rho_L(l \mid y) \right) \, dl \right]
$$

In [None]:
# Распределение меток

n_labels = 10
P_Y = np.zeros(n_labels)
for i in range(ds_train_Y.shape[0]):
    P_Y[ds_train_Y[i]] += 1
    
P_Y /= ds_train_Y.shape[0]

In [None]:
P_Y

In [None]:
entropy_L_mid_Y_array = np.zeros(n_labels)
entropy_L_mid_Y_error_array = np.zeros(n_labels)

for y in range(n_labels):
    print("Расчёт для метки %d" % y)
    
    # Получение кодов для данной метки.
    codes_L_mid_Y = np.array([codes_L[i] for i in range(codes_L.shape[0]) if ds_train_Y[i] == y])
    
    # PCA
    PCA_codes_L_mid_Y = PCA(n_components=codes_dim_L, whiten=True)
    codes_pca_L_mid_Y = np.array(PCA_codes_L_mid_Y.fit_transform(codes_L_mid_Y))
    
    # KDE
    KDE_codes_L_mid_Y = smart_gridsearch(0.01, 0.2, codes_pca_L_mid_Y, n_jobs = n_jobs).best_estimator_
    KDE_codes_L_mid_Y.set_params(rtol = 0.0)
    print(KDE_codes_L_mid_Y.get_params())
    
    # Совместное распределение.
    le_L_mid_X, le_error_L_mid_X = entropy_leave_one_out_parallel(full_path,
                                                                  KDE_codes_L_mid_Y.get_params()['bandwidth'],
                                                                  codes_pca_X_L,
                                                                  n_jobs = n_jobs,
                                                                  first_N = 60000,
                                                                  recover_saved = False)

    print("LH_L_mid_Y(%d): %f, errLH_L_mid_Y(%d): %f" % (y, le_L_mid_X, y, le_error_L_mid_X))
    
    # Изменение энтропии при денормализации.
    PCA_codes_defc_L_mid_Y = np.abs(
        np.linalg.det( PCA_codes_L_mid_Y.inverse_transform(np.eye(codes_dim_L)) -
        PCA_codes_L_mid_Y.inverse_transform(np.zeros((codes_dim_L, codes_dim_L))) )
    )
    PCA_codes_transform_entropy_L_mid_Y = np.log(PCA_codes_defc_L_mid_Y)
    
    # Итоговая энтропия.
    entropy_L_mid_Y_array[y] = le_L_mid_X + PCA_codes_transform_entropy_L_mid_Y
    entropy_L_mid_Y_error_array[y] = le_error_L_mid_X

In [None]:
entropy_L_mid_Y = P_Y @ entropy_L_mid_Y_array
entropy_L_mid_Y_error = P_Y @ entropy_L_mid_Y_error_array

print("H_L_mid_Y: %f, errH_L_mid_Y: %f" % (entropy_L_mid_Y, entropy_L_mid_Y_error))

In [None]:
mutual_information_L_Y = entropy_L - entropy_L_mid_Y
mutual_information_error_L_Y = entropy_error_L + entropy_L_mid_Y_error

print("MI: %f, errMI: %f" % (mutual_information_L_Y, mutual_information_error_L_Y))

In [None]:
info['entropy_L_mid_Y'] = entropy_L_mid_Y
info['entropy_L_mid_Y_error'] = entropy_L_mid_Y_error


info['mutual_information_L_Y'] = mutual_information_L_Y
info['mutual_information_error_L_Y'] = mutual_information_error_L_Y

# Сохранение информации.
with open(full_path + 'info.json', 'w') as fp:
    json.dump(info, fp, indent=4)