# EEG Viewer with synthetic data pipeline
![status](https://img.shields.io/badge/status-in%20progress-orange)



<div style="text-align: center;">
    <img src="./assets/230524_eeg-viewer.png" alt="eeg viewer preview" width="450"/>
</div>

## Summary

This workflow is intended to demonstrate the visualization of a set of 1D EEG timeseries with HoloViz and Bokeh tools.

For details specific to this workflow, such as goals, specifications, and bottlenecks, please see this workflow's [readme](./readme_eeg-viewer.md).

For a summary of EEG research, data, and software, see [neuro/wiki/EEG-notes](https://github.com/holoviz-topics/neuro/wiki/EEG-notes).

## Imports and config

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Requirements</p>
    <p>This workflow notebook requires the <a href="./environment.yml">environment</a> specified in this workflow directory.</p>
</div>


In [None]:
%load_ext autoreload
%autoreload 2
import numpy as np; np.random.seed(0)
import pandas as pd
from scipy.stats import zscore

# Viz
# import colorcet as cc
import holoviews as hv; hv.extension('bokeh')
from holoviews.plotting.links import RangeToolLink
# from holoviews.operation.datashader import rasterize
from holoviews import Dataset
from bokeh.models import HoverTool, WheelZoomTool
import panel as pn; pn.extension(template='material')

### Generate random synthetic data

<div class="admonition alert alert-warning">
    <p class="admonition-title" style="font-weight:bold">Run a single option</p>
    <p>... from the following data generation options</p>
</div>

In [None]:
n_channels = 25
n_seconds = 15
fs = 250  # Sampling frequency

generate_random = False
generate_sin = False
generate_realistic = True

#### Option 1: Random cumsum data. Useful for bug reporting

In [None]:
if generate_random:
    import numpy as np; np.random.seed(0)
    
    # n_channels = 25
    # n_seconds = 30
    # fs = 512
    
    total_samples = fs*n_seconds
    time = np.linspace(0, n_seconds, total_samples)
    data = np.random.randn(n_channels, total_samples).cumsum(axis=1)
    channels = [f"EEG {i}" for i in range(n_channels)]
    print(f'shape: {data.shape} (n_channels, samples) ')

#### Option 2: Increasing sin data. Useful for quick demo and debugging minimap

In [None]:
if generate_sin: 
    # n_channels = 25
    # n_seconds = 30
    # fs = 512  # Sampling frequency
    
    init_freq = 1  # Initial frequency in Hz
    freq_inc = 20/n_channels  # Frequency increment
    amplitude = 1
    
    total_samples = n_seconds * fs
    time = np.linspace(0, n_seconds, total_samples)
    channels = [f'EEG {i}' for i in range(n_channels)]
    
    data = np.array([amplitude * np.sin(2 * np.pi * (init_freq + i * freq_inc) * time)
                     for i in range(n_channels)])
    print(f'shape: {data.shape} (n_channels, samples) ')

#### Semi-realistic data. Useful for demo

The `generate_eeg_powerlaw` function synthesizes EEG data as high-pass filtered pink noise power law time series by default. The function returns a 2D numpy array of synthetic EEG data (in microvolts) shaped as (number of channels, total samples), a 1D time array (in seconds), and a list of channel names. Parameters such as the high-pass filter factor (in Hz) and an amplitude scaling factor allow customization of the generated data.

In [None]:
if generate_realistic:
    from neurodatagen.eeg import generate_eeg_powerlaw
    # from hvneuro import download_file
    
    data, time, channels = generate_eeg_powerlaw(n_channels, n_seconds, fs, channel_prefix='EEG', blink_scale=.009)
    print(f'shape: {data.shape} (n_channels, samples) ')

## Generate range annotations

In [None]:
import pandas as pd
import colorcet as cc
import string

def create_range_annotations(n_total_seconds: int, n_categories: int, 
                             n_total_annotations: int, duration: int = 1) -> pd.DataFrame:

    start_times = np.sort(np.random.randint(0, n_total_seconds - duration, n_total_annotations))
    
    # Ensure the annotations are non-overlapping
    for i in range(1, len(start_times)):
        if start_times[i] < start_times[i-1] + duration:
            start_times[i] = start_times[i-1] + duration
    end_times = start_times + duration
    categories = np.random.choice(list(string.ascii_uppercase)[:n_categories], n_total_annotations)
    
    df = pd.DataFrame({
        'start': start_times,
        'end': end_times,
        'category': categories
    })
    df['category'] = df['category'].astype('category')
    
    unique_categories = df['category'].cat.categories
    color_map = dict(zip(unique_categories, cc.glasbey[:len(unique_categories)]))
    df['color'] = df['category'].map(color_map)
    df['color'] = df['color'].astype('category')
    
    return df

n_categories = 3
n_total_annotations = 5
annotations_df = create_range_annotations(n_seconds, n_categories, n_total_annotations)
annotations_df.sample(5)


### Visualize synthetic data. Approach: Subcoords, HoloViz

This approach makes use HoloViews API that adds axis offset automation and a (hopefully) simpler API to Bokeh's experimental 'subplot' functionality, as described and demonstrated separately.

Pros:
- Allows for independent dynamic y-axis scaling of individual traces
- Simpler on the front end to deal with offsets; just set `subcoordinate_y = True`.

Cons:
- Multiple levels of being experimental, so still has some bugs to work out
- Requires HoloViews >=1.18 and 1.18 is not yet supporting Bokeh 3.3

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Requires HoloViews>=1.18</p>
    <p>... for initial `subcoordinate_y` support. Later versions will include important improvements.</p>
</div>


In [None]:
xzoom_out_extent = 2
start_t_disp = 4.5 #time[0] # start time of initially displayed window 
max_t_disp = xzoom_out_extent # max time in seconds to initially display
max_ch_disp = 20  # max channels to initially display
max_y_disp = np.min((max_ch_disp - 1.5, n_channels - 1.5))
subcoord_btm = -0.5 # auto lower xlim of first subcoord
clim_mul = 1 # color limit multiplier.. adjusts the levels on the minimap

annotation_elements = [hv.VSpan(row['start'], row['end']).opts(fill_color=row['color'], alpha=0.2, line_alpha=0) 
                       for _, row in annotations_df.iterrows()]
annotations_overlay = hv.Overlay(annotation_elements)

hover = HoverTool(tooltips=[
    ("Channel", "@channel"),
    ("Time", "$x s"),
    ("Amplitude", "$y µV")])

channel_curves = []
for channel, channel_data in zip(channels, data):
    ds = Dataset((time, channel_data, channel), ["Time", "Amplitude", "channel"])
    curve = hv.Curve(ds, "Time", ["Amplitude", "channel"], label=f'{channel}')
    curve.opts(color="black", line_width=1, subcoordinate_y=True, tools=[hover])
    channel_curves.append(curve)

eeg_viewer = (hv.Overlay(channel_curves, kdims="Channel") * annotations_overlay)
eeg_viewer = eeg_viewer.opts(
    xlabel="Time (s)", ylabel="Channel", show_legend=False,
    padding=0, aspect=1.5, responsive=True, shared_axes=False,
     #ylim does not work with subcoordinate_y
    # xlim=(start_t_disp, start_t_disp+max_t_disp), ylim=(subcoord_btm, subcoord_btm+max_y_disp),
    backend_opts={
        "y_range.start": subcoord_btm, # required as long as ylim doesn't work
        "y_range.end": subcoord_btm + max_y_disp, # required as long as ylim doesn't work
        "x_range.start": start_t_disp,
        "x_range.end": start_t_disp + max_t_disp,
        "x_range.bounds": (time.min(), time.max()), # absolute outer limits on pan/zoom
        "y_range.bounds": (0, len(channels)),
        "x_range.max_interval": xzoom_out_extent
    })

y_positions = range(len(channels))
yticks = [(i, ich) for i, ich in enumerate(channels)]
z_data = zscore(data, axis=1) # np.zeros_like(data)
# Does not currently work with rasterize on the minimap image
minimap = hv.Image((time, y_positions, z_data), ["Time (s)", "Channel"], "Amplitude (uV)")
minimap = minimap.opts(
    cmap="RdBu_r", colorbar=False, xlabel='',
    alpha=.3, yticks=[yticks[0], yticks[-1]],
    toolbar='disable', # needed to prevent zoom and pan behavior on image
    height=120, responsive=True, default_tools=[],
    clim=(-z_data.std()*clim_mul, z_data.std()*clim_mul))

RangeToolLink(minimap, eeg_viewer, axes=["x", "y"],
              boundsx=(start_t_disp, start_t_disp + max_t_disp), #required for reset behavior
              boundsy=(subcoord_btm, subcoord_btm + max_y_disp) #required for reset behavior
             )

eeg_app = (eeg_viewer + minimap * annotations_overlay).opts().cols(1)
eeg_app = pn.Column((eeg_viewer + minimap * annotations_overlay).cols(1), min_height=650)
eeg_app

## Alternate Approaches

### Visualize synthetic data. Approach: Offset, HoloViz

This approach makes a copy of the data with an **offset** to position the timeseries traces vertically stacked. The original copy of the data is used to provide accurate y-axis hover information over each trace.

Pros:
- Simpler on the backend, as your just plotting multiple timeseries on the same coordinate axis
- More amenable to existing datashader rendering
- Uses high-level HoloViz

Cons:
- More complicated on the front-end, as you have to manually create and control the offset data
- Makes a copy of the data
- Does not allow for independent dynamic y-axis scaling of individual traces without rerendering the whole canvas

In [None]:
if False:
    max_ch_disp = 15  # max channels to initially display
    max_t_disp = 10 # max time in seconds to initially display
    spacing = 5.5  # Spacing between channels
    clim_mul = 3.2 # color range multiplier for minimap. lower will saturate more.
    
    annotation_elements = [hv.VSpan(row['start'], row['end']).opts(fill_color=row['color'], alpha=0.2, line_alpha=0) 
                           for _, row in annotations_df.iterrows()]
    annotations_overlay = hv.Overlay(annotation_elements)
    
    # Create a hv.Curve element per chan
    channel_curves = []
    max_data = data.max()
     
    hover = HoverTool(tooltips=[
        ("Channel", "@channel"),
        ("Time", "$x s"),
        ("Amplitude", "@original_amplitude µV")])
    
    offset = np.std(data) * spacing
    for i, channel_data in enumerate(data):
        offset_data = channel_data + (i * offset)
        max_data = max(offset_data.max(), max_data) # update max
        ds = Dataset((time, offset_data, channel_data, channels[i]), ["Time", "Amplitude", "original_amplitude", "channel"])
        channel_curves.append(
            hv.Curve(ds, "Time", ["Amplitude", "original_amplitude", "channel"]).opts(
                color="black", line_width=1,
                tools=[hover, 'xwheel_zoom'], shared_axes=False))
    
    yticks = [(i * offset, ich) for i, ich in enumerate(channels)]
    
    # set maintain focus to False to allow independence for zoom out against a single hardbound
    # def set_maintain_focus(plot, element):
    #     wheel_zoom = plot.state.select(type=WheelZoomTool)
    #     if wheel_zoom:
    #         wheel_zoom[0].maintain_focus = False
            
    # Create an overlay of curves
    eeg_viewer = (annotations_overlay * hv.Overlay(channel_curves, kdims="Channel")).opts(
        padding=0, xlabel="Time (s)", ylabel="Channel", #default_tools=['hover', 'pan', 'box_zoom', 'save', 'reset'],
        yticks=yticks, show_legend=False, aspect=1.5, responsive=True,
        shared_axes=False, backend_opts={
            "x_range.bounds": (time.min(), time.max()),
            "y_range.bounds": (data.min(), max_data)})
    
    # Minimap
    y_positions, _ = zip(*yticks) # use positions of yticks for yaxis of minimap image
    z_data = zscore(data, axis=1)
    
    # fix rasterize
    minimap = hv.Image((time, y_positions, z_data), ["Time (s)", "Channel"], "Amplitude (uV)")
    minimap = minimap.opts(
        cmap="RdBu_r", colorbar=False, xlabel='', alpha=.8, yticks=[yticks[0], yticks[-1]],
        height=100, responsive=True, default_tools=[''], shared_axes=False, clim=(-z_data.std()*clim_mul, z_data.std()*clim_mul))
        
    # Create RangeToolLink between the minimap and the main EEG viewer 
    # (quirk: apply to just one eeg trace and it will apply to all. see HoloViews #4472)
    max_y_disp = np.max(data[max_ch_disp-1,:] + ((max_ch_disp-1) * offset))
    RangeToolLink(minimap, list(eeg_viewer.values())[0], axes=["x", "y"],
                  boundsx=(None, max_t_disp),
                  boundsy=(None, max_y_disp))
    
    # eeg_app = pn.Column((eeg_viewer + minimap * annotation).cols(1), min_height=650).servable(target='main', title='EEG Viewer with HoloViz and Bokeh')
    eeg_app = (eeg_viewer + minimap * annotations_overlay).cols(1)
    eeg_app

### Visualize synthetic data. Approach: Subcoords, Bokeh

This approach makes use of Bokeh's experimental 'subplot' functionality, which allows for nesting subcoordinate systems so that each timeseries is on its own y-axis.

Pros:
- Allows for independent dynamic y-axis scaling of individual traces
- Theoretically simpler on the front end (however, this is only really true if the offset of the subcoordinate axes are automatically handled, which is not yet true the Subcoords-Bokeh approach, but it is true of the Subcoords-HoloViews approach)

Cons:
- More complicated on the front and back-end, as you have to handle the renderers and zoom tool level manually
- Pure, lower-level Bokeh (maybe this is a pro for some)
- Requires Bokeh >=3.3

<div class="admonition alert alert-info">
    <p class="admonition-title" style="font-weight:bold">Requires Bokeh>=3.3</p>
    <p>...primarily for subplot zooming, but also fixes to minimap box annotation handling</p>
</div>


In [None]:
if False: 
    from scipy.stats import zscore
    
    from bokeh.core.properties import field
    from bokeh.io import show, output_notebook
    from bokeh.layouts import column, row
    from bokeh.models import (ColumnDataSource, CustomJS, Div, FactorRange, HoverTool,
                              Range1d, Switch, WheelZoomTool, ZoomInTool, ZoomOutTool, 
                              RangeTool)
    from bokeh.palettes import Category10
    from bokeh.plotting import figure
    from bokeh.models import FixedTicker
    
    output_notebook()
    
    # n_channels = 10
    # n_seconds = 100
    max_ch_disp = n_channels/2  # max channels to initially display
    max_t_disp = n_seconds/2 # max time in seconds to initially display
    # fs = 512 # Hz
    # total_samples = fs*n_seconds
    time = np.linspace(0, n_seconds, total_samples)
    data = np.random.randn(n_channels, total_samples).cumsum(axis=1)
    # channels = [f"EEG {i}" for i in range(n_channels)]
    
    hover = HoverTool(tooltips=[
        ("Channel", "$name"),
        ("Time", "$x s"),
        ("Amplitude", "$y µV"),
    ])
    
    x_range = Range1d(start=time.min(), end=time.max())
    y_range = Range1d(start=-0.5, end=len(channels) - 1 + 0.5)
    p = figure(x_range=x_range, y_range=y_range, height=500, width=800,
               x_axis_label='Time (s)',
               lod_threshold=None, tools="pan, reset")
    
    source = ColumnDataSource(data=dict(time=time))
    renderers = []
    
    for i, channel in enumerate(channels):
        
        xy = p.subplot(
            x_source=p.x_range,
            y_source=Range1d(start=data[i].min(), end=data[i].max()),
            x_target=p.x_range,
            y_target=Range1d(start=i - 0.5, end=i + 0.5),
        )
    
        source.data[channel] = data[i]
        line = xy.line(field("time"), field(channel), color='black', source=source, name=channel)
        renderers.append(line)
    
    ticks = list(range(len(channels)))
    p.yaxis.ticker = FixedTicker(ticks=ticks)
    p.yaxis.major_label_overrides = {i: f"EEG {i}" for i in ticks}
    
    level = 1
    
    ywheel_zoom = WheelZoomTool(renderers=renderers, level=level, dimensions="height")
    xwheel_zoom = WheelZoomTool(renderers=renderers, level=level, dimensions="width")
    yzoom_in = ZoomInTool(renderers=renderers, level=level, dimensions="height")
    yzoom_out = ZoomOutTool(renderers=renderers, level=level, dimensions="height")
    
    p.add_tools(ywheel_zoom, xwheel_zoom, yzoom_in, yzoom_out, hover)
    p.toolbar.active_scroll = ywheel_zoom
    
    z_data = zscore(data, axis=1)
    
    range_tool = RangeTool(x_range=p.x_range, y_range=p.y_range)
    range_tool.x_range.update(start=0, end=max_t_disp)
    range_tool.y_range.update(start=0, end=max_ch_disp)
    range_tool.overlay.fill_alpha = .8
    
    select = figure(height=120, width=800, tools="", toolbar_location=None, y_axis_type=None,
                    x_range=(time.min(), time.max()),
                    y_range=(-0.5, len(channels) - 1 + 0.5))
    select.image(image=[z_data], x=0, y=0, dw=n_seconds, dh=n_channels, palette="Sunset11")
    select.add_tools(range_tool)
    
    show(column(p, select))