In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
from scipy import stats, linalg
import matplotlib.pyplot as plt
from idtxl.bivariate_pid import BivariatePID
from idtxl.data import Data

from mesostat.utils.decorators import redirect_stdout

# Append base directory
import os,sys #,inspect
rootname = "pub-2020-exploratory-analysis"
#thispath = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
thispath = os.getcwd()
rootpath = os.path.join(thispath[:thispath.index(rootname)], rootname)
sys.path.append(rootpath)
print("Appended root directory", rootpath)

import lib.nullmodels.null_models_3D as null3D
import lib.nullmodels.null_test as nulltest

%load_ext autoreload
%autoreload 2

## PCorr Funictions

[] TODO: Move to library class

In [None]:
decompLabels = ['unq_s1', 'unq_s2']

In [None]:
def fit_covariate(x, cov):
    coeffX = linalg.lstsq(cov.T, x)[0]
    return x - coeffX.dot(cov)

def partial_corr(x, y, covar, eta=1.0E-6):
    xFit = fit_covariate(x, covar)
    yFit = fit_covariate(y, covar)
    
    # Add noise of very low relative magnitude to destroy very small effects
    stdX = np.std(x)
    stdY = np.std(y)
    noiseStdX = stdX * eta if stdX > 0 else eta
    noiseStdY = stdY * eta if stdY > 0 else eta
    xFit += np.random.normal(0, noiseStdX, x.shape)
    yFit += np.random.normal(0, noiseStdY, y.shape)
    
    rez = np.corrcoef(xFit, yFit)[0, 1]
    
    if np.isnan(rez):
        raise ValueError("Sth Went wrong")
    
    return np.clip(rez, eta, None)   # Crop very small values

def pcorr(x, y, z):
    return {
        'unq_s1': partial_corr(x, z, np.array([y])),
        'unq_s2': partial_corr(y, z, np.array([x]))
    }

In [None]:
contFuncDict = null3D.cont_method_dict()

In [None]:
x = np.random.normal(0, 1, 10000)
y = x.copy()
z = x.copy()
partial_corr(x, z, np.array([y]))

### Testing binning-dependence

In [None]:
valThrDict = None
# valThrDict = {'unq_s1': None, 'unq_s2': None}

In [None]:
nData = 10000

taskDict = {
    'yolo': np.array([0,0,0]),
    'norand': np.array([0,0,0.5]),
    'randx': np.array([0.5,0,0.5]),
    'rand': np.array([0.5,0.5,0.5])
}

for taskName, params in taskDict.items():
    print(taskName)
    rezDict = {}

    # Do continuous tests
    for funcName, func in contFuncDict.items():
        print('-', funcName)
        
        f_data   = lambda: func(nData, *params)
        f_metric = lambda x, y, z: pcorr(x,y,z)

        rezDF   = nulltest.run_tests(f_data, f_metric, decompLabels, nTest=100)
        rezDFsh = nulltest.run_tests(f_data, f_metric, decompLabels, nTest=100, haveShuffle=True)

        nulltest.plot_test_summary(rezDF, rezDFsh, suptitle=funcName, haveEff=False, valThrDict=valThrDict)
        suffix = '' if valThrDict is None else '_withThr'
        plt.savefig(funcName + '_pcorr_summary_'+taskName+suffix+'.png', dpi=200)
        plt.show()

### Effect of variance

Continuous

In [None]:
f_metric_cont = lambda x, y, z: pcorr(x,y,z)

In [None]:
# Do continuous tests
nData = 10000

alphaStratDict = {
    'PureSrc': lambda alpha: [0,0,alpha],
    'ImpureX': lambda alpha: [alpha,0,alpha],
    'Impure' : lambda alpha: [alpha,alpha,alpha],
}

thrMetricDictDict = {
    'H0_orig' : None,
    'H0_adj' : {'unq_s1': 0.718, 'unq_s2': 0.718}
}


for fName, f_data in contFuncDict.items():
    for alphaStratName, alphaFunc in alphaStratDict.items():
        
        f_data_eff = lambda alpha: f_data(nData, *alphaFunc(alpha))
        
        for h0type, thrMetricDict in thrMetricDictDict.items():
            print(fName, alphaStratName, h0type)

            nulltest.run_plot_param_effect(f_data_eff, f_metric_cont, decompLabels,
                                           nStep=1001, nSkipTest=100, nTest=200, alphaRange=(0, 1),
                                           thrMetricDict=thrMetricDict, plotAlphaSq=False, fontsize=12)

            suffix = 'n_' + str(nData) + '_' + alphaStratName + '_' + h0type

            plt.savefig(fName + '_pcorr_scatter_vareff_'+suffix+'.png', dpi=300)
            plt.show()

In [None]:
nData=10000
for fName, f_data in contFuncDict.items():
    print(fName)
    
    f_data_eff = lambda alpha: f_data(n=nData, sigX=alpha, sigY=alpha, sigZ=alpha)
    nulltest.run_plot_param_effect_test(f_data_eff, f_metric_cont, decompLabels,
                                        nStep=10, nTest=400, alphaRange=(0, 2), valThrDict=valThrDict)
    
    suffix = '' if valThrDict is None else '_withThr'
    plt.savefig(fName + '_pcorr_vareff_n'+str(nData)+suffix+'.png', dpi=200)
    plt.show()

### Effect of data size

In [None]:
alpha=0.25

# thrLst = [0.7412463126326689,
#  0.7335060658593113,
#  0.7273009797907714,
#  0.7251461790704873,
#  0.723556141666278,
#  0.7210860589307895,
#  0.7204293389502892,
#  0.7198486488329752,
#  0.7189879974592189,
#  0.7183190663516441]

# thrDict = dict(zip(1000 * np.arange(1, 11), thrLst))

alphaStratDict = {
    'PureSrc': [0,0,alpha],
    'ImpureX': [alpha,0,alpha],
    'Impure' : [alpha,alpha,alpha],
}

thrMetricDictDict = {
    'H0_orig' : None,
    'H0_adj' : {'unq_s1': 0.725, 'unq_s2': 0.725}
}


for fName, f_data in contFuncDict.items():
    for alphaStratName, alphaFunc in alphaStratDict.items():
        f_data_eff = lambda n: f_data(n, *alphaFunc)

        for h0type, thrMetricDict in thrMetricDictDict.items():
            print(fName, alphaStratName, h0type)

            nulltest.run_plot_data_effect(f_data_eff, f_metric_cont, decompLabels,
                                          nStep=101, nSkipTest=10, nTest=200, pVal=0.01,
                                          thrMetricDict=thrMetricDict, fontsize=12)

            suffix = 'alpha_' + str(alpha) + '_' + alphaStratName + '_' + h0type

            plt.savefig(fName + '_pcorr_scatter_nEff_'+suffix+'.png', dpi=300)
            plt.show()

In [None]:
alpha=0.5
for fName, f_data in contFuncDict.items():
    print(fName)

    f_data_eff = lambda n: f_data(n=n, aX=alpha, aY=alpha, aZ=alpha)
    nulltest.run_plot_data_effect_test(f_data_eff, f_metric_cont, decompLabels,
                                       nStep=10, nTest=400, valThrDict=valThrDict)
    
    suffix = '' if valThrDict is None else '_withThr'
    plt.savefig(fName + '_pcorr_nEff_sig'+str(sig)+suffix+'.png', dpi=200)
    plt.show()

### Test relationship of unique and redundancy for fixed data size

#### 2. Finding max synergy parameters - GridSearch1D

In [None]:
from mesostat.visualization.mpl_colors import base_colors_rgb

In [None]:
tableauColors = base_colors_rgb(key='tableau')

### Redundant Model

In [None]:
f_data_1D = lambda nData, alpha: null3D.cont_red_noisy(nData, alpha, alpha, alpha)
nDataLst = 1000 * np.arange(1, 11)
alphaMaxLst = []
thrAdjLst = []
thrRandLst = []

for nData in nDataLst:
    print(nData)
    alphaMax, thr = nulltest.run_plot_1D_scan(f_data_1D, f_metric_cont, 'unq_s2', 'unq_s1',
                                              varLimits=(0, 1), nData=nData, nStep=100, nTest=100,
                                              colorA = tableauColors[1], colorB = tableauColors[0])
    
    plt.savefig('redCont_pcorr_1Dscan_unq_n_'+str(nData)+'.png', dpi=200)
    plt.show()
    
    # Get also shuffle distribution at this alpha
    datagen_func_noparam = lambda nData: f_data_1D(nData, alphaMax)
    randValues = nulltest.sample_decomp(datagen_func_noparam, f_metric_cont, 'unq_s1',
                                        nData=nData, nSample=10000, haveShuffle=True)
    
    alphaMaxLst += [alphaMax]
    thrAdjLst += [thr]
    thrRandLst += [np.quantile(randValues, 0.99)]
    
plt.figure()
# plt.plot(nDataLst, alphaMaxLst, label='param')
plt.plot(nDataLst, thrAdjLst, label='adjusted', color='purple')
plt.plot(nDataLst, thrRandLst, label='shuffle')
plt.legend()
plt.ylim([0, None])
plt.savefig('redCont_pcorr_1Dscan_unq_summary.png', dpi=200)
plt.show()

### Synergistic Model

In [None]:
f_data_1D = lambda nData, alpha: null3D.cont_xor_noisy(nData, alpha, alpha, alpha)
nDataLst = 1000 * np.arange(1, 11)
alphaMaxLst = []
thrAdjLst = []
thrRandLst = []

for nData in nDataLst:
    print(nData)
    alphaMax, thr = nulltest.run_plot_1D_scan(f_data_1D, f_metric_cont, 'unq_s2', 'unq_s1',
                                              varLimits=(0, 1), nData=nData, nStep=100, nTest=100,
                                              colorA = tableauColors[1], colorB = tableauColors[0])
    
    plt.savefig('synCont_pcorr_1Dscan_unq_n_'+str(nData)+'.png', dpi=200)
    plt.show()
    
    
    # Get also shuffle distribution at this alpha
    datagen_func_noparam = lambda nData: f_data_1D(nData, alphaMax)
    randValues = nulltest.sample_decomp(datagen_func_noparam, f_metric_cont, 'unq_s1',
                                        nData=nData, nSample=10000, haveShuffle=True)
    
    alphaMaxLst += [alphaMax]
    thrAdjLst += [thr]
    thrRandLst += [np.quantile(randValues, 0.99)]
    
plt.figure()
# plt.plot(nDataLst, alphaMaxLst, label='param')
plt.plot(nDataLst, thrAdjLst, label='adjusted', color='purple')
plt.plot(nDataLst, thrRandLst, label='shuffle')
plt.legend()
plt.ylim([0, None])
plt.savefig('synCont_pcorr_1Dscan_unq_summary.png', dpi=200)
plt.show()