# RCS simulation tutorial

This tutorial takes you through one example implementation of the rcssim package. It uses a dataset collected using the Medtronic RC+S in a benchtop setting with controlled inputs acting as the LFP signals. The device was programmed to perform adaptive stimulation to provide a walkthrough for all stages of the processing pipeline, from time-domain data all the way to stimulation outputs.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time
from ast import literal_eval
from tkinter import Tk, filedialog

import os

os.chdir('D:/TannerDixon/git/weill-neurohub/rcs-simulation')
from rcssim import rcs_sim as rcs

## Load data

First let's load in the data and display it. When the file explorer opens, please input the directory containing the sample data and settings files.

In [None]:
# Open file dialog for selecting neural data (.csv)
root = Tk()
root.withdraw()
root.call('wm', 'attributes', '.', '-topmost', True)

# print('Select left hemisphere .csv file.')
# left_file_name = filedialog.askopenfilename(multiple=False)

print('Select folder containing data and settings files.')
data_folder = filedialog.askdirectory()

root.destroy()
# left_neural_data = pd.read_csv(left_file_name)
data_meas = pd.read_csv(data_folder + '/dataset_ld7.csv')
settings = pd.read_csv(data_folder + '/dataset_ld7_config.csv')
amp_gains = np.genfromtxt(data_folder + '/amp_gains.csv', 
                          delimiter=',').astype(int)

settings['band_edges_hz'] = settings['band_edges_hz'].apply(literal_eval)
settings['subtract_vec'] = settings['subtract_vec'].apply(literal_eval)
settings['multiply_vec'] = settings['multiply_vec'].apply(literal_eval)
settings['update_rate'] = settings['update_rate'].apply(literal_eval)
settings['weights'] = settings['weights'].apply(literal_eval)
settings['dual_threshold'] = settings['dual_threshold'].apply(literal_eval)
settings['threshold'] = settings['threshold'].apply(literal_eval)
settings['blank_duration'] = settings['blank_duration'].apply(literal_eval)
settings['onset'] = settings['onset'].apply(literal_eval)
settings['termination'] = settings['termination'].apply(literal_eval)
settings['blank_both'] = settings['blank_both'].apply(literal_eval)
settings['target_amp'] = settings['target_amp'].apply(literal_eval)

In [None]:
print('Measured data')
data_meas

In [None]:
print('Settings')
settings

## TD &rarr; PB

Next, let's use the rcssim code to estimate the two channels of Power Band outputs that were recorded on the device. As you can see in the settings above, the two Power Bands were collected using different Time-Domain channels, but both were tracking 30Hz input signals.

This process typically follows three main steps:
1. Convert the Time-Domain outputs of the device from mV to the internal RC+S unit representation using `transform_mv_to_rcs()`
2. Compute the full-spectrum FFT outputs given all the FFT parameters and the function `td_to_fft()`
3. Specify the frequency range of your intended Power Bands for computing power signals from the FFT outputs using `fft_to_pb()`. Not that this includes more than just summing across the required FFT output bins.

In [None]:
data_sim = data_meas[['timestamp', 'td1', 'td2']].copy()
data_sim['pb1'] = np.nan*np.ones(np.shape(data_sim['td1']))
data_sim['pb2'] = np.nan*np.ones(np.shape(data_sim['td1']))

t_start = time.time()
hann_win = rcs.create_hann_window(settings.fft_size[0], percent=100)

# Compute power band for the first time-domain channel
data_td = rcs.transform_mv_to_rcs(data_sim['td1'].values, amp_gains[0])
data_fft, t_pb = rcs.td_to_fft(data_td, 
                               data_sim['timestamp'].values, settings.fs_td[0], 
                               settings.fft_size[0], settings.interval[0], 
                               hann_win, output_in_mv=False)
data_pb = rcs.fft_to_pb(data_fft, settings.fs_td[0], settings.fft_size[0], 
                        settings.bit_shift[0], 
                        band_edges_hz=settings.band_edges_hz[0][0][1], 
                        input_is_mv=False)
pb_sample_mask = np.isin(data_sim.timestamp, t_pb)
data_sim.loc[pb_sample_mask,'pb1'] = data_pb

# Compute power band for the second time-domain channel
data_td = rcs.transform_mv_to_rcs(data_sim['td2'].values, amp_gains[1])
data_fft, t_pb = rcs.td_to_fft(data_td, 
                               data_sim['timestamp'].values, settings.fs_td[0], 
                               settings.fft_size[0], settings.interval[0], 
                               hann_win, output_in_mv=False)
data_pb = rcs.fft_to_pb(data_fft, settings.fs_td[0], settings.fft_size[0], 
                        settings.bit_shift[0], 
                        band_edges_hz=settings.band_edges_hz[0][1][1], 
                        input_is_mv=False)
pb_sample_mask = np.isin(data_sim.timestamp, t_pb)
data_sim.loc[pb_sample_mask,'pb2'] = data_pb

t_end = time.time()
print('Time elapsed: ' + str(t_end-t_start))

Now let's plot our results. The "Simulated" signals are those that we computed offline from the Time-Domain data, and the "Measured" signals are those that were output by the device's online computations.

In [None]:
%matplotlib widget
fig, ax = plt.subplots(2,1, figsize=(6,3), sharex='col', sharey=False)

ax[0].plot(data_meas.loc[~np.isnan(data_meas.pb1), 'timestamp'], 
           data_meas.loc[~np.isnan(data_meas.pb1), 'pb1'], label='Measured')
ax[0].plot(data_sim.loc[~np.isnan(data_sim.pb1), 'timestamp'], 
           data_sim.loc[~np.isnan(data_sim.pb1), 'pb1'], label='Simulated')
ax[0].legend(bbox_to_anchor=(1.02, 0.6))

ax[1].plot(data_meas.loc[~np.isnan(data_meas.pb1), 'timestamp'], 
           data_meas.loc[~np.isnan(data_meas.pb1), 'pb2'], label='Measured')
ax[1].plot(data_sim.loc[~np.isnan(data_sim.pb1), 'timestamp'], 
           data_sim.loc[~np.isnan(data_sim.pb1), 'pb2'], label='Simulated')

ax[0].grid()
ax[1].grid()
ax[1].set_xlabel('Time [sec]')
ax[0].set_ylabel('PB1 Output \n [RCS units]')
ax[1].set_ylabel('PB2 Output \n [RCS units]')

plt.tight_layout()

Pretty good!

## PB &rarr; LD &rarr; State &rarr; Stim

Now let's move on to the detector and stimulation output portions of the device's operations. Several sub-processes will be combined in the next code cell. Here is an outline of what's going on under the hood, but please look in at the actual code as well:

1. Power band outputs are uniquely combined to generate the two LD signals using `pb_to_ld()`
2. The two LD signals are subjected to logical operations (thresholded, hold times, etc) to assign the moment-by-moment state using `ld_to_state()`. The two LD's are semi-independent (depending on parameters) and each occupies one of three different states. These are then combined to create a single LD state that can take nine (3x3) different state values.
3. The state determines the stimulation outputs according to a state table with target stimulation amplitudes for each state and specified stimulation ramp rates. This is computed using `state_to_stim()`.

In [None]:
data_sim['ld1'] = np.nan*np.ones(np.shape(data_sim['td1']))
data_sim['ld2'] = np.nan*np.ones(np.shape(data_sim['td1']))
data_sim['state'] = np.nan*np.ones(np.shape(data_sim['td1']))
data_sim['stim'] = np.nan*np.ones(np.shape(data_sim['td1']))

t_start = time.time()

# Compute continuous-valued LD output
ld_output, t_ld, update_tbl = rcs.pb_to_ld(
                        data_sim.loc[~np.isnan(data_sim['pb1']), 
                                      ['pb1', 'pb2']].values, 
                        data_sim.loc[~np.isnan(data_sim['pb1']), 
                                      'timestamp'].values, 
                        update_rate=settings.update_rate[0], 
                        weights=settings.weights[0],
                        subtract_vec=settings.subtract_vec[0], 
                        multiply_vec=settings.multiply_vec[0])

# Compute the state changes
state, t_state, ld_output = rcs.ld_to_state(ld_output, update_tbl, 
                                   data_sim.loc[~np.isnan(data_sim['pb1']), 
                                                 'timestamp'].values, 
                                   update_rate=settings.update_rate[0], 
                                   dual_threshold=settings.dual_threshold[0], 
                                   threshold=settings.threshold[0], 
                                   onset_duration=settings.onset[0], 
                                   termination_duration=settings.termination[0], 
                                   blank_duration=settings.blank_duration[0], 
                                   blank_both=settings.blank_both[0])
# t_state, idx = np.unique(t_state, return_index=True)
# state = state[idx]

# Compute the stim changes
rise_time = rcs.transform_ramp_rate_int_to_mas(settings.rise_time[0])
fall_time = rcs.transform_ramp_rate_int_to_mas(settings.fall_time[0])
stim, t_stim = rcs.state_to_stim(state, t_state, 
                                 target_amp=settings.target_amp[0], 
                                 rise_time=rise_time, 
                                 fall_time=fall_time)
# t_stim = np.around(np.around(t_stim/0.002)*0.002, 3)
# t_stim, idx = np.unique(t_stim, return_index=True)
# stim = stim[idx]


t_end = time.time()
print('Time elapsed: ' + str(t_end-t_start))


# # Log the simulated data
# ld1_sample_mask = np.isin(data_sim.timestamp.values, t_ld[0])
# data_sim.loc[ld1_sample_mask,'ld1'] = ld_output[0]

# ld2_sample_mask = np.isin(data_sim.timestamp.values, t_ld[1])
# data_sim.loc[ld2_sample_mask,'ld2'] = ld_output[1]

# state_sample_mask = np.isin(data_sim.timestamp, t_state)
# data_sim.loc[state_sample_mask,'state_iso'] = state

# stim_sample_mask = np.isin(data_sim.timestamp, t_stim)
# data_sim.loc[stim_sample_mask,'stim_iso'] = stim

In [None]:
# %matplotlib widget
fig, ax = plt.subplots(4,1, figsize=(6,6), sharex='col', sharey=False)

ax[0].plot(data_meas.loc[~np.isnan(data_meas.ld1), 'timestamp'], 
           data_meas.loc[~np.isnan(data_meas.ld1), 'ld1'], label='Measured')
# ax[0].plot(data_sim.loc[~np.isnan(data_sim.ld1), 'timestamp'], 
#            data_sim.loc[~np.isnan(data_sim.ld1), 'ld1'], 
#            label='Simulated')
ax[0].plot(t_ld[0], ld_output[0], 
           label='Simulated')
ax[0].axhline(settings.threshold[0][0][0], color='r')
ax[0].axhline(settings.threshold[0][0][1], color='r')
ax[0].legend(bbox_to_anchor=(1.02, 0.6))

ax[1].plot(data_meas.loc[~np.isnan(data_meas.ld2), 'timestamp'], 
           data_meas.loc[~np.isnan(data_meas.ld2), 'ld2'], label='Measured')
# ax[1].plot(data_sim.loc[~np.isnan(data_sim.ld2), 'timestamp'], 
#            data_sim.loc[~np.isnan(data_sim.ld2), 'ld2'], 
#            label='Simulated')
ax[1].plot(t_ld[1], ld_output[1], 
           label='Simulated')
ax[1].axhline(settings.threshold[0][1][0], color='r')
ax[1].axhline(settings.threshold[0][1][1], color='r')

ax[2].plot(data_meas.loc[~np.isnan(data_meas.state), 'timestamp'], 
           data_meas.loc[~np.isnan(data_meas.state), 'state'], label='Measured')
# ax[2].plot(data_sim.loc[~np.isnan(data_sim.state_iso), 'timestamp'], 
#            data_sim.loc[~np.isnan(data_sim.state_iso), 'state_iso'], 
#            label='Simulated')
ax[2].plot(t_state, state, 
           label='Simulated')
ax[2].set_yticks(np.arange(9))
ax[2].set_yticklabels(['0'] + ['']*7 + ['8'])

ax[3].plot(data_meas.loc[~np.isnan(data_meas.stim), 'timestamp'], 
           data_meas.loc[~np.isnan(data_meas.stim), 'stim'], label='Measured')
# ax[3].plot(data_sim.loc[~np.isnan(data_sim.stim_iso), 'timestamp'], 
#            data_sim.loc[~np.isnan(data_sim.stim_iso), 'stim_iso'], 
#            label='Simulated')
ax[3].plot(t_stim, stim, 
           label='Simulated')

for i in range(4):
    ax[i].grid()
ax[3].set_xlabel('Time [sec]')
ax[0].set_ylabel('LD1 Output \n [a.u]')
ax[1].set_ylabel('LD2 Output \n [a.u.]')
ax[2].set_ylabel('LD State')
ax[3].set_ylabel('Stimulation \n amplitude [mA]')

plt.tight_layout()

And there we have it! The complete signal processing and logic operations of the Medtronic RC+S, at work in an in silico simulation. This obviates the need for recollecting datasets just to know what would have happened under different programming configurations. It is particularly useful for embedding within data-driven approaches for optimizing device programming.

For further investigation of the package, please feel free to look at the testing notebooks. These include a thorough set of unit testing for each of the device functions. This will help you better understand just how accurate the offline simulation can be in different use cases, and has some added information about certain limitations.

We hope that this tool will be useful to you and ultimately empower research for improving the lives of real people. Please feel free to contact the authors and/or contribute to the code if you find any areas for improvement.