In [32]:
# import dependencies
import numpy as np
import pandas as pd
from scipy.signal import find_peaks
import scipy.fft
from scipy import signal
from scipy.stats import entropy
from scipy.integrate import trapz
from scipy.ndimage import zoom

# _______New CGM Class + Merge Function_______
# comes with ALL values merged on one dataframe
# Updated 11-25-24

class CGM_Wide_DF:
    def __init__(self, n, path="physio_wfood/"):
        self.n = n
        self.path = path
        self.df = -1
        self.dex = -1
        self.acc = -1
        self.tem = -1
        self.bvp = -1
        self.eda = -1
        self.hr = -1
        self.ibi = -1
        self.flg = -1
        self.meal_df = -1
        self.meal_dt = -1
        self.markers = -1
        self.dex_windows = -1
        self.areas = -1
        self.intp_2hr_areas = -1
        self.intp_5hr_areas = -1
        self.raw_gluc_df = -1
        self.intp_2hrs_df = -1
        self.intp_5hrs_df = -1
        self.parse_file()
        self.get_meal_df()
        self.get_bmarker_dicts()
        self.assemble()
    
    def parse_file(self):
        """
        - Parses all dataframes in Subject's folder.
        - Updates the respective Class attribute for easy access after run 
        at user's discretion.
        - Ran at class instantiation.
        """
        # Gets all csv files in Subject's folder --> combines into a dictionary
        pn = '0{n}'.format(n=self.n).zfill(3)
        path = self.path+pn+'/'
        folder = [o for o in os.listdir(path) if o[:1].isalpha()]
        dfs = [pd.read_csv(path+f) for f in folder]
        self.df = dict(zip(folder, dfs))

        # Formats Dexcom dataframe (keeps only datetime and glucose columns)
        dex = self.df['Dexcom_{n}.csv'.format(n=pn)].copy()
        # Adjusts dex dataframe
        mask = [col for col in dex.columns if len(dex[dex[col].notna()])<5] + \
                ['Event Subtype', 'Source Device ID', 'Index', \
                 'Event Type', 'Transmitter Time (Long Integer)',]
        dex = dex.drop(columns=mask)
        dex = dex[dex['Timestamp (YYYY-MM-DDThh:mm:ss)'].notna()]
        self.dex = dex.rename(columns={'Timestamp (YYYY-MM-DDThh:mm:ss)': 'datetime'})
        
        def fmt_df(name):
            # support function for Class 'parse_file' method
            temp = self.df['{name}_{n}.csv'.format(name=name,n=pn)].copy()
            cols = temp.columns.tolist()[1:]
            for col in cols:
                temp = temp.rename(columns={col:col.strip()})
            temp = self.to_dt(temp)
            return temp
        
        # Retrieves Tri-Axial Accelerometry dataframe
        self.acc = fmt_df('ACC')

        # Retrieves Skin Temperature dataframe
        self.tem = fmt_df('TEMP')

        # Retrieves Blood Volume Pulse dataframe
        self.bvp = fmt_df('BVP')

        # Retrieves Electrodermal Activity dataframe
        self.eda = fmt_df('EDA')

        # Formats Heart Rate dataframe
        def is_iso_format(dt):
            """
            Checks datetime format of a series.
            - input: pandas Series object
            - returns: boolean
            """
            try:
                pd.to_datetime(dt, format='%Y-%m-%d %H:%M:%S')
                return True
            except ValueError:
                return False
        
        self.hr = fmt_df('HR')
        # Formats hr['datetime']: change from 02/13/20 --> 2020-02-13 to match standard datetime
        if not is_iso_format:
            self.hr['datetime'] = [str(pd.to_datetime(h, format = "%m/%d/%y %H:%M:%S.%f")) for h in self.hr['datetime']]

        # Formats InterBeat Interval dataframe
        self.ibi = fmt_df('IBI')
        
        # Formats Food Log dataframe
        self.flg = self.df['Food_Log_{n}.csv'.format(n=pn)].copy()
        self.fmt_times()
        self.align_dt()
    
    def to_dt(self, df):
        """
        Coverts datetime column to pd.datetime object for easier parsing/manipulation.
        - input: pandas dataframe
        - returns: updated dataframe with pd.dataetime 'datetime column'
        """
        df['datetime'] = pd.to_datetime(df['datetime'])
        df = df.sort_values(by=['datetime'])
        return df
    
    # Corrects Subject 12 Data Error: 'Not a Time' Error due to NaN value
    def fmt_times(self):
        """
        - Finds and fills any missing 'time' or 'time_of_day' values.
        - Renames 'time_begin' column to 'datetime' for ease of parsing.
        - Updates the Food Log's Class attribute.
        """
        temp = self.flg.copy()
        mask = temp.time_begin.isna()
        time = ['time' if 'time' in temp.columns.tolist() else 'time_of_day'][0]
        temp.loc[mask, 'time_begin'] = temp['date'] + ' ' + temp[time]
        temp = temp.rename(columns={'time_begin': 'datetime'})
        self.flg = temp
    
    # Corrects Subject 7 Data Error: (Wrong Date)
    def align_dt(self):
        """
        - Aligns Food Log datetime to Dexcom datetime, if discrepancies exist.
        Changes both dataframe's 'datetime' column to pd.datetime objects.
        - Updates Food Log's and Dexcom's Class attribute.
        """
        # 'cond' bool checks for food log datetime descrepencies compared to Dexcom
        dex_date = self.dex['datetime'].iloc[0][:7]
        temp = self.flg.copy()
        cond = temp.datetime.str.contains(dex_date).any()

        # Corrects for discrepancies in food log datetime; aligned food log datetime to Dexcom datetime
        if not cond:
            temp['datetime'] = pd.to_datetime(temp['datetime'])
            mask = dex['datetime'][:len(temp['datetime'])].reset_index(drop=True)
            mask = pd.to_datetime(mask)
            diff = mask - temp['datetime']
            temp['datetime'] = temp['datetime'] + pd.DateOffset(days = diff[0].round('d').days)
            temp['datetime'] = temp.datetime.dt.strftime('%Y-%m-%d %H:%M:%S')
        self.flg = self.to_dt(temp)
        self.dex = self.to_dt(self.dex)

    def to_delta(self, df):
        """
        Converts 'datetime' column to day and hour columns.
        - input: pandas dataframe.
        - returns: updated pandas dataframe.
        """
        # Converts 'datetime' to timedelta object in seconds
        df = df.copy()
        df['timedelta'] = df['datetime'] - df['datetime'].iloc[0]
        df['day'] = df['timedelta'].dt.days + 1
        # timedelta objects don't have an 'min' property --> convert seconds to min
        df['min'] = df['timedelta'].dt.seconds // 60
        df = df.drop(columns=['datetime','timedelta'])
        df = df[['subject_no','day','min'] + df.columns[1:-2].tolist()]
        return df
    
    def get_meal_df(self):
        """
        - Combined all meal entries within 1hr of each other in order for it 
        to  be easier to calculate post-prandial Glucose Area Under the Curve.
        - Updates the Food Log's Class attribute.
        - Ran at class instantiation.
        """
        log = self.flg.copy().fillna(0)
        log = log[['datetime','calorie','total_carb','dietary_fiber','sugar','protein','total_fat']]
        tol = pd.Timedelta('60 minutes')
        mask = log['datetime'] - log['datetime'].shift(1) <= tol
        mask = log.groupby(~mask).ngroup().cumsum() - 1
        df = log.groupby(mask).agg('sum', numeric_only=True).reset_index(drop=True)
        df['datetime'] = log.groupby(mask)['datetime'].first().reset_index(drop=True)
        self.meal_df = df[['datetime'] + df.columns.tolist()[:-1]]
        self.meal_dt = self.meal_df.datetime.copy()
    
    def roll_window(self, df, hrs=[False,'']):
        """
        Gets rolling windows based on Food Log datetime
        - input: pandas dataframe
        - returns: list of pandas dataframes between Food Log datetimes
        """
        meal_dt = self.meal_dt.copy()
        if hrs[0]:
            hr = pd.Timedelta(str(hrs[1]*60)+' minutes')
            windows = [df[((df.datetime >= meal_dt[i]) & (df.datetime < meal_dt[i]+hr))]\
                       for i in range(len(meal_dt)-1)]
        else:
            windows = [df[((df.datetime >= meal_dt[i]) & (df.datetime < meal_dt[i+1]))]\
                       for i in range(len(meal_dt)-1)]
        first = [df[df.datetime <= meal_dt.iloc[0]]]
        last = [df[df.datetime >= meal_dt.iloc[-1]]]
        windows = first + windows + last
        windows = [w.reset_index(drop=True) for w in windows]
        return windows

    # ACC features
    def acc_feats(self, window):
        """
        Generates features from Tri-Accelerometry dataframe.
        - input: list of pandas dataframes
        - returns: dictionary of generate features.
        """
        acc = {}
        
        # Zero-Crossing Rate
        def zero_cross(ray):
            return np.sum(np.abs(np.diff(np.sign(ray)))) / (2 * len(ray) - 2)

        # Gets the Zero-Crassing Rate of each axis and sums them
        acc_zcross_rate = np.sum([zero_cross(window[col]) \
                                    for col in window.columns.tolist()[1:]])
        acc['acc_zcross_rate'] = round(acc_zcross_rate*100,2)

        # Magnitude
        magnitude = np.sqrt(window['acc_x']**2 + window['acc_y']**2 + \
                            window['acc_z']**2)

        # Root Mean Square
        rms = np.sqrt((magnitude**2).mean())
        acc['acc_rms'] = round(rms,2)

        # Peak Counts for peaks above the magnitude standard deviation
        peaks = len(find_peaks(magnitude, prominence=magnitude.std().round())[0])
        acc['acc_peaks'] = peaks

        # Compute the Fourier Transform
        fourier = scipy.fft.fft(pd.DataFrame(magnitude))
        # Magnitude Spectrum
        mag_spec = np.abs(fourier)
        acc['acc_mag_spec_min'] = round(mag_spec.min(),2)
        acc['acc_mag_spec_max'] = round(mag_spec.max(),2)
        acc['acc_mag_spec_stdev'] = round(mag_spec.std(),2)

        # Spectral Entropy on magnitude
        freq, psd = signal.welch(magnitude, fs=100) # power spectral density (PSD)
        psd_norm = psd / np.sum(psd)
        spec_entropy = entropy(psd_norm, base=2)
        acc['acc_spec_entropy'] = round(spec_entropy,2)

        return acc

    def get_bmdata(self, marker_df, acc=False):
        """
        Generates quick statistics (mean, min, max, stdev) for each window 
        of marker dataframes. If input is Tri-Axial Accelerometry, it will run the 'acc_feats' function
        - input: pandas dataframe of marker
        - returns: list of dictionaries with quick stats by window
        """
        windows = self.roll_window(marker_df)

        bm_data = []
        if not acc:
            for i in range(len(windows)):
                ret = {}
                col = windows[i].columns.tolist()[-1]
                if len(windows[i]) != 0:
                    ret[col+'_curr'] = round(windows[i][col].values[-1],2)
                else: 
                    ret[col+'_curr'] = np.nan
                ret[col+'_min'] = round(windows[i][col].min(),2)
                ret[col+'_max'] = round(windows[i][col].max(),2)
                ret[col+'_mean'] = round(windows[i][col].mean(),2)
                ret[col+'_stdev'] = round(windows[i][col].std(),2)
                bm_data+=[ret]
        else:
            for i in range(len(windows)):
                if len(windows[i]) != 0:
                    bm_data+=[self.acc_feats(windows[i])]
                else:
                    temp = {}
                    keys = ['acc_zcross_rate','acc_rms','acc_peaks',\
                             'acc_mag_spec_min','acc_mag_spec_max',\
                             'acc_mag_spec_stdev','acc_spec_entropy']
                    for key in keys:
                        temp[key] = np.nan
                    bm_data+=[temp]

        return bm_data, windows

    def get_bmarker_dicts(self):
        """
        - Generates quick stats for each biomarker.
        - Updates 'markers' and 'areas' Class attribute.
        """
        # pre-prandial Glucose values
        dex_df = self.dex.copy()
        dex, self.dex_windows = self.get_bmdata(dex_df)
        
        # if first dexcom window is empty (no glucose values prior to meal)
        # start log at the next meal in order have get pre-prandial glucose
        # for each meal entry in final df
        if self.dex_windows[0].empty:
            self.dex_windows = self.dex_windows[1:]
            self.meal_df = self.meal_df[1:].reset_index(drop=True)
            
        if self.dex_windows[-1].empty:
            self.dex_windows = self.dex_windows[:-1]
            idx = self.meal_df.index.to_list()[-1]
            self.meal_df = self.meal_df.drop(index=idx).reset_index(drop=True)
            
        self.meal_dt = self.meal_df.datetime.copy()

        # post-prandial Glucose AUC
        self.areas = [{'ppg_auc':np.trapz(w['Glucose Value (mg/dL)'], w.index.to_series())}\
                 for w in self.dex_windows][1:]
        
        def get_intp_areas(hrs):
            # reads and labels glucose values backwards
            intp_wnd = self.roll_window(self.dex, [True,hrs])
            wnd = [w['Glucose Value (mg/dL)'] for w in intp_wnd]
            # post-prandial Glucose AUC
            intp_areas = [{'ppg_auc':np.trapz(w, w.index.to_series())}\
                     for w in wnd][1:]
            return intp_areas
        
        self.intp_2hr_areas = get_intp_areas(2)
        self.intp_5hr_areas = get_intp_areas(5)
        
        # Food Log to dictionary for ease of parsing
        flg_dict = self.meal_df.round(2).to_dict('index')
        
        # ACC
        acc_df = self.acc.copy()
        acc = self.get_bmdata(acc_df, acc=True)[0]

        # TEMP
        temp_df = self.tem.copy()
        temp = self.get_bmdata(temp_df)[0]

        # EDA
        eda_df = self.eda.copy()
        eda = self.get_bmdata(eda_df)[0]

        # HR
        hr_df = self.hr.copy()
        hr = self.get_bmdata(hr_df)[0]

        # IBI
        ibi_df = self.ibi.copy()
        ibi = self.get_bmdata(ibi_df)[0]

        self.markers = [flg_dict, acc, temp, eda, hr, ibi, dex]

    def interpolate(self, ray, hrs):
        """
        Interpolates (via scipy) input array based on input hour to get
        consistent size glucose values for each window.
        - input: 
            --> 'ray' - numpy array
            --> 'hrs' - integer
        - returns: interpolated numpy array
        """
        x = np.array(ray)
        i = hrs*12
        z = i / len(x)

        intp = np.round(zoom(x,z), 2)
        return intp

    def get_gluc_dict(self):
        """
        Generates a list of dictionaries by window with labeled timeframe 
        keys identifying how many minutes prior to meal the glucose value
        was taken.
            return: 
                --> raw_gluc_dict - dictionaries of variable length with 
                                    raw glucose values (not interpolated)
                --> intp_2hrs_dict - dictionaries of consistent length with 
                                     interpolated glucose values to fit 2 
                                     hour time frame
                --> intp_5hrs_dict - dictionaries of consistent length with 
                                     interpolated glucose values to fit 5 
                                     hour time frame
        """
        raw_gluc_dict, intp_2hrs_dict, intp_5hrs_dict = [], [], []

        i = 0
        windows = self.dex_windows.copy()
        for window in windows:
            gluc, gluc_2hr, gluc_5hr = {}, {}, {}
            
            # reads and labels glucose values backwards
            glw = window['Glucose Value (mg/dL)'].tolist()[::-1]
            glw_2hr = self.interpolate(glw,2)
            glw_5hr = self.interpolate(glw,5)

            # Current glucose already included from 'get_bmdata' function --> 
            # idx starts at 1
            for i in range(1,len(glw)):
                gluc['gluc_'+str(i*5)+'_mins_prior'] = glw[i]
            for i in range(1,len(glw_2hr)):
                gluc_2hr['gluc_'+str(i*5)+'_mins_prior'] = glw_2hr[i]
            for i in range(1,len(glw_5hr)):
                gluc_5hr['gluc_'+str(i*5)+'_mins_prior'] = glw_5hr[i]

            raw_gluc_dict += [gluc]
            intp_2hrs_dict += [gluc_2hr]
            intp_5hrs_dict += [gluc_5hr]
            
        return raw_gluc_dict, intp_2hrs_dict, intp_5hrs_dict

    def get_full_dict(self, markers):
        """
        Combines all markers in to one dictionary per window
        - input: a list of a list of marker dictionaries
        - returns: a list of dictionaries by window
        """
        full_dict = []
        for i in range(len(self.meal_dt)):
            wndw = {}
            for marker in markers:
                if not marker[i] is np.nan:
                    for key, val in marker[i].items():
                        wndw[key] = val
                else:
                    wndw['NaN'] = np.nan
            full_dict += [wndw]
        return full_dict
    
    def add_subject_no(self, full_dict):
        """
        Coverts input dictionary into dataframe and adds Subject number 
        column dataframe.
        - input: dictionary
        - returns: pandas dataframe
        """
        df = pd.DataFrame.from_dict(full_dict.copy())
        df['subject_no'] = self.n
        col = df.columns.tolist()
        df = df[[col[-1]] + col[:-1]]
        return df
        
    def assemble(self):
        """
        - Assembles
        - Updates the following Class dataframes: raw_gluc_df, intp_2hrs_df,
        and intp_5hrs_df
        """
        raw_gluc_dict, intp_2hrs_dict, intp_5hrs_dict = self.get_gluc_dict()
        
        def get_gluc_df(full_dict, areas):
            # support function for Class 'assemble' method
            markers = self.markers.copy() + [full_dict, areas]
            temp = self.get_full_dict(markers)
            df = self.add_subject_no(temp)
            
            # drop glucose columns with zeros & meal events with glucose zeros
            mask = df['ppg_auc']
            mask = mask[mask==0].index.tolist()
            for i in mask:
                df = df.drop(index=i).reset_index(drop=True)

            mask = df.columns[df.columns.str.startswith('gluc')].tolist()
            for col in mask:
                df = [df.drop(columns=col) if df[col].isin([0]).any() else df][0]

            return df
        
        self.raw_gluc_df = get_gluc_df(raw_gluc_dict, self.areas)
        self.intp_2hrs_df = get_gluc_df(intp_2hrs_dict, self.intp_2hr_areas)
        self.intp_5hrs_df = get_gluc_df(intp_5hrs_dict, self.intp_5hr_areas)

In [35]:
def save_dfs(pn,sub,path=''):
    tmp1 = sub.raw_gluc_df.copy()
    tmp2 = sub.to_delta(tmp1)
    tmp1.to_csv(path+'subject_{n}_raw_gluc_df.csv'.format(n=pn), index=False)
    tmp2.to_csv(path+'subject_{n}_raw_gluc_df_delta.csv'.format(n=pn), index=False)
    tmp1 = sub.intp_2hrs_df.copy()
    tmp2 = sub.to_delta(tmp1)
    tmp1.to_csv(path+'subject_{n}_intp_2hrs_df.csv'.format(n=pn), index=False)
    tmp2.to_csv(path+'subject_{n}_intp_2hrs_df_delta.csv'.format(n=pn), index=False)
    tmp1 = sub.intp_5hrs_df.copy()
    tmp2 = sub.to_delta(tmp1)
    tmp1.to_csv(path+'subject_{n}_intp_5hrs_df.csv'.format(n=pn), index=False)
    tmp2.to_csv(path+'subject_{n}_intp_5hrs_df_delta.csv'.format(n=pn), index=False)

In [44]:
itr = [1,2] + list(range(4,17))

In [46]:
for n in itr:
    sub = CGM_Wide_DF(n=n)
    path = 'transformer/wide_df/'
    save_dfs(n, sub, path)