In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)



import os

In [3]:
!pip install wfdb
!pip install mne



In [5]:
#import numpy as np
import matplotlib.pyplot as plt
#import pyedflib
import wfdb
import glob
import random
import gc
import mne
from scipy.signal import find_peaks
import re
import tqdm
import logging

In [7]:
ch_labels = ['FP1-F7', 'F7-T7', 'T7-P7', 'P7-O1', 'FP1-F3', 'F3-C3', 'C3-P3','P3-O1',
             'FP2-F4', 'F4-C4', 'C4-P4', 'P4-O2', 'FP2-F8', 'F8-T8', 'T8-P8', 'P8-O2',
             'FZ-CZ', 'CZ-PZ']

In [9]:
import glob
import os

# Correctly defining the path using raw string
path2pt = r"E:\chb-mit-scalp-eeg-database-1.0.0"

# Use os.path.join to build the path and ensure correct separators
folders = sorted(glob.glob(os.path.join(path2pt, '*/')))

# Extract patient folder names
n_patient = [os.path.basename(os.path.normpath(folder))[-2:] for folder in folders]

# Print the patient numbers
print(*n_patient)


01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24


In [11]:
random.seed(2023)

ratio_train = 0.8
train_patient_str = sorted(random.sample(n_patient, round(ratio_train*len(n_patient))))
test_patient_str = sorted([l for l in n_patient if l not in train_patient_str])
print('Train PT: ', *train_patient_str)
print('Test PT: ', *test_patient_str)

Train PT:  02 03 04 05 06 09 11 12 13 14 15 16 17 18 19 20 21 23 24
Test PT:  01 07 08 10 22


In [15]:
# file names for training and test data

files_train = []
for l in train_patient_str:
    files_train = files_train + glob.glob(path2pt+'/chb{}/*.edf'.format(l))

files_test = []
for l in test_patient_str:
    files_test = files_test + glob.glob(path2pt+'/chb{}/*.edf'.format(l))

In [17]:
len(files_train), len(files_test)

(549, 137)

In [19]:
mne.set_log_level(verbose='ERROR')


In [21]:
import numpy as np
import mne
import logging
import gc

# Set up logger for filtering process
logger = logging.getLogger(__name__)
fh = logging.FileHandler('filtering_process.log')
logger.addHandler(fh)

# Load the processed EEG signal samples and seizure labels
signals_file = r"C:\Users\KIIT\AD pROJECT eeg\signal_samples.npy"  # Corrected path
labels_file = r"C:\Users\KIIT\AD pROJECT eeg\is_sz.npy"  # Corrected path

# Load the signal data
array_signals = np.load(signals_file)
array_is_sz = np.load(labels_file)

# Define the functions for DC removal, notch filter, and broadband filter
def remove_dc_offset(data):
    """Removes DC offset (mean) from the EEG data."""
    return data - np.mean(data, axis=1, keepdims=True)

def apply_notch_filter(data, fs, notch_freq=50):
    """Applies a notch filter to remove power line noise (50Hz by default)."""
    # Ensure the data is float64 for compatibility with MNE functions
    data = data.astype(np.float64)
    return mne.filter.notch_filter(data, fs, freqs=[notch_freq])

def apply_broadband_filter(data, fs, low_freq=0.5, high_freq=40.0):
    """Applies a bandpass filter to the EEG signal to keep the relevant frequencies."""
    # Ensure the data is float64 for compatibility with MNE functions
    data = data.astype(np.float64)
    return mne.filter.filter_data(data, fs, low_freq, high_freq)

# Function to process and filter the signals
def process_and_filter_signals(array_signals, array_is_sz, fs, ch_labels):
    """
    Process and filter EEG signals by removing DC offset, applying notch filter,
    and applying broadband filter.

    :param array_signals: EEG signal data
    :param array_is_sz: Seizure labels (0 or 1)
    :param fs: Sampling frequency
    :param ch_labels: Channel labels
    :return: Processed signals and corresponding labels
    """
    processed_signals = np.zeros_like(array_signals)  # Initialize array for filtered signals
    
    # Loop over each signal and apply the filters
    for i, signal in enumerate(array_signals):
        # Extract the signal window for each sample
        signal_window = signal
        
        # 1. Remove DC offset
        signal_window = remove_dc_offset(signal_window)
        
        # 2. Apply notch filter (50 Hz to remove power line noise)
        signal_window = apply_notch_filter(signal_window, fs)
        
        # 3. Apply broadband filter (0.5 Hz - 40 Hz bandpass)
        signal_window = apply_broadband_filter(signal_window, fs)
        
        # Store the processed signal
        processed_signals[i] = signal_window

        # Logging for progress
        if i % 100 == 0:
            logger.info(f"Processed {i}/{len(array_signals)} signals.")

    return processed_signals

# Sampling frequency (replace with the correct value)
fs = 1000  # Example sampling frequency, you can adjust based on your data

# Define channel labels (adjust according to your data)
ch_labels = ['FP1-F7', 'F7-T7', 'T7-P7', 'P7-O1', 'FP1-F3', 'F3-C3', 'C3-P3', 'P3-O1',
             'FP2-F4', 'F4-C4', 'C4-P4', 'P4-O2', 'FP2-F8', 'F8-T8', 'T8-P8', 'P8-O2',
             'FZ-CZ', 'CZ-PZ']

# Process and filter the signals
processed_signals = process_and_filter_signals(array_signals, array_is_sz, fs, ch_labels)

# Save the processed signals and labels to new files
np.save(r"F:\signals processed\processed_signal_samples_with_processed_used.npy", processed_signals)
np.save(r"F:\signals processed\processed_is_sz_with_processed.npy", array_is_sz)

# Log final processing
logger.info(f"Filtered and saved {len(processed_signals)} signals.")

# Clear memory after processing
gc.collect()

# Print confirmation
print(f"Filtered signals saved as processed_signal_samples_with_processed_used.npy and processed_is_sz_with_processed.npy.")


Filtered signals saved as processed_signal_samples_with_processed_used.npy and processed_is_sz_with_processed.npy.
