### Cauchy-Schwarz Divergence Transfer Entropy Code Demo

This code demo accompanies our paper *Cauchy-Schwarz Divergence Transfer Entropy. It provides a complete framework for calculating Cauchy-Schwarz Divergence Transfer Entropy (CS-TE) and includes a simple illustrative example.

#### 1. CS-TE Core Function Implementation

In [1]:
import numpy as np

# RBF kernel
def rbf_dot(x1, x2, sigma):
    G = np.sum(x1 ** 2, axis=1)
    H = np.sum(x2 ** 2, axis=1)
    Q = np.outer(G, np.ones(len(H))) + np.outer(np.ones(len(G)), H) - 2 * np.dot(x1, x2.T)
    return np.exp(-Q / (2 * sigma ** 2))

# Compute median
def comp_med(x):
    d, n = x.shape
    G = np.sum(x ** 2, axis=0)
    T = np.tile(G, (n, 1))
    dist2 = T - 2 * np.dot(x.T, x) + T.T
    dist2 = dist2 - np.tril(dist2)
    R = dist2.flatten()
    R = R[R > 0]
    
    if len(R) == 0:
        print("Warning: R is empty, returning NaN")
        return np.nan

    return np.sqrt(0.5 * np.median(R))

# Core Fuction
def conditional_CS_CMI(x, y, z, sigma1, sigma2, sigma3):
    K = rbf_dot(x, x, sigma1)
    L = rbf_dot(y, y, sigma2)
    M = rbf_dot(z, z, sigma3)

    dim = K.shape[0]

    H1 = K * M
    H2 = L * M
    H3 = K * H2

    self_term1 = 0
    for t in range(dim):
        self_term1 += np.sum(H3[t, :]) * (np.sum(M[t, :])) ** 2

    self_term2 = 0
    for t in range(dim):
        self_term2 += (np.sum(H1[t, :]) ** 2) * (np.sum(H2[t, :]) ** 2) / np.sum(H3[t, :])

    cross_term1 = 0
    for t in range(dim):
        cross_term1 += np.sum(M[t, :]) * np.sum(H1[t, :]) * np.sum(H2[t, :])

    if cross_term1 <= 0 or self_term1 <= 0 or self_term2 <= 0:
        print("Warning: Invalid values encountered in log calculation, returning NaN")
        return np.nan

    conditional_divergence = -2 * np.log(cross_term1) + np.log(self_term1) + np.log(self_term2)
    return conditional_divergence

# Computation function
def compute_causality(X, Y):
    N = len(X)
    order = 2

    # Causal from X to Y
    y = Y[order:N]
    x1 = np.zeros((N - order, order))
    x2 = np.zeros((N - order, order))

    for i in range(N - order):
        x1[i, :] = Y[i:i + order]
        x2[i, :] = X[i:i + order]

    y = y[:, np.newaxis]
    med1 = comp_med(np.hstack([x1, x2, y]))

    if not np.isnan(med1):
        eta = 0.1
        sigma1 = med1 * eta
        sigma2 = med1 * eta
        sigma3 = med1 * eta

        Causal_X_to_Y = conditional_CS_CMI(x2, y, x1, sigma1, sigma2, sigma3)
        print(f"Causal_X_to_Y: {Causal_X_to_Y}")
    else:
        print("Invalid median for X->Y, skipping.")

    # Causal from Y to X
    y = X[order:N]
    x1 = np.zeros((N - order, order))
    x2 = np.zeros((N - order, order))

    for i in range(N - order):
        x1[i, :] = X[i:i + order]
        x2[i, :] = Y[i:i + order]

    y = y[:, np.newaxis]
    med1 = comp_med(np.hstack([x1, x2, y]))

    if not np.isnan(med1):
        sigma1 = med1 * eta
        sigma2 = med1 * eta
        sigma3 = med1 * eta

        Causal_Y_to_X = conditional_CS_CMI(x2, y, x1, sigma1, sigma2, sigma3)
        print(f"Causal_Y_to_X: {Causal_Y_to_X}")
    else:
        print("Invalid median for Y->X, skipping.")

#### 2. Calculation Example

In [2]:
# Generate random sequences X and Y of length 256
def generate_data():
    X = np.random.randn(256)
    Y = np.random.randn(256)
    return X, Y

# Data generation
X, Y = generate_data()

In [3]:
# Compute
compute_causality(X, Y)

Causal_X_to_Y: 0.00012487361297885968
Causal_Y_to_X: 3.532956547047661e-05


#### 3. Usage Guide

**Note:**

1. This code demo serves as an example only. The data used for computation is randomly generated and does not adhere to VAR (Vector Autoregressive) or other established models.
2. The kernel size plays a crucial role in the overall computation. Adjustments may be needed based on the characteristics of the data. You can adjust the `eta` parameter in the `compute_causality` function to modify this aspect of the calculation.
