<a href="https://colab.research.google.com/github/AmineHamzaoui/Financial-Market-Forcesating/blob/master/DTE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from sklearn.metrics import mutual_info_score
from sklearn.preprocessing import KBinsDiscretizer

In [None]:
import pandas as pd

# Load your dataset
df = pd.read_csv('/content/Macro_with_stocks.csv', parse_dates=['Date_x'])

# Filter for a single stock
aapl = df[df['Stock'] == 'AAPL'].copy()
aapl.sort_values('Date_x', inplace=True)

# Create aligned time series
X = aapl['Daily_Return'].values
Y = aapl['Interest Rate (Federal Funds)'].values[:len(X)]  # aligned macro variable

In [None]:
import numpy as np
from sklearn.preprocessing import KBinsDiscretizer

def delay_transfer_entropy(X, Y, delay=1, k=1, n_bins=10):
    """
    Computes Delay Transfer Entropy from Y to X with a given delay.

    Parameters:
    - X: array-like, target variable (e.g. stock return)
    - Y: array-like, source variable (e.g. macroeconomic factor)
    - delay: int, time delay from Y to X
    - k: int, number of past values (embedding)
    - n_bins: int, number of bins for discretization

    Returns:
    - DTE value (float)
    """
    assert len(X) == len(Y), "X and Y must have the same length"
    n = len(X)
    indices = np.arange(k + delay, n)

    # Build past history
    X_past = np.stack([X[i - k - delay:i - delay] for i in indices])
    Y_past = np.stack([Y[i - k:i] for i in indices])
    X_now = X[indices]

    # Discretize values
    enc = KBinsDiscretizer(n_bins=n_bins, encode='ordinal', strategy='uniform')
    X_past_disc = enc.fit_transform(X_past)
    Y_past_disc = enc.fit_transform(Y_past)
    X_now_disc = enc.fit_transform(X_now.reshape(-1, 1)).flatten()

    # Convert to string keys
    def to_str(data):
        return ['_'.join(map(str, row.astype(int))) for row in data]

    x_past = to_str(X_past_disc)
    y_past = to_str(Y_past_disc)
    xy_past = [f"{xp}|{yp}" for xp, yp in zip(x_past, y_past)]
    x_now = X_now_disc.astype(int).astype(str)

    # Compute DTE = H(X_now | X_past) - H(X_now | X_past, Y_past)
    H_x_given_xpast = conditional_entropy(x_now, x_past)
    H_x_given_xy = conditional_entropy(x_now, xy_past)

    return H_x_given_xpast - H_x_given_xy

def conditional_entropy(x, cond):
    """
    Compute conditional entropy H(X | Y) = H(X,Y) - H(Y)
    """
    joint = [f"{xi}|{yi}" for xi, yi in zip(x, cond)]
    return entropy(joint) - entropy(cond)

def entropy(data):
    """
    Compute Shannon entropy from a list of symbols.
    """
    _, counts = np.unique(data, return_counts=True)
    probs = counts / counts.sum()
    return -np.sum(probs * np.log2(probs + 1e-9))


In [None]:
# Simulate influence: Y causes X with delay
np.random.seed(0)
n = 1000
Y = np.random.randn(n)
X = np.roll(Y, 3) + 0.1 * np.random.randn(n)

# Run DTE analysis
dte_val = delay_transfer_entropy(X, Y, delay=3, k=1, n_bins=5)
print(f"Delay Transfer Entropy from Y → X: {dte_val:.4f}")

Delay Transfer Entropy from Y → X: 0.0504
