In [1]:
import time
import datetime
import numpy as np
import pandas as pd
from pylsl import StreamInlet, resolve_stream, proc_ALL, local_clock

class LSLDataCollector:
    def __init__(self, exg_stream_name="filtered_exg", exg_stream_type="EXG", mp_stream_name="FingerPercentages", mp_stream_type="Markers", initial_max_seconds=1024):
        """
        Initialize the data collector by connecting to the specified LSL stream and setting up buffers.

        Parameters
        ----------
        exg_stream_name : str
            The name of the LSL EXG stream to connect to.
        exg_stream_type : str
            The type of the LSL EXG stream to connect to.
        mp_stream_name : str
            The name of the LSL MP stream to connect to.
        mp_stream_type : str
            The type of the LSL MP stream to connect to.
        initial_max_seconds : int
            The initial maximum seconds of data to store. The buffers will dynamically resize as needed.
        """

        # Resolve the EXG LSL stream
        print(f"Resolving stream name='{exg_stream_name}', type='{exg_stream_type}'...")
        exg_streams = resolve_stream('name', exg_stream_name)
        if not exg_streams:
            raise RuntimeError(f"No EXG stream found with name '{exg_stream_name}'. Please ensure the stream is active.")
        
        # Resolve the MP LSL stream
        print(f"Resolving stream name='{mp_stream_name}', type='{mp_stream_type}'...")
        mp_streams = resolve_stream('name', mp_stream_name)
        if not mp_streams:
            raise RuntimeError(f"No MP stream found with name '{mp_stream_name}'. Please ensure the stream is active.")
        
        self.exg_inlet = StreamInlet(exg_streams[0], processing_flags=proc_ALL)
        self.mp_inlet = StreamInlet(mp_streams[0], processing_flags=proc_ALL)

        self.is_recording = False
        self.start()

        # Get stream info
        self.exg_info = self.exg_inlet.info()
        self.num_exg_channels = self.exg_info.channel_count()
        self.exg_sampling_rate = self.exg_info.nominal_srate()
        if self.exg_sampling_rate <= 0:
            raise ValueError("The EXG stream's nominal sampling rate is not set or is 0.")
        
        self.mp_info = self.mp_inlet.info()
        self.num_mp_channels = self.mp_info.channel_count()
        self.mp_sampling_rate = self.mp_info.nominal_srate()
        if self.mp_sampling_rate <= 0:
            raise ValueError("The MP stream's nominal sampling rate is not set or is 0.")

        # Attempt to get MP labels (if defined)
        try:
            self.mp_labels = self.mp_info.desc().child("channels").child("label").str()
        except:
            self.mp_labels = "unknown"

        print(f"Connected to EXG stream: {self.exg_info.desc()}")
        print(f"Number of EXG channels: {self.num_exg_channels}")
        print(f"EXG Sampling rate: {self.exg_sampling_rate} Hz")
        
        print(f"Connected to MP stream: {self.mp_info.desc()}")
        print(f"Number of MP channels: {self.num_mp_channels}")
        print(f"MP Labels: {self.mp_labels}")
        print(f"MP Sampling rate: {self.mp_sampling_rate} Hz")
        
        self.initial_exg_capacity = int(initial_max_seconds * self.exg_sampling_rate)
        self.initial_mp_capacity = int(initial_max_seconds * self.mp_sampling_rate)

        # Initialize buffers
        self._initialize_buffers(self.initial_exg_capacity, self.initial_mp_capacity)

        # Initialize sizes
        self.exg_size = 0
        self.mp_size = 0


    def _initialize_buffers(self, exg_capacity, mp_capacity):
        """
        Initialize the data and timestamp buffers with the specified capacity.
        """
        self.exg_buffer = np.empty((exg_capacity, self.num_exg_channels), dtype=np.float32)
        self.exg_timestamp_buffer = np.empty(exg_capacity, dtype=np.float64)
        self.exg_timestamp_dict = {}
        self.exg_capacity = exg_capacity

        self.mp_buffer = np.empty((mp_capacity, self.num_mp_channels), dtype=np.float32)
        self.mp_timestamp_buffer = np.empty(mp_capacity, dtype=np.float64)
        self.mp_timestamp_dict = {}
        self.mp_capacity = mp_capacity

    def get_exg(self, index_start, index_end=None):
        """
        Get the EXG data samples at the specified indices.

        Parameters
        ----------
        index_start : int
            The start index of the sample to retrieve.
        
        index_end : int or None
            The end index of the sample to retrieve. If None, only the sample at index_start is returned.

        Returns
        -------
        (timestamp_array, data_array) : tuple of np.ndarrays
            The timestamps and corresponding EXG data samples.
        """
        
        index_start = self.exg_size + index_start if index_start < 0 else index_start
        
        if index_end is None:
            return (self.exg_timestamp_buffer[index_start], self.exg_buffer[index_start])
        
        index_end = self.exg_size + index_end if index_end < 0 else index_end
        
        return (self.exg_timestamp_buffer[index_start:index_end], self.exg_buffer[index_start:index_end])
    
    def get_mp(self, index_start, index_end=None):
        """
        Get the MP data samples at the specified indices.

        Parameters
        ----------
        index_start : int
            The start index of the sample to retrieve.
        
        index_end : int or None
            The end index of the sample to retrieve. If None, only the sample at index_start is returned.

        Returns
        -------
        (timestamp_array, data_array) : tuple of np.ndarrays
            The timestamps and corresponding MP data samples.
        """
        index_start = self.mp_size + index_start if index_start < 0 else index_start
        
        if index_end is None:
            return (self.mp_timestamp_buffer[index_start], self.mp_buffer[index_start])
        
        index_end = self.mp_size + index_end if index_end < 0 else index_end
        
        return (self.mp_timestamp_buffer[index_start:index_end], self.mp_buffer[index_start:index_end])

    def __len__(self):
        """
        Return the number of EXG samples collected so far.
        """
        return self.exg_size

    def _resize_exg_buffers(self, new_capacity):
        """
        Resize the EXG data and timestamp buffers to a new capacity.
        """
        new_exg_buffer = np.empty((new_capacity, self.num_exg_channels), dtype=np.float32)
        new_exg_buffer[:self.exg_size] = self.exg_buffer[:self.exg_size]
        self.exg_buffer = new_exg_buffer

        new_exg_timestamp_buffer = np.empty(new_capacity, dtype=np.float64)
        new_exg_timestamp_buffer[:self.exg_size] = self.exg_timestamp_buffer[:self.exg_size]
        self.exg_timestamp_buffer = new_exg_timestamp_buffer

        self.exg_capacity = new_capacity

    def _resize_mp_buffers(self, new_capacity):
        """
        Resize the MP data and timestamp buffers to a new capacity.
        """
        new_mp_buffer = np.empty((new_capacity, self.num_mp_channels) , dtype=np.float32)
        new_mp_buffer[:self.mp_size] = self.mp_buffer[:self.mp_size]
        self.mp_buffer = new_mp_buffer

        new_mp_timestamp_buffer = np.empty(new_capacity, dtype=np.float64)
        new_mp_timestamp_buffer[:self.mp_size] = self.mp_timestamp_buffer[:self.mp_size]
        self.mp_timestamp_buffer = new_mp_timestamp_buffer

        self.mp_capacity = new_capacity

    def start(self):
        """
        Start recording by setting the start time and opening the streams.
        """
        if self.is_recording:
            print("Data collection is already started.")
            return
        self.is_recording = True
        self.start_time = datetime.datetime.now()
        self.start_lsl_time = local_clock()
        print(f"Data collection started at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')}.")
        self.exg_inlet.open_stream()
        self.mp_inlet.open_stream()
        self.exg_size = 0
        self.mp_size = 0

    def stop(self):
        """
        Stop recording.
        """
        if not self.is_recording:
            print("Data collection is not running.")
            return
        self.exg_inlet.close_stream()
        self.mp_inlet.close_stream()
        self.is_recording = False
        print("Data collection stopped.")
        
    def round_time(self, timestamps, sampling_rate):
        # Convert timestamps to a NumPy array if needed
        timestamps = np.array(timestamps, dtype=np.float64)
        return (timestamps * sampling_rate).astype(int) / sampling_rate

    def update(self):
        """
        Fetch new data from the LSL streams and append it to the buffers.
        This method should be called periodically.
        """
        if not self.is_recording:
            return
        
        # Fetch data from MP stream
        mp_data, mp_timestamp = self.mp_inlet.pull_chunk()
        
        # Fetch data from EXG stream
        exg_data, exg_timestamp = self.exg_inlet.pull_chunk()
        
        # Handle EXG data
        if exg_data:
            exg_data = np.array(exg_data, dtype=np.float32)
            num_exg_samples = exg_data.shape[0]
            if self.exg_size + num_exg_samples > self.exg_capacity:
                # Resize the EXG buffers if needed
                new_exg_capacity = max(self.exg_capacity * 2, self.exg_size + num_exg_samples)
                self._resize_exg_buffers(new_exg_capacity)
            self.exg_buffer[self.exg_size:self.exg_size + num_exg_samples] = exg_data
            rounded_exg_timestamp = self.round_time(exg_timestamp, self.exg_sampling_rate)
            self.exg_timestamp_buffer[self.exg_size:self.exg_size + num_exg_samples] = rounded_exg_timestamp
            self.exg_timestamp_dict.update(zip(rounded_exg_timestamp, range(self.exg_size, self.exg_size + num_exg_samples)))
            self.exg_size += num_exg_samples
        
        # Handle MP data
        if mp_data:
            mp_data = np.array(mp_data, dtype=np.float32)
            num_mp_samples = mp_data.shape[0]
            if self.mp_size + num_mp_samples > self.mp_capacity:
                # Resize the MP buffers if needed
                new_mp_capacity = max(self.mp_capacity * 2, self.mp_size + num_mp_samples)
                self._resize_mp_buffers(new_mp_capacity)
            self.mp_buffer[self.mp_size:self.mp_size + num_mp_samples] = mp_data
            rounded_mp_timestamp = self.round_time(mp_timestamp, self.mp_sampling_rate)
            self.mp_timestamp_buffer[self.mp_size:self.mp_size + num_mp_samples] = rounded_mp_timestamp
            self.mp_timestamp_dict.update(zip(rounded_mp_timestamp, range(self.mp_size, self.mp_size + num_mp_samples)))
            self.mp_size += num_mp_samples

    def save_data(self, filename=None):
        """
        Save the collected data to a file.

        Parameters
        ----------
        filename : str
            The name of the file to save the data to. If None, the current timestamp is used.
        """
        if filename is None:
            filename = f"lsl_data_{datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}.npz"
        
        np.savez(filename, 
                 exg_data=self.exg_buffer[:self.exg_size], 
                 exg_timestamp=self.exg_timestamp_buffer[:self.exg_size], 
                 mp_data=self.mp_buffer[:self.mp_size], 
                 mp_timestamp=self.mp_timestamp_buffer[:self.mp_size])
        print(f"Data saved to {filename}")

    def clear(self):
        """
        Clear the collected data buffers and reset them to their initial capacity.
        """
        self._initialize_buffers(self.initial_exg_capacity, self.initial_mp_capacity)
        self.exg_size = 0
        self.mp_size = 0
        self.start_time = None
        print("Data buffers cleared and reset to initial capacity.")

In [2]:
data_collection = LSLDataCollector()

Resolving stream name='filtered_exg', type='EXG'...


2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:102   INFO| 	IPv4 addr: 7f000001
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:105   INFO| 	IPv6 addr: ::1
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:91    INFO| netif 'lo0' (status: 1, multicast: 32768, broadcast: 0)
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:105   INFO| 	IPv6 addr: fe80::1%lo0
2024-12-11 18:33:20.927 (   0.614s) [          102C67]      netinterfaces.cpp:91    I

Resolving stream name='FingerPercentages', type='Markers'...
Data collection started at 2024-12-11 18:33:48.
Connected to EXG stream: <pylsl.pylsl.XMLElement object at 0x11f4f70a0>
Number of EXG channels: 16
EXG Sampling rate: 250.0 Hz
Connected to MP stream: <pylsl.pylsl.XMLElement object at 0x11f4f70a0>
Number of MP channels: 5
MP Labels: unknown
MP Sampling rate: 250.0 Hz


2024-12-11 18:33:48.019 (  27.705s) [          102C67]             common.cpp:66    INFO| git:v1.16.2-34-g7e61a2ef/branch:master/build:Release/compiler:AppleClang-15.0.0.15000309/link:SHARED


In [14]:
last_time = time.perf_counter()
while(data_collection.is_recording):
    data_collection.update()
    print(data_collection.exg_size, data_collection.mp_size)
    # Print the most recent timestamps
    if data_collection.exg_size > 0:
        print(f"EXG Timestamps: {data_collection.get_exg(-1)}")
    if data_collection.mp_size > 0:
        print(f"MP Timestamps: {data_collection.get_mp(-1)}")
    cur_time = time.perf_counter()
    time_diff = cur_time - last_time
    if time_diff < 1:  
        time.sleep(1-time_diff)
    last_time = time.perf_counter()

1024 1024
EXG Timestamps: (141429.74, array([ -0.58904576,  19.282387  , -31.024773  , -31.02538   ,
        35.007187  ,  65.9172    ,  -7.14829   , -45.98456   ,
       -12.508161  ,  -2.4513226 ,  20.688032  , -14.215396  ,
        36.041218  ,  28.612534  , -12.637799  , -52.533928  ],
      dtype=float32))
MP Timestamps: (141429.732, array([0.9382957 , 0.9303272 , 0.89518523, 0.8794692 , 0.87368536],
      dtype=float32))
2048 2048
EXG Timestamps: (141433.836, array([  0.47026366,  19.067413  ,  38.40021   ,  -6.6247716 ,
       -53.23405   ,  11.910244  , 116.43437   ,  -1.9015212 ,
       -86.68646   , -21.034857  , -28.568745  , -31.750013  ,
        32.207367  , 118.296135  ,  56.295868  , -94.399414  ],
      dtype=float32))
MP Timestamps: (141433.824, array([nan, nan, nan, nan, nan], dtype=float32))
3072 3072
EXG Timestamps: (141437.936, array([ -0.33365914,  18.979315  , -40.52543   ,  25.186394  ,
        66.58502   , -28.818924  , -97.22816   ,  47.417965  ,
        71.81

KeyboardInterrupt: 

In [146]:
data_collection.update()

exg_keys = set(data_collection.exg_timestamp_dict.keys())
mp_keys = set(data_collection.mp_timestamp_dict.keys())

diff_exg = exg_keys.difference(mp_keys)
diff_mp = mp_keys.difference(exg_keys)

print(data_collection.mp_size)
print(len(diff_exg))
print(len(diff_mp))

70428
37
10
