In [None]:
import os
import mne
import pyxdf
import numpy as np
%matplotlib qt
import matplotlib.pyplot as plt
matplotlib.style.use('default')

In [None]:
labels = ['L1', 'L2', 'L3', 'L4', 'L5', 'L6', 'L7', 'L8', 'L9', 'L10',
          'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9', 'R10']
x_coords = [-72, -81, -99, -108, -109, -109, -108, -99, -81, -72,
            72, 81, 99, 108, 109, 109, 108, 99, 81, 72]
y_coords = [0.67778, 0.73333, 0.73333, 0.67778, 0.63889, 0.60556, 0.56667, 0.51111, 0.51111, 0.56667,
            0.67778, 0.73333, 0.73333, 0.67778, 0.63889, 0.60556, 0.56667, 0.51111, 0.51111, 0.56667]
lpa_x = (x_coords[0] + x_coords[9])/2
lpa_y = (y_coords[0] + y_coords[9])/2

rpa_x = (x_coords[10] + x_coords[19])/2
rpa_y = (y_coords[10] + y_coords[19])/2

naison_x = (x_coords[2] + x_coords[11]+9)/2
naison_y = (y_coords[2] + y_coords[11])/2
print("Naison coordinates: ", naison_x, naison_y)
plt.figure(figsize=(8, 6))
scatter = plt.scatter(x_coords, y_coords, c='blue', marker='o')
plt.scatter(lpa_x, lpa_y, c='red', marker='x', label='LPA')
plt.scatter(rpa_x, rpa_y, c='green', marker='x', label='RPA')
plt.scatter(naison_x, naison_y, c='purple', marker='^', label='Nasion')
plt.annotate('LPA', xy=(lpa_x, lpa_y), xytext=(0, 5),
             textcoords="offset points",
             fontsize=8, ha='center')
plt.annotate('RPA', xy=(rpa_x, rpa_y), xytext=(0, 5),
                textcoords="offset points",
                fontsize=8, ha='center')
plt.annotate('Nasion', xy=(naison_x, naison_y), xytext=(0, 5),
                textcoords="offset points",
                fontsize=8, ha='center')

# Adding labels to each point
for i, label in enumerate(labels):
    plt.annotate(label,  # The text to display
                 (x_coords[i], y_coords[i]),  # The position to place the text
                 textcoords="offset points",  # How to position the text
                 xytext=(0, 5),  # Offset text by 5 points above
                 ha='center',  # Horizontal alignment
                 fontsize=8)  # Font size

# Adding a legend, title and axes labels
plt.legend()
plt.title('Electrode Positions')
plt.xlabel('X Coordinate')
plt.ylabel('Y Coordinate')

In [None]:
from mne.channels import make_dig_montage

# Updated electrode data
labels = ['L1', 'L2', 'L3', 'L4', 'L5', 'L6', 'L7', 'L8', 'L9', 'L10',
          'R1', 'R2', 'R3', 'R4', 'R5', 'R6', 'R7', 'R8', 'R9', 'R10']
x_coords = [-72, -81, -99, -108, -109, -109, -108, -99, -81, -72,
            72, 81, 99, 108, 109, 109, 108, 99, 81, 72]
y_coords = [0.67778, 0.73333, 0.73333, 0.67778, 0.63889, 0.60556, 0.56667, 0.51111, 0.51111, 0.56667,
            0.67778, 0.73333, 0.73333, 0.67778, 0.63889, 0.60556, 0.56667, 0.51111, 0.51111, 0.56667]
electrode_data = list(zip(x_coords, y_coords, labels))

# Constants for 0.095 m radius
K = 0.0475 / 37  # ≈ 0.001283783784 m/unit
M = 0.0475 / (0.73333 - 0.622225)  # ≈ 0.427524775 m/unit
Z_OFFSET = 0.622225


def convert_to_mne_coordinates(electrode_data):
    """Convert 2D ear electrode coordinates to 3D MNE fiducial coordinates in meters."""
    coords_3d = {}
    for x_2d, z_2d, name in electrode_data:
        if name.startswith('L'):  # Left ear
            x_mne = -0.095
            y_mne = K * (x_2d + 72)
        else:  # Right ear
            x_mne = 0.095
            y_mne = K * (x_2d - 72)
        # Corrected to match image (higher y_coords = higher z)
        z_mne = M * (z_2d - Z_OFFSET)
        coords_3d[name] = [x_mne, y_mne, z_mne]
    return coords_3d


def create_mne_montage(coords_3d):
    """Create an MNE DigMontage with electrodes and fiducials."""
    fiducials = {
        'LPA': [-0.095, 0.0, 0.0],
        'RPA': [0.095, 0.0, 0.0],
        'NAS': [0.0, 0.095, 0.0]
    }
    ch_pos = coords_3d.copy()
    ch_pos.update(fiducials)

    montage = make_dig_montage(
        ch_pos={k: np.array(v) for k, v in coords_3d.items()},
        lpa=np.array(fiducials['LPA']),
        rpa=np.array(fiducials['RPA']),
        nasion=np.array(fiducials['NAS']),
        coord_frame='mri'
    )
    return montage


coords_3d = convert_to_mne_coordinates(electrode_data)

# print("\n>>> montage = create_mne_montage(coords_3d)")
montage = create_mne_montage(coords_3d)
# print("Montage created with the following channel positions:")
montage.get_positions()

In [49]:
montages = mne.channels.get_builtin_montages(descriptions=True)
easycap_montage = mne.channels.make_standard_montage(montages[1][0])
print(easycap_montage)
easycap_montage.get_positions()

# easycap_montage.plot(kind="topomap")

<DigMontage | 0 extras (headshape), 0 HPIs, 3 fiducials, 94 channels>


{'ch_pos': OrderedDict([('Fp1', array([-0.0294367,  0.0839171, -0.00699  ])),
              ('Fpz', array([ 0.0001123,  0.088247 , -0.001713 ])),
              ('Fp2', array([ 0.0298723,  0.0848959, -0.00708  ])),
              ('AF9', array([-0.0489708,  0.0640872, -0.047683 ])),
              ('AF7', array([-0.0548397,  0.0685722, -0.01059  ])),
              ('AF5', array([-0.0454307,  0.0728622,  0.005978 ])),
              ('AF3', array([-0.0337007,  0.0768371,  0.021227 ])),
              ('AF1', array([-0.0184717,  0.0799041,  0.032752 ])),
              ('AFz', array([0.0002313, 0.080771 , 0.035417 ])),
              ('AF2', array([0.0198203, 0.0803019, 0.032764 ])),
              ('AF4', array([0.0357123, 0.0777259, 0.021956 ])),
              ('AF6', array([0.0465843, 0.0738078, 0.006034 ])),
              ('AF8', array([ 0.0557433,  0.0696568, -0.010755 ])),
              ('AF10', array([ 0.0504352,  0.0638698, -0.048005 ])),
              ('F9', array([-0.0701019,  0.041652

In [None]:
def closest_points_vector(eeg_timestamps, marker_timestamps):
    # Get the insertion indices for each marker timestamp
    indices = np.searchsorted(eeg_timestamps, marker_timestamps)

    # Preallocate the output array as a copy of indices.
    closest_eeg_indices = indices.copy()

    # Create a mask for markers where the insertion index equals 0 (marker before first EEG timestamp)
    mask_begin = (indices == 0)
    # For these, the closest EEG index is 0 (they cannot use a previous value)
    closest_eeg_indices[mask_begin] = 0

    # Create a mask for markers where the insertion index equals the length of the EEG timestamps
    mask_end = (indices == len(eeg_timestamps))
    # For these markers, set the closest EEG index to the last index
    closest_eeg_indices[mask_end] = len(eeg_timestamps) - 1

    # Create a mask for the "middle" markers, i.e., not at the very beginning or end
    mask_middle = (indices > 0) & (indices < len(eeg_timestamps))

    # For markers in the middle, compute the distance to the previous and next EEG timestamps:
    prev_times = eeg_timestamps[indices[mask_middle] - 1]
    next_times = eeg_timestamps[indices[mask_middle]]
    marker_times_middle = marker_timestamps[mask_middle]

    # Calculate the differences
    diff_prev = marker_times_middle - prev_times
    diff_next = next_times - marker_times_middle

    # For each marker in the middle, choose the index of the EEG timestamp that is closer:
    # If the distance to the previous timestamp is less or equal than the distance to the next,
    # then we pick indices[mask_middle]-1; otherwise, we pick indices[mask_middle].
    closest_eeg_indices[mask_middle] = np.where(diff_prev <= diff_next,
                                                indices[mask_middle] - 1,
                                                indices[mask_middle])

    return closest_eeg_indices

def create_mappings(event_names, prefix):
    marker_dict = {p: i for i, p in enumerate(
        np.unique(event_names))}
    id_binding = {v: k for k, v in marker_dict.items()}
    category_mapping = {
        p: {k: v for k, v in marker_dict.items() if k.startswith(p)} for p in prefix
    }
    return marker_dict, id_binding, category_mapping

def create_events(time_points, event_mapping, event_names):
    label_id_func = np.vectorize(event_mapping.get)
    events = np.zeros((len(time_points), 3), dtype=int)
    events[:, 0] = time_points
    events[:, 2] = label_id_func(event_names)
    return events

In [None]:
montages

## EEG Observations
Missing EEG: 129059



In [None]:
subs = [797337,129059, 617834, 822866, 991031]
EXP_ROOT = "exp_data"
INPUT_ROOT = "input"
OUTPUT_ROOT = "output"
sub_id = subs[0]
DATA_FILE = os.path.join(
    EXP_ROOT, f"sub-{sub_id}", f"sub-{sub_id}_task-hearing_run-001.xdf")
RAW_VIDEO = os.path.join(INPUT_ROOT, f"{sub_id}.avi")
# DATA_FILE = os.path.join(EXP_ROOT,"elizabeth.xdf")
data, header = pyxdf.load_xdf(DATA_FILE)
print([stream['info']['type'][0] for stream in data])

In [None]:
marker_stream = next(
    stream for stream in data if stream['info']['type'][0] == 'Markers')
# video_stream = next(
#     stream for stream in data if stream['info']['type'][0] == 'Video')
# ppg_stream = next(
#     stream for stream in data if stream['info']['type'][0] == 'PPG')
# eeg_stream = next(
#     stream for stream in data if stream['info']['type'][0] == 'EEG')
eeg_stream = data[3]

In [None]:
marker_timestamps = marker_stream['time_stamps']
marker_data = np.array(marker_stream['time_series']).squeeze()
eeg_data = eeg_stream['time_series'] * 1e-6  # convert to volts
eeg_timestamps = eeg_stream['time_stamps']
eeg_insert_points = closest_points_vector(eeg_timestamps, marker_timestamps)

In [None]:
bindings = ['pmt','hlt','let','ast']
marker_dict, id_binding, category_mapping = create_mappings(marker_data, bindings)
events = create_events(eeg_insert_points, marker_dict, marker_data)

In [None]:
ch_labels = ['L1', 'L2', 'L4', 'L5', 'L7', 'L8', 'L9', 'L10',
             'R1', 'R2', 'R4', 'R5', 'R7', 'R8', 'R9', 'R10']
# ch_labels = ['Fp1', 'Fp2', 'C3', 'C4', 'P7', 'P8', 'O1',
#              'O2', 'F7', 'F8', 'F3', 'F4', 'T7', 'T8', 'P3', 'P4']
sampling_rate = 125

eeg_data = eeg_stream['time_series'].T * 1e-6
info = mne.create_info(
    ch_names=ch_labels, sfreq=sampling_rate, ch_types='eeg')
raw = mne.io.RawArray(eeg_data, info)
# raw.filter(l_freq=1, h_freq=None)

flalt_voltage = 0.1
_, bads = mne.preprocessing.annotate_amplitude(
    raw, flat=dict(eeg=flalt_voltage*1e-6))
raw.info['bads'] = bads
print(f"Bad channels: {bads}")
# raw.interpolate_bads()
annot = mne.annotations_from_events(
    events, raw.info['sfreq'], id_binding)
raw.set_annotations(annot)

# raw.plot_psd(fmax=62)

raw = raw.notch_filter(60)
bandpass = {'low': 1, 'high': 40}
raw = raw.filter(l_freq=bandpass['low'], h_freq=bandpass['high'])
# _, bads = mne.preprocessing.annotate_amplitude(raw, flat=dict(eeg=1e-6))
# raw.info['bads'] = ['R1', 'R2', 'R4', 'R5', 'R7', 'R8', 'R9', 'R10']
# raw.info['bads'] = bads
# print(f"Bad channels: {bads}")

raw.plot(n_channels=16, scalings='auto', bad_color='red')
# raw.plot_psd(fmax=62,dB=True)

In [None]:
raw.set_montage(montage)
raw.plot_sensors(show_names=True,)

In [None]:
# Compute and plot the PSD
raw.compute_psd(fmin=1, fmax=60).plot()

In [None]:
print(raw.get_data().max())

In [None]:
import numpy as np
import mne

# Approximate positions for 8 electrodes per ear
ch_pos = {}
radius = 0.015  # Radius around the ear (~15 mm)
angles = np.linspace(0, 2 * np.pi, 9)[:-1]  # 8 angles (0° to 315°)

# Left ear around LPA
lpa = np.array([-0.081, -0.010, 0])  # Slightly behind LPA
for i, angle in enumerate(angles):
    ch_pos[f'L{i+1}'] = [
        lpa[0],  # x: fixed at LPA's x
        lpa[1] + radius * np.cos(angle),  # y: varies in circle
        lpa[2] + radius * np.sin(angle)   # z: varies in circle
    ]

# Right ear around RPA
rpa = np.array([0.081, -0.010, 0])
for i, angle in enumerate(angles):
    ch_pos[f'R{i+1}'] = [
        rpa[0],  # x: fixed at RPA's x
        rpa[1] + radius * np.cos(angle),  # y: varies in circle
        rpa[2] + radius * np.sin(angle)   # z: varies in circle
    ]

# Create the montage
montage = mne.channels.make_dig_montage(ch_pos=ch_pos, coord_frame='head')

# Assign channel names to match the montage
raw.rename_channels({raw.ch_names[i]: list(ch_pos.keys())[i] for i in range(16)})

# Set the montage
raw.set_montage(montage)

# Visualize the sensor positions
raw.plot_sensors()