In [1]:
record_names = [f'{i:03d}' for i in range(100, 235)]

In [2]:
import os
import wfdb

In [3]:
path = 'mit-bih-arrhythmia-database-1.0.0'

In [4]:
record_path = os.path.join(path, record_names[0])

In [5]:
record_path

'mit-bih-arrhythmia-database-1.0.0\\100'

In [6]:
signal = wfdb.rdrecord(record_path)

p_signal in the context of the WFDB library refers to the primary signals or the actual waveform data contained within an ECG record.

In [7]:
signal.p_signal.shape

(650000, 2)

In [8]:
annotation = wfdb.rdann(record_path, 'atr')

In [42]:
annotation.symbol[:10]

['+', 'N', 'N', 'N', 'N', 'N', 'N', 'N', 'A', 'N']

In [9]:
header = wfdb.rdheader(record_path)

In [10]:
header.__dict__

{'record_name': '100',
 'n_sig': 2,
 'fs': 360,
 'counter_freq': None,
 'base_counter': None,
 'sig_len': 650000,
 'base_time': None,
 'base_date': None,
 'comments': ['69 M 1085 1629 x1', 'Aldomet, Inderal'],
 'sig_name': ['MLII', 'V5'],
 'p_signal': None,
 'd_signal': None,
 'e_p_signal': None,
 'e_d_signal': None,
 'file_name': ['100.dat', '100.dat'],
 'fmt': ['212', '212'],
 'samps_per_frame': [1, 1],
 'skew': [None, None],
 'byte_offset': [None, None],
 'adc_gain': [200.0, 200.0],
 'baseline': [1024, 1024],
 'units': ['mV', 'mV'],
 'adc_res': [11, 11],
 'adc_zero': [1024, 1024],
 'init_value': [995, 1011],
 'checksum': [-22131, 20052],
 'block_size': [0, 0]}

In [11]:
import pandas as pd

In [12]:
signal_df = pd.DataFrame(signal.p_signal, columns=[f'signal_{i}' for i in range(signal.p_signal.shape[1])])

fs: This is the sampling frequency of the ECG signal, 

In [13]:
signal.fs

360

In [14]:
signal_df.index

RangeIndex(start=0, stop=650000, step=1)

In [15]:
signal_df['time'] = signal_df.index / signal.fs

In [16]:
signal_df.head()

Unnamed: 0,signal_0,signal_1,time
0,-0.145,-0.065,0.0
1,-0.145,-0.065,0.002778
2,-0.145,-0.065,0.005556
3,-0.145,-0.065,0.008333
4,-0.145,-0.065,0.011111


In [17]:
annotation.sample

array([    18,     77,    370, ..., 649484, 649734, 649991], dtype=int64)

In [18]:
annotation_df = pd.DataFrame({
    'time': annotation.sample / signal.fs, 
    'annotation': annotation.symbol
})

In [19]:
annotation_df['annotation'].unique()

array(['+', 'N', 'A', 'V'], dtype=object)

normal beats (N), premature ventricular contractions (V), atrial premature beats (A)

In [20]:
output_dir = 'Data'

In [21]:
signal_csv_path = os.path.join(output_dir, f'{record_names[0]}_signal.csv')
annotation_csv_path = os.path.join(output_dir, f'{record_names[0]}_annotations.csv')
signal_df.to_csv(signal_csv_path, index=False)
annotation_df.to_csv(annotation_csv_path, index=False)

In [22]:
import pandas as pd

In [23]:
signal_csv_path = pd.read_csv('Data/100_signal.csv')

In [24]:
signal_csv_path.head()

Unnamed: 0,signal_0,signal_1,time
0,-0.145,-0.065,0.0
1,-0.145,-0.065,0.002778
2,-0.145,-0.065,0.005556
3,-0.145,-0.065,0.008333
4,-0.145,-0.065,0.011111


In [25]:
header_file_path = os.path.join(path, f'{record_names[0]}.hea')

In [26]:
header_file_path

'mit-bih-arrhythmia-database-1.0.0\\100.hea'

In [27]:
with open(header_file_path) as f:
    header_lines = f.readlines()

In [28]:
print(header_lines)

['100 2 360 650000\n', '100.dat 212 200 11 1024 995 -22131 0 MLII\n', '100.dat 212 200 11 1024 1011 20052 0 V5\n', '# 69 M 1085 1629 x1\n', '# Aldomet, Inderal\n']


In [29]:
print(header_lines[0])

100 2 360 650000



In [30]:
details =[line.strip().split() for line in header_lines if line.startswith('#')]
details

[['#', '69', 'M', '1085', '1629', 'x1'], ['#', 'Aldomet,', 'Inderal']]

In [31]:
patient_info ={}

In [32]:
patient_info

{}

In [33]:
patient_info['Age'] = details[0][1]
patient_info['Gender'] = details[0][2]

In [34]:
patient_info['ECG_specs'] = ' '.join(details[0][3:])

In [35]:
patient_info['Diagnoses'] = ' '.join(details[1][1:])

In [36]:
patient_info['other'] = ' '.join(details[2:])

In [37]:
patient_info_df = pd.DataFrame([patient_info])

In [38]:
patient_info_df.head()

Unnamed: 0,Age,Gender,ECG_specs,Diagnoses,other
0,69,M,1085 1629 x1,"Aldomet, Inderal",


In [39]:
dir_path = 'mit-bih-arrhythmia-database-1.0.0'

In [40]:
def read_header(file_path):
    # path = os.path.join(file_path)
    with open(file_path) as f:
        header_lines = f.readlines()
    details =[line.strip().split() for line in header_lines if line.startswith('#')]
    patient_info ={}
    patient_info['Age'] = details[0][1]
    patient_info['Gender'] = details[0][2]
    patient_info['ECG_params'] = ' '.join(details[0][3:])
    patient_info['Diagnoses'] = ' '.join(details[1][1:])

    flatten_details = [item.replace('#', '').strip() for sublist in details[2:] for item in sublist]
    patient_info['other'] = ' '.join(flatten_details)

    for key,value in patient_info.items():
        if len(value) == 0:
            patient_info[key] = 'NaN'
    return patient_info


In [41]:
read_header('232.hea')

FileNotFoundError: [Errno 2] No such file or directory: '232.hea'

In [None]:
def convert_record_to_csv(record_name, output_dir='Data'):
    record_path = os.path.join(dir_path, record_name)
    
    signal = wfdb.rdrecord(record_path)
    annotation = wfdb.rdann(record_path, 'atr')
    
    signal_df = pd.DataFrame(signal.p_signal, columns=[f'signal_{i}' for i in range(signal.p_signal.shape[1])])
    signal_df['time'] = signal_df.index / signal.fs
    
    annotation_df = pd.DataFrame({
        'time': annotation.sample / signal.fs, 
        'annotation': annotation.symbol
    })


    signal_csv_path = os.path.join(output_dir,'Signals', f'{record_name}_signal.csv')
    annotation_csv_path = os.path.join(output_dir, 'Annotations' ,f'{record_name}_annotations.csv')
    
    signal_df.to_csv(signal_csv_path, index=False)
    annotation_df.to_csv(annotation_csv_path, index=False)
    
    header_file_path = f'{record_path}.hea'
    patient_info = read_header(header_file_path)
    patient_info_df = pd.DataFrame([patient_info])
    patient_info_csv_path = os.path.join(output_dir, 'Patients',f'{record_name}_patient_info.csv')
    patient_info_df.to_csv(patient_info_csv_path, index=False)



In [None]:
dir_path = 'mit-bih-arrhythmia-database-1.0.0'
output_dir = 'Data'
os.makedirs(output_dir, exist_ok=True)

record_names = [f'{i:03d}' for i in range(100, 235)]

for record_name in record_names:
    try:
        convert_record_to_csv(record_name, output_dir)
    except FileNotFoundError as e:
        continue
print('Done')

Done


In [None]:
test = pd.read_csv('Data/Annotations/100_annotations.csv')

In [None]:
test.columns

Index(['time', 'annotation'], dtype='object')

In [None]:
test['annotation'].unique()

array(['+', 'N', 'A', 'V'], dtype=object)