In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import polars as pl
import json
from polars import selectors as cs
from utils import *
import os
import duckdb



import rcs_cataloging as rcc # --> probably need to build first

In [3]:
def get_td_data(td_filepath):
    with open(td_filepath, 'r') as file:
        data = json.load(file)
    return data[0]['TimeDomainData']

In [13]:
def td_postprocessing(df_td):
    df_td = df_td.with_columns(
    [
        pl.col('samplerate').replace({0: 250, 1: 500, 2: 1000}),
        pl.col('key0').list.len().alias('packetsizes'),
    ]
).explode(pl.col('^key.*$')).with_columns(
        pl.when( (pl.col('systemTick').shift(-1) - pl.col('systemTick')) == 0).then(pl.lit(None)).otherwise(pl.col('systemTick')).alias('systemTick'),
        #pl.when( (pl.col('samplerate').shift(-1) - pl.col('samplerate')) == 0).then(pl.lit(None)).otherwise(pl.col('samplerate')).alias('samplerate'),
        pl.when( (pl.col('PacketRxUnixTime').shift(-1) - pl.col('PacketRxUnixTime')) == 0).then(pl.lit(None)).otherwise(pl.col('PacketRxUnixTime')).alias('PacketRxUnixTime'),
        pl.when( (pl.col('dataTypeSequence').shift(-1) - pl.col('dataTypeSequence')) == 0).then(pl.lit(None)).otherwise(pl.col('dataTypeSequence')).alias('dataTypeSequence'),
    ).with_columns(
        pl.when(pl.col('systemTick').is_not_null()).then('PacketGenTime').otherwise(pl.lit(None)).alias('PacketGenTime'),
        pl.when(pl.col('systemTick').is_not_null()).then('timestamp').otherwise(pl.lit(None)).alias('timestamp'),
        pl.when(pl.col('systemTick').is_not_null()).then('packetsizes').otherwise(pl.lit(None)).alias('packetsizes'),
    )
    return assignTime(df_td).rename({'samplerate': 'SampleRateInHz'})

In [5]:
def get_accel_data(accel_filepath):
    with open(accel_filepath, 'r') as file:
        data = json.load(file)
    return data[0]['AccelData']

In [6]:
def accel_postprocessing(df_accel):
    df_accel = df_accel.with_columns(
        pl.col('samplerate').replace({0: 65.104}),
        pl.col('XSamples').list.len().alias('packetsizes'),
    ).explode(pl.col('^*Samples.*$')).with_columns(
        pl.when( (pl.col('systemTick').shift(-1) - pl.col('systemTick')) == 0).then(pl.lit(None)).otherwise(pl.col('systemTick')).alias('systemTick'),
        #pl.when( (pl.col('samplerate').shift(-1) - pl.col('samplerate')) == 0).then(pl.lit(None)).otherwise(pl.col('samplerate')).alias('samplerate'),
        pl.when( (pl.col('PacketRxUnixTime').shift(-1) - pl.col('PacketRxUnixTime')) == 0).then(pl.lit(None)).otherwise(pl.col('PacketRxUnixTime')).alias('PacketRxUnixTime'),
        pl.when( (pl.col('dataTypeSequence').shift(-1) - pl.col('dataTypeSequence')) == 0).then(pl.lit(None)).otherwise(pl.col('dataTypeSequence')).alias('dataTypeSequence'),
    ).with_columns(
        pl.when(pl.col('systemTick').is_not_null()).then('PacketGenTime').otherwise(pl.lit(None)).alias('PacketGenTime'),
        pl.when(pl.col('systemTick').is_not_null()).then('timestamp').otherwise(pl.lit(None)).alias('timestamp'),
        pl.when(pl.col('systemTick').is_not_null()).then('packetsizes').otherwise(pl.lit(None)).alias('packetsizes'),
    )
    
    return assignTime(df_accel).rename({'XSamples': 'accel_XSamples', 'YSamples': 'accel_YSamples', 'ZSamples': 'accel_ZSamples', 'samplerate': 'accel_RateInHz'})    

In [18]:
def epoch_time_series(df, epoch_size='1s'):
    df = df.group_by_dynamic('localTime', every=epoch_size, period=epoch_size, start_by='datapoint').agg(
            [
                pl.col('localTime').alias('localTime_vec'),
                pl.col('DerivedTime'),
                pl.col('channel_0'),
                pl.col('channel_1'),
                pl.col('channel_2'),
                pl.col('channel_3'),
                pl.col('localTime_vec').count().alias('epoch_length'),
                pl.col('SampleRateInHz'),
                pl.col('accel_XSamples'),
                pl.col('accel_YSamples'),
                pl.col('accel_ZSamples'),
                # Include accel sample rate
                pl.col('timestamp').mode(),
                pl.col('PacketGenTime').mode(),
                pl.col('PacketRxUnixTime').mode().alias('HostUnixTime'),
            ]
        ).filter(
            # Remove epochs with more than 1 sample rate
            pl.col('SampleRateInHz').list.len() < 2,
            # Remove incomplete epochs
            pl.col('epoch_length').is_in([125, 250, 500])
        ).with_columns(
            pl.col('SampleRateInHz').list.first().alias('SampleRateInHz'),
        )
        
    return df

In [19]:
# Double check how this looks when in adaptive and when in groups A-C
def add_stimulation_parameters(df, settings, adapt):
    if adapt is not None:
        df = df.join_asof(adapt.select(
                'StimRateInHz', 'AmplitudeInMilliamps'
            ), 
            on='HostUnixTime', how='left').join_asof(settings.select(
                'PulseWidthInMicroseconds'
            ),
            on='HostUnixTime', how='left')
    else:
        df = df.join_asof(settings.select(
                'StimRateInHz', 'AmplitudeInMilliamps', 'PulseWidthInMicroseconds'
            ), 
            on='HostUnixTime', how='left')
    
    return df.with_columns(
        pl.col('StimRateInHz').backward_fill(),
        pl.col('AmplitudeInMilliamps').backward_fill(),
        pl.col('PulseWidthInMicroseconds').backward_fill(),
    )
    

In [20]:
def add_session_tables(directory, epoch_size='500ms'):
    assert epoch_size == '500ms', 'Only 500ms epoch size is supported at the moment, because of filtering on epoch size.'
    
    # Cycle sessions
    for root, dirs, files in os.walk(directory):
        for subdir in dirs:
            subdirectory_path = os.path.join(root, subdir)
            # Get Device directory
            for subroot, subdirs, subfiles in os.walk(subdirectory_path):
                for subsubdir in subdirs:
                    if 'Device' in subsubdir:
                        device_subdirectory_path = os.path.join(subroot, subsubdir)
                        # Cycle through the files in the device subdirectory, get corresponding tables
                        
                        # Time Domain Data
                        td_data = get_td_data(os.path.join(device_subdirectory_path, 'RawDataTD.json'))
                        df_td = rcc.loop_and_table_td_data(td_data)
                        df_td = td_postprocessing(df_td)
                        
                        # Accelerometery Data
                        accel_data = get_accel_data(os.path.join(device_subdirectory_path, 'RawDataAccel.json'))
                        df_accel = rcc.loop_and_table_accel_data(accel_data)
                        df_accel = accel_postprocessing(df_accel)
                        
                        # Aggregate TD and Accel data
                        accel_samplerate = df_accel.select(pl.col('accel_RateInHz').unique()).item()
                        df = df_td.sort('DerivedTime').join_asof(
                            df_accel.sort('DerivedTime').select([
                                pl.col('DerivedTime'),
                                pl.col('accel_RateInHz'),
                                pl.col('^.*Samples.*$')
                            ]),
                            on='DerivedTime',
                            strategy='nearest',
                            tolerance=1000/accel_samplerate,
                        ).with_columns(
                            pl.from_epoch(pl.col('DerivedTime'), time_unit='ms').dt.convert_time_zone('America/Los_Angeles').alias('localTime'),
                        )
                        
                        # Epoch Time Series data
                        df = epoch_time_series(df, epoch_size=epoch_size)
                        
                        # Stim Settings
                        settings = process_device_settings(json.load(open(os.path.join(device_subdirectory_path, 'Settings.json'))))
                        # Clean up settings table
                        cols = settings.columns[2:]
                        settings_col = cs.by_name(*cols)
                        settings.filter(~pl.all_horizontal(settings_col.is_null())).rename({'RateInHz': 'StimRateInHz'})
                        
                        # Event log
                        event_log = parse_event_log(json.load(open(os.path.join(device_subdirectory_path, 'EventLog.json'))))
                        
                        # Adaptive Table
                        df_adaptive = parse_adaptive_log(json.load(open(os.path.join(device_subdirectory_path, 'AdaptiveLog.json'))))
                        
                        # Add stimulation parameters (Hz, Amp, pulse width) to the epoch table
                        df = add_stimulation_parameters(df, settings, df_adaptive)
                        
                        return df, settings, event_log, df_adaptive
    return None

In [21]:
root_path = '/media/dropbox_hdd/Starr Lab Dropbox/RC+S Patient Un-Synced Data/RCS02 Un-Synced Data/SummitData/SummitContinuousBilateralStreaming/RCS02L'

In [22]:
df, settings, event_log, df_adaptive = add_session_tables(root_path)

ColumnNotFoundError: channel_0

Error originated just after this operation:
DF ["index", "timestamp", "PacketGenTime", "PacketRxUnixTime"]; PROJECT */20 COLUMNS; SELECTION: "None"

In [8]:
rcs_nums = ['0' + str(i) for i in range(1, 10)] + [str(i) for i in range(10, 21)]
side = ['L', 'R']

['01',
 '02',
 '03',
 '04',
 '05',
 '06',
 '07',
 '08',
 '09',
 '10',
 '11',
 '12',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19']