In [4]:
import numpy as np
import os
import h5py
import pyedflib
from tqdm import tqdm
import pandas as pd
from scipy.signal import butter, filtfilt, iirnotch

In [14]:
source_dir = "/Users/folasewaabdulsalam/Seizure_Onset/Dataset"
output_dir = "/Users/folasewaabdulsalam/Seizure_Onset/h5_files"

def convert_edf_to_h5(source_dir, output_dir):
    """
    Convert .edf files to .h5 format
    """

    os.makedirs(output_dir, exist_ok=True)
    for patient_folder in tqdm(os.listdir(source_dir), desc="Processing patient data"):
        patient_path = os.path.join(source_dir, patient_folder)

        if not os.path.isdir(patient_path):
            continue

        patient_output_dir = os.path.join(output_dir, patient_folder)
        os.makedirs(patient_output_dir, exist_ok=True)

        edf_files  = [f for f in os.listdir(patient_path) if f.endswith(".edf")]
        h5_files = [f.replace(".edf", ".h5") for f in edf_files]
        already_processed = all(os.path.exists(os.path.join(patient_output_dir, h5_file)) for h5_file in h5_files)

        if already_processed:
            print(f"Skipping {patient_folder} (all sessions converted)")
            continue

        for edf_file, h5_file in zip(edf_files, h5_files):
            edf_path = os.path.join(patient_path, edf_file)
            h5_file_path = os.path.join(patient_output_dir, h5_file)

            if os.path.exists(h5_file_path):
                print(f"Skipping {edf_file} already converted")
                continue

            #extracting raw signals and metadata
            with pyedflib.EdfReader(edf_path) as f:
                signals = np.array([f.readSignal(i) for i in range(f.signals_in_file)])
                channels = f.getSignalLabels()
                sampling_rate = f.getSampleFrequency(0)
            
            with h5py.File(h5_file_path, "w") as h5_file:
                h5_file.create_dataset("data", data=signals, compression="gzip")
                h5_file.attrs["channels"] = channels
                h5_file.attrs["sampling_rate"] = sampling_rate

                seizure_file = edf_path + ".seizures"
                if os.path.exists(seizure_file):
                    with open(seizure_file, "r", encoding="ISO-8859-1", errors="ignore") as sf:
                        annotations = [line.replace("\x00", "").strip() for line in sf.readlines()]
                    h5_file.attrs["seizure_annotations"] = annotations

            print(f"Converted: {edf_file}")
    print("Yehhh!! Conversion Complete")


convert_edf_to_h5(source_dir, output_dir)    

Processing patient data: 100%|██████████| 8/8 [00:00<00:00, 3120.18it/s]

Skipping chb03 (all sessions converted)
Skipping chb04 (all sessions converted)
Skipping chb05 (all sessions converted)
Skipping chb02 (all sessions converted)
Skipping chb07 (all sessions converted)
Skipping chb01 (all sessions converted)
Skipping chb06 (all sessions converted)
Yehhh!! Conversion Complete





In [15]:
print("hello")

hello


Data Preprocessing Steps
Bandpass Filtering (0.5-25Hz)
Notch Filtering (50 - 60)

In [2]:
data_path = "/Users/folasewaabdulsalam/Seizure_Onset/h5_all_files"
output_path = "/Users/folasewaabdulsalam/Seizure_Onset/preprocessed_data"
sampling_rate = 256

def bandpass_filter(data, low_cut, high_cut, fs, order=4):
    """
    Apply a Butterworth bandpass filter.
    
    Returns:
    - filtered_data (ndarray): The bandpass filtered data.
    """
    nyquist = 0.5 * fs
    low = low_cut / nyquist
    high = high_cut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data, axis=1)

def notch_filter(data, notch_freq, fs, quality=30):
    """
    Apply a notch filter to remove powerline interference.
    
    Returns:
    - filtered_data (ndarray): The notch filtered data.
    """
    nyquist = 0.5 * fs
    b, a = iirnotch(notch_freq / nyquist, quality)
    return filtfilt(b, a, data, axis=1)

def preprocess_h5_files(data_path, output_path, low_cut=0.5, high_cut=50.0, notch_freq=50.0, fs=256, batch_size=10):
    """
    Preprocess .h5 files by applying bandpass and notch filters.

    """
    os.makedirs(output_path, exist_ok=True)

    # Get all .h5 files in the directory
    h5_files = [f for f in os.listdir(data_path) if f.endswith(".h5")]
    
    for i in tqdm(range(0, len(h5_files), batch_size), desc="Preprocessing .h5 Files"):
        batch_files = h5_files[i:i+batch_size]
        
        for h5_file in batch_files:
            file_path = os.path.join(data_path, h5_file)
            output_file_path = os.path.join(output_path, h5_file.replace(".h5", "_preprocessed.npy"))
            
            with h5py.File(file_path, "r") as f:
                # Extract signals
                signals = f["data"][:]
                
                # Apply bandpass filter
                filtered_signals = bandpass_filter(signals, low_cut, high_cut, fs)
                
                # Apply notch filter
                filtered_signals = notch_filter(filtered_signals, notch_freq, fs)
                
                # Save the preprocessed data
                np.save(output_file_path, filtered_signals)
    
    print("Preprocessing complete!")

# Run the preprocessingp
preprocess_h5_files(data_path, output_path, low_cut=0.5, high_cut=50.0, notch_freq=50.0, fs=sampling_rate, batch_size=10)


Preprocessing .h5 Files: 100%|██████████| 21/21 [11:26<00:00, 32.70s/it]

Preprocessing complete!





8 bank filterbank
2-second sliding window
feature extraction


In [5]:
preprocessed_path = "/Users/folasewaabdulsalam/Seizure_Onset/preprocessed_data"
feature_path = "/Users/folasewaabdulsalam/Seizure_Onset/feature_path"
window_size = 2
sampling_rate = 256
window_samples = window_size * sampling_rate

filter_bands = [
    (0.5, 3.625),
    (3.625, 6.75),
    (6.75, 9.875),
    (9.875, 13.0),
    (13.0, 16.125),
    (16.125, 19.25),
    (19.25, 22.375),
    (22.375, 25.0)
]


def extract_spectral_energy(data, fs=256):
    """
    Extracts spectral energy features for each band.

    """
    num_channels = data.shape[0]
    features = []

    for low_cut, high_cut in filter_bands:
        # Calculate the power in each band
        band_samples = data[:, int(low_cut * fs):int(high_cut * fs)]
        energy = np.sum(band_samples ** 2, axis=1)  # Shape: (channels,)
        features.append(energy)

    # Flatten to create the final feature vector (8 bands × 18 channels)
    return np.concatenate(features)


def process_and_save_features(preprocessed_path, feature_path):
    """
    Extract features from all preprocessed files.
    """
    os.makedirs(feature_path, exist_ok=True)

    # Get all preprocessed .npy files
    npy_files = [f for f in os.listdir(preprocessed_path) if f.endswith("_preprocessed.npy")]
    
    for npy_file in tqdm(npy_files, desc="Processing Features"):
        input_file = os.path.join(preprocessed_path, npy_file)
        output_file = os.path.join(feature_path, npy_file.replace("_preprocessed.npy", "_features.h5"))
        
        # Load the preprocessed data
        signals = np.load(input_file)  # Shape: (channels, samples)

        # Extract features
        features = extract_spectral_energy(signals, fs=sampling_rate)

        # Save the features in compressed HDF5 format
        with h5py.File(output_file, "w") as hf:
            hf.create_dataset("features", data=features, compression="gzip")
        
    print("Feature extraction complete!")

# Run the filterbank processing
process_and_save_features(preprocessed_path, feature_path)

Processing Features: 100%|██████████| 207/207 [00:22<00:00,  9.26it/s]

Feature extraction complete!



