In [1]:
import numpy as np
from scipy.linalg import lstsq
from scipy.stats import pearsonr

In [18]:
def shift(x, by=0):
    """
    Shift variable by a number of time steps.
    
    Parameters
    ----------
    by : int
        Number of steps to shift by.
    
    Returns
    -------
    numpy.array
        Shifted series
    """
    x_shift = np.empty_like(x)
    if by == 0:
        return x
    elif by > 0:
        x_shift[:by] = np.nan
        x_shift[by:] = x[:-by]
    else:
        x_shift[by:] = np.nan
        x_shift[:by] = x[-by:]
    return x_shift

def partial_xcorr(x, y, max_lag=10, standardize=True):
    """
    Computes partial cross correlation between x and y using linear regression.
    
    Parameters
    ----------
    x : numpy.array
        First variable to compute correlation for.
    y : numpy.array
        Second variable to compute correlation for.
    max_lag : Optional[int]
        Number of time lags to compute correlations for, defaults to 10.
    standardize : Optional[bool]
        Standardize / scale x and y variables, defaults to True
        
    Returns
    -------
    list
        List of correlation coefficients for each lag.
    """

    # Standardize x and y and stack into matrix
    if standardize:
        x = (x - np.mean(x)) / np.std(x)
        y = (y - np.mean(y)) / np.std(y)
    
    # Initialize feature matrix
    nlags = max_lag + 1
    X = np.zeros((len(x), nlags))
    X[:, 0] = x

    # Initialize correlations, first is y ~ x
    xcorr = np.zeros(nlags, dtype=np.float)
    xcorr[0] = pearsonr(x, y)[0]
    
    # Process lags
    for lag in range(1, nlags):
        # Add lag to matrix
        X[:, lag] = shift(x, lag)

        # Trim NaNs from y (yt), current lag (l) and previous lags (Z)
        yt = y[lag:]
        l = X[lag:, lag:lag+1]
        Z = X[lag:, 0:lag]
    
        # Coefficients and residuals for y ~ Z
        beta_l = lstsq(Z, yt)[0]
        resid_l = yt - Z.dot(beta_l)
        
        # Coefficient and residuals for l ~ Z
        beta_Z = lstsq(Z, l)[0]
        resid_Z = l - Z.dot(beta_Z)
        
        # Compute correlation between residuals
        xcorr[lag] = pearsonr(resid_l, resid_Z.ravel())[0]

    return xcorr


In [24]:
# Set coefficients
betas = np.array([40, 0, 0, 0, 50, 0, 45, 0, 0])

In [25]:
# Draw x values
x = np.random.normal(0, 5, 150).astype(float)

In [26]:
# Construct matrix of lags
lags = len(betas) - 1
X = np.zeros((len(x), lags + 1))
for lag in range(0, lags + 1):
    X[:, lag] = shift(x, lag)
X = X[lags:]

In [27]:
# Generate y and trim x
y = X.dot(betas)
x = x[lags:]

In [28]:
# Show partial correlations
# Note: high correlations should match high coefficients
np.round(partial_xcorr(x, y, max_lag=lags), 2)

array([ 0.58, -0.13, -0.12, -0.09,  0.7 , -0.09,  1.  ,  0.  ,  0.  ])