In [None]:
# ===================================================================== #
# pmap/share/ptsd.py -- shared routines for physiologic time series data
#                 extraction
#
# History
#  - 9/10/2020 created, jpb
#  - 3/1/2022 reorganized into ptsd dir, split into util/saver, jpb
#  - 3/15/2022 move internal routines to util_int.py, jpb
#  - 9/15/2022 fix zarr_save_waveform for ZARR 2.11.0
#      [1] clobber=True arg to normalize_store_arg replaced w/mode='w'
#  - 10/31/2022 (Happy Halloween!) moved into pmap/share/ptsd.py
# ===================================================================== #

In [None]:
from pathlib import Path
import numpy as np
import pandas as pd

from dateutil import tz

import zarr
import numcodecs
from zarr.creation import normalize_store_arg, open_array
from zarr.hierarchy import group as _create_group

In [None]:
def zarr_save_waveform(store, df):
    may_need_closing = isinstance(store, str)
    store = normalize_store_arg(store, mode='w')       # [1] for ZARR 2.11.0
    # store = normalize_store_arg(store, clobber=True) # pre ZARR 2.11.0
    
    fmt = 'v2'
    #if df.columns[0] == 'ts' and df.columns[1] == 'dt':
    #    fmt = 'v1'

    info = {'fmt': fmt}

    if fmt == 'v1':
        info['cols'] = df.columns.to_list()[2:]
    elif fmt == 'v2':
        info['cols'] = df.columns.to_list()
    else:
        assert False
    
    
    try:
        grp = _create_group(store, overwrite=True)
        
        o = grp.empty('info', shape=1, overwrite=True, dtype='object',
                      object_codec = numcodecs.JSON())
        o[0] = info
        if fmt == 'v1':
            df_ts   = df.iloc[:, 0:2]
            df_data = df.iloc[:, 2:]
            grp.create_dataset('ts', data=df_ts.to_numpy(), overwrite=True)
            grp.create_dataset('data', data=df_data.to_numpy(), overwrite=True)
        elif fmt == 'v2':
            grp.create_dataset('data_v2', data=df.to_numpy(), overwrite=True)
        else:
            assert False
            
    finally:
        if may_need_closing and hasattr(store, 'close'):
            # needed to ensure zip file records are written
            store.close()

In [None]:
def zarr_load_waveform(store):
    lazy = zarr.load(store)
    
    info = lazy['info'][0]

    if info['fmt'] == 'v1':
        ts   = lazy['ts']
        data = lazy['data']
        df = pd.concat([pd.DataFrame(ts),
                        pd.DataFrame(data)], axis=1, ignore_index=True)
        df.columns = ['ts', 'dt'] + info['cols']
    
        return df
    elif info['fmt'] == 'v2':
        data = lazy['data_v2']
        df = pd.DataFrame(data)
        df.columns = info['cols']

        return df
    else:
        assert False, f"load_waveform format {info['fmt']} unrecognized"

In [None]:
def record_dir(save_dir, csn):
    sdir = Path(save_dir) / Path('%03d' % (csn%1000))
    return sdir

In [None]:
# load PTSD records for CSN from DEVICE

def load_record(ptsd_rec, csn, device, start=None, stop=None,
                rate = None,
                inst = None,
                scale=None,
                signal_contains = None,
                data_dir = None,
                tz_local = None,
                add_dt = True,
                csn_col = None,
                load=True, verbose=False):
    """
    load_record -- load PTSD records for CSN from DEVICE

    Args:
     - ptsd_rec -- ptsd_record dataframe
     - csn
     - device
     - start
     - stop
     - *scale - timescale: 1 for sec, 1000 for ms.
     - *data_dir
    """
    if csn_col is None:
        if 'pat_enc_csn_sid' in ptsd_rec.columns:
            csn_col = 'pat_enc_csn_sid'
        elif 'pat_enc_csn_id' in ptsd_rec.columns:
            csn_col = 'pat_enc_csn_id'
        else:
            assert False, 'unable to find CSN col in ptsd_rec'

    if data_dir is None:
        p_root = Path(f"/projects/ACCM-PMAP/data")
    else:
        p_root = Path(data_dir)

    if tz_local is None:
        # tz_local = tz.gettz('America/East_Baltimore')
        tz_local = tz.gettz('America/New_York')

    if scale is None:
        if device.endswith('VITAL') or device.endswith('WAVE'):
            scale = 1000.0
            subdir = 'vitals-sb'
        else:
            scale = 1.0
            subdir = 'vitals-tsdb'

    m = pd.Series(data=True, index=ptsd_rec.index)

    m = m & (ptsd_rec[csn_col] == csn)
    m = m & (ptsd_rec.device == device)

    if start is not None:
        m = m & (ptsd_rec.end_time > start)
    if stop is not None:
        m = m & (ptsd_rec.start_time < stop)
    if rate is not None:
        m = m & (ptsd_rec.rate == rate)
    if inst is not None:
        m = m & (ptsd_rec.inst_id == inst)

    recs = ptsd_rec[m].sort_values('start_time')
    
    dfs = []
    last_ts = None
    for i in recs.index:
        filename = recs.loc[i, 'filename']
        fmt      = recs.loc[i, 'fmt']
        rec_dt   = recs.loc[i, 'record_date']
        rec_ts0  = pd.to_datetime(rec_dt).replace(tzinfo=tz_local).timestamp()
        if fmt == 'zarr': fmt = 'zip'

        p_file = p_root / subdir / f'{csn%1000:03}' / f'{filename}.{fmt}'

        if verbose:
            print(f'{device} {filename}.{fmt}: {p_file.exists()}')

        if load:
            if fmt == 'feather':
                sub_df = pd.read_feather(p_file)
            elif fmt == 'zip':
                sub_df = zarr_load_waveform(str(p_file))
            else: assert False, f"fmt {fmt} unrecorgnized"

            if signal_contains is not None:
                sigs = []
                for sig in sub_df.columns:
                    if signal_contains in sig:
                        sigs.append(sig)
                    elif sig == 'dts':
                        sigs.append(sig)
                sub_df = sub_df[sigs]

            sub_df.dts = sub_df.dts/scale + rec_ts0
            sub_df = sub_df.rename(columns={'dts': 'ts'})
            sub_df['frame'] = len(dfs)

            if last_ts is not None and sub_df.loc[0, 'ts'] - last_ts > 15:
                print('insert gap')
                dfs.append( pd.DataFrame({'ts': [last_ts + 1]}) )

            dfs.append(sub_df)

            last_ts = sub_df.loc[sub_df.shape[0]-1, 'ts']

    if len(dfs) == 0:
        return None

    df = pd.concat(dfs, axis=0, ignore_index=True)

    if add_dt:
        if 'dt' in df:
            df = df.rename(columns={'dt': 'orig_dt'})
        df['dt'] = to_dt(df.ts)

    return df

In [None]:
# convert to datetime

def to_dt(ts):
    """
    to_dt -- convert single/series of timestamps (Epoch times) to datetime
    
    Args
     - ts -- timestamp (unix Epoch). SECONDS since 1970-1-1 00:00
             can be scalar (single float/int), or pd.Series of float/int

    Returns
     - tz-naive datetime, in local timezone
    """
    tz_local = tz.gettz('America/New_York')

    if type(ts) is pd.Series:
        # convert ts to datetime, with UTC timezone
        t1 = pd.to_datetime(ts, unit='s', utc=True) # datetime, tz=UTC
        # convert timezone from UTC to local
        t2 = t1.dt.tz_convert(tz_local) # tz=local
        # drop timezone
        t3 = t2.dt.tz_localize(None)
    else:
        t1 = pd.to_datetime(ts, unit='s', utc=True) # to datetime, tz=UTC
        t2 = t1.tz_convert(tz_local)                # tz=local
        t3 = t2.tz_localize(None)                   # tz=None, aka niave

    return t3