In [1]:
import os
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from jpype import *
import tqdm
import matplotlib.pyplot as plt
from scipy.signal import convolve



In [2]:
from utils import participant_ids, raw_data_path, find_trials, import_soa_rating_data, import_emg_data, onset_detection, extract_onset_time

In [3]:
# locate jitd jar file
from pathlib import Path
import sys

sys.path.append(
    "../"
)


from TE import TransferEntropyCalculator_continuous_kraskov

In [4]:
# Find files in folders and subfolders
from pair_data_paths import build_grouped, default_base_dir
base = default_base_dir()
grouped = build_grouped(base / "NI", base / "SoA")  # list of SoAToNIs
for g in grouped:
    print(g.date, g.soa_csv, len(g.ni_files))



20250604 D:\SynologyDrive\Drive-Acer\DeepWen\deepwen\home\acercyc\Projects\Drum\data\raw_20250911\SoA\20250604\morita_0604_soa.csv 6
20250606 D:\SynologyDrive\Drive-Acer\DeepWen\deepwen\home\acercyc\Projects\Drum\data\raw_20250911\SoA\20250606\morita_0606_soa.csv 8
20250611 D:\SynologyDrive\Drive-Acer\DeepWen\deepwen\home\acercyc\Projects\Drum\data\raw_20250911\SoA\20250611\morita_0611_soa.csv 10
20250613 D:\SynologyDrive\Drive-Acer\DeepWen\deepwen\home\acercyc\Projects\Drum\data\raw_20250911\SoA\20250613\morita_0613_soa.csv 11
20250618 D:\SynologyDrive\Drive-Acer\DeepWen\deepwen\home\acercyc\Projects\Drum\data\raw_20250911\SoA\20250618\morita_0618_soa.csv 10
20250620 D:\SynologyDrive\Drive-Acer\DeepWen\deepwen\home\acercyc\Projects\Drum\data\raw_20250911\SoA\20250620\morita_0620_soa.csv 9


In [6]:
# convolue
def convolve_with_exponential_kernel(data, decay_rate, kernel_size):
    """
    Convolve a column with an exponential kernel for smoothing.

    Parameters:
    - data (np.array): The data to be smoothed.
    - decay_rate (float): The decay rate (lambda) for the exponential kernel.

    Returns:
    - np.array: The convolved data.
    """
    kernel = np.exp(-decay_rate * np.linspace(0, 1, kernel_size))
    # kernel /= kernel.sum()  # Normalize kernel to maintain scale

    data_cov = np.convolve(data, kernel, mode="same")
    return data_cov


def impulse_response_convolution(
    data, peak_amplitude=1.0, decay_rate=0.001, response_length=200
):
    """
    Applies an impulse response function with exponential decay using convolution.

    Parameters:
        time_series (numpy array): The input time series (e.g., binary impulses).
        peak_amplitude (float): The peak height of the response.
        decay_rate (float): The exponential decay rate (higher means faster decay).
        response_length (int): The length of the impulse response function.

    Returns:
        numpy array: The output signal with impulse responses applied.
    """
    # Generate the impulse response kernel
    data = np.abs(data)
    t = np.arange(response_length)
    impulse_response_kernel = peak_amplitude * np.exp(-decay_rate * t)

    # Apply convolution
    output_signal = convolve(data, impulse_response_kernel, mode="full")[: len(data)]

    # Normalize the output signal
    output_signal /= np.max(output_signal)

    return output_signal


def compute_TE_from_input_file(file_path):
    # Load data
    data = pd.read_csv(file_path)
    data = pd.read_csv(file_path, sep="\t")

    
    # Extract signals
    target = data['ACC_HIHAT[V]'].values
    source = data['Correct_Timing_Signal[V]'].values
    t = data['Time[s]'].values
    
    # Apply convolution smoothing
    decay_rate = 0.001
    response_length = 1000
    target_cov = impulse_response_convolution(
        target, peak_amplitude=1, decay_rate=decay_rate, response_length=response_length
    )

    source_cov = impulse_response_convolution(
        source, peak_amplitude=1, decay_rate=decay_rate, response_length=response_length
    )

    
    # Find signal boundaries (non-zero regions)
    onsets_array, onset_idx = onset_detection(source, threshold=0.1, minimal_interval=1000)
    idx_start = onset_idx[0]
    idx_end = onset_idx[-1]
    
    # Truncate the signals
    target_ = target_cov[idx_start:idx_end]
    source_ = source_cov[idx_start:idx_end]
    t_ = t[idx_start:idx_end]
    
    # Down sample the signals
    original_sampling_rate = 10000
    down_sampling_rate = 100  # Hz
    down_sample_point = int(original_sampling_rate / down_sampling_rate)
    target_ = target_[::down_sample_point]
    source_ = source_[::down_sample_point]
    t_ = t_[::down_sample_point]
    
    # Compute Transfer Entropy
    te_calc = TransferEntropyCalculator_continuous_kraskov()
    te_result = te_calc.compute_TE(source_, target_, isPrintEstimation=True)
    
    return te_result

# Test the function with the current file
file_path = "../../data/raw_20250911/NI/20250604/1_20250604_135334_ni.txt"
te_result = compute_TE_from_input_file(file_path)
print(f"Transfer Entropy result: {te_result}")


TE was optimised via Ragwitz criteria: k=6, k_tau=3, l=6, l_tau=6
Transfer Entropy result: 0.02901197142154155
