# This notebook generates the spectrogram images from raw ecg signals

Import libraries

In [1]:
import wfdb
import os
import numpy as np
from scipy import signal
from utils import *
from scipy.fft import fftshift
import matplotlib.pyplot as plt
from sklearn.preprocessing import normalize

Useful constants

In [2]:
NB_S_PER_MIN = 6000
SAMPLING_FREQUENCY = 100
LOWER_BOUND_FILTER = 1
UPPER_BOUND_FILTER = 48
NB_S_PER_HALF_MIN = 3000

train_files = [
    "a01",
    "c01",
    "b01",
    "a02",
    "c02",
    "b02",
    "a03",
    "c03",
    "b03",
    "a04",
    "c04",
    "b04",
    "a05",
    "c05",
    "a06",
    "c06",
    "a07",
    "c07",
    "a08",
    "c08",
    "a09",
    "a10",
    "a11",
    "a12",
    "a13",
    "a14",
    "a15",
    "a16",
]


test_files = ["b05", "c09", "c10", "a17", "a18", "a19", "a20"]

# assign directory
directory = "./apnea-ecg-database-1.0.0/"

out_directory_train = "./Spectrogram_Hann/Train/"
out_directory_test = "./Spectrogram_Hann/Test/"

This function generates the spectrogram images in format ".png" to intended directory with a csv file that maps filenames to annotation(apnea or not)

In [6]:
# return signals segments in forms of array with the associated label
def create_spect_images(source_directory, files, out_directory, csv_path):


    splitted_out_directory = out_directory.split('/')
    os.mkdir(splitted_out_directory[1])
    os.mkdir( splitted_out_directory[1] + '/' + splitted_out_directory[2] )
    spc_labels_file = open(out_directory + csv_path, "a")
    spc_labels_file.write("image,label")

    plt.ioff()
    for filename in files:
        i = 0
        path = source_directory + filename

        print(filename)
        record = wfdb.rdrecord(path)
        apnea = wfdb.rdann(path, "apn")
        ecg = record.p_signal
        ecg = ecg.reshape(len(ecg))
        ecg = bandpass_signal(ecg, LOWER_BOUND_FILTER, UPPER_BOUND_FILTER)

        length = min(
            len(apnea.symbol) - 1, int((len(ecg) - NB_S_PER_HALF_MIN) / NB_S_PER_MIN)
        )

        for j in range(length):
            label = apnea.symbol[j + 1]
            segment = ecg[
                NB_S_PER_HALF_MIN
                + (NB_S_PER_MIN * j) : NB_S_PER_HALF_MIN
                + (NB_S_PER_MIN * (j + 1))
            ]
            # print(segment)

            # save spectrogram plot
            spc_file_name = "spc_" + filename + "_" + str(i) + ".png"
            f, t, Sxx = signal.spectrogram(
                segment, SAMPLING_FREQUENCY, window=("hann"), nperseg=128, noverlap=64
            )
            plt.pcolormesh(t, f, Sxx, shading="gouraud")
            # x-axis: frequency [Hz]
            # y-axis: time [sec]
            plt.axis("off")
            plt.savefig(
                out_directory + spc_file_name, bbox_inches="tight", pad_inches=0.0
            )
            plt.close()
            # add line to csv
            spc_labels_file.write(spc_file_name + "," + label + "\n")
            # increment 1
            i += 1

    spc_labels_file.close()

In [5]:
create_spect_images(
    directory, train_files, out_directory_train, "spc_train_labels.csv"
)
create_spect_images(directory, test_files, out_directory_test, "spc_test_labels.csv")

a01


KeyboardInterrupt: 