# Test mutual information estimators

## Preamble

In [None]:
import numpy as np
import pandas as pd
import scipy.stats as sps

In [None]:
%matplotlib inline
import matplotlib
import matplotlib.pyplot as plt

import seaborn as sns

In [None]:
font = {'family' : 'DejaVu Sans',
        'size'   : 18}

matplotlib.rc('font', **font)

In [None]:
import os
import json
import csv

from datetime import datetime

In [None]:
from pathlib import Path
path = os.path.abspath(os.path.join(os.path.abspath(os.getcwd()), "../../data/"))

In [None]:
experiments_path = path + "/mutual_information/synthetic/"

#### Importing the module

In [None]:
import mutinfo.estimators.mutual_information as mi_estimators
from mutinfo.utils.dependent_norm import multivariate_normal_from_MI

In [None]:
### SETTINGS ###
%run ./Settings.ipynb

#### Standard tests with arbitrary mapping

In [None]:
def perform_normal_test(mi, n_samples, X_dimension, Y_dimension, X_map=None, Y_map=None, verbose=0):
    # Generation.
    random_variable = multivariate_normal_from_MI(X_dimension, Y_dimension, mi)
    X_Y = random_variable.rvs(n_samples)
    X = X_Y[:, 0:X_dimension]
    Y = X_Y[:, X_dimension:X_dimension + Y_dimension]
        
    # Mapping application.
    if not X_map is None:
        X = X_map(X)
           
    if not Y_map is None:
        Y = Y_map(Y)

    # Mutual information estimation.
    mi_estimator = mi_estimators.MutualInfoEstimator(entropy_estimator_params=entropy_estimator_params)
    mi_estimator.fit(X, Y, verbose=verbose)
    
    return mi_estimator.estimate(X, Y, verbose=verbose)

## Continuous case

In [None]:
def perform_normal_tests_MI(MI, n_samples, X_dimension, Y_dimension, X_map=None, Y_map=None, verbose=0):
    """
    Estimate mutual information for different true values
    (transformed normal distribution).
    """
    n_exps = len(MI)
    
    # Mutual information estimates.
    estimated_MI = []

    # Conducting the tests.
    for n_exp in range(n_exps):
        print("\nn_exp = %d/%d\n------------\n" % (n_exp + 1, n_exps))
        mi = perform_normal_test(MI[n_exp], n_samples, X_dimension, Y_dimension,
                                 X_map, Y_map, verbose)
        estimated_MI.append(mi)
        
    return np.asarray(estimated_MI)

In [None]:
def plot_estimated_MI(MI, estimated_MI, title, Bandwidth=None, bandwidth_scale=10.0):
    estimated_MI_mean = estimated_MI[:,0]
    estimated_MI_std  = estimated_MI[:,1]
    
    fig_normal, ax_normal = plt.subplots()

    fig_normal.set_figheight(11)
    fig_normal.set_figwidth(16)

    # Grid.
    ax_normal.grid(color='#000000', alpha=0.15, linestyle='-', linewidth=1, which='major')
    ax_normal.grid(color='#000000', alpha=0.1, linestyle='-', linewidth=0.5, which='minor')

    ax_normal.set_title(title)
    ax_normal.set_xlabel("$I(X,Y)$")
    ax_normal.set_ylabel("$\\hat I(X,Y)$")
    
    ax_normal.minorticks_on()
    
    #ax_normal.set_yscale('log')
    #ax_normal.set_xscale('log')

    ax_normal.plot(MI, MI, label="$I(X,Y)$", color='red')
    ax_normal.plot(MI, estimated_MI_mean, label="$\\hat I(X,Y)$")        
    ax_normal.fill_between(MI, estimated_MI_mean + estimated_MI_std, estimated_MI_mean - estimated_MI_std, alpha=0.2)
    
    if not Bandwidth is None:
        ax_normal.plot(MI, Bandwidth * bandwidth_scale, label="bandwidth")

    ax_normal.legend(loc='upper left')

    ax_normal.set_xlim((0.0, None))
    ax_normal.set_ylim((0.0, None))

    plt.show();

### Global parameters

In [None]:
# The values of mutual information under study.
#MI = [0.0, 0.1, 0.2, 0.3, 0.5, 0.7, 1.0, 1.5, 2.0, 3.0, 5.0, 6.0, 8.0, 10.0]
#MI = [0.0, 0.5, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0]
MI = np.linspace(0.0, 10.0, 41)
#MI = [0.0, 2.0, 5.0]
n_exps = len(MI)

# Sample size and dimensions of vectors X and Y.
n_samples = 20000
X_dimension = 4
Y_dimension = 4

### Gaussian random vector

In [None]:
# Mutual information estimate
estimated_MI = perform_normal_tests_MI(MI, n_samples, X_dimension, Y_dimension, verbose=10)

In [None]:
plot_estimated_MI(MI, estimated_MI, "Gaussian vectors")

In [None]:
save_estimated_MI(MI, estimated_MI, 'normal')

### Uniformly distributed vectors

Apply to the components of a normal random vector Gaussian cumulative distribution function.

In [None]:
from mutinfo.utils.synthetic import normal_to_uniform

In [None]:
from mutinfo.utils.matrices import get_scaling_matrix

In [None]:
def _uniform_pp():
    _X_Y = multivariate_normal_from_MI(1, 1, mutual_information=2.0).rvs(1000)
    _X = _X_Y[:, 0:1]
    _Y = _X_Y[:, 1:2]
    _X = normal_to_uniform(_X)
    _Y = normal_to_uniform(_Y)
    _X_Y = np.concatenate([_X, _Y], axis=1)
    #M = get_scaling_matrix(np.cov(_X_Y, rowvar=False))
    #_X_Y = _X_Y @ M
    print(np.cov(_X_Y, rowvar=False))

    pp = sns.pairplot(pd.DataFrame(_X_Y), height = 2.0, aspect=1.6,
                      plot_kws=dict(edgecolor="k", linewidth=0.0, alpha=0.05, size=0.01, s=0.01),
                      diag_kind="kde", diag_kws=dict(shade=True))

    fig = pp.fig
    fig.subplots_adjust(top=0.93, wspace=0.3)
    t = fig.suptitle("Pairplot", fontsize=14)
    
_uniform_pp()

In [None]:
from scipy.stats import norm

def _test(X):
    n, dim = X.shape
    X = normal_to_uniform(X)
    
    for index in range(dim):
        X[:,index] = (np.argsort(np.argsort(X[:,index])) + 1) / (n + 2)
        X[:,index] = norm.ppf(X[:,index])
        
    return X

In [None]:
# Оценки взаимной информации.
estimated_MI = perform_normal_tests_MI(MI, n_samples, X_dimension, Y_dimension,
                                       X_map=normal_to_uniform, Y_map=normal_to_uniform, verbose=10)

In [None]:
plot_estimated_MI(MI, estimated_MI, "Uniform distribution")

In [None]:
save_estimated_MI(MI, estimated_MI, 'uniform')

### Rings

We obtain uniform distributions according to the previous section.
Then we apply the following transformation:

$$
\begin{cases}
x' = [R \cdot x + r \cdot (1 - x)] \cdot \cos(2 \pi y) \\
y' = [R \cdot x + r \cdot (1 - x)] \cdot \sin(2 \pi y) \\
\end{cases}
$$

It is required to have dimension of both vectors equals $ 2 $.

In [None]:
r = 1.0
R = 2.0

def ring_mapping(X):
    """
    Gaussian vector to a ring.
    """
    
    assert len(X.shape) == 2
    assert X.shape[1] == 2
    
    X = normal_to_uniform(X)
    new_X = np.zeros_like(X)
    for index in range(X.shape[0]):
        rho = R * X[index][0] + r * (1.0 - X[index][0])
        phi = 2.0 * np.pi * X[index][1]
        
        new_X[index][0] = rho * np.cos(phi)
        new_X[index][1] = rho * np.sin(phi)
    
    return new_X

In [None]:
def _rings_pp():
    _X_Y = multivariate_normal_from_MI(2, 2, mutual_information=10.0).rvs(10000)
    _X = _X_Y[:, 0:2]
    _Y = _X_Y[:, 2:4]
    _X = ring_mapping(_X)
    _Y = ring_mapping(_Y)
    _X_Y = np.concatenate([_X, _Y], axis=1)

    pp = sns.pairplot(pd.DataFrame(_X_Y), height = 2.0, aspect=1.6,
                      plot_kws=dict(edgecolor="k", linewidth=0.0, alpha=0.05, size=0.01, s=0.01),
                      diag_kind="kde", diag_kws=dict(shade=True))

    fig = pp.fig
    fig.subplots_adjust(top=0.93, wspace=0.3)
    t = fig.suptitle("Pairplot", fontsize=14)
    
_rings_pp()

## Continuous-discrete case

In [None]:
def perform_uniform_discrete_test(n_labels, n_samples, X_dimension, X_map=None, verbose=0):
    # Generation.  
    X_random_variable = sps.uniform(scale=1.0)
    X = np.zeros(shape=(n_samples, X_dimension))
    for dim in range(X_dimension):
        X[:,dim] = X_random_variable.rvs(size=n_samples)
    
    # The discrete RV is obtained from the first component of the continuous RV.
    Y = (np.floor(X[:,0] * n_labels)).astype(int)
        
    # Применение преобразования.
    if not X_map is None:
        X = X_map(X)
        #X_Y = np.concatenate([X, Y], axis=1)

    # Entropy estimation.
    mi_estimator = mi_estimators.MutualInfoEstimator(
        Y_is_discrete=True,
        entropy_estimator_params=entropy_estimator_params
    )
    mi_estimator.fit(X, Y, verbose = verbose)
    
    return mi_estimator.estimate(X, Y, verbose=verbose)

In [None]:
def perform_uniform_discrete_test_MI(N_labels, n_samples, X_dimension, X_map=None, verbose=0):
    """
    Estimate mutual information for different true values
    (uniform distribution).
    """
    n_exps = len(N_labels)
    MI = np.array([np.log(N_labels[index]) for index in range(n_exps)])
    
    # Mutual information estimates.
    estimated_MI = []

    # Conducting the tests.
    for n_exp in range(n_exps):
        print("\nn_exp = %d/%d\n------------\n" % (n_exp + 1, n_exps))
        estimated_MI.append(perform_uniform_discrete_test(N_labels[n_exp], n_samples, X_dimension, X_map, verbose))
        
    return np.asarray(MI), np.asarray(estimated_MI)

### Global parameters

In [None]:
# Number of classes.
if method == 'KDE':
    min_in_group = 10
elif method == 'KL':
    min_in_group = 5 * k_neighbours

N_labels = [2**n for n in range(int(np.floor(np.log2(n_samples / min_in_group))))]
print(N_labels[-1])
n_exps = len(N_labels)

# Sample size and dimensions of vectors X and Y.
#X_dimension = 2

In [None]:
MI, estimated_MI = perform_uniform_discrete_test_MI(N_labels, n_samples, X_dimension, verbose=10)

In [None]:
plot_estimated_MI(MI, estimated_MI, "Uniform distribution with discrete label")

In [None]:
save_estimated_MI(MI, estimated_MI, 'uniform_discrete')

## Varying dimensionality

In [None]:
def perform_normal_tests_dim(mi, n_samples, dimensions, X_map=None, Y_map=None, verbose=0):
    """
    Estimate mutual information for different true values.
    """
    n_exps = len(dimensions)
    
    # Mutual information estimates.
    estimated_MI = []
    
    # Cunducting the tests.
    for n_exp in range(n_exps):
        print("\nn_exp = %d/%d\n------------\n" % (n_exp + 1, n_exps))
        est_mi = perform_normal_test(mi, n_samples, dimensions[n_exp], dimensions[n_exp],
                                     X_map, Y_map, verbose)
        estimated_MI.append(est_mi)
        
    return estimated_MI

In [None]:
def plot_estimated_dim(dimensions, mi, estimated_MI, title):
    estimated_MI_mean = np.array([estimated_MI[index][0] for index in range(len(estimated_MI))])
    estimated_MI_std  = np.array([estimated_MI[index][1] for index in range(len(estimated_MI))])
    
    fig_normal, ax_normal = plt.subplots()

    fig_normal.set_figheight(11)
    fig_normal.set_figwidth(16)

    # Grid.
    ax_normal.grid(color='#000000', alpha=0.15, linestyle='-', linewidth=1, which='major')
    ax_normal.grid(color='#000000', alpha=0.1, linestyle='-', linewidth=0.5, which='minor')

    ax_normal.set_title(title)
    ax_normal.set_xlabel("Размерность $ X $ и $ Y $")
    ax_normal.set_ylabel("$\\hat I(X,Y)$")
    
    ax_normal.minorticks_on()
    
    #ax_normal.set_yscale('log')
    #ax_normal.set_xscale('log')

    ax_normal.plot(dimensions, np.ones_like(dimensions) * mi, label="$I(X,Y)$", color='red')
    ax_normal.plot(dimensions, estimated_MI_mean, label="$\\hat I(X,Y)$")
    ax_normal.fill_between(dimensions, estimated_MI_mean + estimated_MI_std, estimated_MI_mean - estimated_MI_std, alpha=0.2)

    ax_normal.legend(loc='upper left')

    ax_normal.set_xlim((0.0, None))
    ax_normal.set_ylim((0.0, None))

    plt.show();

### Global parameters

In [None]:
dimensions = [1, 2, 3, 4, 5, 6, 8, 10, 12, 14, 16, 20, 25, 30, 40]
#dimensions = [1, 2, 4, 6, 8, 12, 16, 20, 30, 40]
mi = 2.0

### Gaussian random vector

In [None]:
# Mutual information estimation.
#estimated_MI = perform_normal_tests_dim(mi, n_samples, dimensions, verbose=10)

In [None]:
#plot_estimated_dim(dimensions, mi, estimated_MI, "Нормальные векторы")