# Split dataset for anomaly detection

0. Download the dataset from PhysioNet: https://physionet.org/content/challenge-2017/1.0.0/

In [3]:
import os
import random
import shutil
import pandas as pd
random.seed(42)

home_dir = os.getenv("HOME")
physionet_dir = '/physionet.org/files/challenge-2017/1.0.0/training/'
raw_data_dir = home_dir + physionet_dir

splited_data_dir = './data/'
os.makedirs(splited_data_dir, exist_ok=True)

1.  Loop through the subfolders to find all the .mat and .hea files

Four RECORDS files are provided in the dataset to record all the index of the .mat and .hea files for each class. We can use the RECORDS to find all the .mat and .hea files.

In [2]:
# read RECORDS-normal file
records_list = ['normal', 'af', 'noisy', 'other']
records_dic = {}

for item in records_list:
    with open (raw_data_dir + f'RECORDS-{item}') as f:
        records = f.readlines()
        records = [x.strip() for x in records]

    # store the records into a dic
    records_dic[item] = records
    print(f'{item} records: {len(records)}')


normal records: 5076
af records: 758
noisy records: 279
other records: 2415


2. Split the dataset into training and testing set

In [3]:
# Training set: randomly select 80% of the data in records-normal
# Testing set: randomly select 20% of the data in records-normal, 100% of the data in records-af, records-noisy, records-other
# also generate a Reference csv for training and testing set to store labels

training_set = []
testing_set = []
training_label = []
testing_label = []

for item in records_list:
    if item == 'normal':
        records = records_dic[item]
        random.shuffle(records)
        training_set += records[:int(len(records)*0.8)]
        training_label += [item]*int(len(records)*0.8)
        testing_set += records[int(len(records)*0.8):]
        testing_label += [item]*(len(records) - int(len(records)*0.8))
    else:
        records = records_dic[item]
        testing_set += records
        testing_label += [item]*len(records)


print(f"Training set: {len(training_set)}")
print(f"Testing set: {len(testing_set)}")


Training set: 4060
Testing set: 4468


In [4]:
# delete the existing files
shutil.rmtree(splited_data_dir, ignore_errors=True)

# copy .mat and .hea files to the corresponding folder
os.makedirs(splited_data_dir + 'training/', exist_ok=True)
for item in training_set:
    shutil.copy(raw_data_dir +
                item + '.mat', splited_data_dir + 'training/')
    shutil.copy(raw_data_dir +
                item + '.hea', splited_data_dir + 'training/')
os.makedirs(splited_data_dir + 'testing/', exist_ok=True)
for item in testing_set:
    shutil.copy(raw_data_dir +
                item + '.mat', splited_data_dir + 'testing/')
    shutil.copy(raw_data_dir +
                item + '.hea', splited_data_dir + 'testing/')

In [5]:
# remove the first 4 characters of the file name
# save the label into a csv file

training_df = pd.DataFrame({'filename': [x[4:] for x in training_set], 'label': training_label})
testing_df = pd.DataFrame({'filename': [x[4:] for x in testing_set], 'label': testing_label})
training_df.to_csv(splited_data_dir + 'training.csv', index=False)
testing_df.to_csv(splited_data_dir + 'testing.csv', index=False)
print("Done")


Done


In [4]:
# zip the folder
shutil.make_archive('data', 'zip', './data/')

'/Users/zhipenghe/GitHub/PhysioNet-CinC-2017/data.zip'