In [19]:
import numpy as np, scipy.signal as ss, cv2, pathlib
import os
import scipy
from pathlib import Path
import scipy.io as sio

In [10]:
from scipy.signal import correlate, hilbert, butter, firwin, filtfilt, lfilter, resample_poly

def butter_bandpass(lowcut, highcut, fs, order=2):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='bandpass')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=2):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

def DUS_filtering(DUS, fs):  
    #25-600Hz 4th order Butterworth band pass
    lowcut = 0.5
    highcut = 40
    DUS_f = butter_bandpass_filter(DUS, lowcut, highcut, fs, order=2)
    return DUS_f

def bandpass_filter(signal, lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    # Apply the filter
    filtered_signal = filtfilt(b, a, signal)
    return filtered_signal

def normalize_signal(signal):
    # Normalize the signal to the range [-1, 1]
    signal_min = np.min(signal)
    signal_max = np.max(signal)
    normalized_signal = 2 * (signal - signal_min) / (signal_max - signal_min) - 1
    return normalized_signal

In [7]:
os.getcwd()

'/home/tsu25/fECG_cGAN'

In [8]:
# Load the .mat file
mat_file_path = '/labs/katebilab/NinFEA_Data/ECG-Doppler/1.mat'
mat_data = scipy.io.loadmat(mat_file_path)

# Display the keys in the .mat file (variables stored in the file)
print("Keys in the .mat file:", mat_data.keys())

Keys in the .mat file: dict_keys(['__header__', '__version__', '__globals__', 'PORTI', 'fs'])


In [9]:
mat_data['PORTI']

array([[ 9.08900879e+03,  9.08879395e+03,  9.08879395e+03, ...,
         1.04339951e+04,  1.04322793e+04,  1.04301348e+04],
       [-2.83333057e+03, -2.83318774e+03, -2.83268701e+03, ...,
        -1.95609705e+03, -1.95802759e+03, -1.96174561e+03],
       [ 9.67280664e+03,  9.67273535e+03,  9.67530859e+03, ...,
         9.73930176e+03,  9.73894434e+03,  9.73851465e+03],
       ...,
       [ 1.72140656e+05,  1.72132062e+05,  1.72137797e+05, ...,
         1.75140406e+05,  1.75134688e+05,  1.75154719e+05],
       [ 9.00000000e+00,  1.10000000e+01,  1.30000000e+01, ...,
         3.90000000e+01,  4.10000000e+01,  4.30000000e+01],
       [ 6.00000000e+00,  6.00000000e+00,  6.00000000e+00, ...,
         6.00000000e+00,  6.00000000e+00,  6.00000000e+00]])

In [20]:
def ecg_to_spec(ecg1d, fs=1000, nperseg=512, noverlap=384,
                f_max=150, out_size=(256, 256)):
    f, t, Z = ss.stft(ecg1d, fs, 'hann', nperseg, noverlap)
    Z = np.abs(Z)[f <= f_max]
    Z = 20*np.log10(Z + 1e-8)
    Z = (Z - Z.min()) / (Z.ptp() + 1e-12)
    return (cv2.resize(Z, out_size[::-1]) * 255).astype(np.uint8)

def preprocess(sig, fs_in, fs_out=1000, bp=(0.5, 150)):
    b, a = ss.butter(4, [bp[0]/(fs_in/2), bp[1]/(fs_in/2)], 'bandpass')
    sig = ss.filtfilt(b, a, sig, axis=-1)
    if fs_in != fs_out:
        n = int(round(sig.shape[-1] * fs_out / fs_in))
        sig = ss.resample(sig, n, axis=-1)
    return fs_out, sig

def process_mat(mat_path: Path, out_root: Path,
                lead_idx: int = 0,      # choose 0-23 for abdominal channel
                win_sec: float = 5.0):
    mdict = sio.loadmat(mat_path, squeeze_me=True, struct_as_record=False)
    if 'PORTI' not in mdict:
        raise KeyError(f"'PORTI' not found in {mat_path}")
    fs_raw   = float(mdict.get('fs', 2048))          
    lead_raw = np.asarray(mdict['PORTI'], np.float64)[lead_idx]   # selected abdominal channel
    fs, lead = preprocess(lead_raw, fs_raw)          

    win = int(win_sec * fs)       # 5 000 samples @1 kHz
    hop = win                     # non-overlap
    rec = mat_path.stem
    out_dir = out_root / 'A' / 'test'
    out_dir.mkdir(parents=True, exist_ok=True)

    for start in range(0, len(lead) - win + 1, hop):
        seg = lead[start:start + win]
        png = ecg_to_spec(seg, fs)
        t0  = start // fs                         # start time [s]
        fname = f"{rec}_ch{lead_idx+1:02d}_t{t0:06d}.png"
        cv2.imwrite(str(out_dir / fname), png)

In [21]:
DATA_DIR = Path("/labs/katebilab/NinFEA_Data/ECG-Doppler").expanduser()   # folder with .mat files
OUT_DIR  = Path("datasets/ninfea")             # Pix2Pix dataroot
LEAD     = 7                                   # e.g. row 8 (0-based)

for f in DATA_DIR.glob("*.mat"):
    process_mat(f, OUT_DIR, lead_idx=LEAD)

In [15]:
# Alternative ECG processing with customized bandpass filters same as in NInFEA_processing

def preprocess_with_customized_filter(sig, fs_in, fs_out=1000):
    # customzied bandpass filter
    sig_f = DUS_filtering(sig, fs_in)
    # optional resample (keeps GAN in 1 kHz domain)
    if fs_in != fs_out:
        n = int(round(sig_f.shape[-1] * fs_out / fs_in))
        sig_f = ss.resample(sig_f, n, axis=-1)                       
    # optional normalise to [-1,1]
    return fs_out, normalize_signal(sig_f)

def process_mat_customized_filter(mat_path: Path, out_root: Path):
    m   = sio.loadmat(mat_path, squeeze_me=True, struct_as_record=False) 
    fs  = float(m.get('fs', 2048))
    raw = np.asarray(m['PORTI'], dtype=np.float64)[:24]                  # rows 1-24 = abdominal
    fs, sigs = preprocess_with_user_filter(raw, fs)
    out_dir  = out_root / 'A' / 'test'; out_dir.mkdir(parents=True, exist_ok=True)
    for ch, ecg in enumerate(sigs, 1):
        cv2.imwrite(str(out_dir / f"{mat_path.stem}_ch{ch:02d}.png"),
                    ecg_to_spec(ecg, fs))