In [5]:
# import and config
import mne
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from zipfile import ZipFile
from pathlib import Path
from os import getcwd, listdir, path
import os 
import scipy.io
import warnings
from typing import Dict, Generic
from sklearn import preprocessing
from typing import TypeVar
from IPython.display import display, HTML
import EntropyHub as eh
import math
import antropy as ant
from functools import reduce
import numba as nb
import time

%matplotlib qt
%cd /home/matej/2-fer/uuzop/eeg-driver-fatigue-detection/

UNZIP_DATA = False
PATH_CWD = Path(getcwd()) 
PATH_DATA = Path(PATH_CWD, "data")
PATH_DATA_MAT = Path(PATH_DATA, "mat")
PATH_DATA_CNT = Path(PATH_DATA, "cnt")
PATH_ZIP_CNT = Path(PATH_DATA, "5202739.zip")
PATH_ZIP_MAT = Path(PATH_DATA, "5202751.zip")

FREQ = 1000
USER_COUNT = 12
EPOCH_SECONDS = 1

SAFETY_CUTOFF_SECONDS = 20
SIGNAL_FILE_DURATION_SECONDS = 600
SIGNAL_REQUESTED_SECONDS = 300

FATIGUE_STR = "fatigue"
NORMAL_STR = "normal"

ELECTRODE_NAMES=['HEOL', 'HEOR', 'FP1', 'FP2', 'VEOU', 'VEOL', 'F7', 'F3', 'FZ', 'F4', 'F8', 'FT7', 'FC3', 'FCZ', 'FC4', 'FT8', 'T3', 'C3', 'CZ', 'C4', 'T4', 'TP7', 'CP3', 'CPZ', 'CP4', 'TP8', 'A1', 'T5', 'P3', 'PZ', 'P4', 'T6', 'A2', 'O1', 'OZ', 'O2', 'FT9', 'FT10', 'PO1', 'PO2']

pd.set_option('display.max_columns', None)
warnings.filterwarnings('ignore')

/home/matej/2-fer/uuzop/eeg-driver-fatigue-detection


In [6]:
### SET DUMMIES FOR FASTER PRODUCTION
# SIGNAL_REQUESTED_SECONDS = 30
# USER_COUNT = 5
# ELECTRODE_NAMES = ELECTRODE_NAMES[2:15]

In [7]:
# Abstractions
T = TypeVar('T') #Any

In [8]:
# CNT - unzip and restructure data
if  UNZIP_DATA:
    with ZipFile(PATH_ZIP_CNT, 'r') as zip_ref:
        zip_ref.extractall(PATH_DATA_CNT)
    
    zips = [file for file in PATH_DATA_CNT.iterdir() if str(file).endswith(".zip")]
    
    for zip_item in zips:
        
        if not str(zip_item).endswith(".zip"):
            continue
        
        zip_ref = ZipFile(zip_item) # create zipfile object
        
        for cnt_file in zip_ref.namelist()[1:]: # ignore "9/" directory
        
            prefix_number = zip_item.stem # 9
            state_name = Path(cnt_file).stem.lower().split(' ')[0] # "Normal State" -> "normal"
            filename = Path(prefix_number + '_' + state_name + ".cnt")
            
            with open(Path(PATH_DATA_CNT, filename), "wb") as new_file:
                new_file.write(zip_ref.read(cnt_file))
                
    # Delete zips as they were temporary
    for zip_item in zips:
    	os.remove(zip_item)

In [9]:
# MAT - unzip data
if  UNZIP_DATA:
    with ZipFile(PATH_ZIP_MAT, 'r') as zip_ref:
        zip_ref.extractall(PATH_DATA_MAT)

In [10]:
def dict_apply_procedture(old_dict: Dict[str, T], procedure) -> Dict[str, T]:
    return {k: procedure(v) for k, v in old_dict.items()}
    
def min_max_dataframe(df: pd.DataFrame):
    return pd.DataFrame(min_max_scaler(df))

def standard_scale_dataframe(df: pd.DataFrame):
    return pd.DataFrame(standard_scaler(df))

standard_scaler=preprocessing.StandardScaler().fit_transform
standard_scaler_1d= lambda x: standard_scaler(x.reshape(-1, 1)).reshape(1,-1).squeeze()

min_max_scaler=preprocessing.MinMaxScaler().fit_transform
min_max_scaler_1d= lambda x: min_max_scaler(x.reshape(-1, 1)).reshape(1,-1).squeeze()

In [11]:
# Null and NaN are the same in Pandas :)

def isnull_any(df):
    return df.isnull().any()

def isnull_values_sum(df):
    return df.isnull().values.sum() > 0

def isnull_sum(df):
    return df.isnull().sum() > 0

def isnull_values_any(df):
    return df.isnull().values.any()

def rows_with_null(df):
    return df[df.isnull().any(axis=1)]

In [12]:
def default(x, r):
    assert isinstance(
        r, tuple), 'When Fx = "Default", r must be a two-element tuple.'
    y = np.exp(-(x**r[1])/r[0])
    return y


def FuzzEn(Sig,  m=2, tau=1, r=(.2, 2), Fx='default', Logx=np.exp(1)):
    N = Sig.shape[0]
    m = m+1
    Fun = default
    Sx = np.zeros((N, m))
    for k in range(m):
        Sx[:N-k*tau, k] = Sig[k*tau::]

    Ps1 = np.zeros(m)
    Ps2 = np.zeros(m-1)
    Ps1[0] = .5
    for k in range(2, m+1):
        N1 = N - k*tau
        N2 = N - (k-1)*tau
        T2 = Sx[:N2, :k] - \
            np.transpose(np.tile(np.mean(Sx[:N2, :k], axis=1), (k, 1)))
        d2 = np.zeros((N2-1, N2-1))

        for p in range(N2-1):
            Mu2 = np.max(
                np.abs(np.tile(T2[p, :], (N2-p-1, 1)) - T2[p+1:, :]), axis=1)
            d2[p, p:N2] = Fun(Mu2, r)

        d1 = d2[:N1-1, :N1-1]
        Ps1[k-1] = np.sum(d1)/(N1*(N1-1))
        Ps2[k-2] = np.sum(d2)/(N2*(N2-1))

    with np.errstate(divide='ignore', invalid='ignore'):
        Fuzz = (np.log(Ps1[:-1]) - np.log(Ps2))/np.log(Logx)

    return Fuzz, Ps1, Ps2

In [13]:
# In addition each BCIT dataset includes 4 additional EOG channels placed vertically above the right eye (veou), vertically below the right eye (veol), horizontally on the outside of the right eye (heor), and horizontally on the outside of the left eye (heol)
def get_tmin_tmax(start, duration, end_cutoff):
	return (start - end_cutoff, start + duration - end_cutoff)
	
to_numpy_reshape = lambda x: pd.DataFrame.to_numpy(x).reshape(-1,1)
fuzzy_entropy = lambda x : eh.FuzzEn(x,m=2, r=(np.std(x, ddof=0) * 0.2,1))[0][-1]
sample_entropy = lambda x: ant.sample_entropy(x)
# don't normalize because you have to normalze across all users and not based on 1 user and 1 sample
spectral_entropy = lambda x: ant.spectral_entropy(x,sf = FREQ, normalize=False)
approximate_entropy = lambda x: ant.app_entropy(x,order=2)



def pd_fuzzy_entropy(x: pd.Series, standardize_input=False):
	x = x.to_numpy()
	if standardize_input:
		x = standard_scaler_1d(x)
	return fuzzy_entropy(x)
# standardization doesnt affect result!
def pd_sample_entropy(x: pd.Series, standardize_input=False):
	x = x.to_numpy()
	if standardize_input:
		x = standard_scaler_1d(x)
	return sample_entropy(x)
# standardization doesnt affect result!
def pd_spectral_entropy(x: pd.Series, standardize_input=False):
	x = x.to_numpy()
	if standardize_input:
		x = standard_scaler_1d(x)
	return spectral_entropy(x)

# standardization doesnt affect result!
def pd_approximate_entropy(x: pd.Series, standardize_input=False):
	x = x.to_numpy()
	if standardize_input:
		x = min_max_scaler_1d(x)
	return approximate_entropy(x)


In [14]:
### SHOWCASE SIGNAL

# filename = str(Path(PATH_DATA_CNT, "2_fatigue.cnt"))
# eeg = mne.io.read_raw_cnt(filename,verbose=False)
# eeg_filtered = eeg.load_data(verbose=False).filter(l_freq=0.15, h_freq=40).notch_filter(50)
# signal_seconds_floored =  math.floor(len(eeg_filtered) / FREQ)
# tmin = signal_seconds_floored - SIGNAL_REQUESTED_SECONDS - SAFETY_CUTOFF_SECONDS
# tmax = signal_seconds_floored - SAFETY_CUTOFF_SECONDS
# eeg_filtered = eeg_filtered.crop(tmin=tmin, tmax=tmax)
# eeg.plot()

In [15]:
def get_cnt_filename(i_user: int, state:str):
	return "{i_user}_{state}.cnt".format(i_user=i_user, state=state)
	
# {(0,normal), (0,fatigue), (1,normal)...(12,fatigue)} 
user_state_pairs = [(i_user, state) for i_user in range(0, USER_COUNT) for state in [NORMAL_STR, FATIGUE_STR]]
arr_total = []
for pair in user_state_pairs:
	
	i_user, state = pair
	filename = str(Path(PATH_DATA_CNT, get_cnt_filename(i_user + 1,state)))
	
	# LOAD, FILTER, CROP AND EPOCH SIGNAL
	eeg = mne.io.read_raw_cnt(filename, eog=['HEOL', "HEOR", "VEOU", "VEOL"],verbose=False)
	eeg_filtered = eeg.load_data(verbose=False).notch_filter(50).filter(l_freq=0.15, h_freq=40)
	signal_seconds_floored =  math.floor(len(eeg_filtered) / FREQ)
	tmin = signal_seconds_floored - SIGNAL_REQUESTED_SECONDS - SAFETY_CUTOFF_SECONDS
	tmax = signal_seconds_floored - SAFETY_CUTOFF_SECONDS
	eeg_filtered = eeg_filtered.crop(tmin=tmin, tmax=tmax)	
	epochs = mne.make_fixed_length_epochs(eeg, duration=EPOCH_SECONDS, preload=False, verbose=False)

	# CREATE DF
	df: pd.DataFrame = epochs.to_data_frame(scalings=dict(eeg=1, mag=1, grad=1))
	df['condition'] = df['condition'].astype(int)
	df.drop('time',axis=1, inplace=True)

	arr_one_user_samples=[]
	for i_poch in range(0, SIGNAL_REQUESTED_SECONDS):

		# take epooch rows, divide electordes and info
		df_epoch = df.loc[df["epoch"]==i_poch]
		df_info: pd.DataFrame = df_epoch.iloc[0, ~df_epoch.columns.isin(ELECTRODE_NAMES)]
		df_electrodes: pd.DataFrame = df_epoch[ELECTRODE_NAMES]
		start = time.time()
		df_pe_en = df_electrodes.apply(func=lambda x: pd_spectral_entropy(x,standardize_input=True), axis=0)
		print("User", i_user, "df_pe_en", time.time() - start)
		start = time.time()
		df_ae_en = df_electrodes.apply(func=lambda x: pd_approximate_entropy(x,standardize_input=True), axis=0)
		print("User", i_user, "df_ae_en", time.time() - start)
		start = time.time()
		df_se_en = df_electrodes.apply(func=lambda x: pd_sample_entropy(x,standardize_input=True), axis=0)
		print("User", i_user, "df_se_en", time.time() - start)
		start = time.time()
		df_fe_en = df_electrodes.apply(func=lambda x: pd_fuzzy_entropy(x,standardize_input=True), axis=0)
		print("User", i_user, "df_fe_en", time.time() - start)
		arr_one_user_samples.append([*df_info, *df_pe_en, *df_ae_en, *df_se_en, *df_fe_en])
		break
	arr_total.append(arr_one_user_samples)


# test = to_numpy_reshape(epochs_df.loc[epochs_df["epoch"]==1,"F7"].to_numpy().reshape(-1,1))


User 0 df_pe_en 0.027446269989013672
User 0 df_ae_en 0.1805276870727539
User 0 df_se_en 0.19603991508483887
User 0 df_fe_en 4.626615285873413
User 0 df_pe_en 0.06715703010559082
User 0 df_ae_en 0.22101902961730957
User 0 df_se_en 0.25874900817871094
User 0 df_fe_en 5.288679361343384
User 1 df_pe_en 0.06428813934326172
User 1 df_ae_en 0.25064969062805176
User 1 df_se_en 0.37012290954589844


In [None]:

# ant gives correct results !
# nk.entropy_fuzzy over eh.FuzzEn
# eh.FuzzEn's r are parameters of fuzzy function and not tolerence levels?





