In [14]:
import numpy as np
from scipy.signal import hilbert

class AntipodalConnectivity:
    def __init__(self, eeg_data, window_length, sensors):
        """
        Initializes the AntipodalConnectivity with the given data, window length, and sensors.

        Parameters:
        eeg_data (ndarray): 2D numpy array with shape (num_sensors, num_columns)
        window_length (int): The length of each segment window
        sensors (list): List of sensor names corresponding to the rows of eeg_data
        """
        self.eeg_data = eeg_data
        self.window_length = window_length
        self.sensors = sensors
        self.num_sensors, self.num_columns = eeg_data.shape
        self.processed_data = {}
        self.energy_data = {}

    def chordal_distance_energy(self, real_part, imag_part):
        """
        Calculates the chordal distance energy (Ec) from the real and imaginary parts of the analytic signal.

        Parameters:
        real_part (ndarray): Real part of the analytic signal
        imag_part (ndarray): Imaginary part of the analytic signal

        Returns:
        ec (ndarray): Chordal distance energy for each time step
        """

        absSig = np.sqrt(real_part ** 2 + imag_part ** 2)
        ec = -np.log(np.sqrt(1 / (1 + absSig ** 2)))

        return ec

    def process_sensor_data(self, sensor_data):
        """
        Processes the data for a single sensor by segmenting, applying Hilbert Transform, stacking, and normalizing.

        Parameters:
        sensor_data (ndarray): 1D numpy array with sensor data

        Returns:
        sensor_processed (list): List of processed data for each window
        sensor_energy (list): List of chordal distance energy for each window
        """
        num_windows = self.num_columns // self.window_length
        sensor_processed = []
        sensor_energy = []

        for w in range(num_windows):
            start = w * self.window_length
            end = start + self.window_length
            window_data = sensor_data[start:end]

            # Apply Hilbert Transform
            analytic_signal = hilbert(window_data)
            real_part = np.real(analytic_signal)
            imag_part = np.imag(analytic_signal)

            # Stack temporal data as a vector (real and imaginary parts)
            stacked_data = np.hstack((real_part, imag_part))

            # Normalize the data over the window length
            mean = np.mean(stacked_data)
            std = np.std(stacked_data)
            normalized_data = (stacked_data - mean) / std

            # Calculate chordal distance energy for each time step
            ec = self.chordal_distance_energy(real_part, imag_part)
            sensor_energy.append(ec)

            sensor_processed.append(normalized_data)

        return sensor_processed, sensor_energy

    def process_all_sensors(self):
        """
        Processes all sensors and stores the results in instance variables.
        """
        for i, sensor in enumerate(self.sensors):
            sensor_data = self.eeg_data[i]
            processed, energy = self.process_sensor_data(sensor_data)
            self.processed_data[sensor] = np.array(processed)
            self.energy_data[sensor] = np.hstack(energy)

    def compare_energies(self):
        """
        Compares chordal distance energies across sensors and finds matches.

        Returns:
        matches (list): List of tuples containing sensor names and time where chordal distance energies match
        """
        sensor_time_array = []
        time_steps = self.energy_data[self.sensors[0]].shape[0]
        matched_pairs = set()

        for t in range(time_steps):
            #Compare each channel (electrode) whose energy values are equal with 5-digit precision (You can change the precision)
            energies_at_t = {sensor: np.round(self.energy_data[sensor][t], 5) for sensor in self.sensors}
            for sensor1 in self.sensors:
                for sensor2 in self.sensors:
                    if sensor1 != sensor2 and energies_at_t[sensor1] == energies_at_t[sensor2]:
                        pair = tuple(sorted([sensor1, sensor2]))
                        if pair not in matched_pairs:
                            matched_pairs.add(pair)
                            sensor_time_array.append((sensor1, sensor2, t))

        return sensor_time_array

    def run(self):
        """
        Runs the full processing and comparison pipeline.

        Returns:
        matches (list): List of tuples containing sensor names and time where chordal distance energies match
        """
        self.process_all_sensors()
        return self.compare_energies()



In [15]:
# Example usage:
# Assuming eeg_data is a 2D numpy array with shape (num_sensors, num_columns)
# and sensors is a list of sensor names
eeg_data = np.random.rand(10, 10000)  # Example EEG data
sensors = ['Fz', 'Cz', 'Pz', 'Oz', 'Fp1', 'Fp2', 'P1', 'P2', 'T1', 'T2']  # Example sensor names
window_length = 128  # Example window length

processor = AntipodalConnectivity(eeg_data, window_length, sensors)
sensor_time_matches = processor.run()

# Print the matches
for match in sensor_time_matches:
    print(f"Sensors {match[0]} and {match[1]} have matching energies at time {match[2]}")

Sensors Pz and T1 have matching energies at time 427
Sensors Cz and Oz have matching energies at time 1344
Sensors Fp1 and P1 have matching energies at time 1966
Sensors Fz and Fp2 have matching energies at time 2086
Sensors Cz and Fp2 have matching energies at time 4355
Sensors Fp2 and P2 have matching energies at time 4955
Sensors Pz and Oz have matching energies at time 6545
Sensors Oz and P2 have matching energies at time 8939
Sensors Cz and P1 have matching energies at time 9581
