In [None]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time

In [None]:
from util_results import Results

In [None]:
from te_datasim.jointprocess import MVJointProcessSimulator
from te_datasim.lineargaussian import MVLinearGaussianSimulator

In [None]:
from reference_cmigan import TE_cmigan
EPOCHS = 2500
BATCH_SIZE = 1000

# Basic Validity

In [None]:
REPLICATES = 5
SAMPLE_SIZE = 10000

## Linear Gaussian

In [None]:
# Specify the range of lambda values to test
lg_lambda_range = list(np.linspace(0, 1, 9, endpoint=True))

# Initialize the list of generators with one for each lambda value
lg_generator_lst = [MVLinearGaussianSimulator(n_dim=1, coupling=lam) for lam in lg_lambda_range]

# get the reference values
lg_TE_X2Y_ref_lst = [generator.analytic_transfer_entropy('X', 'Y') for generator in lg_generator_lst]
lg_TE_Y2X_ref_lst = [generator.analytic_transfer_entropy('Y', 'X') for generator in lg_generator_lst]

In [None]:
lg_results_TE_X2Y = Results(columns=['method', 'coupling'])
lg_results_TE_Y2X = Results(columns=['method', 'coupling'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for lam, generator in zip(lg_lambda_range, lg_generator_lst):
        print("# Coupling = ", lam, "#")
        # Simulate data
        X, Y = generator.simulate(time=SAMPLE_SIZE, seed=r)
        # Estimate X -> Y
        TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
        lg_results_TE_X2Y.write(method='cmigan', coupling=lam, value=TE_X2Y)
        # Estimate Y -> X
        TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
        lg_results_TE_Y2X.write(method='cmigan', coupling=lam, value=TE_Y2X)

lg_results_TE_X2Y.df.to_csv('results/cmigan/lg_results_TE_X2Y_bv.csv', index=False)
lg_results_TE_Y2X.df.to_csv('results/cmigan/lg_results_TE_Y2X_bv.csv', index=False)

## Joint Process

In [None]:
# Specify the range of lambda values to test
jp_lambda_range = list(np.linspace(-3, 3, 9, endpoint=True))

# Initialize the list of generators with one for each lambda value
jp_generator_lst = [MVJointProcessSimulator(n_dim=1, lam=lam) for lam in jp_lambda_range]

# get the reference values
jp_TE_X2Y_ref_lst = [generator.analytic_transfer_entropy('X', 'Y') for generator in jp_generator_lst]
jp_TE_Y2X_ref_lst = [generator.analytic_transfer_entropy('Y', 'X') for generator in jp_generator_lst]

In [None]:
jp_results_TE_X2Y = Results(columns=['method', 'coupling'])
jp_results_TE_Y2X = Results(columns=['method', 'coupling'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for lam, generator in zip(jp_lambda_range, jp_generator_lst):
        print("# Coupling = ", lam, "#")
        # Simulate data
        X, Y = generator.simulate(time=SAMPLE_SIZE, seed=r)
        # Estimate X -> Y
        TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
        jp_results_TE_X2Y.write(method='cmigan', coupling=lam, value=TE_X2Y)
        # Estimate Y -> X
        TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
        jp_results_TE_Y2X.write(method='cmigan', coupling=lam, value=TE_Y2X)

jp_results_TE_X2Y.df.to_csv('results/cmigan/jp_results_TE_X2Y_bv.csv', index=False)
jp_results_TE_Y2X.df.to_csv('results/cmigan/jp_results_TE_Y2X_bv.csv', index=False)

# Sample size scaling

In [None]:
lg_generator = MVLinearGaussianSimulator(n_dim=1, coupling=0.5)
jp_generator = MVJointProcessSimulator(n_dim=1, lam=0.0)
sample_sizes = [500, 1000, 5000, 10000, 50000, 100000]

In [None]:
lg_results_TE_X2Y = Results(columns=['method', 'sample_size'])
lg_results_TE_Y2X = Results(columns=['method', 'sample_size'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for samples in sample_sizes:
        print("# Samples = ", samples, "#")
        # Simulate data
        X, Y = lg_generator.simulate(time=samples, seed=r)
        # Estimate X -> Y
        TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
        lg_results_TE_X2Y.write(method='cmigan', sample_size=samples, value=TE_X2Y)
        # Estimate Y -> X
        TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
        lg_results_TE_Y2X.write(method='cmigan', sample_size=samples, value=TE_Y2X)

lg_results_TE_X2Y.df.to_csv('results/cmigan/lg_results_TE_X2Y_ss.csv', index=False)
lg_results_TE_Y2X.df.to_csv('results/cmigan/lg_results_TE_Y2X_ss.csv', index=False)

In [None]:
jp_results_TE_X2Y = Results(columns=['method', 'sample_size'])
jp_results_TE_Y2X = Results(columns=['method', 'sample_size'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for samples in sample_sizes:
        print("# Samples = ", samples, "#")
        # Simulate data
        X, Y = jp_generator.simulate(time=samples, seed=r)
        # Estimate X -> Y
        TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
        jp_results_TE_X2Y.write(method='cmigan', sample_size=samples, value=TE_X2Y)
        # Estimate Y -> X
        TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
        jp_results_TE_Y2X.write(method='cmigan', sample_size=samples, value=TE_Y2X)

jp_results_TE_X2Y.df.to_csv('results/cmigan/jp_results_TE_X2Y_ss.csv', index=False)
jp_results_TE_Y2X.df.to_csv('results/cmigan/jp_results_TE_Y2X_ss.csv', index=False)

# Dimensionality Scaling with redundant dimensions

In [None]:
dim_range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
sample_sizes = [10000, 100000]

## Linear Gaussian

In [None]:
# Initialize the list of generators with one for each dimension
lg_generator_lst = [MVLinearGaussianSimulator(n_dim=dim, n_redundant_dim=dim-1) for dim in dim_range]
# Get the reference values
lg_TE_X2Y_ref_lst = [generator.analytic_transfer_entropy('X', 'Y') for generator in lg_generator_lst]
lg_TE_Y2X_ref_lst = [generator.analytic_transfer_entropy('Y', 'X') for generator in lg_generator_lst]

In [None]:
lg_results_TE_X2Y = Results(columns=['method', 'n_dim', 'sample_size'])
lg_results_TE_Y2X = Results(columns=['method', 'n_dim', 'sample_size'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for dim, generator in zip(dim_range, lg_generator_lst):
        print("## Dim = ", dim, "#")
        for samples in sample_sizes:
            print("# Sample size = ", samples, "#")
            # Simulate data
            X, Y = generator.simulate(time=samples, seed=r)
            # Estimate X -> Y
            TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
            lg_results_TE_X2Y.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_X2Y)
            # Estimate Y -> X
            TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
            lg_results_TE_Y2X.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_Y2X)

lg_results_TE_X2Y.df.to_csv('results/cmigan/lg_results_TE_X2Y_dimred.csv', index=False)
lg_results_TE_Y2X.df.to_csv('results/cmigan/lg_results_TE_Y2X_dimred.csv', index=False)

## Joint Process

In [None]:
# Initialize the list of generators with one for each dimension
jp_generator_lst = [MVJointProcessSimulator(n_dim=dim, n_redundant_dim=dim-1, lam=0.0) for dim in dim_range]
# Get the reference values
jp_TE_X2Y_ref_lst = [generator.analytic_transfer_entropy('X', 'Y') for generator in jp_generator_lst]
jp_TE_Y2X_ref_lst = [generator.analytic_transfer_entropy('Y', 'X') for generator in jp_generator_lst]

In [None]:
jp_results_TE_X2Y = Results(columns=['method', 'n_dim', 'sample_size'])
jp_results_TE_Y2X = Results(columns=['method', 'n_dim', 'sample_size'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for dim, generator in zip(dim_range, jp_generator_lst):
        print("## Dim = ", dim, "#")
        for samples in sample_sizes:
            print("# Sample size = ", samples, "#")
            # Simulate data
            X, Y = generator.simulate(time=samples, seed=r)
            # Estimate X -> Y
            TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
            jp_results_TE_X2Y.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_X2Y)
            # Estimate Y -> X
            TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
            jp_results_TE_Y2X.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_Y2X)

jp_results_TE_X2Y.df.to_csv('results/cmigan/jp_results_TE_X2Y_dimred.csv', index=False)
jp_results_TE_Y2X.df.to_csv('results/cmigan/jp_results_TE_Y2X_dimred.csv', index=False)

# Dimensionality Scaling without redundant dimensions

In [None]:
dim_range = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
sample_sizes = [10000, 100000]

## Linear Gaussian

In [None]:
# Initialize the list of generators with one for each dimension
lg_generator_lst = [MVLinearGaussianSimulator(n_dim=dim) for dim in dim_range]
# Get the reference values
lg_TE_X2Y_ref_lst = [generator.analytic_transfer_entropy('X', 'Y') for generator in lg_generator_lst]
lg_TE_Y2X_ref_lst = [generator.analytic_transfer_entropy('Y', 'X') for generator in lg_generator_lst]

In [None]:
lg_results_TE_X2Y = Results(columns=['method', 'n_dim', 'sample_size'])
lg_results_TE_Y2X = Results(columns=['method', 'n_dim', 'sample_size'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for dim, generator in zip(dim_range, lg_generator_lst):
        print("## Dim = ", dim, "#")
        for samples in sample_sizes:
            print("# Sample size = ", samples, "#")
            # Simulate data
            X, Y = generator.simulate(time=samples, seed=r)
            # Estimate X -> Y
            TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
            lg_results_TE_X2Y.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_X2Y)
            # Estimate Y -> X
            TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
            lg_results_TE_Y2X.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_Y2X)

lg_results_TE_X2Y.df.to_csv('results/cmigan/lg_results_TE_X2Y_dim.csv', index=False)
lg_results_TE_Y2X.df.to_csv('results/cmigan/lg_results_TE_Y2X_dim.csv', index=False)

## Joint Process

In [None]:
# Initialize the list of generators with one for each dimension
jp_generator_lst = [MVJointProcessSimulator(n_dim=dim, lam=0.0) for dim in dim_range]
# Get the reference values
jp_TE_X2Y_ref_lst = [generator.analytic_transfer_entropy('X', 'Y') for generator in jp_generator_lst]
jp_TE_Y2X_ref_lst = [generator.analytic_transfer_entropy('Y', 'X') for generator in jp_generator_lst]

In [None]:
jp_results_TE_X2Y = Results(columns=['method', 'n_dim', 'sample_size'])
jp_results_TE_Y2X = Results(columns=['method', 'n_dim', 'sample_size'])

for r in range(REPLICATES):
    print(f"\n### REPLICATE {r+1}/{REPLICATES} ###\n")
    for dim, generator in zip(dim_range, jp_generator_lst):
        print("## Dim = ", dim, "#")
        for samples in sample_sizes:
            print("# Sample size = ", samples, "#")
            # Simulate data
            X, Y = generator.simulate(time=samples, seed=r)
            # Estimate X -> Y
            TE_X2Y = TE_cmigan(X, Y, epochs=EPOCHS, batch_size=BATCH_SIZE)
            jp_results_TE_X2Y.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_X2Y)
            # Estimate Y -> X
            TE_Y2X = TE_cmigan(Y, X, epochs=EPOCHS, batch_size=BATCH_SIZE)
            jp_results_TE_Y2X.write(method='cmigan', n_dim=dim, sample_size=samples, value=TE_Y2X)

jp_results_TE_X2Y.df.to_csv('results/cmigan/jp_results_TE_X2Y_dim.csv', index=False)
jp_results_TE_Y2X.df.to_csv('results/cmigan/jp_results_TE_Y2X_dim.csv', index=False)