# Real-Time Multichannel EEG Streaming Application

![](./assets/.png)

## Prerequisites

| What? | Why? |
| --- | --- |
| [Index: Intro, Workflows, Extensions](./index.ipynb) | For context and workflow selection/feature guidance |
| [Recommended Workflow](./multichan.ipynb) | For live downsampling with in-memory Pandas DataFrame |

## Overview

For an introduction, please visit the ['Index'](./index.ipynb) page. 

This tutorial guides you through building a multichannel timeseries streaming application, visualizing either CPU usage or an EEG data file as if it was streaming live over a network interface. The application shows how to create controls like selecting the data source and starting, pausing, and stoping the data stream.

### Key Software:
- **[LSL](https://github.com/sccn/labstreaminglayer)** and **[MNE-LSL](https://mne.tools/mne-lsl/stable/index.html)**: Widely used neuroscience, Lab Streaming Layer (LSL) helps with the collection of measurement time series in research experiments. Here, it is used via the MNE-LSL python package to set up a mock live stream from a file on disk.


## Imports and Configuration


In [None]:
import abc
import time
import uuid
import mne
import holoviews as hv
import panel as pn
import param
import numpy as np
import pandas as pd
from bokeh.palettes import Category20
from mne_lsl.datasets import sample
from mne_lsl.player import PlayerLSL
from mne_lsl.stream import StreamLSL
from holoviews.streams import Buffer
import psutil

hv.extension('bokeh')
pn.extension()


## Creating Data Sources

We will create two data sources, and a python class to handle the management of each:
1. **CPU Usage:** Streams CPU usage percentages per core.
2. **EEG Usage:** Streams EEG data from a sample dataset.

Let's create a sort of recipe (abstract base class) to ensure that each data source class contains certain methods. We want the class to tell us its channel names, positions, sampling interval, as well as to start up the stream, generate some data, and stop the stream.

In [None]:
class DataSource(abc.ABC):
    @abc.abstractmethod
    def get_channel_names(self):
        pass
    
    @property
    @abc.abstractmethod
    def sampling_interval(self):
        pass
    
    @abc.abstractmethod
    def get_channel_names(self):
        pass

    @abc.abstractmethod
    def get_channel_positions(self):
        pass
        
    @abc.abstractmethod
    def generate_data(self):
        pass
    
    @abc.abstractmethod
    def start(self):
        pass

    @abc.abstractmethod
    def stop(self):
        pass

### CPU Usage Data Source
The `CPU_Usage` class streams CPU usage data using the `psutil` library. This acts as a sort of simple test bench for us to stream some real data and make sure the plotting application is working. Whenever `generate_data` is called, it will return a timestamped measurement of cpu usage across my computer's cores.

In [None]:
class CPU_Usage():
    def __init__(self, sampling_interval=0.25, buffer_size=5):
        self.num_cores = psutil.cpu_count(logical=True)
        self._sampling_interval = sampling_interval
        self.streaming = False
        self._channel_positions = None
        self.buffer_size = buffer_size
        
        self.channel_names = [f'CPU_{i}' for i in range(self.num_cores)]

    @property
    def sampling_interval(self):
        return self._sampling_interval
    
    def get_channel_names(self):
        return self.channel_names

    def get_channel_positions(self):
        return self.channel_positions
    
    def start(self):
        self.streaming = True

    def stop(self):
        self.streaming = False

    def generate_data(self):
        if not self.streaming:
            return pd.DataFrame(columns=['time'] + self.channel_names)
        cpu_percent = psutil.cpu_percent(percpu=True)
        if cpu_percent:
            timestamp = pd.Timestamp.now()
            data = {'time': [timestamp]}
            for ch, usage in zip(self.channel_names, cpu_percent):
                data[ch] = usage
            return pd.DataFrame(data)
        else:
            return pd.DataFrame(columns=['time'] + self.channel_names)


### LSL File Stream Data Source
The `LSL_EEG_File_Stream` class streams EEG data via a mock `LSL` live stream from a saved file, using utilities from the `mne` and `mne_lsl` libraries.

This class is a bit more involved than the last and requires us to handle the stream setup (start) and teardown (stop) in a particular way, according to the [PlayerLSL](https://mne.tools/mne-lsl/stable/generated/api/mne_lsl.player.PlayerLSL.html) and [StreamLSL](https://mne.tools/mne-lsl/stable/generated/api/mne_lsl.stream.StreamLSL.html#mne_lsl.stream.StreamLSL) docs. However, the idea is the same, when `generate_data` is called, it will return timestamped dataframe of the next block of channel measurements.


In [16]:
class LSL_EEG_File_Stream():
    def __init__(self, fname, picks='eeg'):
        self.source_id = uuid.uuid4().hex
        self.fname = fname
        self.name = f"MNE-LSL-{self.source_id}"  # Explicit stream name
        
        self.chunk_size = 20

        # Initialize LSL components
        self.player = PlayerLSL(
            self.fname, 
            chunk_size=self.chunk_size,
            source_id=self.source_id,
            name=self.name
        )
        
        self.stream = StreamLSL(
            bufsize=2,
            name=self.name,
            source_id=self.source_id,
        )
        # self.player = PlayerLSL(self.fname, chunk_size=self.chunk_size, source_id=self.source_id)
        # self.stream = StreamLSL(bufsize=2, source_id=self.source_id)
        
        # Set sampling interval for updates
        self._sampling_interval = 0.02  # 20ms update rate
        
        self.streaming = False
        self.picks = picks
        self.reference = "average"
        
        if self.picks == 'eeg':
            ch_type_indices = mne.channel_indices_by_type(self.player.info)['eeg']
            self.channel_names = [self.player.ch_names[i] for i in ch_type_indices]
        else:
            self.channel_names = self.picks
            
        # Get the standard 10-05 montage
        montage = mne.channels.make_standard_montage("standard_1005")
        positions = montage.get_positions()['ch_pos']
        
        # Filter positions for the current channels
        self.channel_positions = []
        for ch in self.channel_names:
            if ch in positions:
                pos = positions[ch]
                self.channel_positions.append({
                    'xpos': pos[0],
                    'ypos': pos[1],
                    'ch': ch,
                })
    
    def get_channel_names(self):
        return self.channel_names
    
    def get_channel_positions(self):
        return self.channel_positions
        
    @property
    def sampling_interval(self):
        return self._sampling_interval
        
    def start(self):
        if not self.streaming:
            self.player.start()
            # Wait briefly for the stream to be available
            time.sleep(0.1)
            try:
                self.stream.connect(timeout=5.0)
            except RuntimeError as e:
                print("Failed to connect to LSL stream")
                self.player.stop()
                raise
            self.stream.pick(self.picks)
            if self.reference:
                self.stream.set_eeg_reference(self.reference)
            self.streaming = True
    
    def stop(self):
        if self.streaming:
            self.stream.disconnect()
            self.player.stop()
            self.streaming = False
    
    def generate_data(self):
        if not self.streaming:
            return pd.DataFrame(columns=['time'] + self.channel_names)
        
        # Get whatever new data is available
        data, ts = self.stream.get_data(
            self.stream.n_new_samples / self.stream.info["sfreq"],
            picks=self.channel_names
        )
        
        if data.size > 0:
            new_data = pd.DataFrame({'time': ts})
            for i, ch in enumerate(self.channel_names):
                new_data[ch] = data[i]
            return new_data
        else:
            return pd.DataFrame(columns=['time'] + self.channel_names)

## Building the Streaming Application
The StreamingApp class below handles the user interface and streaming logic, integrating the data sources with interactive controls. Comments are included for each part of the code that might need additional context.

In [18]:
class StreamingApp(param.Parameterized):
    def __init__(self, data_sources, notebook=False, buffer_length_samples=10000):
        super().__init__()
        self.data_sources = data_sources
        self.notebook = notebook
        self.initial_time = None  # track the first timestamp

        # Create mappings from data source names to instances
        self.data_source_names = [type(ds).__name__ for ds in data_sources]
        self.data_source_instances = {type(ds).__name__: ds for ds in data_sources}
        self.data_source = self.data_source_names[0]  # Default selection

        self.update_channel_names()
        self.buffer_length = buffer_length_samples
        # Initialize the buffer here after channel names are set
        self.buffer = Buffer(data=self.initial_data(), length=self.buffer_length)
        
        self.streaming = False
        self.paused = False
        self.task = None
        self.data_generator = None


        self.create_widgets()
        self.create_layout()

    def update_channel_names(self):
        print('update_channel_names')
        data_source_instance = self.data_source_instances[self.data_source]
        self.channel_names = data_source_instance.get_channel_names()
        # Create a consistent color mapping
        palette = Category20[max(3, len(self.channel_names))]
        self.color_mapping = {ch: palette[i % len(palette)] for i, ch in enumerate(self.channel_names)}
        self.current_sampling_interval = data_source_instance.sampling_interval
        # Update buffer with new channel names if buffer is initialized
        if hasattr(self, 'buffer'):
            self.buffer.clear()
            self.buffer.data = self.initial_data()

    def initial_data(self):
        return pd.DataFrame({'time': [], **{ch: [] for ch in self.channel_names}})

    def create_widgets(self):
        # Dropdown widget for selecting data source
        self.data_source_widget = pn.widgets.Select(
            name='Data Source',
            options=self.data_source_names,
            value=self.data_source
        )
        # Watch for changes in the data source selector
        self.data_source_widget.param.watch(self.on_data_source_change, 'value')

        self.radio_group = pn.widgets.RadioButtonGroup(
            name='Stream Control',
            options=['Start', 'Pause', 'Stop'],
            value='Stop',
            button_type='default',
            sizing_mode='stretch_width',
            stylesheets=[ """
            :host(.solid) .bk-btn.bk-btn-default.bk-active {
                background-color: #b23c3c;
            }
            """]
        )
        self.radio_group.param.watch(self.handle_state_change, 'value')

    def on_data_source_change(self, event):
        print('on_data_source_change')
        self.stop_stream()
        self.data_source = event.new
        self.update_channel_names()
        # Update the plot and montage
        self.plot_pane.object = self.bare_plot()
        self.montage_pane.object = pn.pane.Placeholder()

    def handle_state_change(self, event):
        if event.new == 'Start':
            self.start_stream()
        elif event.new == 'Pause':
            self.pause_stream()
        elif event.new == 'Stop':
            self.stop_stream()

    def create_layout(self):
        print('create_layout')
        # Create the montage pane
        self.montage_pane = pn.pane.Placeholder()
        # Start with a blank bare plot in main
        self.plot_pane = pn.pane.HoloViews(self.bare_plot())
        if self.notebook:
            self.layout = pn.Row(
                pn.Column(
                    self.data_source_widget,
                    self.radio_group,
                    self.montage_pane,
                    width=300,
                ),
                self.plot_pane,
                align='start',
            )
        else:
            sidebar = pn.Column(
                pn.WidgetBox(
                    self.data_source_widget,
                    self.radio_group,
                ),
                self.montage_pane,
                sizing_mode='stretch_width',
            )
            self.template = pn.template.FastListTemplate(
                main=[self.plot_pane],
                sidebar=[sidebar],
                title="Multi-Channel Streaming App",
                theme="dark",
                accent="#2e008b",
            )

    def start_stream(self):
        if not self.streaming:
            with self.plot_pane.param.update(loading=True):
                self.streaming = True
                self.paused = False
                self.data_generator = self.data_source_instances[self.data_source_widget.value]
                self.montage_pane.object = self.create_montage_plot()
                self.data_generator.start()
                self.buffer.clear()
                self.initial_time = None  # Reset initial time when starting new stream
                print('starting stream')
                self.plot_pane.object = hv.DynamicMap(self.create_streaming_plot, streams=[self.buffer])
                sampling_interval_ms = int(self.data_generator.sampling_interval * 1000)
                self.task = pn.state.add_periodic_callback(
                    self.stream_data, period=sampling_interval_ms, count=None
                )
        elif self.streaming and self.paused:
            self.paused = False
            if self.task is None:
                sampling_interval_ms = int(self.data_generator.sampling_interval * 1000)
                self.task = pn.state.add_periodic_callback(
                    self.stream_data, period=sampling_interval_ms, count=None
                )

    def pause_stream(self):
        print('pause stream')
        if self.task:
            self.task.stop()
            self.task = None
        self.paused = True

    def stop_stream(self):
        print('stop stream')
        if self.streaming:
            self.streaming = False
            self.paused = False
            if self.task:
                self.task.stop()
                self.task = None
            if self.data_generator is not None:
                self.data_generator.stop()
            self.radio_group.value = 'Stop'
            self.plot_pane.object = self.bare_plot()
            self.buffer.clear()
            self.buffer.data = self.initial_data()
            self.data_generator = None
            self.initial_time = None  # Reset initi

    def create_streaming_plot(self, data):
        # print('create_streaming_plot')
        overlays = {}
        
        if not data.empty and 'time' in data.columns:
            # Capture initial time only once when streaming starts
            if self.initial_time is None:
                self.initial_time = data['time'].iloc[0]
            
            # Use stored initial time for all subsequent calculations
            data = data.copy()  # Create copy to avoid modifying original
            data['time'] = (data['time'] - self.initial_time).round(2)
        
        for ch in self.channel_names:
            if ch in data.columns and not data[ch].dropna().empty:
                curve = hv.Curve((data['time'], data[ch]), 'time', 'channel', label=ch).opts(
                    line_width=2,
                    color=self.color_mapping[ch],
                    subcoordinate_y=True,
                )
                overlays[ch] = curve
                
        if overlays:
            ndoverlay = hv.NdOverlay(overlays).apply.opts(
                show_legend=False,
                responsive=True,
                min_height=600,
                framewise=True,
                title='',
                xlabel='Time (s)',
                ylabel='Amplitude',
            )
            return ndoverlay
        else:
            return self.blank_stream_plot()

    def bare_plot(self, min_height=600):
        print('bare')
        return hv.Curve([]).opts(yaxis='bare', xaxis='bare', min_height=min_height, responsive=True)

    def blank_stream_plot(self):
        print('blank')
        # Create an empty NdOverlay with the expected keys and labels
        empty_curves = {ch: hv.Curve([]).relabel(ch).opts(subcoordinate_y=True) for ch in self.channel_names}
        ndoverlay = hv.NdOverlay(empty_curves).opts(
            legend_position='right',
            responsive=True,
            min_height=600,
            title=''
        )
        return ndoverlay

    def stream_data(self):
        # print('stream data')
        if not self.streaming or self.paused:
            return
        with pn.io.unlocked():
            new_data_df = self.data_generator.generate_data()
            if not new_data_df.empty:
                self.buffer.send(new_data_df)

    def create_montage_plot(self):
        if hasattr(self.data_generator, 'channel_positions') and self.data_generator.channel_positions:
            df = pd.DataFrame(self.data_generator.channel_positions)
            df['clr'] = df['ch'].apply(lambda x: self.color_mapping.get(x, 'grey'))
            points = hv.Points(df, ['xpos', 'ypos'], vdims=['ch', 'clr']).opts(
                color='clr', size=20, alpha=.5, tools=['hover'], marker='circle')
            labels = hv.Labels(df, ['xpos', 'ypos'], 'ch').opts(text_color='black', text_font_size='8pt')
            plot = (points * labels).opts(
                xaxis=None, yaxis=None, axiswise=True, height=300, responsive=True, shared_axes=False, title='Channel Position')
            return pn.pane.HoloViews(plot)
        else:
            return pn.pane.Placeholder()

    def create_servable_app(self):
        if self.notebook:
            return self.layout
        else:
            return self.template.servable()


### Running the Application
Instantiate the data sources and create the application.

In [None]:
data_source_cpu_usage = CPU_Usage()

# display a subset of eeg channels with 'picks'
data_source_eeg_usage = LSL_EEG_File_Stream(
    sample.data_path() / "sample-ant-raw.fif",
    picks=['F4', 'F7', 'FC5', 'FC2', 'T7', 'CP2', 'CP6', 'P4', 'P7', 'O1', 'O2']
)

app = StreamingApp(data_sources=[data_source_eeg_usage, data_source_cpu_usage], notebook=False) #['F4', 'F7', 'FC5', 'FC2', 'T7', 'CP2', 'CP6', 'P4', 'P7', 'O1', 'O2']
app.create_servable_app()

## Using the Application
- **Select Data Source:** Use the dropdown to select data source.
- **Control Streaming:**
- Click **Start** to begin streaming data from the selected source.
  - Click **Pause** to temporarily halt data updates without stopping the stream.
  - Click **Stop** to stop data updates and the data source itself.
- **Switching Data Sources:**
  - When you select a different data source, the app automatically stops the current data source before starting the new one.

## Conclusion
We've built a real-time multichannel streaming application that can handle different data sources, including EEG data and CPU usage and that can be extended and customized for various real-time data streaming needs.

## What Next?
- Customization: Modify the application to include additional data sources or customize the visualization options.
- Deployment: Deploy the application as a standalone web app using panel serve.
- Data Analysis: Extend the application to include data analysis features such as filtering, feature extraction, or event detection.

## Related Resources

| What? | Why? |
| --- | --- |
|[MNE-Python Docs](https://mne.tools/stable/index.html)|	For more information on EEG data handling and analysis |