# Real-Time Multichannel EEG Streaming Application

![](./assets/.png)

## Prerequisites

| What? | Why? |
| --- | --- |
| [Index: Intro, Workflows, Extensions](./index.ipynb) | For context and workflow selection/feature guidance |
| [Recommended Workflow](./multichan.ipynb) | For live downsampling with in-memory Pandas DataFrame |

## Overview

For an introduction, please visit the ['Index'](./index.ipynb) page. 

This tutorial guides you through building a multichannel timeseries streaming application, visualizing either CPU usage or an EEG data file as if it was streaming live over a network interface. The application shows how to create controls like selecting the data source and starting, pausing, and stoping the data stream.

### Key Software:
- **[LSL](https://github.com/sccn/labstreaminglayer)** and **[MNE-LSL](https://mne.tools/mne-lsl/stable/index.html)**: Widely used neuroscience, Lab Streaming Layer (LSL) helps with the collection of measurement time series in research experiments. Here, it is used via the MNE-LSL python package to set up a mock live stream from a file on disk.


## Imports and Configuration


In [None]:
import abc
import time
import uuid
import mne
import holoviews as hv
import panel as pn
import param
import numpy as np
import pandas as pd
from mne_lsl.datasets import sample
from mne_lsl.player import PlayerLSL
from mne_lsl.stream import StreamLSL
from holoviews.streams import Buffer
import psutil

hv.extension('bokeh')
pn.extension()


## Creating Data Sources

We will create two data sources, and a python class to handle the management of each:
1. **CPU Usage:** Streams CPU usage percentages per core.
2. **EEG Usage:** Streams EEG data from a sample dataset.

Let's create a sort of recipe (abstract base class) to ensure that each data source class contains certain methods. We want the class to tell us its channel names, what is its sampling interval, start up the stream, generate some data, and stop the stream.

In [None]:
class DataSource(abc.ABC):
    @abc.abstractmethod
    def get_channel_names(self):
        pass
    
    @property
    @abc.abstractmethod
    def sampling_interval(self):
        pass
        
    @abc.abstractmethod
    def generate_data(self):
        pass
    
    @abc.abstractmethod
    def start(self):
        pass

    @abc.abstractmethod
    def stop(self):
        pass

### CPU Usage Data Source
The `CPU_Usage` class streams CPU usage data using the `psutil` library. This acts as a sort of simple test bench for us to stream some real data and make sure the plotting application is working. Whenever `generate_data` is called, it will return a timestamped measurement of cpu usage across my computer's cores.

In [None]:
class CPU_Usage():
    def __init__(self, sampling_interval=0.25):
        self.num_cores = psutil.cpu_count(logical=True)
        self._sampling_interval = sampling_interval
        self.streaming = False
        
        self.channel_names = [f'CPU_{i}' for i in range(self.num_cores)]

    def get_channel_names(self):
        return self.channel_names

    def get_channel_positions(self):
        return self.channel_positions if hasattr(self, 'channel_positions') else None
    
    @property
    def sampling_interval(self):
        return self._sampling_interval

    def start(self):
        self.streaming = True

    def stop(self):
        self.streaming = False

    def generate_data(self):
        if not self.streaming:
            return pd.DataFrame(columns=['time'] + self.channel_names)
        cpu_percent = psutil.cpu_percent(percpu=True)
        if cpu_percent:
            timestamp = pd.Timestamp.now()
            data = {'time': [timestamp]}
            for ch, usage in zip(self.channel_names, cpu_percent):
                data[ch] = usage
            return pd.DataFrame(data)
        else:
            return pd.DataFrame(columns=['time'] + self.channel_names)


### LSL File Stream Data Source
The `LSL_EEG_File_Stream` class streams EEG data via a mock `LSL` live stream from a saved file, using utilities from the `mne` and `mne_lsl` libraries.

This class is a bit more involved than the last and requires us to handle the stream setup (start) and teardown (stop) in a particular way, according to the [PlayerLSL](https://mne.tools/mne-lsl/stable/generated/api/mne_lsl.player.PlayerLSL.html) and [StreamLSL](https://mne.tools/mne-lsl/stable/generated/api/mne_lsl.stream.StreamLSL.html#mne_lsl.stream.StreamLSL) docs. However, the idea is the same, when `generate_data` is called, it will return timestamped dataframe of the next block of channel measurements.


In [None]:
class LSL_EEG_File_Stream():
    def __init__(self, fname, buffer_size=None, picks='eeg'):
        """
        Initialize LSL EEG File Stream with automatic chunk size optimization
        
        Parameters:
        -----------
        fname : str
            Path to the EEG file
        buffer_size : float, optional
            Buffer size in seconds. If None, automatically set to 2 seconds
        picks : str or list
            Channel selection
        """
        self.source_id = uuid.uuid4().hex
        self.fname = fname
        
        # Load the file to get sampling rate
        raw = mne.io.read_raw(fname)
        sfreq = raw.info['sfreq']
        
        # Auto-calculate optimal chunk size (aiming for ~50ms chunks)
        target_chunk_time = 0.05  # 50ms
        self.chunk_size = int(sfreq * target_chunk_time)
        
        # Set buffer size in seconds if not provided
        if buffer_size is None:
            buffer_size = 2.0  # 2 second default buffer
        
        # Initialize LSL components
        self.player = PlayerLSL(self.fname, 
                              chunk_size=self.chunk_size,
                              source_id=self.source_id)
        
        # Convert buffer_size from seconds to samples for StreamLSL
        buffer_samples = int(buffer_size * sfreq)
        self.stream = StreamLSL(bufsize=buffer_samples, 
                              source_id=self.source_id)
        
        self._sampling_interval = self.chunk_size / sfreq
        self.streaming = False
        self.picks = picks
        self.reference = "average"
        
        # Set up channel names
        if self.picks == 'eeg':
            ch_type_indices = mne.channel_indices_by_type(self.player.info)['eeg']
            self.channel_names = [self.player.ch_names[i] for i in ch_type_indices]
        else:
            self.channel_names = self.picks
            
        # Get standard 10-05 montage
        montage = mne.channels.make_standard_montage("standard_1005")
        montage_pos = montage.get_positions()['ch_pos']
        
        # Store positions for picked channels
        self.channel_positions = {}
        for ch in self.channel_names:
            if ch in montage_pos:
                pos = montage_pos[ch]
                self.channel_positions[ch] = {
                    'x': float(pos[0]),
                    'y': float(pos[1])
                }

    def get_channel_positions(self):
        """Get 2D positions for channels if available"""
        return self.channel_positions if hasattr(self, 'channel_positions') else None
    
    @property 
    def sfreq(self):
        """Get sampling frequency of the stream"""
        return self.player.info['sfreq']
    
    @property
    def chunk_time(self):
        """Get chunk duration in seconds"""
        return self.chunk_size / self.sfreq
    
    def get_channel_names(self):
        return self.channel_names

    @property
    def sampling_interval(self):
        return self._sampling_interval

    def start(self):
        if not self.streaming:
            self.player.start()
            self.stream.connect()
            self.stream.pick(self.picks)
            if self.reference:
                self.stream.set_eeg_reference(self.reference)
            self.streaming = True

    def stop(self):
        if self.streaming:
            self.stream.disconnect()
            self.player.stop()
            self.streaming = False

    def generate_data(self):
        """Generate the next chunk of data"""
        if not self.streaming:
            return pd.DataFrame(columns=['time'] + self.channel_names)
            
        # Get all new samples
        data, ts = self.stream.get_data(
            self.stream.n_new_samples / self.stream.info["sfreq"],
            picks=self.channel_names
        )
        
        if data.size > 0:
            new_data = pd.DataFrame({'time': ts})
            for i, ch in enumerate(self.channel_names):
                new_data[ch] = data[i]
            return new_data
        else:
            return pd.DataFrame(columns=['time'] + self.channel_names)

## Building the Streaming Application
The StreamingApp class below handles the user interface and streaming logic, integrating the data sources with interactive controls. Comments are included for each part of the code that might need additional context.

In [None]:
class StreamingApp(param.Parameterized):
    def __init__(self, data_sources, notebook=False):
        super().__init__()
        self.data_sources = data_sources
        self.notebook = notebook

        # Create mappings from data source names to instances
        self.data_source_names = [type(ds).__name__ for ds in data_sources]
        self.data_source_instances = {type(ds).__name__: ds for ds in data_sources}
        self.data_source = self.data_source_names[0]  # Default selection

        self.update_channel_names()
        self.buffer_length = 1000  # Adjust as needed
        self.streaming = False
        self.paused = False
        self.task = None
        self.data_generator = None

        # Initialize the buffer here after channel names are set
        self.buffer = Buffer(data=self.initial_data(), length=self.buffer_length)

        self.create_widgets()
        self.create_layout()

    def update_channel_names(self):
        data_source_instance = self.data_source_instances[self.data_source]
        self.channel_names = data_source_instance.get_channel_names()
        # Update buffer with new channel names if buffer is initialized
        if hasattr(self, 'buffer'):
            self.buffer.clear()
            self.buffer.data = self.initial_data()

    def initial_data(self):
        # No initial data
        return pd.DataFrame({'time': [], **{ch: [] for ch in self.channel_names}})

    def create_montage_plot(self, data_source=None, channel_values=None):
        """Create a montage plot if the data source has channel positions"""
        if data_source is None:
            data_source = self.data_source_instances[self.data_source]
        
        # Check if data source has channel positions
        if not hasattr(data_source, 'get_channel_positions'):
            return None
            
        positions = data_source.get_channel_positions()
        if not positions:
            return None
            
        import holoviews as hv
        
        # Create points for each channel
        points = []
        for ch, pos in positions.items():
            point = {
                'x': pos['x'],
                'y': pos['y'],
                'channel': ch,
                'value': channel_values.get(ch, 0) if channel_values else 0
            }
            points.append(point)
        
        # Create scatter plot
        scatter = hv.Points(
            points, 
            kdims=['x', 'y'], 
            vdims=['channel', 'value']
        ).opts(
            color='value',
            colorbar=True,
            cmap='RdBu_r',  # Red-Blue diverging colormap
            symmetric=True,  # Center colormap around 0
            tools=['hover'],
            size=10
        )
        
        # Add channel labels
        labels = hv.Labels({
            'x': [p['x'] for p in points],
            'y': [p['y'] for p in points],
            'text': [p['channel'] for p in points]
        })
        
        # Add head outline (circular approximation)
        theta = np.linspace(0, 2*np.pi, 100)
        head_radius = 0.12  # Approximate head radius
        circle_pts = {
            'x': head_radius * np.cos(theta),
            'y': head_radius * np.sin(theta)
        }
        head_outline = hv.Path([circle_pts])
        
        # Nose indicator
        nose_pts = {'x': [0, 0], 'y': [head_radius, head_radius*1.1]}
        nose = hv.Path([nose_pts])
        
        # Combine plots
        plot = (head_outline * nose * scatter * labels).opts(
            hv.opts.Path(color='black'),
            hv.opts.Labels(
                text_font_size='8pt',
                text_align='center',
                text_baseline='middle'
            ),
            title='Channel Positions',
            width=300,
            height=300,
            xlim=(-0.15, 0.15),
            ylim=(-0.15, 0.15),
            show_grid=False
        )
        
        return plot
    
    def create_widgets(self):
        # Dropdown widget for selecting data source
        self.data_source_widget = pn.widgets.Select(
            name='Data Source',
            options=self.data_source_names,
            value=self.data_source
        )
        # Watch for changes in the data source selector
        self.data_source_widget.param.watch(self.on_data_source_change, 'value')

        self.radio_group = pn.widgets.RadioButtonGroup(
            name='Stream Control',
            options=['Start', 'Pause', 'Stop'],
            value='Stop',
            button_type='default',
            sizing_mode='stretch_width',
            stylesheets=[ """
            :host(.solid) .bk-btn.bk-btn-default.bk-active {
                background-color: #b23c3c;
            }
            """]
        )
        self.radio_group.param.watch(self.handle_state_change, 'value')

        # Slider for buffer size
        self.buffer_size_slider = pn.widgets.IntSlider(
            name='Buffer Size (s)', start=1, end=10, step=1, value=2
        )
        self.buffer_size_slider.param.watch(self.on_buffer_size_change, 'value')

    def on_buffer_size_change(self, event):
        self.buffer_length = event.new * self.data_generator.info['sfreq']
        self.buffer.clear()
        self.buffer.data = self.initial_data()
    
    def on_data_source_change(self, event):
        self.stop_stream()
        self.data_source = event.new
        self.update_channel_names()
        # Update the plot
        self.plot_pane.object = self.bare_plot()

    def handle_state_change(self, event):
        if event.new == 'Start':
            self.start_stream()
        elif event.new == 'Pause':
            self.pause_stream()
        elif event.new == 'Stop':
            self.stop_stream()

    def create_layout(self):
        # Create time-series plot
        self.plot_pane = pn.pane.HoloViews(self.bare_plot())
        
        # Create montage plot if available
        montage_plot = self.create_montage_plot()
        self.montage_pane = pn.pane.HoloViews(montage_plot) if montage_plot else None
        
        if self.notebook:
            layout_elements = [
                self.data_source_widget,
                self.radio_group,
            ]
            
            if self.montage_pane:
                layout_elements.append(
                    pn.Row(
                        pn.Column(self.montage_pane, width=300),
                        self.plot_pane
                    )
                )
            else:
                layout_elements.append(self.plot_pane)
                
            self.layout = pn.Column(
                *layout_elements,
                align='center',
            )
        else:
            sidebar = pn.Column(
                pn.WidgetBox(
                    self.data_source_widget,
                    self.radio_group,
                ),
                align='center',
                sizing_mode='stretch_width',
            )
            self.template = pn.template.FastListTemplate(
                main=[self.plot_pane],
                sidebar=[sidebar],
                title="Multi-Channel Streaming App",
                theme="dark",
                accent="#2e008b",
            )

    def start_stream(self):
        if not self.streaming:
            self.streaming = True
            self.paused = False
            self.data_generator = self.data_source_instances[self.data_source_widget.value]
            self.data_generator.start()
            self.buffer.clear()
            # Replace the plot with the live streaming plot
            self.plot_pane.object = hv.DynamicMap(self.create_streaming_plot, streams=[self.buffer])
            # Start the periodic callback
            sampling_interval_ms = int(self.data_generator.sampling_interval * 1000)
            self.task = pn.state.add_periodic_callback(
                self.stream_data, period=sampling_interval_ms, count=None
            )
        elif self.streaming and self.paused:
            self.paused = False
            if self.task is None:
                # Restart the periodic callback if it was stopped
                sampling_interval_ms = int(self.data_generator.sampling_interval * 1000)
                self.task = pn.state.add_periodic_callback(
                    self.stream_data, period=sampling_interval_ms, count=None
                )

    def pause_stream(self):
        if self.task:
            self.task.stop()
            self.task = None
        self.paused = True

    def stop_stream(self):
        if self.streaming:
            self.streaming = False
            self.paused = False
            if self.task:
                self.task.stop()
                self.task = None
            if self.data_generator is not None:
                self.data_generator.stop()  # Stop the data source
            self.radio_group.value = 'Stop'
            self.plot_pane.object = self.bare_plot()
            self.buffer.clear()
            self.buffer.data = self.initial_data()
            self.data_generator = None

    def create_streaming_plot(self, data):
        overlays = {}
        for ch in self.channel_names:
            if ch in data.columns and not data[ch].dropna().empty:
                # Set the label to match the key
                curve = hv.Curve((data['time'], data[ch]), 'time', 'channel', label=ch).opts(
                    line_width=2,
                    subcoordinate_y=True,
                )
                overlays[ch] = curve
        if overlays:
            ndoverlay = hv.NdOverlay(overlays).apply.opts(
                # legend_position='right',
                show_legend=False,
                responsive=True,
                min_height=600,
                framewise=True,
                title='',
            )
            return ndoverlay
        else:
            return self.blank_stream_plot()

    def bare_plot(self):
        return hv.Curve([]).opts(yaxis='bare', xaxis='bare', min_height=600, responsive=True)

    def blank_stream_plot(self):
        # Create an empty NdOverlay with the expected keys and labels
        empty_curves = {ch: hv.Curve([]).relabel(ch).opts(subcoordinate_y=True) for ch in self.channel_names}
        ndoverlay = hv.NdOverlay(empty_curves).opts(
            legend_position='right',
            responsive=True,
            min_height=600,
            title=''
        )
        return ndoverlay

    def stream_data(self):
        if not self.streaming or self.paused:
            return
        with pn.io.unlocked():
            new_data_df = self.data_generator.generate_data()
            if not new_data_df.empty:
                # Update values for montage plot
                current_values = {}
                for ch in self.channel_names:
                    if ch in new_data_df.columns:
                        current_values[ch] = float(new_data_df[ch].iloc[-1])
                
                # Update montage plot if it exists
                if self.montage_pane is not None:
                    self.montage_pane.object = self.create_montage_plot(
                        channel_values=current_values
                    )
                self.buffer.send(new_data_df)

    def create_servable_app(self):
        if self.notebook:
            return self.layout
        else:
            return self.template.servable()

### Running the Application
Instantiate the data sources and create the application.

In [None]:
data_source_cpu_usage = CPU_Usage(sampling_interval=0.05)

eeg_channel_picks = ['F4', 'F7', 'FC5', 'FC2', 'T7', 'CP2', 'CP6', 'P4', 'P7', 'O1', 'O2']
data_source_eeg_usage = LSL_EEG_File_Stream(
    sample.data_path() / "sample-ant-raw.fif",
    picks=eeg_channel_picks
)

app = StreamingApp(data_sources=[data_source_eeg_usage, data_source_cpu_usage], notebook=True)
app.create_servable_app()

## Using the Application
- **Select Data Source:** Use the dropdown to select data source.
- **Control Streaming:**
- Click **Start** to begin streaming data from the selected source.
  - Click **Pause** to temporarily halt data updates without stopping the stream.
  - Click **Stop** to stop data updates and the data source itself.
- **Switching Data Sources:**
  - When you select a different data source, the app automatically stops the current data source before starting the new one.

## Conclusion
We've built a real-time multichannel streaming application that can handle different data sources, including EEG data and CPU usage and that can be extended and customized for various real-time data streaming needs.

## What Next?
- Customization: Modify the application to include additional data sources or customize the visualization options.
- Deployment: Deploy the application as a standalone web app using panel serve.
- Data Analysis: Extend the application to include data analysis features such as filtering, feature extraction, or event detection.

## Related Resources

| What? | Why? |
| --- | --- |
|[MNE-Python Docs](https://mne.tools/stable/index.html)|	For more information on EEG data handling and analysis |