In [None]:
from tqdm.auto import tqdm

import os
os.environ['KMP_DUPLICATE_LIB_OK']='True' 

from pathlib import Path, PurePath
import pandas as pd
import numpy as np
import wfdb

import msgpack
import datetime

In [None]:
rlist = []
rawpath = PurePath(Path(os.getcwd()).parents[0], 'mit-bih-raw/') # raw data should be stored in this folder in the root of the repo
records = rawpath / 'RECORDS'
with open(records) as rfile:
    for record in rfile:
        record = record[0:len(record)-1] # Remove any erronious new line characters at the end ('\n')
        rlist.append(record)

In [None]:
samples = [] # will house the samples of all subjects
good_list = [] # will list the names of the subjects we successfully extracted
bad_list = [] # will house the names of the subjects we failed to extract
qrs = [] # will house the indices of R-Peaks for all subjects
atr_label = [] # will house the labels for each rhythm annotation for all subjects
atr_locs = [] # will house the locations corresponding to the rhythm annotation labels

for x in tqdm(rlist): #this will iterate through te records that we found above
    try:
        rpath = str(rawpath / x)
        samp = wfdb.rdsamp(rpath) # wfdb._____(file_location) will read the signal & header data and return a 2 value array
            # samp[0] - the signal data is the raw reading from the ecg. Each value is a sample taken.
            # samp[1] - the header data includes things about the signal data such as:
              # samples per section, denoted 'fs'
              # number of signals, denoted 'n_sig'
        ######################################################
        samples.append(samp) #add it to our array for all subject
        
            #What is our file extension that has the annotation we want? Find it here and replace _____ with it 
            #hint: READ THE VARIABLE NAMES!!!!
        qrs_tmp = wfdb.rdann(rpath, extension="qrs") #extract the QRS Info
        qrs_locs = np.array(qrs_tmp.sample, dtype='int') #Get just the loccation of R-Peaks from the QRS Info
        qrs.append(qrs_locs) # Add to our array for all subjects
        
            #Do the same thing here
        atr = wfdb.rdann(rpath, extension="atr") #extract the atr info which stores the rhythm type(s) over the whole signal
        atr_label.append(atr.aux_note) # aux_note stores the type of rhythm - main two are '(N' for normal and '(AFIB' for AFIB
        atr_locs.append(np.append(atr.sample, len(samp[0]))) #I add the length of the whole sample to the end for better visualization later
        
        good_list.append(x) # when all extraction is successful append the record name to good_list
    except Exception as exep:
        tqdm.write(str(exep)) # Alert the user of an exception
        bad_list.append(x) # add to the bad list

rlist = good_list # ignore the bad ones from now on

In [None]:
atr_dics = [] #Initialize the array that will hold the dictionary for each subject

for idxs,lab in enumerate(atr_label):
    atr_dic = {} #Initialize dictionary for each subject
    for idx,x in enumerate(lab):
        if x not in atr_dic.keys():
            atr_dic[x] = [] #Add dictionary key if does not exist
        atr_dic[x].append([atr_locs[idxs][idx], atr_locs[idxs][idx+1]]) #Insert range for each rhythm
    atr_dics.append(atr_dic) #Add to dictionary array

In [None]:
reload_flag = True # set to True to rewrite all current data

In [None]:
dfpath = PurePath(Path(os.getcwd()).parents[0], 'mit-bih-dataframes-stepping/')
if not os.path.exists(dfpath):
    os.mkdir(dfpath)
for s, id in enumerate(tqdm(good_list)): # Iterate through all of the subjects that we have complete data of 
    subj = pd.DataFrame( # The below statements initialize our datafram. The first to columns will be our given signals, and the rest we initialize to 0
        data = np.transpose(np.array([ # First we give our data, for pandas they want the data by row instead of by column, so we use transpose to get the proper format
                                               [x[0] for x in samples[s][0]],
                                               [x[1] for x in samples[s][0]]
                                        ])
                           ),
        columns = ['Signal 1', 'Signal 2'] # Here we name our columns to match the dataframe we outlined above
    )
    norm = [] # Initialize the norm array which will list every index the person is in a normal rhythm
    if '(N' in atr_dics[s].keys():
        for x in atr_dics[s]['(N']: # Then we iterate through our ranges we extracted above
            norm = norm + list(range(x[0], x[1])) # And add all values in the range to our norm array
    af = [] # Then we do the same steps above for AFIB rhythms
    if '(AFIB' in atr_dics[s].keys():
        for x in atr_dics[s]['(AFIB']:
            af = af + list(range(x[0], x[1]))
    subj['R-Peak'] = subj.index.isin(qrs[s]) # the isin() function of a DataFram index will return true if the index is in that list and false if it is not
                                            # then, we can initialize our dataFrame with correct values based on that
    subj['Normal'] = subj.index.isin(norm)
    subj['AFIB'] = subj.index.isin(af)
    subj['Other'] = ~subj.index.isin(np.append(norm, af)) # Because we are classifying AFIB specifically we define other as any rhythm not in the norm or AFIB list
    
    if not os.path.exists(dfpath / (id+'.parquet')) or reload_flag:
        subj.to_parquet(dfpath / (id+'.parquet'))

np.savetxt(dfpath / "subject_list.csv", good_list, delimiter=",",  fmt='%s') 

In [None]:
extractedpath = PurePath(Path(os.getcwd()).parents[0], 'mit-bih-extracted-stepping/')
if not os.path.exists(extractedpath):
    os.mkdir(extractedpath)

def encoder(obj): # encoder function for msgpack--small serialization format used to store headers/rhythms
    if isinstance(obj, datetime.time):
        return {'__datetime__': True, 'as_str': obj.strftime("%H:%M:%S.%f")}
    if isinstance(obj, np.int64):
        return {'__npint64__': True, 'as_int': int(obj)}
    return obj

def get_rhythm_type(samp, rhythms): # helper functions to match R-peaks to rhythm labels based on rhythm dictionaries
    if "(N" in rhythms.keys():
        for span in rhythms["(N"]:
            if span[0] <= samp <= span[1]:
                return "N"
    if "(AFIB" in rhythms.keys():
        for span in rhythms["(AFIB"]:
            if span[0] <= samp <= span[1]:
                return "A"
    return "O"

np.savetxt(extractedpath / "subject_list.csv", good_list, delimiter=",",  fmt='%s')
for idx, x in enumerate(tqdm(good_list)):
    if not os.path.exists(extractedpath / x): # creating folder for each subject
        os.mkdir(extractedpath / x)

    if not os.path.exists(extractedpath / x / "signals.parquet") or reload_flag: # saving signals data
        signaldf = pd.DataFrame(np.array(samples[idx][0]), columns=["signal1", "signal2"])
        signaldf.to_parquet(extractedpath / x / "signals.parquet")

    if not os.path.exists(extractedpath / x / "rpeaks.parquet") or reload_flag:
        rpeaksdf = pd.DataFrame({
            'rpeak': qrs[idx],
            'rhythm': [get_rhythm_type(samp, atr_dics[idx]) for samp in qrs[idx]] # matching rhythms to R-peaks 
        })
        rpeaksdf.to_parquet(extractedpath / x / "rpeaks.parquet") # saving the data

    if not os.path.exists(extractedpath / x / "headers.msgpack") or reload_flag: # saving headers data
        with open(extractedpath / x / "headers.msgpack", 'wb') as outfile:
            outfile.write(msgpack.packb(samples[idx][1], default=encoder))

    if not os.path.exists(extractedpath / x / "rhythms.msgpack") or reload_flag: # saving rhythms data
        with open(extractedpath / x / "rhythms.msgpack", 'wb') as outfile:
            outfile.write(msgpack.packb(atr_dics[idx], default=encoder))

In [None]:
rpeak_dfs = {}
for record in tqdm(rlist): # reopening R-peaks/rhythms DataFrames
    rpeak_dfs[record] = pd.read_parquet(extractedpath / record / 'rpeaks.parquet')

In [None]:
def extract_rmean(rrInts): # function to calculate R-mean for subset
    rmeans = []
    for index, value in enumerate(rrInts):
        if index==0:
            rmeans.append(value)
        else:
            rmeans.append(0.75*rmeans[index-1] + 0.25*value)
    return rmeans

def subset_subject(rpeak_df, interval_length = 4, calib_length = 100):
    rpeaks = rpeak_df['rpeak'].to_numpy() # get the R-peak
    peakrhythms = rpeak_df['rhythm'].to_numpy() # and rhythm columns out of the DataFrame
    raw_rr_ints = np.diff(rpeaks) # do the subtraction to get a new array filled with all the RR-intervals
    mask = raw_rr_ints <= 500 # use a boolean mask to filter all of the outliers (greater than 500 samples)
    rr_ints = raw_rr_ints[mask] # filter both RR-intervals
    intrhythms = peakrhythms[1:][mask] # and the rhythms array

    subsets = [] # create a list for the subset DataFrames

    calib_rr_ints = rr_ints[:calib_length] # take the slice of calib_length intervals from the beginning of the array
    calib_rhythms = intrhythms[:calib_length] # take the matching slice of rhythms
    calib_rmean = extract_rmean(calib_rr_ints) # calculate the R-mean for the calibration intervals
    subsets.append(pd.DataFrame({'rhythmLabel': calib_rhythms, 'rrInt': calib_rr_ints, 'rmean': calib_rmean})) # add the calibration subset to the list

    remaining_len = len(rr_ints) - calib_length # get the remaining number of intervals in the array
    end = len(rr_ints) - (remaining_len % interval_length)  # and do the math to find where we should cut it off to get 
                                                            # even subsets of interval_length length
    remaining_ints = rr_ints[calib_length:end] # do the slicing on both the RR-intervals
    remaining_rhythms = intrhythms[calib_length:end] # and the rhythms

    remaining_chunks = np.split(remaining_ints, remaining_len//interval_length) # use numpy to split the RR-intervals
    remaining_rhythm_chunks = np.split(remaining_rhythms, remaining_len//interval_length) # and rhythms into the even subsets
    subsets.extend([
        pd.DataFrame({'rhythmLabel': rhythms, 
                      'rrInt': chunk, 
                      'rmean': extract_rmean(chunk)}) for chunk, rhythms in zip(remaining_chunks, remaining_rhythm_chunks)
    ]) # calculate the rmean for each chunk and append a new DataFrame for each subset to the list

    return subsets

In [None]:
subset_dfs = {}
for record in tqdm(rlist): # subset each subject
    subset_dfs[record] = subset_subject(rpeak_dfs[record])

In [None]:
subsetpath = PurePath(Path(os.getcwd()).parents[0], 'mit-bih-time-subsets-stepping/')
if not os.path.exists(subsetpath):
    os.mkdir(subsetpath)

for record in tqdm(rlist): # saving all of the subsets--might take a while
    subsets = subset_dfs[record] # get the subsets list out of the dictionary
    idx_list = list(range(len(subsets))) # create a list of numbers from 0...len(subsets)-1 to label them
    
    subset_list = pd.DataFrame({
        "subjectID": [record]*len(subsets),
        "subsetID": idx_list,
        "rhythmLabel": [subsets[x]['rhythmLabel'].mode()[0] for x in idx_list]
    })  # create the subset list DataFrame--ID info for each subset as well as a rhythm label
        # that is the most common rhythm in each of the subsets
    # map the rhythm labels to other values in a new column, will be useful later for classifying
    subset_list['mappedLabel'] = subset_list['rhythmLabel'].map({'N': 'Non-Afib', 'A': 'Afib', 'O': 'Non-Afib'})
    subset_list.to_parquet(subsetpath / (record+"_subset_list.parquet")) # save the subset list
    
    recordpath = subsetpath / (record)
    os.mkdir(recordpath)
    for x, subset in enumerate(subsets): # save each of the individual subset DataFrames (this is a lot of files)
        subset.to_parquet(recordpath / (str(record)+"-"+str(idx_list[x])+".parquet"))