In [None]:
# Porting in Python of the matlab script from
# Barnett and A. K. Seth, <http://www.sciencedirect.com/science/article/pii/S0165027013003701
# The MVGC Multivariate Granger Causality Toolbox: A New Approach to Granger-causalInference>,
# J. Neurosci. Methods_ 223, 2014 [ <matlab:open('mvgc_preprint.pdf') preprint> ].
# NOT the gradient descent approach as in Crimi et al. Neuroimage 2021
# https://www.sciencedirect.com/science/article/pii/S1053811921005644

import numpy as np
from scipy.stats import f
   
# Convert Time-series into autocovariance matrix
def tsdata_to_autocov(X, q):
    """
    Calculate sample autocovariance sequence from time series data for all possible time delay
    :param X:   multi-trial time series data
    :param q:   number of lags
    :return G:  sample autocovariance sequence
    """
    if len(X.shape) == 2:
        X = np.expand_dims(X, axis=2)
        [n, m, N] = np.shape(X)
    else:
        [n, m, N] = np.shape(X)

    G = np.zeros((n, n, (q+1)))

    for k in range(q+1):
        M = N * (m-k)
        Xk = np.asmatrix(X[:, k:m, :].reshape((n, M)))
        Xm = np.asmatrix(X[:, 0:m-k, :].reshape((n, M)).conj().T)
        G[:, :, k] = Xk * Xm / M - 1.0
    # G.shape = (n, n, q+1)
    return G

# Convert Autocovariance matrix into Autoregressor matrix and residual
def autocov_to_var(G):
    """
    Calculate VAR parameters from autocovariance sequence
    :param G:           autocovariance sequence
    :return [A, SIG]:   A - VAR coefficients matrix, SIG - residuals covariance matrix
    """
    [n, _, q1] = G.shape
    q = q1 - 1
    qn = q * n
    G0 = np.asmatrix(G[:, :, 0])                                            # covariance
    GF = np.asmatrix(G[:, :, 1:].reshape((n, qn)).T)                        # forward autoconv seq
    GB = np.asmatrix(G[:, :, 1:].transpose(0, 2, 1).reshape(qn, n))         # backward autoconv seq

    AF = np.asmatrix(np.zeros((n, qn)))                                     # forward  coefficients
    AB = np.asmatrix(np.zeros((n, qn)))                                     # backward coefficients

    # initialise recursion
    k = 1                                                                   # model order
    r = q - k
    kf = np.arange(0, k*n)                                                  # forward indices
    kb = np.arange(r*n, qn)                                                 # backward indices

    G0I = G0.I

    AF[:, kf] = GB[kb, :] * G0I
    AB[:, kb] = GF[kf, :] * G0I

    for k in np.arange(2, q+1):
        GBx = GB[(r-1)*n: r*n, :]
        DF = GBx - AF[:, kf] * GB[kb, :]
        VB = G0 - AB[:, kb] * GB[kb, :]
        AAF = DF * VB.I

        GFx = GF[(k-1)*n: k*n, :]
        DB = GFx - AB[:, kb] * GF[kf, :]
        VF = G0 - AF[:, kf] * GF[kf, :]
        AAB = DB * VF.I

        AFPREV = AF[:, kf]
        ABPREV = AB[:, kb]
        r = q - k

        kf = np.arange(0, k*n)
        kb = np.arange(r*n, qn)

        AF[:, kf] = np.hstack([AFPREV - AAF * ABPREV, AAF])
        AB[:, kb] = np.hstack([AAB, ABPREV - AAB * AFPREV])

    SIG = G0 - AF * GF
    AF = np.asarray(AF).reshape((n, n, q))
    return [AF, SIG]

# Compute F-statistics as a ratio of reduced-regressor model and full-regressors model from the autoregressors
def autocov_to_mvgc_cond(G, x, y):
    """
    Calculate conditional time-domain MVGC (multivariate Granger causality)
    from the variable |Y| (specified by the vector of indices |y|)
    to the variable |X| (specified by the vector of indices |x|),
    conditional on all other variables |Z| represented in |G|, for
    a stationary VAR process with autocovariance sequence |G|.
    See L. Barnett and A. K. Seth for details.
    
    :param G:   autocovariance sequence
    :param x:   vector of indices of target (causee) multi-variable
    :param y:   vector of indices of source (causal) multi-variable
    :return F:  Granger causality
    """
    n = G.shape[0]
    z = np.arange(n)
    z = np.delete(z, [np.hstack((x, y))])
    # indices of other variables (to condition out)
    xz = np.hstack((x, z))
    xzy = np.hstack((xz, y))
    F = 0

    # full regression
    ixgrid1 = np.ix_(xzy, xzy)

    [AF, SIG] = autocov_to_var(G[ixgrid1])
    # reduced regression
    ixgrid2 = np.ix_(xz, xz)

    [AF, SIGR] = autocov_to_var(G[ixgrid2])

    ixgrid3 = np.ix_(x, x)
    ixgrid4 = np.ix_(x, x)

    if x[0] == (n-1):
      ixgrid4 = np.ix_(x-1, x-1)
 
    #F-statistics computed as likelihood ratio statistic (see Barnett and Seth )
    F = np.log(np.linalg.det(SIGR[ixgrid4])) - np.log(np.linalg.det(SIG[ixgrid3]))

    dfn = len(SIGR[ixgrid4])  #define degrees of freedom numerator 
    dfd = len(SIG[ixgrid3])  #define degrees of freedom denominator 
    p_value = 1 - f.cdf(F, dfn, dfd) #find p-value of F test statistic 

    return F, p_value
 

if __name__ == "__main__":
    # Time series from brain region, each row is brain region 
    X = np.random.rand(3, 25) #here 5 brain region and time series long 25 timepoints

    maxlag = 1
    G = tsdata_to_autocov(X, maxlag)
 
    #ADDD FOR loop for all brain regions
    # For each brain region to test
      # For each brain region to remove
    F, p_value  = autocov_to_mvgc_cond(G, np.array([2]), np.array([0]))
    #GC[i,j] = p_value
    print(p_value)

    #Filter element in the matrix where p_value < 0.05
    threshold = 0.05
    #filtered_matrix = np.copy(GC)
    #filtered_matrix[filtered_matrix >= threshold] = 0

1.0
