In [9]:
import pandas as pd
import torch  # because we save the data as torch tensors
import os
from pathlib import Path
import time
import numpy as np
from sklearn.model_selection import train_test_split
import wfdb # needed to read the format of the raw files
from joblib import Parallel, delayed # to parallelize processing the ECGs, so that it all runs faster
# info: in parallel mode print-statements are ignored. That is why functions that are run parallel should instead return a message
import itertools # to flatten list of lists
import resampy # to resample the ECGs
from torch.utils.data import WeightedRandomSampler

import config

start_time = time.time()

# create all necessary out-folders that are needed
set_pieces_folders = [] # the foldernames where the cropped pieces of train-val-test are saved
set_df_pathnames = [] # where the metadata-files for train-val-test are stored
for set in config.SPLIT_NAMES:
    set_pieces_folders.append(os.path.join(config.PIECES_FOLDER, set + f'_pieces_length_{config.PIECE_LENGTH}'))
    set_df_pathnames.append(os.path.join(config.OUT_FOLDER, set + '_df.csv'))

folders_to_create = set_pieces_folders + [config.OUT_FOLDER, config.DATALOADER_FOLDER, config.PIECES_FOLDER, config.EXTRACTED_ECGS_FOLDER]
for folder in folders_to_create:
    if not os.path.exists(folder):
        Path(folder).mkdir(parents=True, exist_ok=True)


In [2]:
# this block took ~10min (is faster when some files were already created before)
use_parallel = True # set to false for debugging

global_avg_val = 0 # global mean of alle signals, is substracted from the crops for the xresnet. Edit: this value is very close to 0, probably does not matter at all
# - thus removed it


def extract_clean_and_resample_signal(record, channel, orignal_frequency, file_id_num):
    ECG_signal = []
    nan_counter = 0
    for signal in record.p_signal:
        # signal contains just ONE sample of each channel
        # device 0 has channels ECG1, ECG2 and NIBP
        # device 1 has channels ECG1 and NIBP
        # we ignore NIBP (Non-invasive blood-pressure measurement) and only chose one ECG channel

        # remove NaN-values from array --> otherwise resampling may have issues
        one_val = signal[channel]
        if not np.isnan(one_val):
            ECG_signal.append(one_val)
        else:
            nan_counter += 1
    # make an array out of the ECG signal
    ecg_array = np.array([ECG_signal]) # dimensions: (1, record-length)
    if config.RESAMPLE_TO > 0:
        ecg_array = resampy.resample(ecg_array, orignal_frequency, config.RESAMPLE_TO)
    ecg_array = torch.tensor(ecg_array)
    return ecg_array, nan_counter

def read_and_extract_signal(row):
    # construct the correct path
    file_id_num = int(row.iloc[0]['ID'])
    filename = str(file_id_num).zfill(4) # respect leading zeros: 186 ---> 0186
    filepathname = os.path.join(config.SOURCE_FOLDER, filename)
    tensor_file_pathname = os.path.join(config.EXTRACTED_ECGS_FOLDER, filename + '.pt')
    need_to_drop_this_row = False
    message = ''
    avg_val = 0
    nan_counter = 0

    if not os.path.exists(tensor_file_pathname): # if it already exists, there is no need to create it again
        try:
            record_content = wfdb.rdrecord(filepathname)
            device_number = row.iloc[0]['Device']
            orignal_frequency = record_content.fs
            
            # only extract one certain lead
            if (device_number == 0) or (device_number == 1):
                
                if device_number == 0: # device 0 has 2 lead ECGs, only the second is comparable to the 1 lead from device 1
                    ECG_signal_array, nan_counter = extract_clean_and_resample_signal(record_content, 1, orignal_frequency, file_id_num)
                elif device_number == 1:
                    ECG_signal_array, nan_counter = extract_clean_and_resample_signal(record_content, 0, orignal_frequency, file_id_num)               
                
                torch.save(ECG_signal_array, tensor_file_pathname) # save extracted channel as file
                avg_val = torch.mean(ECG_signal_array)
                if nan_counter != 0:
                    message = message + f'found {nan_counter} nan-values in file {filename}, age-group {row.iloc[0][config.LABEL_COLUMN_NAME]}'

            else:
                # should not happen, just in case data was modified
                message = message + f'Do not know how to handle device number {device_number}, ignoring entry'
                need_to_drop_this_row = True
        except Exception as e:
            message = message + str(e) # Did not find 0400.dat --> it is indeed missing
            need_to_drop_this_row = True

    row[config.TENSOR_FILE_COLUMN_NAME] = tensor_file_pathname
    return row, need_to_drop_this_row, message, avg_val


t1 = time.time()
# read metadata or create it anew
if not os.path.exists(config.METADATA_CLEANED_FILE):
    metadata = pd.read_csv(config.DATASET_METADATA_FILE)

    # change labels according to the bin-merge-dict
    for old_label, new_label in config.BIN_MERGE_DICT.items():
        metadata.loc[metadata[config.LABEL_COLUMN_NAME] == old_label, config.LABEL_COLUMN_NAME] = new_label

    metadata = metadata.dropna() # filters rows with NaN-values
    metadata = metadata.reset_index(drop=True) # otherwise .loc[[index] and index will yield different results
    need_to_drop_indices = []
    metadata[config.TENSOR_FILE_COLUMN_NAME] = ''

    if use_parallel:
        results = Parallel(n_jobs=os.cpu_count())(delayed(read_and_extract_signal)(metadata.iloc[[index]].copy()) for index, row in metadata.iterrows())
    else:
        results = []
        for index, row in metadata.iterrows():
            results.append(read_and_extract_signal(metadata.iloc[[index]]))

    # merge results
    row_list = []
    for row, need_to_drop, message, avg_val in results:
        if not message == '': #show all the things that could not be shown during the parallized function calls
            print(message)
        if not need_to_drop: # drop rows that showed errors
            row_list.append(row)
            global_avg_val += avg_val

    global_avg_val = global_avg_val/len(row_list)
    print(f'global average value is {global_avg_val}. Subtracting it from the tensors in the <get-crop>-step')
    new_metadata = pd.concat(row_list)
    # save the better metadata file
    new_metadata.to_csv(config.METADATA_CLEANED_FILE, index=False)
    print(f'extracting data & building new metadata file took {time.time()-t1} seconds')
    metadata = new_metadata
else:
    metadata = pd.read_csv(config.METADATA_CLEANED_FILE)
    print(f'loading existing metadata file took {time.time()-t1} seconds')


loading existing metadata file took 0.01772451400756836 seconds


In [3]:
if config.USE_OVERSAMPLING:
    # find out how unbalanced the dataset is, so that we know later how to oversample it
    # inspired by: https://towardsdatascience.com/demystifying-pytorchs-weightedrandomsampler-by-example-a68aceccb452
    class_counts = metadata[config.LABEL_COLUMN_NAME].value_counts()
    #print(class_counts)
    class_weights = 1/class_counts
    sample_weights = [class_weights[i] for i in metadata[config.LABEL_COLUMN_NAME].values]
    # each recording now has a weight
    # 19 min average per recording = 19min*60s/min, 3 second-crops --> divide by 3 to get the amount of crops needed
    global_sampler = WeightedRandomSampler(weights=sample_weights, num_samples=int(config.NUM_CLASSES*class_counts.max()*int(19*60/config.PIECE_LENGTH)), replacement=True)


In [4]:
# do train-val-test split
not_train_percentage = 1 - (config.SPLIT_ARRAY[0]/np.sum(config.SPLIT_ARRAY))
test_val_percentage = config.SPLIT_ARRAY[1]/(config.SPLIT_ARRAY[1] + config.SPLIT_ARRAY[2])

train_df, val_test_df = train_test_split(metadata, test_size=not_train_percentage, stratify=metadata[config.LABEL_COLUMN_NAME], random_state=42)
val_df, test_df = train_test_split(val_test_df, test_size=test_val_percentage, stratify=val_test_df[config.LABEL_COLUMN_NAME], random_state=42)

for idx, df in enumerate([test_df, val_df, train_df]):
    # print(df[config.LABEL_COLUMN_NAME].value_counts()) # to check distribution to see if stratify worked
    # save file
    df.to_csv(os.path.join(set_df_pathnames[idx]), index=False)

In [5]:
print(f'Runtime without dataloader and cropping (as that is not needed for xgboost): {time.time()- start_time}')

Runtime without dataloader and cropping (as that is not needed for xgboost): 0.07057976722717285


In [6]:
# take ~3minutes
# crop data to 3s-pieces, save the individual pieces because otherwise 32 GB RAm are not enough if you later try to load them at once
# info: x is later the input-data to the deep learning model. y is the correct label corresponding to x

def my_one_hot_encoder(y):
    arr = torch.zeros(config.NUM_CLASSES, dtype=torch.float32)
    arr[int(y[0])-1] = 1 # -1 as age groups range from 1 to 15, but array from 0 to 14
    return arr

def get_crop(tensor_path, y, target_folder, id): # crops one file into the pieces we want and saves these pieces with their labels in target_folder
    msg = ''
    ecg_array = torch.load(tensor_path)#-global_avg_val # read prepared ecg file
    filename = Path(tensor_path).stem
    ecg_array = np.array(ecg_array, ndmin=2) # first dim is now size 1, second dim has all the data

    # do the cropping
    cropped_data_pathnames = []
    ecg_length_in_s = int(len(ecg_array[0])/config.SAMPLING_RATE)
    for i in range(0, ecg_length_in_s-config.PIECE_LENGTH, config.PIECE_LENGTH):
        crop_pathname = os.path.join(target_folder, filename + '_label_' + str(y) + '_piece_' + str(i) + '.pt')
        if not os.path.exists(crop_pathname):
            crop_start = i*config.SAMPLING_RATE
            crop_end = (i+config.PIECE_LENGTH)*config.SAMPLING_RATE

            if config.ONE_HOT_ENCODE_BINS:
                y_encoded = my_one_hot_encoder(y)
            else:
                y_encoded = torch.tensor(y, dtype=torch.float32)

            my_crop = torch.tensor(ecg_array[:, crop_start:crop_end], dtype=torch.float32)
            torch.save([my_crop, y_encoded, id], crop_pathname)
            cropped_data_pathnames.append(crop_pathname)
        else:
            cropped_data_pathnames.append(crop_pathname)
        
    return cropped_data_pathnames, tensor_path, msg


for df_pathname, pieces_folder, set_name in zip(set_df_pathnames, set_pieces_folders, config.SPLIT_NAMES):
    df = pd.read_csv(df_pathname)
    x_of_part = df[[config.TENSOR_FILE_COLUMN_NAME]].values # the paths to the prepared ecg-files
    y_of_part = df[[config.LABEL_COLUMN_NAME]].values # the corresponding labels
    id_of_part = df[["ID"]].values # the IDs are needed to that later crops of the same person can be identified

    t1 = time.time()
    data = Parallel(n_jobs=os.cpu_count())(delayed(get_crop)(x.item(), y, pieces_folder, id) for x,y,id in zip(x_of_part,y_of_part, id_of_part))
    for data_pathnames_list, x, msg in data:
        if msg != '':
            print(msg)

    combined_data_pathnames = [data_pathnames_list for data_pathnames_list, x, msg in data if data_pathnames_list != []]    
    combined_data_pathnames = list(itertools.chain.from_iterable(combined_data_pathnames)) # flatten the list of lists
    t2 = time.time()
    print(f'Data cropping for data of {set_name}-set took {t2-t1} seconds. It produced {len(combined_data_pathnames)} crops.')


Data cropping for data of test-set took 35.71535062789917 seconds. It produced 76700 crops.
Data cropping for data of val-set took 35.23702692985535 seconds. It produced 81057 crops.
Data cropping for data of train-set took 110.7969913482666 seconds. It produced 235409 crops.


In [10]:
# create dataloader
# take ~8minutes
do_parallel = True # set to False for debugging
do_label_check = False # also sets do_parallel to False

def load_file(name):
    torch_data = torch.load(name) # torch_data[0] contains the time series, torch_data[1] the label (could be one-hot-encoded)
    # check if label is valid
    if do_label_check:
        label = torch_data[1]
        if config.ONE_HOT_ENCODE_BINS:
            if label.size() == torch.Size([config.NUM_CLASSES]):
                label_is_ok = False
                num_not_zeros = torch.count_nonzero(label)
                if 1 in label:
                    if num_not_zeros == 1:
                        label_is_ok = True
                if not label_is_ok:
                    print(f'reporting unexpected label: {label}')
            else:
                print(f'unexpected tensor format {label.size()}')
        else:
            if label.size() == torch.Size([1]):
                #check if label is between 1 and config.NUM_CLASSES
                if not (label[0] >= 1) and (label[0] <= config.NUM_CLASSES):
                    print(f'reporting unexpected label: {label[0]}')
            else:
                print(f'unexpected tensor format {label.size()}')    
    return torch_data

def get_dataloader(combined_data_pathnames,shuffle=True, sampler=None):
    # load the individual pieces
    if do_parallel and not do_label_check:
        combined_data = Parallel(n_jobs=os.cpu_count())(delayed(load_file)(pathname) for pathname in combined_data_pathnames)
    else:
        combined_data = []
        for pathname in combined_data_pathnames:
            combined_data.append(load_file(pathname))

    return torch.utils.data.DataLoader(combined_data, batch_size=config.BATCH_SIZE, shuffle=shuffle, drop_last=True, num_workers=config.NUM_WORKERS, sampler=sampler)

for df_pathname, pieces_folder, set_name in zip(set_df_pathnames, set_pieces_folders, config.SPLIT_NAMES):
    save_pathname = os.path.join(config.DATALOADER_FOLDER, set_name + '_DL.pt')
    if not os.path.exists(save_pathname):
        combined_data_pathnames = [os.path.join(pieces_folder, f) for f in os.listdir(pieces_folder)] # need full path of each file
        t1=time.time()

        if set_name == 'train': # only shuffle TRAIN set
            if config.USE_OVERSAMPLING: # sampler-option is mutually exclusive with shuffle!
                part_loader = get_dataloader(combined_data_pathnames, shuffle=False, sampler=global_sampler)
                print('oversampled training set') # only oversample train-set! (val and test-set are imbalanced but its real data)
            else:
                part_loader = get_dataloader(combined_data_pathnames, shuffle=True)
                print('shuffled training set')
        else:
            part_loader = get_dataloader(combined_data_pathnames, shuffle=False)

        # save dataloader
        torch.save(part_loader, save_pathname)
        print(f'creating dataloader for {set_name}-set all in all took {time.time()-t1} seconds')
    else:
        print(f'found existing dataloader for {set_name}-set - aboarted creation of a new one')


creating dataloader for test-set all in all took 79.43093466758728 seconds
creating dataloader for val-set all in all took 89.8095293045044 seconds
shuffled training set
creating dataloader for train-set all in all took 266.829030752182 seconds


In [8]:
# if you run the whole data_preparation_v12.ipynb file completely and for the first time it will take a while
# some datapoints:
# i5-1240p: ~16 minutes
# Ryzen 3700X: ~5 minutes
print(f'Runtime: {time.time()- start_time}')

Runtime: 181.9015564918518
