# Libraries

In [1]:
import pyedflib
#from pyedflib import highlevel
import plotly.express as px
from pathlib import Path
import os
import numpy as np
from scipy.signal import butter, filtfilt

# EDF wrapper

In [2]:
class EDFFile:
    def __init__(self, file: Path):
        self.file_path = file.absolute().as_posix()
        self.file_name = file.name
        self.file_handle = pyedflib.EdfReader(self.file_path)
        self.file_header = self.file_handle.getHeader()
        self.signal_headers = None
        self.num_signals = self.file_handle.signals_in_file
        self.sample_frequency = None
        self.signals = []
        self.annotations = []
        
        self.load_raw_signals()
        self.load_signal_headers()
        self.load_annotations()
        self.load_sample_frequency()
        
        self.close()
        
    def load_raw_signals(self):
        
        for i in range(self.num_signals):
            signal_buffer = self.file_handle.readSignal(i)
            self.signals.append(signal_buffer)
    
    def load_signal_headers(self):
        if self.file_handle is None:
            self.file_handle = pyedflib.EdfReader(self.file_path)

        self.signal_headers = self.file_handle.getSignalHeaders()

    def load_annotations(self):
        if self.file_handle is None:
            self.file_handle = pyedflib.EdfReader(self.file_path)
        
        self.annotations = self.file_handle.readAnnotations()

    def load_sample_frequency(self):
        if self.file_handle is None:
            self.file_handle = pyedflib.EdfReader(self.file_path)
        
        self.sample_frequency = self.file_handle.getSampleFrequencies()[0]
        
        return self.sample_frequency
    
    def save_edf_filtered(self, output_path: Path, signals: np.ndarray):
        # Create a new EDF file to write the modified signals into
        new_file = pyedflib.EdfWriter(output_path / f'{self.file_name}.edf', n_channels=self.num_signals, file_type=pyedflib.FILETYPE_EDFPLUS)
        
        new_file.setHeader(self.file_handle.getHeader())
        
        for channel, signal in enumerate(signals):
            # Write the modified signal to the new file
            new_file.writeSamples(channel, signal)
            new_file.setSignalHeader()
            
        new_file.close()
        
    def close(self):
        if self.file_handle is not None:
            self.file_handle.close()
            self.file_handle = None

In [3]:
def read_files_from_dir(directory: Path):
    extensions = ["edf", "bdf"]

    return [Path(file) for file in os.scandir(directory) 
            if file.is_file() and file.name.endswith(tuple(extensions))]

In [4]:
def save_filtered_file(file: Path, filtered_signals: np.ndarray, output_path: str="data/filtered/"):
    original_file = pyedflib.EdfReader(file.absolute().as_posix())

    # Create a new EDF file to write the modified signals into
    new_file = pyedflib.EdfWriter(output_path + f'{file.stem}.bdf',
                                  n_channels=original_file.signals_in_file,
                                  file_type=pyedflib.FILETYPE_BDF)

    # Keeping both header and signal headers unmodified
    new_file.setHeader(original_file.getHeader())
    new_file.setSignalHeaders(original_file.getSignalHeaders())

    # Write the filtered signals to new file
    new_file.writeSamples(filtered_signals)

    new_file.close()
    original_file.close()

# Filter design

In [5]:
def apply_butterworth(signal:np.ndarray, fs:float, f_type:str="high", cutoff:int=15, order:int=3) -> np.ndarray:
    nyquist = 0.5 * fs
    normalized_cutoff = cutoff / nyquist
    b, a = butter(order, normalized_cutoff, btype=f_type, analog=False)
    
    return np.array(filtfilt(b, a, signal))

def apply_notch_filter(signal:np.ndarray, notch_freq: float, fs: float) -> np.ndarray:
    nyquist = 0.5 * fs
    notch_normalized = notch_freq / nyquist
    b, a = butter(2, [notch_normalized - 0.02, notch_normalized + 0.02], btype='bandstop', analog=False)
    
    return np.array(filtfilt(b, a, signal))

# Testing zone

In [6]:
directory = Path("data/edf/")
files = read_files_from_dir(directory)

## Save EDF file filtered/file.name

In [7]:
file = files[-2]

In [11]:
original_file = pyedflib.EdfReader(file.absolute().as_posix())
signals = [original_file.readSignal(i) for i in range(original_file.signals_in_file)] # Should be passed as parameter
filtered_signals = [apply_butterworth(signals[0], original_file.getSampleFrequencies()[0], "low"), signals[1], signals[2]]
original_file.close()

In [12]:
signals

[array([8.49616876, 8.51417533, 8.44415179, ..., 8.87725704, 8.89326089,
        8.90426093]),
 array([6.12751133, 6.12201817, 6.11863858, ..., 5.03062904, 5.03252983,
        5.03506541]),
 array([-3.33334525, -3.33335002, -3.33335479, ..., -3.33358248,
        -3.33357175, -3.33355506])]

In [13]:
filtered_signals

[array([8.60658443, 8.60670537, 8.60683136, ..., 8.88143085, 8.88143063,
        8.88143045]),
 array([6.12751133, 6.12201817, 6.11863858, ..., 5.03062904, 5.03252983,
        5.03506541]),
 array([-3.33334525, -3.33335002, -3.33335479, ..., -3.33358248,
        -3.33357175, -3.33355506])]

In [14]:
save_filtered_file(file, filtered_signals)

### Check if file has been saved properly

In [15]:
filtered_file = pyedflib.EdfReader("data/filtered/noisy_emg_powerline.bdf")

In [16]:
test_signals = [filtered_file.readSignal(i) for i in range(filtered_file.signals_in_file)]
test_signals

[array([8.60658637, 8.60670558, 8.60683373, ..., 8.88143235, 8.88143235,
        8.88143235]),
 array([6.12751133, 6.12201638, 6.11863679, ..., 5.03062904, 5.03252983,
        5.03506541]),
 array([-3.33334525, -3.33335002, -3.33335479, ..., -3.33358248,
        -3.33357175, -3.33355506])]

In [17]:
filtered_file.close()