In [2]:
from pathlib import Path
import allego_file_reader as afr

data_dir = r"C:\axorus\250117-PEV test" # data directory
p = Path(data_dir).expanduser()
all_xdat_datasource_names = [Path(elem.stem).stem for elem in list(p.glob('**/*xdat.json'))]

n_files = len(all_xdat_datasource_names)
print(f'detected {n_files} files')

detected 34 files


In [30]:
# Read an example file
file_i = 0
filename = all_xdat_datasource_names[file_i]
file_path = str(Path(data_dir, filename))

meta = afr.read_allego_xdat_metadata(file_path)
signals, timestamps, time_samples = afr.read_allego_xdat_all_signals(file_path,    None, None)
time_samples -= time_samples[0]
time_samples = time_samples * 1000  # convert to ms

In [67]:
# Extract metadatanames
import pandas as pd
channel_names = meta['sapiens_base']['biointerface_map']['chan_name']
sys_chan_idx = meta['sapiens_base']['biointerface_map']['sys_chan_idx']
channel_x = meta['sapiens_base']['biointerface_map']['site_ctr_x']
channel_y = meta['sapiens_base']['biointerface_map']['site_ctr_y']

channel_df = pd.DataFrame()
for i, chname in enumerate(channel_names):
    channel_df.at[chname, 'sys_chan_idx'] = sys_chan_idx[i]
    channel_df.at[chname, 'site_ctr_x'] = channel_x[i]
    channel_df.at[chname, 'site_ctr_y'] = channel_y[i]
    
channel_df.head()

Unnamed: 0,sys_chan_idx,site_ctr_x,site_ctr_y
pri_0,0.0,20.0,20.0
pri_1,1.0,30.0,20.0
pri_2,2.0,40.0,20.0
pri_3,3.0,50.0,20.0
pri_4,4.0,20.0,30.0


In [None]:
# Load and plot the digital 1 data
import utils
import numpy as np
import pandas as pd

# Get the digital in data
din_1_data = signals[chan_pointer["din_1"], :].flatten()

# Detect up slopes
up_idx = np.where(np.diff(din_1_data) == 1)[0]
down_idx = np.where(np.diff(din_1_data) == -1)[0]

burst_df = pd.DataFrame()

for b_i, (ui, di) in enumerate(zip(up_idx, down_idx)):
    burst_df.at[b_i, 'burst_onset'] = time_samples[ui]
    burst_df.at[b_i, 'burst_offset'] = time_samples[di]
    

fig_save_dir = Path(data_dir) / 'figures' / 'develop'


t0 = 15 * 1e3
t1 = 17 * 1e3

i0 = np.where(time_samples >= t0)[0][0]
i1 = np.where(time_samples < t1)[0][-1]

x_plot = time_samples[i0:i1]
y_plot = din_1_data[i0:i1]

fig = utils.simple_fig()
fig.add_scatter(x=x_plot, y=y_plot,
                mode='lines', line=dict(color='black'),
                showlegend=False)

bdf_plot = burst_df.query(f'burst_onset > {t0} and burst_offset < {t1}')
for i, r in bdf_plot.iterrows():
    x_plot = [r.burst_onset, r.burst_onset]
    y_plot = [0, 1]

    fig.add_scatter(x=x_plot, y=y_plot,
                mode='lines', line=dict(color='green'),
                showlegend=False)

    x_plot = [r.burst_offset, r.burst_offset]
    y_plot = [0, 1]

    fig.add_scatter(x=x_plot, y=y_plot,
                mode='lines', line=dict(color='red'),
                showlegend=False)
fig.show()


In [43]:
# downsample data
n_data = time_samples.size
sample_idx = np.arange(0, n_data, 30)
time_samples_ds = time_samples[sample_idx]
signals_ds = signals[:, sample_idx]

(39, 1852672)

In [48]:
time_samples[:5]

array([0.        , 0.03333333, 0.06666667, 0.1       , 0.13333333])

In [69]:
# Plot single channel VEP
# chname = 'pri_0'

all_vep_data = {}
for chname, chinfo in channel_df.iterrows():
    chdata = signals_ds[int(chinfo.sys_chan_idx), :].flatten()
    
    t_pre = 100
    t_post = 200
    n_samples = (t_pre + t_post)
    n_bursts = burst_df.shape[0]
    
    vep = np.zeros((n_bursts, n_samples))
    
    for burst_i, burst_info in burst_df.iterrows():
        burst_onset = burst_info.burst_onset
        idx = np.where((time_samples_ds >= burst_onset - t_pre) & (time_samples_ds < burst_onset + t_post))[0]   
        vep[burst_i, :] = chdata[idx]
        
    all_vep_data[chname] = np.mean(vep, axis=0)
        
    # fig = utils.simple_fig(width=1, height=1)
    # x = np.linspace(t_pre, t_post, n_samples)
    # 
    # for i in range(vep.shape[0]):
    #     y = vep[i, :].flatten()
    #     fig.add_scatter(x=x, y=y,
    #                     mode='lines', line=dict(color='black', width=0.1),
    #                     showlegend=False)
    #     
    # y = np.mean(vep, axis=0)
    # fig.add_scatter(x=x, y=y,
    #                 mode='lines', line=dict(color='red', width=2),
    #                 showlegend=False)
    # 
    # fig.update_yaxes(
    #     range=[-700, 700]
    # )
    # 
    # fig.show()
    
# print(vep[:3, :])
    

In [80]:
ec_xmin = channel_df.site_ctr_x.min()
ec_xmax = channel_df.site_ctr_x.max()
ec_ymin = channel_df.site_ctr_y.min()
ec_ymax = channel_df.site_ctr_y.max()

for i, r in channel_df.iterrows():
    channel_df.at[i, 'plot_row'] = r.site_ctr_x / 10
    channel_df.at[i, 'plot_col'] = r.site_ctr_y / 10
    
n_rows = int(channel_df.plot_row.max()) + 1
n_cols = int(channel_df.plot_col.max()) + 1

x_offset = 0.05
y_offset = 0.05
x_spacing = 0.05
y_spacing = 0.05

x_width = (1 - 2 * x_offset - (n_cols * x_spacing)) / n_cols
y_height = (1 - 2 * y_offset - (n_rows * y_spacing)) / n_rows

x_domains = {}
y_domains = {}

for row_i in range(n_rows):
    x_domains[row_i+1] = []
    y_domains[row_i+1] = []
    
    for col_j in range(n_cols):
        x0 = x_offset + col_j * (x_spacing + x_width)
        x_domains[row_i+1].append([x0, x0 + x_width])
        
        y0 = 1 - y_offset - row_i * (y_spacing + y_height) - y_height
        y_domains[row_i+1].append([y0, y0+y_height])
        
fig = utils.make_figure(
    width=2, height=2,
    x_domains=x_domains,
    y_domains=y_domains,
)

for i, r in channel_df.iterrows():
    pos = dict(row=int(r.plot_row+1), col=int(r.plot_col+1))
    y = all_vep_data[i]
    
    fig.add_scatter(y=y, mode='lines', line=dict(color='black', width=1),
                    showlegend=False, **pos)
    
fig.show()
    


In [None]:
#  Read stimulation 