Skip to content

Commit

Permalink
bugs resolved
Browse files Browse the repository at this point in the history
  • Loading branch information
AminAlam committed Mar 11, 2024
1 parent 925e801 commit 731e625
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 31 deletions.
62 changes: 62 additions & 0 deletions elecphys/conversion.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
import utils
import shutil
from tqdm import tqdm
import pandas as pd
import re
from io import StringIO


def convert_rhd_to_mat(
Expand Down Expand Up @@ -78,3 +81,62 @@ def convert_mat_to_npz(mat_file: str, output_npz_folder: str,
f'{ch_name}.npz'),
data=data_filtered,
fs=fs)


def convert_OpenBCI_csv_to_npz(file_path: str, output_npz_folder: str, notch_filter_freq: int) -> None:
""" Function that Converts OpenBCI CSV files to NPZ files
Parameters
----------
file_path: str
path to OpenBCI csv file
output_npz_folder: str
path to output npz folder
notch_filter_freq: int
frequency of notch filter
Returns
----------
"""

if not os.path.exists(output_npz_folder):
os.makedirs(output_npz_folder)
else:
Warning(f'{output_npz_folder} already exists. Files will be overwritten.')

# read openbci txt file, separate first lines which start with %

with open(file_path, 'r') as file:
lines = file.readlines()
data = []
header = []
for line in lines:
if line.startswith('%'):
header.append(line)
else:
data.append(line)

for header_line in header:
if 'Sample Rate' in header_line:
fs = int(re.findall(r'\d+', header_line)[0])
elif 'Number of channels' in header_line:
num_channels = int(re.findall(r'\d+', header_line)[0])
# convert data to pandas dataframe with columns as the first row
data = pd.read_csv(StringIO(''.join(data)), sep=',')

for ch_no in tqdm(range(num_channels)):
ch_name = f' EXG Channel {ch_no}'
data_ch = data[ch_name].values

if notch_filter_freq == 0:
data_filtered = data_ch
else:
data_filtered = preprocessing.apply_notch(
data_ch, {'Q': 60, 'fs': fs, 'f0': notch_filter_freq})
ch_name = f'Ch{ch_no+1}'
np.savez(
os.path.join(
output_npz_folder,
f'{ch_name}.npz'),
data=data_filtered,
fs=fs)
25 changes: 11 additions & 14 deletions elecphys/data_io.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import mat73
import numpy as np
import os
from typing import Union
import utils


def load_mat(mat_file) -> [np.ndarray, int]:
def load_mat(mat_file) -> Union[np.ndarray, int]:
""" Function that Loads MAT file
Parameters
Expand All @@ -25,7 +26,7 @@ def load_mat(mat_file) -> [np.ndarray, int]:
return data, fs


def load_npz(npz_file) -> [np.ndarray, int]:
def load_npz(npz_file) -> Union[np.ndarray, int]:
""" Function that Loads NPZ file
Parameters
Expand All @@ -43,8 +44,8 @@ def load_npz(npz_file) -> [np.ndarray, int]:
return data, fs


def load_all_npz_files(npz_folder: str, ignore_channels: [
list, str] = None, channels_list: [list, str] = None) -> [np.ndarray, int, list]:
def load_all_npz_files(npz_folder: str, ignore_channels: Union[
list, str] = None, channels_list: Union[list, str] = None) -> Union[np.ndarray, int, list]:
""" Function that Loads all NPZ files in a folder
Parameters
Expand All @@ -70,21 +71,18 @@ def load_all_npz_files(npz_folder: str, ignore_channels: [
if not file_name.endswith('.npz'):
files_list.remove(file_name)
files_list = utils.sort_file_names(files_list)
all_channels_in_folder = list(range(0, len(files_list)))
all_channels_in_folder = list(range(1, len(files_list) + 1))
channels_list = utils.convert_string_to_list(channels_list)
if channels_list is None:
channels_list = all_channels_in_folder
ignore_channels = utils.convert_string_to_list(ignore_channels)
if ignore_channels is not None:
ignore_channels = [i - 1 for i in ignore_channels]
else:
if ignore_channels is None:
ignore_channels = []
# all elements of channels_list that are not in all_channels_in_folder
invalid_channels = [channel for channel in all_channels_in_folder if channel not in channels_list]
if len(invalid_channels) > 0:
ignore_channels.extend(invalid_channels)
channels_map = all_channels_in_folder

channels_map_new = []
for channel in channels_map:
if channel not in ignore_channels:
Expand All @@ -93,10 +91,9 @@ def load_all_npz_files(npz_folder: str, ignore_channels: [

files_list_new = []
for indx, file_name in enumerate(files_list):
if indx not in ignore_channels:
if indx + 1 not in ignore_channels:
files_list_new.append(file_name)
files_list = files_list_new

num_channels = len(files_list)
ch_indx = 0
for npz_file in files_list:
Expand All @@ -111,7 +108,7 @@ def load_all_npz_files(npz_folder: str, ignore_channels: [
return data_all, fs, channels_map


def load_npz_stft(npz_file) -> [np.ndarray, np.ndarray, np.ndarray]:
def load_npz_stft(npz_file) -> Union[np.ndarray, np.ndarray, np.ndarray]:
""" Function that Loads NPZ file
Parameters
Expand All @@ -131,7 +128,7 @@ def load_npz_stft(npz_file) -> [np.ndarray, np.ndarray, np.ndarray]:
return f, t, Zxx


def load_npz_dft(npz_file) -> [np.ndarray, np.ndarray]:
def load_npz_dft(npz_file) -> Union[np.ndarray, np.ndarray]:
""" Function that Loads NPZ file
Parameters
Expand All @@ -150,7 +147,7 @@ def load_npz_dft(npz_file) -> [np.ndarray, np.ndarray]:
return f, Zxx


def load_npz_mvl(npz_file) -> [np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
def load_npz_mvl(npz_file) -> Union[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
""" Function that Loads NPZ file
Parameters
Expand Down
14 changes: 8 additions & 6 deletions elecphys/fourier_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from tqdm import tqdm
import json
import csv
from typing import Union

import utils
import cfc
Expand Down Expand Up @@ -84,7 +85,7 @@ def dft_numeric_output_from_npz(


def stft_from_array(signal_array, fs: int, window_size: float, overlap: float,
window_type: str = 'hann', nfft: int = None) -> [np.ndarray, np.ndarray, np.ndarray]:
window_type: str = 'hann', nfft: int = None) -> Union[np.ndarray, np.ndarray, np.ndarray]:
""" Computes STFT from 1D array
Parameters
Expand Down Expand Up @@ -136,7 +137,7 @@ def stft_from_array(signal_array, fs: int, window_size: float, overlap: float,


def dft_from_array(signal_array, fs: int,
nfft: int = None) -> [np.ndarray, np.ndarray]:
nfft: int = None) -> Union[np.ndarray, np.ndarray]:
""" Computes DFT from 1D array
Parameters
Expand Down Expand Up @@ -204,7 +205,7 @@ def butterworth_filtering_from_array(
elif _args['filter_type'] == 'BPF':
_args['freq_cutoff'] = utils.convert_string_to_list(
_args['freq_cutoff'])
_args['freq_cutoff'] = [int(i) for i in _args['freq_cutoff']]
_args['freq_cutoff'] = [i for i in _args['freq_cutoff']]
b, a = signal.butter(_args['filter_order'], [
_args['freq_cutoff'][0] / (fs / 2), _args['freq_cutoff'][1] / (fs / 2)], btype='bandpass')
else:
Expand Down Expand Up @@ -257,7 +258,7 @@ def butterworth_filtering_from_npz(


def calc_freq_response(
_args: dict) -> [np.ndarray, np.ndarray, np.ndarray, dict]:
_args: dict) -> Union[np.ndarray, np.ndarray, np.ndarray, dict]:
""" Calculates filter frequency response
Parameters
Expand Down Expand Up @@ -385,7 +386,7 @@ def calc_cfc_from_npz(input_npz_folder: str, output_npz_folder: str,

def freq_bands_power_over_time(
input_npz_folder: str,
freq_bands: [
freq_bands: Union[
tuple,
list] = None,
channels_list: str = None,
Expand Down Expand Up @@ -429,7 +430,6 @@ def freq_bands_power_over_time(
"""
channels_list = utils.convert_string_to_list(channels_list)
ignore_channels = utils.convert_string_to_list(ignore_channels)

data_all, fs, channels_map = data_io.load_all_npz_files(input_npz_folder, ignore_channels, channels_list)
# if freq_bands only has one list, we should make sure it is a list of lists
if len(freq_bands) == 2 and isinstance(freq_bands[0], int) and isinstance(freq_bands[1], int):
Expand Down Expand Up @@ -470,6 +470,8 @@ def freq_bands_power_over_time(
if 'csv' in output_csv_file:
output_csv_file = output_csv_file.replace('.csv', '')
# save to csv file
if not os.path.exists(os.path.dirname(output_csv_file)):
os.makedirs(os.path.dirname(output_csv_file))
with open(f'{output_csv_file}_{freq_band[0]}_{freq_band[1]}.csv', 'w', newline='') as csvfile:
csvwriter = csv.writer(csvfile)
csvwriter.writerow(['Channel', 'Time', 'Power'])
Expand Down
57 changes: 56 additions & 1 deletion elecphys/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,61 @@ def convert_mat_to_npz(ctx, mat_file: str, output_npz_folder: str = 'output_npz'
output_npz_folder,
notch_filter_freq)
print('--- Conversion complete.\n\n')


@cli.command('convert_OpenBCI_csv_to_npz', help='Converts OpenBCI CSV files to NPZ files')
@click.option('--file_path', '-f',
help='Path to the file containing OpenBCI CSV file', required=True, type=str)
@click.option('--output_npz_folder', '-o', help='Path to output npz folder',
required=True, type=str, default='output_npz', show_default=True)
@click.option('--sampling_rate', '-sr', help='Sampling rate in Hz',
required=True, type=int)
@click.option('--channels_list',
'-cl',
help='List of channels to use. If None, then all of the channels will be used. It should be a string of comma-separated channel numbers (e.g. --channels_list "[1,2,3]").',
required=False,
type=str,
default=None,
show_default=True)
@click.option('--ignore_channels',
'-ic',
help='List of channels to ignore. If None, then no channels will be ignored (e.g. --ignore_channels "[1,2,3]").',
required=False,
type=str,
default=None,
show_default=True)
@click.option('--notch_filter_freq', '-n', help='Notch filter frequency in Hz',
required=False, type=int, default=50, show_default=True)
@click.pass_context
@error_handler
def convert_OpenBCI_csv_to_npz(ctx, file_path: str, output_npz_folder: str = 'output_npz',
sampling_rate: int = 250, channels_list: str = None, ignore_channels: str = None,
notch_filter_freq: int = 50) -> None:
""" Converts OpenBCI CSV files to NPZ files
Parameters
----------
file_path: str
path to OpenBCI txt file
output_npz_folder: str
path to output npz folder. If the folder already exists, it will be overwritten. If not specified, the default value is 'output_npz'
channels_list: str
list of channels to use. If None, then all of the channels will be used. It should be a string of comma-separated channel numbers (e.g. "[1,2,3]"). If not specified, the default value is None
ignore_channels: str
list of channels to ignore. If None, then no channels will be ignored. It should be a string of comma-separated channel numbers (e.g. "[1,2,3]"). If not specified, the default value is None
notch_filter_freq: int
notch filter frequency in Hz. If not specified, the default value is 50. It should be 0 (no filtering), 50 (Hz), or 60 (Hz)
Returns
----------
"""

print('--- Converting OpenBCI CSV files to NPZ files...')
conversion.convert_OpenBCI_csv_to_npz(
file_path,
output_npz_folder,
notch_filter_freq)
print('--- Conversion complete.\n\n')
### Conversion ###


Expand Down Expand Up @@ -375,7 +430,7 @@ def frequncy_domain_filter(ctx, input_npz_folder: str, output_npz_folder: str =
required=False, type=float, default=0.5, show_default=True)
@click.option('--t_min', '-tmin', help='Start of time interval to plot',
required=False, type=float, default=None, show_default=True)
@click.option('--t_max', '-tmin', help='End of time interval to plot',
@click.option('--t_max', '-tmax', help='End of time interval to plot',
required=False, type=float, default=None, show_default=True)
@click.option('--output_csv_file',
'-o',
Expand Down
9 changes: 6 additions & 3 deletions elecphys/preprocessing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import numpy as np
from scipy import signal
from typing import Union
from tqdm import tqdm
import utils
import data_io
Expand All @@ -21,7 +22,8 @@ def apply_notch(_signal_chan: np.ndarray, _args: dict) -> np.ndarray:
_signal_chan: np.ndarray
signal channel with notch filter applied
"""
for f0 in np.arange(_args['f0'], 300, _args['f0']):
max_f0 = int(_args['fs'] / 2)
for f0 in np.arange(_args['f0'], max_f0, _args['f0']):
b_notch, a_notch = signal.iirnotch(f0, _args['Q'], _args['fs'])
_signal_chan = signal.filtfilt(b_notch, a_notch, _signal_chan)
return _signal_chan
Expand Down Expand Up @@ -132,7 +134,7 @@ def normalize(data: np.ndarray) -> np.ndarray:
return data_normalized


def re_reference_npz(input_npz_folder: str, output_npz_folder: str, ignore_channels: [
def re_reference_npz(input_npz_folder: str, output_npz_folder: str, ignore_channels: Union[
list, str] = None, rr_channel: int = None) -> None:
""" re-references NPZ files
Expand Down Expand Up @@ -163,7 +165,7 @@ def re_reference_npz(input_npz_folder: str, output_npz_folder: str, ignore_chann
data_all_rereferenced, fs, output_npz_folder)


def re_reference(data: np.ndarray, ignore_channels: [
def re_reference(data: np.ndarray, ignore_channels: Union[
list, str] = None, rr_channel: int = None) -> np.ndarray:
""" Average re-references data
Expand Down Expand Up @@ -199,4 +201,5 @@ def re_reference(data: np.ndarray, ignore_channels: [
reference = np.mean(data[channels_list, :], axis=0).reshape(1, -1)
data_rereferenced[channels_list, :] = data[channels_list,
:] - np.repeat(reference, len(channels_list), axis=0)
data_rereferenced = np.vstack((data_rereferenced, reference))
return data_rereferenced
2 changes: 1 addition & 1 deletion elecphys/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ def convert_string_to_list(string):
string = string.replace(']', '')
string = string.replace(' ', '')
string = string.split(',')
output = list(map(int, string))
output = list(map(float, string))
output = np.unique(output)
output = output.tolist()
return output
Expand Down
Loading

0 comments on commit 731e625

Please sign in to comment.