In [1]:
from pathlib import Path

from zmax_datasets.datasets.base import ZMaxRecording


DATA_DIR = Path("../../../eye_movement")

print(DATA_DIR.exists())

rec = ZMaxRecording(subject_id="p1", session_id="s1", data_dir=DATA_DIR)
print(rec.data_types)

True
['dX', 'dY', 'NASAL L', 'OXY_R_AC', 'dZ', 'EEG L', 'OXY_IR_DC', 'OXY_DARK_AC', 'BATT', 'EEG R', 'OXY_R_DC', 'RSSI', 'NOISE', 'NASAL R', 'LIGHT', 'BODY TEMP', 'OXY_IR_AC']


In [2]:
from slumber.utils.data import Data
import numpy as np

eeg_l = rec.read_raw_data("EEG L").get_data().T * 1e6
print(eeg_l.shape)
eeg_r = rec.read_raw_data("EEG R").get_data().T * 1e6
print(eeg_r.shape)

data = Data(
    np.column_stack((eeg_l, eeg_r)),
    256,
    channel_names=["F7-Fpz", "F8-Fpz"],
)
print(data)

Extracting EDF parameters from /Users/alisaberi/Desktop/eye_movement/EEG L.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
(51200, 1)
Extracting EDF parameters from /Users/alisaberi/Desktop/eye_movement/EEG R.edf...
EDF file detected
Setting channel info structure...
Creating raw.info structure...
(51200, 1)
Data(shape=(51200, 2), sample_rate=256, channel_names=['F7-Fpz', 'F8-Fpz'])


In [3]:
from slumber.processing.transforms import FIRFilter, Resample

data_ = Resample()(data, new_sample_rate=128)
print(data_)

Data(shape=(25600, 2), sample_rate=128, channel_names=['F7-Fpz', 'F8-Fpz'])


In [4]:
from slumber.processing.eye_movement import detect_lr_eye_movements

seq = detect_lr_eye_movements(data_, left_eeg_label="F7-Fpz", right_eeg_label="F8-Fpz")
print(seq)

Setting up band-pass filter from 0.3 - 2 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.30
- Lower transition bandwidth: 0.30 Hz (-6 dB cutoff frequency: 0.15 Hz)
- Upper passband edge: 2.00 Hz
- Upper transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 3.00 Hz)
- Filter length: 1409 samples (11.008 s)



[32m2024-12-19 14:36:44.115[0m | [34m[1mDEBUG   [0m | [36mslumber.processing.eye_movement[0m:[36mdetect_lr_eye_movements[0m:[36m59[0m - [34m[1mDifference data: Data(shape=(25600, 1), sample_rate=128, channel_names=['channel_0'])[0m


Setting up band-pass filter from 0.3 - 2 Hz

FIR filter parameters
---------------------
Designing a one-pass, zero-phase, non-causal bandpass filter:
- Windowed time-domain design (firwin) method
- Hamming window with 0.0194 passband ripple and 53 dB stopband attenuation
- Lower passband edge: 0.30
- Lower transition bandwidth: 0.30 Hz (-6 dB cutoff frequency: 0.15 Hz)
- Upper passband edge: 2.00 Hz
- Upper transition bandwidth: 2.00 Hz (-6 dB cutoff frequency: 3.00 Hz)
- Filter length: 1409 samples (11.008 s)

[MovementEvent(label='LR', start_time=7.1875, end_time=7.7734375), MovementEvent(label='LRLR', start_time=31.625, end_time=35.65625), MovementEvent(label='LRLR', start_time=44.40625, end_time=46.8515625), MovementEvent(label='LRLR', start_time=52.828125, end_time=54.4140625), MovementEvent(label='LRLRL', start_time=63.9140625, end_time=68.484375), MovementEvent(label='LRLR', start_time=71.4609375, end_time=74.2890625), MovementEvent(label='LRLR', start_time=83.28125, end_time=8

In [5]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
from ipywidgets import Button, VBox, HBox, Output
from IPython.display import display

pio.renderers.default = 'vscode'

samples = data.index

subset_duration = 30  # seconds
total_subsets = int(data.duration.total_seconds() // subset_duration)

current_subset = 0

# Output widget for displaying the figure
output = Output()

def plot_subset(subset_idx):
    """
    Function to create and display a plot for a specific subset.
    """
    # Calculate slice indices for EEG and hypnodensity
    start_time = subset_idx * subset_duration
    end_time = start_time + subset_duration

    # Slice EEG data
    subset_data = data.slice_by_time(start_time, end_time)
    subset_data_ = data_.slice_by_time(start_time, end_time)
    subset_seq = [s for s in seq if s.start_time >= start_time and s.end_time <= end_time]


    # Create figure
    fig = make_subplots(
        rows=2, cols=1,
        shared_xaxes=True,
        vertical_spacing=0.05,
        subplot_titles=("Raw EEG Signals")
    )

    # Add EEG signals
    fig.add_trace(go.Scatter(x=subset_data.index + start_time, y=subset_data[:, "F7-Fpz"].array.squeeze() * 1e6, mode='lines', name='EEG Left'), row=1, col=1)
    fig.add_trace(go.Scatter(x=subset_data.index + start_time, y=subset_data[:, "F8-Fpz"].array.squeeze() * 1e6, mode='lines', name='EEG Right'), row=1, col=1)

    # Add EEG signals
    fig.add_trace(go.Scatter(x=subset_data_.index + start_time, y=(subset_data_[:, "F7-Fpz"].array - subset_data_[:, "F8-Fpz"].array).squeeze() * 1e6, mode='lines', name='EEG Left'), row=2, col=1)
    fig.add_trace(go.Scatter(x=subset_data_.index + start_time, y=subset_data_[:, "F8-Fpz"].array.squeeze() * 1e6, mode='lines', name='EEG Right'), row=2, col=1)

    # Add annotations
    for event in subset_seq:
        fig.add_shape(
            type="rect",
            x0=event.start_time, x1=event.end_time,
            y0=subset_data[:, "F7-Fpz"].array.min() * 1e6, y1=subset_data[:, "F7-Fpz"].array.max() * 1e6,
            line=dict(color="RoyalBlue"),
            fillcolor="LightSkyBlue",
            opacity=0.5
        )
        fig.add_trace(go.Scatter(
            x=[(event.start_time + event.end_time) / 2], y=[subset_data[:, "F7-Fpz"].array.max() * 1e6],
            text=[event.label], mode="text", name=f"Annotation: {event.label}"
        ))
    
    # Update layout
    fig.update_layout(
        height=600,
        xaxis=dict(title="Time (s)"),
        yaxis=dict(title="EEG Signal Amplitude (µV)"),
        dragmode="zoom",
        margin=dict(l=50, r=50, t=50, b=50),
        title=f"Subset {subset_idx + 1}/{total_subsets}"
    )

    # Display the figure
    with output:
        output.clear_output(wait=True)
        fig.show()

# Callback functions for buttons
def next_subset_callback(b):
    global current_subset
    if current_subset < total_subsets - 1:
        current_subset += 1
        plot_subset(current_subset)

def prev_subset_callback(b):
    global current_subset
    if current_subset > 0:
        current_subset -= 1
        plot_subset(current_subset)

# Create buttons
btn_next = Button(description="Next Subset")
btn_prev = Button(description="Previous Subset")

btn_next.on_click(next_subset_callback)
btn_prev.on_click(prev_subset_callback)

# Display buttons and initial plot
display(VBox([HBox([btn_prev, btn_next]), output]))
plot_subset(current_subset)

VBox(children=(HBox(children=(Button(description='Previous Subset', style=ButtonStyle()), Button(description='…