# Signal

notebook for exploring emg signal quality

there are three ways to select an npz to analyze

1. specifying the local path
2. using the upload widget 
3. running from the [Runner Notebook](runner.ipynb)

In [None]:
import json
import logging

import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from IPython.display import display
from ipywidgets import Layout, interact_manual

import cionic
from cionic import tools, triggers

%matplotlib widget
%load_ext autoreload
%autoreload 2

## Parameters


These parameters will be overridden when this notebook is used with the runner

```
npzpath     path on your local filesystem to find the recording or path
            or path on the local filesystem to store the recording if download set
            
download    xid of collection to download

tokenpath   path to auth token if download is set

```

In [None]:
# param

npzpath = None
download = None
tokenpath = None
datapath = None

In [None]:
if datapath:
    cionic.auth(tokenpath=tokenpath)
    collection_data = cionic.get_cionic(datapath)
    print(json.dumps(collection_data.get("meta", {}).get("sensor_stats"), indent=4))

In [None]:
if download:
    cionic.auth(tokenpath=tokenpath)
    cionic.download_npz(npzpath, download)

try:
    npz = np.load(npzpath)
except FileNotFoundError:
    logging.error(f"npzpath [{npzpath}] not found")

if 'position' in npz['segments'].dtype.names:
    print(f"Positions: {set(npz['segments']['position'])}")

if 'stream' in npz['segments'].dtype.names:
    print(f"Stream: {set(npz['segments']['stream'])}")

## Interface

The following elements can be edited via the interface

1. which stream to examine
2. which filter to run on the signal
3. the cutoff frequency of that filter
4. window size for rms
5. fft or cdf


In [None]:
emg_filter_butter = {
    'filter': tools.butter_highpass_filter,
    'filter_order': 5,
}

emg_filter_fir = {
    'filter': tools.fir_filter,
    'taps_n': 63,
}

emg_filter_none = {'filter': None}

filters = {'off': emg_filter_none, 'butter': emg_filter_butter, 'fir': emg_filter_fir}

streams = [
    x
    for x in set(npz['segments']['stream'])
    if not x.startswith('shtp_')  # shtp_ streams should be redundant
    and x not in ['frsp', 'regs', 'charge']  # register streams
]
regs_dict = tools.stream_regs(npz)
data_dict, times, components = tools.stream_data(npz, streams=streams, degrees=False)

# Add joints
joint_dict, joint_component_list = tools.get_stream_data_joints(npz, streams=streams)
data_dict.update(joint_dict)
components.extend(joint_component_list)

# Add stims
stim_streams = triggers.get_action_muscles(npz)
components.extend(stim_streams)

boundaries = cionic.load_boundary_times(npz)
# construct dictionary keyed on "<label> <segment>"
gwlabels = {f"{b['label']} {b['segment']}": b for b in boundaries}
lastlabel = 'ALL'
dataLabels = list(gwlabels.keys())
dataLabels.sort(key=lambda name: int(name.split()[1]))
labels = [lastlabel] + dataLabels

w = '400px'
sliderLayout = Layout(width=w)
dropdownLayout = Layout(width=w, height='200px')

sp = widgets.SelectMultiple(
    options=components, description="streams", layout=dropdownLayout
)
rs = widgets.FloatRangeSlider(
    min=times[0],
    max=times[1],
    value=times,
    continuous_update=False,
    description="times",
    layout=sliderLayout,
)
fd = widgets.Dropdown(options=filters.keys(), description="filter")
rt = widgets.BoundedIntText(value=300, min=50, max=200000, step=50)
ct = widgets.BoundedIntText(value=10, min=0, max=150, step=5)
a1 = widgets.BoundedIntText(
    value=1, min=0, max=100000, step=1000, description="axis 1 scale"
)
a2 = widgets.BoundedIntText(
    value=10000, min=0, max=100000, step=1000, description="axis 2 scale"
)
title = widgets.Text(
    value='', placeholder='Enter title here', description='plot title:', disabled=False
)
xlabel = widgets.Text(
    value='',
    placeholder='Enter xlabel here',
    description='x-axis label:',
    disabled=False,
)
ylabel = widgets.Text(
    value='',
    placeholder='Enter ylabel here',
    description='y-axis label:',
    disabled=False,
)
color = widgets.Text(
    value='',
    placeholder='This will override plot colors.',
    description='plot color:',
    disabled=False,
)
ylim_min = widgets.FloatText(value=0, description='ylim min', disabled=False)
ylim_max = widgets.FloatText(value=0, description='ylim max', disabled=False)
plot_same = widgets.Checkbox(value=False, description='Same Plot?')
ncols = widgets.IntText(value=1, description='# columns', disabled=False)
legend_loc = widgets.Dropdown(
    options=[
        'best',
        'upper right',
        'upper left',
        'lower right',
        'lower left',
        'upper center',
        'lower center',
    ],
    description="legend location",
)
plot_sharex = widgets.Checkbox(value=True, description='Share x?')
show_filter_plots = widgets.Checkbox(value=False, description='Show RMS/filter plots')


@interact_manual
def show_emg(
    stream=sp,
    times=rs,
    label=labels,
    style=['-', 'o'],
    filter_plots_toggle=show_filter_plots,
    filt=fd,
    cutoff=ct,
    rms=rt,
    fft=['db', 'cdf'],
    a1_scale=a1,
    a2_scale=a2,
    plot_title=title,
    plot_xlabel=xlabel,
    plot_ylabel=ylabel,
    plot_color=color,
    plot_min=ylim_min,
    plot_max=ylim_max,
    legend_location=legend_loc,
    plot_on_same=plot_same,
    sharex=plot_sharex,
    plot_ncols=ncols,
):

    global lastlabel
    if label != lastlabel:
        gwtimes = gwlabels.get(label, {'start_s': rs.min, 'end_s': rs.max})
        times = (gwtimes['start_s'], gwtimes['end_s'])
        rs.value = times
        lastlabel = label

    # close all the plots
    plt.close('all')

    # adjust filter parameters
    fil = filters[filt]
    fil['cutoff_freq'] = cutoff

    # Override xaxis and yaxis labels
    # NOTE: Not to be used with "Same Plot?" checked - this will not work
    # NOTE: length of these lists MUST be the same as the number of streams selected
    # plot_xlabel = [
    #     'This is the first xlabel',
    #     'This is the second',
    #     'This is the third'
    #     ]
    # plot_ylabel = [
    #     'This is the first ylabel',
    #     'This is the second',
    #     'This is the third'
    # ]

    # pull out the stim triggers to plot as fills
    (actions, stream) = triggers.check_for_action(stream)
    stims = triggers.compute_stims(npz, actions, times=times)

    # plot filtered signal, rms, and fft
    signals, legends = tools.compute_signals(
        data_dict, regs_dict, stream, times, fil, rms, fft, [a1_scale, a2_scale]
    )
    tools.configurable_plot(
        signals['sig'],
        leg_contents=legends['sig'],
        title=plot_title,
        xlabel=plot_xlabel,
        ylabel=plot_ylabel,
        ylim=[ylim_min.value, ylim_max.value],
        color=plot_color,
        style=style,
        ncols=plot_ncols,
        same_plot=plot_on_same,
        legend_loc=legend_location,
        sharex=sharex,
        shades=stims,
    )
    if filter_plots_toggle:
        tools.simple_plot(
            signals['rms'],
            leg_contents=legends['rms'],
            title='RMS Signal - Filter: %s, Window: %.0d' % (filt, rms),
        )
        tools.simple_plot(
            signals['fft'],
            y_column='Hz',
            leg_contents=legends['fft'],
            title='FFT -  Filter: %s, FFT: %s' % (filt, fft),
        )

In [None]:
# print out segments table
pd.DataFrame(npz['segments'])

In [None]:
# print out any saved impedance values
tools.stream_impedances(npz)

In [None]:
# print out regs
for device, regs in regs_dict.items():
    print(device)
    display(regs)