Skip to content

Commit

Permalink
Feature/conditional granger (NeuralEnsemble#359)
Browse files Browse the repository at this point in the history
Co-authored-by: ackurth <a.kurth@fz-juelich.de>
  • Loading branch information
rjurkus and ackurth committed Nov 12, 2020
1 parent 295c6bd commit fa5e8eb
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 2 deletions.
77 changes: 75 additions & 2 deletions elephant/causality/granger.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
:toctree: toctree/causality/
pairwise_granger
conditional_granger
References
Expand All @@ -71,7 +72,8 @@

__all__ = (
"Causality",
"pairwise_granger"
"pairwise_granger",
"conditional_granger"
)


Expand Down Expand Up @@ -348,7 +350,6 @@ def _optimal_vector_arm(signals, dimension, max_order,
raise ValueError("The specified information criterion is not"
"available. Please use 'aic' or 'bic'.")


if temp_ic < optimal_ic:
optimal_ic = temp_ic
optimal_order = order
Expand Down Expand Up @@ -526,3 +527,75 @@ def pairwise_granger(signals, max_order, information_criterion='aic'):
directional_causality_y_x=directional_causality_y_x_round.item(),
instantaneous_causality=instantaneous_causality_round.item(),
total_interdependence=total_interdependence_round.item())


def conditional_granger(signals, max_order, information_criterion='aic'):
r"""
Determine conditional Granger Causality of the second time series on the
first time series, given the third time series. In other words, for time
series X_t, Y_t and Z_t, this function tests if Y_t influences X_t via Z_t.
Parameters
----------
signals : (N, 3) np.ndarray or neo.AnalogSignal
A matrix with three time series (second dimension) that have N time
points (first dimension). The time series to be conditioned on is the
third.
max_order : int
Maximal order of autoregressive model.
information_criterion : {'aic', 'bic'}, optional
A function to compute the information criterion:
`bic` for Bayesian information_criterion,
`aic` for Akaike information criterion,
Default: 'aic'.
Returns
-------
conditional_causality_xy_z_round : float
The value of conditional causality of Y_t on X_t given Z_t. Zero value
indicates that causality of Y_t on X_t is solely dependent on Z_t.
Raises
------
ValueError
If the provided signal does not have a shape of Nx3.
Notes
-----
The formulas used in this implementation follows
:cite:`granger-Ding06_0608035`. Specifically, the Eq 35.
"""
if isinstance(signals, AnalogSignal):
signals = signals.magnitude

if not (signals.ndim == 2 and signals.shape[1] == 3):
raise ValueError("The input 'signals' must be of dimensions Nx3.")

# transpose (N,3) -> (3,N) for mathematical convenience
signals = signals.T

# signal_x, signal_y and signal_z are (1, N) arrays
signal_x, signal_y, signal_z = np.expand_dims(signals, axis=1)

signals_xz = np.vstack([signal_x, signal_z])

coeffs_xz, cov_xz, p_1 = _optimal_vector_arm(
signals_xz, dimension=2, max_order=max_order,
information_criterion=information_criterion)
coeffs_xyz, cov_xyz, p_2 = _optimal_vector_arm(
signals, dimension=3, max_order=max_order,
information_criterion=information_criterion)

conditional_causality_xy_z = np.log(cov_xz[0, 0]) - np.log(cov_xyz[0, 0])

# Round conditional GC according to following scheme:
# Note that standard error scales as 1/sqrt(sample_size)
# Calculate significant figures according to standard error
length = np.size(signal_x)
asymptotic_std_error = 1/np.sqrt(length)
est_sig_figures = int((-1)*np.around(np.log10(asymptotic_std_error)))

conditional_causality_xy_z_round = np.around(conditional_causality_xy_z,
est_sig_figures)

return conditional_causality_xy_z_round
85 changes: 85 additions & 0 deletions elephant/test/test_causality.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,90 @@ def test_ground_truth_vector_autoregressive_model(self):
decimal=4)


class ConditionalGrangerTestCase(unittest.TestCase):

@classmethod
def setUpClass(cls):
np.random.seed(1)
cls.ground_truth = cls._generate_ground_truth()

@staticmethod
def _generate_ground_truth(length_2d=30000, causality_type="indirect"):
"""
Recreated from Example 2 section 5.2 of :cite:'granger-Ding06-0608035'.
The following should generate three signals in one of the two ways:
1. "indirect" would generate data which contains no direct
causal influence from Y to X, but mediated through Z
(i.e. Y -> Z -> X).
2. "both" would generate data which contains both direct and indirect
causal influences from Y to X.
"""
if causality_type == "indirect":
y_t_lag_2 = 0
elif causality_type == "both":
y_t_lag_2 = 0.2
else:
raise ValueError("causality_type should be either 'indirect' or "
"'both'")

order = 2
signal = np.zeros((3, length_2d + order))

weights_1 = np.array([[0.8, 0, 0.4],
[0, 0.9, 0],
[0., 0.5, 0.5]])

weights_2 = np.array([[-0.5, y_t_lag_2, 0.],
[0., -0.8, 0],
[0, 0, -0.2]])

weights = np.stack((weights_1, weights_2))

noise_covariance = np.array([[0.3, 0.0, 0.0],
[0.0, 1., 0.0],
[0.0, 0.0, 0.2]])

for i in range(length_2d):
for lag in range(order):
signal[:, i + order] += np.dot(weights[lag],
signal[:, i + 1 - lag])
rnd_var = np.random.multivariate_normal([0, 0, 0],
noise_covariance)
signal[:, i + order] += rnd_var

signal = signal[:, 2:]

# Return signals as Nx3
return signal.T

def setUp(self):
# Generate a smaller random dataset for tests other than ground truth,
# using a different seed than in the ground truth - the convergence
# should not depend on the seed.
np.random.seed(10)
self.signal = self._generate_ground_truth(length_2d=1000)

# Generate a small dataset for containing both direct and indirect
# causality.
self.non_zero_signal = self._generate_ground_truth(
length_2d=1000, causality_type="both")
# Estimate Granger causality
self.conditional_causality = elephant.causality.granger.\
conditional_granger(self.signal, max_order=10,
information_criterion='bic')

def test_result_is_float(self):
self.assertIsInstance(self.conditional_causality, float)

def test_ground_truth_zero_value_conditional_causality(self):
self.assertEqual(elephant.causality.granger.conditional_granger(
self.ground_truth, 10, 'bic'), 0.0)

def test_non_zero_conditional_causality(self):
self.assertGreater(elephant.causality.granger.conditional_granger(
self.non_zero_signal, 10, 'bic'), 0.0)


if __name__ == '__main__':
unittest.main()

0 comments on commit fa5e8eb

Please sign in to comment.