In [3]:
#@title <font color="\#8FBC8F">Google Drive mount

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
#@title <font color="\#8FBC8F">Imports

import os
import pdb
from tqdm import tqdm

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = [8, 8]
plt.rcParams['figure.dpi'] = 100


!pip install mne --upgrade --quiet
import mne

from mne.filter import notch_filter as notch
from mne.time_frequency import tfr_morlet as morl
from mne.time_frequency import tfr_array_morlet as amorl

from mne.decoding import UnsupervisedSpatialFilter
from sklearn.decomposition import PCA

from sklearn.model_selection import train_test_split as Split

print('[imports successfully loaded]')

[imports successfully loaded]


In [5]:
#@title <font color="\#8FBC8F"> Feature Manager Class

class WTFeatureManager:
    dt = 200#@param {type:'integer'} 
    sfreq = 250 # [Hz]
    f_bands = [0, 4, 8, 13, 30, 60, 80]
    f_names = ['Delta', 'Theta', 'Alpha', 'Beta', 'low-Gamma', 'high-Gamma']
       
    wt_frequencies = np.geomspace(1, 80)
    
    def __init__(self, data, ch_names=None):
        # data is an array - array([epochs, channels, frequencies, time(samles)])
        self.data = data
        self.n_epochs, self.n_channels, self.n_freq, self.n_samp = data.shape

        # channel names will be necesarry for importance retrieval
        self.ch_names = ch_names
      
        # this is the number of samples per bin
        self.dsamp = int(self.sfreq*self.dt*1e-3)

        # this is an iterator of t0 for each bin
        self.t_bins = np.arange(0, self.n_samp, self.dsamp)
        
        # this is a dict - {'f_band':[wt frequency indices of f_band]}
        self.f_idx = self._get_freq_indices()

        # number of bins in every wt
        self.bins_per_wt = len(self.f_names)*len(self.t_bins)
        
        # returns bin relevant data & metadata
        self.binerator = self._binerator()

        # this sets the wt to the first epoch and channel by default, but
        # it updates during the
        self.wt_bins = self._wt_2d_bin()

        # this is a list of tuples - each looks as follows:
        # [(epoch_number, channel_index, band_name, time_0_of_bin)]
        self.bin_metadata = []
        self.bin_features = {}

    def _get_freq_indices(self):
        """
        This function returns a dict of indices, where the keys are the 
        frequency band names, and the values are the ones you'll want to take 
        from the wavelet transform channel 2D array
        """
        f_idx = {}
        for bi,b in enumerate(self.f_bands):
            if bi == len(self.f_bands)-1: break
            f_idx[self.f_names[bi]] = []
            for fi, f in enumerate(self.wt_frequencies):
                if b<f<=self.f_bands[bi+1]: f_idx[self.f_names[bi]].append(fi)
        return f_idx

    def _binerator(self):
        """
        This is a generator that yields bin data and metadata
        Will need a reset for each epoch/channel
        """
        for band, i_freq in self.f_idx.items():
            for t0 in self.t_bins:
                yield (band, i_freq, t0)

    def _wt_2d_bin(self, epoch=0, channel=0):
        """
        This is a generator that stores metadata and yields wt_data_bin
        doesn't need to be reset for every epoch/channel
        """
        wt = self.data[epoch, channel]
        for band, i_f_list, t0 in self.binerator:
            self.bin_metadata.append((epoch, channel, 
                                      band, t0 * self.dt // self.dsamp))
            yield wt[i_f_list, t0:t0+self.dsamp]
    
    # features are declared here as a class variable since they're
    # not supposed to change when the right ones are selected
    feats = ['min','max','mean','std','median']

    def features_from_wt(self):
        """
        This function takes features from the curently selected wt_bins instance
        and stores them in the bin_features dict where the metadata for each bin
        is the key, and the features are the values

        To process a different wt_bins object, you must re initiate the wt_bins 
        object with correct epoch and channel numbers before running this function
        """
        for bin in self.wt_bins: 
            md = self.bin_metadata[-1]
            self.bin_features[md] = []

            # bin is of type np.ndarray
            # TODO: uncomment
            self.bin_features[md].append(bin.min())
            self.bin_features[md].append(bin.max())
            self.bin_features[md].append(bin.mean())
            self.bin_features[md].append(bin.std())
            self.bin_features[md].append(np.median(bin))

    def all_ep_ch_feature_extraction(self, norm=False):
        """
        This function will extract all of the features for a subject
        """
        for ep in tqdm(range(self.n_epochs), desc='Epoch:'):

            for ch in range(self.n_channels):

                self.binerator = self._binerator()
                self.wt_bins = self._wt_2d_bin(ep,ch)
                self.features_from_wt()
    
    def as_dataframe(self, norm=False):
        self.all_ep_ch_feature_extraction(norm)
        
        print('[Creating DataFrame]')
        df = pd.DataFrame(self.bin_features).T

        idx_names = ['Epoch', 'Channel_Number', 'Freq_Band', 'T0 [ms]']
        
        df.index.set_names(names=idx_names, inplace=True)
        
        df.rename(columns={k:v for k,v in zip(range(len(self.feats)),self.feats)}, inplace=True)
        df.reset_index(inplace=True)
        
        return df.pivot(idx_names[0],idx_names[1:],self.feats)

print('[Class WTFeatureManager successfully loaded]')

[Class WTFeatureManager successfully loaded]


In [6]:
#@title <font color="\#8FBC8F">File loading utility code

subject_index =  9#@param {type:'integer'}
DATA_DIR = r'/content/drive/MyDrive/Colab Notebooks/Project Domino/new Macros/'
subject_list = sorted([f for f in os.listdir(DATA_DIR) if 'sub' in f])

subject_path = DATA_DIR + f'{subject_list[subject_index]}/'
subject_files = os.listdir(subject_path)

if 'sub' not in locals() or sub != subject_list[subject_index]:
    sub = subject_list[subject_index] 

print(f'[Working on {sub}]')

[Working on sub-030]


In [7]:
#@title <font color="\#8FBC8F">Load Files
X_name = subject_path + f'{sub}_X.npy'
y_name = subject_path + f'{sub}_y.npy'
names_name = subject_path + f'{sub}_channel_names.npy'

_X = np.load(X_name)
_y = np.load(y_name)
ch_names = np.load(names_name)
print(f'[Loaded X, y and channel names data succesfully]')

[Loaded X, y and channel names data succesfully]


In [10]:
#@title <font color="\#8FBC8F">WT Feature Extraction

wtfm = WTFeatureManager(_X, ch_names)

print(f'[Extracting {wtfm.feats} from each bin]\n')
X = wtfm.as_dataframe()
y = pd.DataFrame(_y, columns=['label'])

print(f'\n[X and y loaded successfully]')

del _X, _y

Epoch::   0%|          | 0/86 [00:00<?, ?it/s]

[Extracting ['min', 'max', 'mean', 'std', 'median'] from each bin]



Epoch:: 100%|██████████| 86/86 [01:10<00:00,  1.22it/s]


[Creating DataFrame]

[X and y loaded successfully]


In [11]:
#@title <font color='darkgreen'>Pickle X and y - Utility
import datetime as dt

now = (str(dt.datetime.now())[:-7]).replace(' ', '_')
print(f'[now is {now}]')

pickle_path = f'/content/drive/My Drive/Colab Notebooks/Project Domino/Feature-Label matrices/{sub}/'

if not os.path.isdir(pickle_path): os.mkdir(pickle_path)
os.mkdir(pickle_path+now)

f_last = f'{sub}_last.txt'
with open(pickle_path+f_last, 'w') as f:
    f.write(now)


if 'label' in X.columns: X.drop('label', axis=1, inplace=True)

X.to_pickle(pickle_path + now + '/X.pickle')
y.to_pickle(pickle_path + now + '/y.pickle')
print('[Pickled X and y feature-label files]')

pd.DataFrame({'ch_names':ch_names}).to_csv(pickle_path + f'{sub} bipolar channel names.csv')
print('[Channel Names CSV saved]')

with open(pickle_path + now + '/log.txt', 'w') as f:
    f.write(
        f'''
subject number:     {sub}
bin features:       {WTFeatureManager.feats}
bin time:           {WTFeatureManager.dt} [ms]
        '''
    )

with open(pickle_path[:-1*(len(sub)+1)] + 'last.txt', 'w') as f:
    f.write(now + ' ' + sub)

# del _X,_y, X, y

[now is 2021-06-25_17:09:54]
[Pickled X and y feature-label files]
[Channel Names CSV saved]
