This script converts files from other formats to edfs.
The script takes three arguments:
 - files: The path to the files to be converted. Can be a file or a folder. If a folder is given then it will search the folder for files that can be converted
 - output: The path to the output folder.
 - split: A boolean for whether to split the files into <24 hour, evenly sized chunks. E.g. a 60 hour file gets split into three 20 hour files.

In [None]:
import argparse
import pandas as pd
import pyedflib
import struct
import numpy as np
import wfdb
import soundfile as sf
import os

parser = argparse.ArgumentParser(description='Convert to EDF.')
parser.add_argument('--files', type=str, required=True, help='path to files')
parser.add_argument('--output', type=str, required=True,
                    help="path to output edfs")
parser.add_argument('--split', type=bool, default=False,
                    help="Whether to split into <24hr segments")
args = parser.parse_args()

All functions take a filepath as input and create a dict with:
 - sample_rate: The sample rate of the ecg
 - tracings: An m x n array, where m is the number of channels and n is the length of the ecg
 - lead_names: A list of strings that has the name of each channel.
 - dimensions: The dimensions of the ecg. Usually either mV or uV.

Most functions have hardcoded values that may need to be changed depending on how the given files are formatted.

Reads .csv files.
Assumes that a "Time (s)" column is given and uses that to calculate sample rate
Assumes that the remaining columns are leads
dimensions are given at the end of the column names

In [None]:
def read_csv(path):
    df = pd.read_csv(path)
    edf = {
        "sample_rate": int(1 / (df.loc[1, "Time (s)"] - df.loc[0, "Time (s)"])),
        "tracings": df[df.columns[1:]].to_numpy().T,
        "lead_names": df.columns[1:],
        "dimensions": [col[-3:-1] if col.endswith(("(uV)", "(mV)")) else None for col in df.columns[1:]]
    }
    if None in edf["dimensions"]:
        raise ValueError(f"Unable to recognize dimension in {path}")

    return edf

Reads .csv files.

In [None]:
def read_csv_2(path):
    df = pd.read_csv(path)
    edf = {
        "sample_rate": 128,
        "tracings": df[["value"]].to_numpy().T,
        "lead_names": ["value"],
        "dimensions": ["mV"]
    }
    if None in edf["dimensions"]:
        raise ValueError(f"Unable to recognize dimension in {path}")

    return edf

Reads .csv files.
Dimensions is just '?' because the ppg values were given without units

In [None]:
def read_ppg_csv(path):
    df = pd.read_csv(path)
    edf = {
        "sample_rate": 50,
        "tracings": np.array(df["ppg_green"])[np.newaxis] / 100,
        "lead_names": ["ppg_green"],
        "dimensions": ["?"]
    }

    print(edf["tracings"])
    print(np.nanmax(edf["tracings"]))
    print(np.nanmax(edf["tracings"]))

    return edf

Reads .dat files.
path is a path to a folder. Each file in the folder is a .dat file that has the values for a channel.
Some values are nan.

In [None]:
def read_dat(path):
    tracings = []
    paths = os.listdir(path)
    for filename in paths:
        with open(os.path.join(path, filename), "rb") as file:
            buffer = file.read()

        tracing = np.array(struct.unpack(
            '<'+'h'*(len(buffer)//2), buffer)) / 80.0
        tracing[tracing == 0x8000] = np.nan
        tracings.append(tracing)

    edf = {
        "sample_rate": 180,
        "tracings": np.array(tracings),
        "lead_names": list(map(str, range(len(tracings)))),
        "dimensions": ["mV"] * len(tracings)
    }

    return edf

Reads MIT format files.
To read an MIT file with wfdb the file extension needs to be removed.

In [None]:
def read_mit(path):
    if path.endswith(".dat"):
        path = path[:-4]

    tracings, header = wfdb.rdsamp(path)

    edf = {
        "sample_rate": header["fs"],
        "tracings": tracings.T,
        "lead_names": header["sig_name"],
        "dimensions": header["units"]
    }

    return edf

Reads .wav files.
This is hard coded to ignore the first signal, because the .wav file given had a non-ecg signal.
tracings are scaled to be in mV units.

In [None]:
def read_wav(path):
    tracings, samplerate = sf.read(path, dtype='float32')
    tracings = tracings.T[:1]
    tracings *= 1e5
    edf = {
        "sample_rate": samplerate,
        "tracings": tracings,
        "lead_names": list(map(str, range(len(tracings)))),
        "dimensions": ["mV"] * len(tracings)
    }

    return edf

Reads .parquet files.
Assumes a "time" column is given, and uses it to calculate the sample rate.

In [None]:
def read_parquet(path):
    df = pd.read_parquet(path)
    lead_names = df.columns[1:]
    tracings = df[lead_names].to_numpy().T

    edf = {
        "sample_rate": 1e6 / (df.loc[1, "time"] - df.loc[0, "time"]).microseconds,
        "tracings": tracings,
        "lead_names": lead_names,
        "dimensions": ["mV"] * len(tracings)
    }

    return edf

Reads .txt files.
The given format is values given in each line.

In [None]:
def read_ascii(path):
    f = open(path, "r").read()
    tracing = list(map(int, f.split("\n")))
    tracings = np.array(tracing)[np.newaxis]

    edf = {
        "sample_rate": 130,
        "tracings": tracings,
        "lead_names": ["1"],
        "dimensions": ["uV"]
    }

    return edf

reads .edf files.
This doesn't add any new information to the edf.
It is only here so if a folder is given as the input path, edf files are also included in the output.

In [None]:
def read_edf(path):
    with pyedflib.EdfReader(path) as edf_file:
        signal_headers = edf_file.getSignalHeaders()
        num_channels = len(edf_file.getSignalHeaders())
        sample_rate = edf_file.getSampleFrequencies()[0]
        tracings = []
        for i in range(num_channels):
            channel = np.array(edf_file.readSignal(i))
            tracings.append(channel)

    edf = {
        "sample_rate": sample_rate,
        "tracings": np.array(tracings),
        "lead_names": [header["label"] for header in signal_headers],
        "dimensions": [header["dimension"] for header in signal_headers]
    }

    return edf

Reads .txt files.
The given format is the same as a csv with spaces separating values.
It is assumed three leads are given.

In [None]:
def read_txt(path):
    df = pd.read_csv(path, sep=' ', names=[
                     'index', 'a', 'b', 'c'], header=None)
    edf = {
        "sample_rate": 500,
        "tracings": np.array(df[["a", "b", "c"]]).T,
        "lead_names": ["a", "b", "c"],
        "dimensions": ["uV", "uV", "uV"]
    }

    return edf

Reads .npy files

In [None]:
def read_npy(path):
    array = np.load(path)

    # Interpolates some missing values.
    interp = np.interp(array[:, 0], array[array[:, 2] != -
                       2147483648, 0], array[array[:, 2] != -2147483648, 2])
    tracings = np.expand_dims(interp, axis=0)

    edf = {
        "sample_rate": 250,
        "tracings": tracings,
        "lead_names": [""],
        "dimensions": ["uV"]
    }

    return edf

Writes an edf dict to an edf file.

In [None]:
def write_edf(edf, path):
    # Fixes the sample rate if it is not an integer
    if int(edf["sample_rate"]) != edf["sample_rate"]:
        new_value = np.round(edf["sample_rate"] * 12) / 12
        if edf["sample_rate"] != new_value:
            print(f"Rounded {edf['sample_rate']} to {new_value}")
            edf["sample_rate"] = new_value

    channel_info = {
        'sample_rate': edf["sample_rate"],
        'physical_max': np.nanmax(edf["tracings"]),
        'physical_min': np.nanmin(edf["tracings"])
    }

    if not path.endswith(".edf"):
        path = path + ".edf"

    with pyedflib.EdfWriter(path, len(edf["tracings"])) as edf_file:
        for i, (lead_name, dimension) in enumerate(zip(edf["lead_names"], edf["dimensions"])):
            channel_info['label'] = lead_name
            channel_info['dimension'] = dimension
            edf_file.setSignalHeader(i, channel_info)

        edf_file.writeSamples(edf["tracings"])

Splits the output edfs into evenly sized chunks, each less than 24 hours.
E.g. 60 hour ecg gets split into three 20 hour ecgs.

In [None]:
def split_edf(edf, path):
    n_parts = int(np.ceil(edf["tracings"].shape[1] /
                  (edf["sample_rate"] * 86400)))
    if n_parts > 1:
        tracings = edf["tracings"].copy()
        delimitation_indices = np.linspace(
            0, tracings.shape[1]-1, n_parts + 1).astype(int)
        for part_i, (start_i, end_i) in enumerate(zip(delimitation_indices[:-1], delimitation_indices[1:])):
            edf["tracings"] = tracings[:, start_i:end_i]
            part_path = path + f"_part_{part_i+1}"
            write_edf(edf, part_path)

    else:
        write_edf(edf, path)

convert_file reads a file, determines the file type, and writes the edf(s). If the given file doesn't have a supported file type then it does nothing.
convert_folder iterates through a folder and tries to convert files within. It will also search subfolders

In [None]:
def convert_file(filepath, output_filepath):
    if filepath.endswith(".csv"):
        edf = read_csv_2(filepath)

    elif filepath.endswith(".dat"):
        edf = read_mit(filepath)

    elif filepath.endswith(".wav"):
        edf = read_wav(filepath)

    elif filepath.endswith(".parquet"):
        edf = read_parquet(filepath)

    elif filepath.endswith(".edf"):
        edf = read_edf(filepath)

    elif filepath.endswith(".txt"):
        edf = read_txt(filepath)

    elif filepath.endswith(".npy"):
        edf = read_npy(filepath)

    else:
        return

    if args.split:
        split_edf(edf, output_filepath)

    else:
        write_edf(edf, output_filepath)


def convert_folder(path, output_path):
    file_ends_in_dat = [file.endswith(".dat") for file in os.listdir(path)]
    if len(file_ends_in_dat) > 0 and all(file_ends_in_dat):
        edf = read_dat(path)
        if args.split:
            split_edf(edf, output_path)

        else:
            write_edf(edf, output_path)
        return

    os.makedirs(output_path, exist_ok=True)
    for file in os.listdir(path):
        filepath = os.path.join(path, file)
        output_filepath = os.path.join(output_path, file)

        if os.path.isdir(filepath):
            convert_folder(filepath, output_filepath)

        else:
            convert_file(filepath, output_filepath)


def main():
    if os.path.isdir(args.files):
        convert_folder(args.files, args.output)

    else:
        convert_file(args.files, args.output)

In [None]:
main()