In [None]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from collections import defaultdict
from idtxl.bivariate_pid import BivariatePID
from idtxl.data import Data

In [None]:
def shuffle(x):
    x1 = x.copy()
    np.random.shuffle(x1)
    return x1

def bin_data_1D(data, nBins):
    boundaries = np.quantile(data, np.linspace(0, 1, nBins + 1))
    boundaries[-1] += 1.0E-10
    return np.digitize(data, boundaries, right=False) - 1

def pid_bin(x,y,z, nBins=4):
    dataEff = np.array([
        bin_data_1D(x, nBins),
        bin_data_1D(y, nBins),
        bin_data_1D(z, nBins)
    ])
    return pid(dataEff)
    
def pid(dataPS):
    settings = {'pid_estimator': 'TartuPID', 'lags_pid': [0, 0]}

    dataIDTxl = Data(dataPS, dim_order='ps', normalise=False)
    pid = BivariatePID()
    rez = pid.analyse_single_target(settings=settings, data=dataIDTxl, target=1, sources=[0,2])
    return rez.get_single_target(1)

def make_test(datagen_func, nBins=4, nTest=100):
    rezDict = {}
    rezDict['True'] = {'unq_s1': [], 'unq_s2': [], 'syn_s1_s2': [], 'shd_s1_s2': []}
    rezDict['Sh'] = {k: [] for k in rezDict['True'].keys()}

    for iTest in range(nTest):
        x,y,z = datagen_func()

        for kind in ['True', 'Sh']:
            yEff = y if kind == 'True' else shuffle(y)

            if nBins is None:
                rez = pid(np.array([x,yEff,z]))
            else:
                rez = pid_bin(x,yEff,z, nBins=nBins)

            for k in rezDict[kind].keys():
                rezDict[kind][k] += [rez[k]]

    rezDF = pd.DataFrame()
    for kind in ['True', 'Sh']:
        for k,v in rezDict[kind].items():
            rezTmp = pd.DataFrame({'kind': [kind]*nTest, 'Method':[k]*nTest, 'Value': v})
            rezDF = rezDF.append(rezTmp)
    
    return rezDF

def plot_test(df, suptitle=None, logEff=False):
    fig, ax = plt.subplots(ncols=3, figsize=(12,4))
    if suptitle is not None:
        fig.suptitle(suptitle)
    
    ax[0].set_title('Metric Value')
    ax[1].set_title('Effect Size')
    ax[2].set_title('Fraction Significant')
    
    sns.violinplot(ax=ax[0], x="Method", y="Value", hue="kind", data=df, scale='width')
    
    # Calculate effect sizes
    dfEffSize = pd.DataFrame()
    for method in sorted(set(df['Method'])):
        dfMethod = df[df['Method'] == method]
        dfMethodTrue = dfMethod[dfMethod['kind'] == 'True']
        dfMethodRand = dfMethod[dfMethod['kind'] == 'Sh']
        
        muRand = np.mean(dfMethodRand['Value'])
        stdRand = np.std(dfMethodRand['Value'])
        
        dfMethodEff = dfMethodTrue.copy()
        dfMethodEff['Value'] = (dfMethodEff['Value'] - muRand) / stdRand
        
        dfEffSize = dfEffSize.append(dfMethodEff)
        
    sns.violinplot(ax=ax[1], x="Method", y="Value", data=dfEffSize, scale='width')
    if logEff:
        ax[1].set_yscale('log')
    #ax[1].axhline(y='2', color='pink', linestyle='--')
    
    # Calculate fraction significant
    sigDict = {}
    for method in sorted(set(df['Method'])):
        dfEffMethod = dfEffSize[dfEffSize['Method'] == method]
        sigDict[method] = [np.mean(dfEffMethod['Value'] > 2)]
    
    sns.barplot(ax=ax[2], data=pd.DataFrame(sigDict))
    ax[2].set_ylim([0,1])

## Noisy Redundant Scenario

We want to check if white noise added to a purely redundant scenario results in correct identification of redundancy

$$X = T + \nu_X$$
$$Y = T + \nu_Y$$
$$Z = T + \nu_Z$$

where $Y$ is the target of $X$ and $Z$, and

$$T \sim \mathcal{N}(0, 1)$$
$$\nu_X, \nu_Y, \nu_Z \sim \mathcal{N}(0, \sigma)$$

and $\sigma$ is a free parameter, denoting the Noise-To-Signal ratio. So the signal should be a mixture of redundant signal and white noise.

Since the signal is continuous, we bin it using different bin counts.

In [None]:
def gen_data_red_noisy(n=1000, sigErrX=1, sigErrY=1, sigErrZ=1):
    t = np.random.normal(0,1,n)
    x = t + np.random.normal(0,sigErrX,n)
    y = t + np.random.normal(0,sigErrY,n)
    z = t + np.random.normal(0,sigErrZ,n)
    return x,y,z

### Testing binning-dependence

In [None]:
rezDFDict = {}
for nBins in range(2, 6):
    gen_data_eff = lambda: gen_data_red_noisy(n=10000, sigErrX=1, sigErrY=1, sigErrZ=1)
    rezDFDict[nBins] = make_test(gen_data_eff, nBins=nBins, nTest=100)

In [None]:
for nBins, rezDF in rezDFDict.items():
    plot_test(rezDF, suptitle='nBins = ' + str(nBins))

### Test relationship of synergy and redundancy for fixed data size

In [None]:
rezSynLst = []
rezRedLst = []

for nTest in range(20000):
    if nTest % 1000 == 0:
        print(nTest)
    
    sigErrX, sigErrY, sigErrZ = np.random.uniform(0, 2, 3)
    x, y, z = gen_data_red_noisy(n=1000, sigErrX=sigErrX, sigErrY=sigErrY, sigErrZ=sigErrZ)
    rez = pid_bin(x,y,z, nBins=4)
    
    rezSynLst += [rez['syn_s1_s2']]
    rezRedLst += [rez['shd_s1_s2']]

In [None]:
plt.figure()
plt.plot(rezRedLst, rezSynLst, '.')
plt.xlabel('Redundancy')
plt.ylabel('Synergy')
plt.title('Synergy-Redundancy relationship for noisy redundant model')
plt.show()

## Noisy Unique Scenario

We want to check if white noise added to a purely unique information scenario results in correct identification of redundancy

$$X = T + \nu_X$$
$$Y = T + \nu_Y$$
$$Z = \nu_Z$$

where $Y$ is the target of $X$ and $Z$, and

$$T \sim \mathcal{N}(0, 1)$$
$$\nu_X, \nu_Y, \nu_Z \sim \mathcal{N}(0, \sigma)$$

and $\sigma$ is a free parameter, denoting the Noise-To-Signal ratio. So the signal should be a mixture of redundant signal and white noise.

Since the signal is continuous, we bin it using different bin counts.

In [None]:
def gen_data_unq_noisy(n=1000, sigErr=1):
    t = np.random.normal(0,1,n)
    x = t + np.random.normal(0,sigErr,n)
    y = t + np.random.normal(0,sigErr,n)
    z = np.random.normal(0,sigErr,n)
    return x,y,z

In [None]:
rezDFDict = {}
for nBins in range(2, 6):
    gen_data_eff = lambda: gen_data_unq_noisy(n=10000, sigErr=1)
    rezDFDict[nBins] = make_test(gen_data_eff, nBins=nBins, nTest=100)

In [None]:
for nBins, rezDF in rezDFDict.items():
    plot_test(rezDF, suptitle='nBins = ' + str(nBins))

## Noisy Redundant Scenario - Discrete Case

It is important to test if false positives are caused by binning, or are an intrinsic property of the noise in the covariate. Here I propose a discretized noisy redundancy model. Instead of added noise, each variable has a random chance to produce the redundant outcome or a purely random outcome.

$$X \sim A_X T + (1 - A_X) \nu_X $$
$$Y \sim A_Y T + (1 - A_Y) \nu_Y $$
$$Z \sim A_Z T + (1 - A_Z) \nu_Z $$

where

$$T, \nu_X, \nu_Y, \nu_Z \sim Ber(0.5) $$
$$A_X \sim Ber(\alpha_X)$$
$$A_Y \sim Ber(\alpha_Y)$$
$$A_Z \sim Ber(\alpha_Z)$$

and $\alpha_X, \alpha_Y, \alpha_Z \in [0, 1]$ are flexible.

So, $\alpha = 0$ means purely noisy signal, and $\alpha=1$ means purely redundant signal.

In [None]:
def bernoulli(n, p):
    return (np.random.uniform(0, 1, n) < p).astype(int)

def gen_discrete_random(nSample, alphaX=0.5, alphaY=0.5, alphaZ=0.5):
    T = bernoulli(nSample, 0.5)
    nuX = bernoulli(nSample, 0.5)
    nuY = bernoulli(nSample, 0.5)
    nuZ = bernoulli(nSample, 0.5)
    aX = bernoulli(nSample, alphaX)
    aY = bernoulli(nSample, alphaY)
    aZ = bernoulli(nSample, alphaZ)
    
    x = aX*T + (1 - aX)*nuX
    y = aY*T + (1 - aY)*nuY
    z = aZ*T + (1 - aZ)*nuZ
    return x,y,z

In [None]:
alphaLst = np.linspace(0, 1, 10)

rezDFDict = {}
for alpha in alphaLst:
    gen_data_eff = lambda: gen_discrete_random(nSample=10000, alphaX=alpha, alphaY=alpha, alphaZ=alpha)
    rezDFDict[alpha] = make_test(gen_data_eff, nBins=None, nTest=100)

In [None]:
for pAlpha, rezDF in rezDFDict.items():
    plot_test(rezDF, suptitle='alpha = ' + str(pAlpha), logEff=True)

### Testing Asymptotic behaviour

In [None]:
nSampleLst = (10**np.linspace(2, 5, 10)).astype(int)

rezDFDict = {}
for nSample in nSampleLst:
    gen_data_eff = lambda: gen_discrete_random(nSample=nSample, alphaX=0.9, alphaY=0.9, alphaZ=0.9)
    rezDFDict[nSample] = make_test(gen_data_eff, nBins=None, nTest=100)

In [None]:
for kind in ['True', 'Sh']:
    muTrueDict = defaultdict(list)
    stdTrueDict = defaultdict(list)

    for nSample, df in rezDFDict.items():
        for method in sorted(set(df['Method'])):
            dfMethod = df[df['Method'] == method]
            dfMethodTrue = dfMethod[dfMethod['kind'] == kind]

            muTrueDict[method] += [np.mean(dfMethodTrue['Value'])]
            stdTrueDict[method] += [np.std(dfMethodTrue['Value'])]


    plt.figure()

    for method, muTrueLst in muTrueDict.items():
        plt.errorbar(nSampleLst, muTrueLst, stdTrueDict[method], label=method)

    plt.xscale('log')
    plt.yscale('log')
    plt.legend()
    plt.title(kind)
    plt.show()

### Test relationship of synergy and redundancy for fixed data size

In [None]:
rezSynLst = []
rezRedLst = []

for nTest in range(20000):
    alphaX, alphaY, alphaZ = np.random.uniform(0.6, 1, 3)
    x,y,z = gen_discrete_random(nSample=1000, alphaX=alphaX, alphaY=alphaY, alphaZ=alphaZ)
    rez = pid(np.array([x,y,z]))
    rezSynLst += [rez['syn_s1_s2']]
    rezRedLst += [rez['shd_s1_s2']]

In [None]:
plt.figure()
plt.plot(rezRedLst, rezSynLst, '.')
plt.xlabel('Redundancy')
plt.ylabel('Synergy')
plt.title('Synergy-Redundancy relationship for noisy redundant model')
plt.show()