In [1]:
import numpy as np
import math
from fractions import Fraction
import itertools
from biotuner_utils import *
from biotuner_offline import *
import matplotlib.pyplot as plt
from numpy import array, zeros, ones, arange, log2, sqrt, diff, concatenate
import emd
from PyEMD import EMD, EEMD
from scipy.signal import butter, lfilter
import colorednoise as cn
from biotuner import *
from biotuner2d import *
import mne

## Load dataset

In [2]:

#path = 'D:/Science/EEG_data/'
path = 'C:/Users/Dell/GitHub/CoCoBrainChannel/'
epochs = mne.read_epochs(path+'pareidolia_run2.fif')
#epochs = mne.read_epochs('C:/Users/Antoine/github/Data_EEG/pareidolia_run1.fif')
epochs = epochs.apply_baseline((-1.5, -0.1))
#epochs = epochs.crop(0.5, 7.5)
epochs_data = epochs.get_data()

Reading C:/Users/Dell/GitHub/CoCoBrainChannel/pareidolia_run2.fif ...


  epochs = mne.read_epochs(path+'pareidolia_run2.fif')


    Found the data of interest:
        t =   -1500.00 ...    8000.00 ms
        0 CTF compensation matrices available


  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring()
  ch_name = ch_name[:np.argmax(ch_name == b'')].tostring

104 matching events found
Applying baseline correction (mode: mean)
Not setting metadata
0 projection items activated
Applying baseline correction (mode: mean)


## Initialize biotuner object and methods

In [26]:
class biotuner(object):
    
    '''Class used to derive peaks information, musical scales and related metrics from time series  
    
    Example of use:       
    biotuning = biotuner(sf = 1000)
    biotuning.peaks_extraction(data)
    biotuning.peaks_extension()
    biotuning.peaks_metrics()
    '''
    
    def __init__(self, sf, peaks_function = 'EEMD', precision = 0.2, compute_sub_ratios = False, 
                 n_harm = 10, harm_function = 'mult', extension_method = 'consonant_harmonic_fit',
                 surrogate = False, surrogate_type = None, low_cut = 0.5, high_cut = 150):
        '''Initializing sampling frequency'''
        self.sf = sf
        '''Initializing arguments for peak extraction
           peaks_function: method used to extract the peaks ['EEMD', 'EMD', 'HH1D_max', 'adapt', 'fixed']
           precision: precision of the peaks in Hz
           compute_sub_ratios: when set to True, include ratios < 1 in peaks_ratios attribute [True, False]'''
        self.peaks_function = peaks_function
        self.precision = precision
        self.compute_sub_ratios = compute_sub_ratios
        '''Initializing arguments for peaks metrics
           n_harm: number of harmonics to compute in harmonic_fit function
           harm_function: compute harmonics from iterative multiplication (x, 2x, 3x, ...nx) or division (x, x/2, x/3, ...x/n) ['mult', 'div']
           extension_method: ['harmonic_fit', 'consonant', 'multi_consonant', 'consonant_harmonic_fit', 'multi_consonant_harmonic_fit']'''
        self.n_harm = n_harm
        self.harm_function = harm_function
        self.extension_method = extension_method
        '''Initializing dictionary for scales metrics'''
        self.scale_metrics = {}
        '''Initializing surrogate parameters'''
        self.surrogate = surrogate
        self.surrogate_type = surrogate_type
        self.low_cut = low_cut
        self.high_cut = high_cut
    '''First method to use. Requires data as input argument
       Generates self.peaks and self.peaks_ratios attributes'''
    
    def peaks_extraction (self, data, peaks_function = None, FREQ_BANDS = None, precision = None, sf = None, max_freq = 80, compute_sub_ratios = None,
                         surrogate = None, surrogate_type = None):
        if sf == None:
            sf = self.sf
        if precision == None:
            precision = self.precision
        if peaks_function == None:
            peaks_function = self.peaks_function
        if compute_sub_ratios == None:
            compute_sub_ratios = self.compute_sub_ratios
        self.dim = np.ndim(data)
        if surrogate_type == None:
            surrogate_type = self.surrogate_type
        if surrogate == None:
            surrogate = self.surrogate
        
        if surrogate == True:
            conditions = ['og_data']+surrogate_type
            self.peaks, self.amps = compute_peaks_surrogates (data, conditions, peaks_function = peaks_function, precision = precision, sf = sf, 
                                                              max_freq = max_freq, low_cut = self.low_cut, high_cut = self.high_cut, save = False)
            #a, self.peaks_ratios = compute_peak_ratios(self.peaks, rebound = True, octave = 2, sub = compute_sub_ratios)
        if surrogate == False:
            self.peaks, self.amps = compute_peaks_matrices(data, peaks_function, precision, sf, max_freq, save = False, suffix = 'default')
        if np.ndim(data) == 3:
            peaks_ratios = function_3d(self.peaks, compute_peak_ratios)
            peaks_ratios = np.moveaxis(peaks_ratios, 2, 1)
            self.peaks_ratios = np.moveaxis(peaks_ratios, 0, 1)
        if np.ndim(data) == 2:
            peaks_ratios = function_3d(self.peaks, compute_peak_ratios)
            self.peaks_ratios = np.moveaxis(peaks_ratios, 0, 1)
    '''Generates self.extended_peaks and self.extended_peaks_ratios attributes'''
    
    def peaks_extension (self, peaks = None, n_harm = None, method = None, harm_function = 'mult', cons_limit = 0.1):
        if peaks == None:
            peaks = self.peaks
        if n_harm == None:
            n_harm = self.n_harm
        if method == None:
            method = self.extension_method
        if method == 'harmonic_fit':
            extended_peaks = harmonic_fit(peaks, self.n_harm, function = harm_function)
            self.extended_peaks = np.sort(list(self.peaks)+list(extended_peaks))
        if method == 'consonant':
            consonance, cons_pairs, cons_peaks, cons_metric = consonance_peaks (peaks, limit = cons_limit)
            self.extended_peaks = np.sort(np.round(cons_peaks, 3))
        if method == 'multi_consonant':
            consonance, cons_pairs, cons_peaks, cons_metric = consonance_peaks (peaks, limit = cons_limit)
            self.extended_peaks = np.sort(np.round(multi_consonance(cons_pairs, n_freqs = 10), 3))
        if method == 'consonant_harmonic_fit':
            extended_peaks = harmonic_fit(peaks, self.n_harm, function = harm_function)
            consonance, cons_pairs, cons_peaks, cons_metric = consonance_peaks (extended_peaks, limit = cons_limit)
            self.extended_peaks = np.sort(np.round(cons_peaks, 3))
        if method == 'multi_consonant_harmonic_fit':
            extended_peaks = harmonic_fit(peaks, self.n_harm, function = harm_function)
            consonance, cons_pairs, cons_peaks, cons_metric = consonance_peaks (extended_peaks, limit = cons_limit)
            self.extended_peaks = np.sort(np.round(multi_consonance(cons_pairs, n_freqs = 10), 3))
        self.extended_peaks_ratios = compute_peak_ratios(self.extended_peaks)
      
    def compute_peaks_metrics (self, n_harm = None):
        if n_harm == None:
            n_harm = self.n_harm
        peaks = self.peaks
        '''peaks = list(self.peaks)
        metrics = {'cons' : 0, 'euler' : 0, 'tenney': 0, 'harm_fit': 0}   
        metrics['harm_fit'] = len(harmonic_fit(peaks, n_harm = n_harm))
        a, b, c, metrics['cons'] = consonance_peaks (peaks, 0.1)
        peaks_euler = [int(round(num, 2)*1000) for num in peaks]
        metrics['euler'] = euler(*peaks_euler)
        metrics['tenney'] = tenneyHeight(peaks)
        metrics_list = []
        for value in metrics.values():
            metrics_list.append(value)
        self.peaks_metrics_list = metrics_list
        self.peaks_metrics = metrics'''
        self.peaks_metrics = peaks_to_metrics_matrices(peaks, n_harm)[1]

    def graph_metric (self, savefolder, peaks_metrics = None, labels = None):
        if peaks_metrics == None:
            peaks_metrics = self.peaks_metrics
        for metric, value in peaks_metrics.items(): 
            print(value)
            if self.surrogate_type != None:
                labs = ['eeg']+self.surrogate_type
            if labels != None:
                labs = labels
            graph_dist(value, metric = metric, ref = value[0], dimensions = [0], labs = labs, savefolder = savefolder, subject = '0', run = '0')
    
    '''Methods to compute scales from whether peaks or extended peaks'''
    
    def compute_diss_curve (self, input_type = 'peaks', denom=1000, max_ratio=2, consonance = True, method = 'min', plot = False):
        if input_type == 'peaks':
            peaks = self.peaks
            amps = self.amps
        if input_type == 'extended_peaks':
            peaks = self.extended_peaks
            amps = self.extended_amps
        peaks_total = []
        amps_total = []
        if np.ndim(peaks) == 3:
            diss_scale_tot = [] 
            diss_euler_tot = []
            diss_tot = []
            diss_harm_sim_tot = []
            for i in range(len(peaks)):
                #print(i)
                diss_scale_ = [] 
                diss_euler_ = []
                diss_ = []
                diss_harm_sim_ = []
                for j in range(len(peaks[0])):
                    #print(j)

                    peaks_ = peaks[i][j]
                    peaks_ = [p*128 for p in peaks_]
                    amps_ = np.interp(amps[i][j], (np.array(amps[i][j]).min(), np.array(amps[i][j]).max()), (0.8, 0.2))
                    #print('peaks', peaks_)
                    #print('amps', amps_)
                    intervals, diss_scale, diss_euler, diss, diss_harm_sim = diss_curve (peaks_, amps_, denom=denom, max_ratio=max_ratio, method = method, plot = plot)
                    diss_scale_.append(diss_scale) 
                    diss_euler_.append(diss_euler)
                    diss_.append(diss)
                    diss_harm_sim_.append(np.average(diss_harm_sim))
                diss_scale_tot.append(diss_scale_)
                diss_euler_tot.append(diss_euler_)
                diss_tot.append(diss_)
                diss_harm_sim_tot.append(diss_harm_sim_)
        self.scale_metrics['diss_euler'] = diss_euler_tot
        self.scale_metrics['dissonance'] = diss_tot
        self.scale_metrics['diss_harm_sim'] = diss_harm_sim_tot
        #self.scale_metrics['diss_n_steps'] = len(self.diss_scale)
        self.diss_scale = diss_scale_tot

    def compute_harmonic_entropy(self, input_type = 'peaks', res = 0.001, spread = 0.01, plot_entropy = True, plot_tenney = False):
        if input_type == 'peaks':
            ratios = self.peaks_ratios
        if input_type == 'extended_peaks':
            ratios = self.extended_peaks_ratios
        HE_scale, HE = harmonic_entropy(ratios, res = res, spread = spread, plot_entropy = plot_entropy, plot_tenney = plot_tenney)
        self.HE_scale = HE_scale[0]
        self.scale_metrics['HE'] = HE
        self.scale_metrics['HE_n_steps'] = len(self.HE_scale)  
    
    '''Generic method to fit all Biotuner methods'''
    
    def fit_all(self, data, compute_diss = True, compute_HE = True, compute_peaks_extension = True):
        biotuning = biotuner(self.sf, peaks_function = self.peaks_function, precision = self.precision, n_harm = self.n_harm)
        biotuning.peaks_extraction(data)
        biotuning.compute_peaks_metrics()
        if compute_diss == True:
            biotuning.compute_diss_curve(input_type = 'peaks', plot = False)
        if compute_peaks_extension == True:
            biotuning.peaks_extension(method = 'multi_consonant_harmonic_fit', harm_function = 'mult', cons_limit = 0.01)
        if compute_HE == True:
            biotuning.compute_harmonic_entropy(input_type = 'extended_peaks', plot_entropy = False)
        return biotuning
    
    def info(self, metrics=False, scales=False, whatever=False):
        if metrics == True:
            print('METRICS')
            print(vars(self))
        
        else:
            print(vars(self))
        return

In [5]:
def function_2d (ts_2d, function):
    output = []
    for i in range(len(ts_2d)):
        output.append(function(ts_2d[i]))
        
    out = np.array(output)
    return out

def function_3d (ts_3d, function):
    output = []
    for i in range(len(ts_3d)):
        output_ = []
        for j in range(len(ts_3d[i])):
            output_.append(function(ts_3d[i][j]))
        output.append(output_)
    out = np.array(output)
    return out

In [6]:
def EEG_harmonics_div(peaks, n_harmonics, n_oct_up = 0, mode = 'div'):
    """
    Natural sub-harmonics

    This function takes a list of frequency peaks as input and computes the desired number of harmonics
    with using division: 

    peaks: List (float)
        Peaks represent local maximum in a spectrum
    n_harmonics: int
        Number of harmonics to compute
    n_oct_up: int
        Defaults to 0. Corresponds to the number of octave the peaks are shifted 
    mode: str
        Defaults to 'div'.
        'div': x, x/2, x/3 ..., x/n
        'div_add': x, (x+x/2), (x+x/3), ... (x+x/n)
        'div_sub': x, (x-x/2), (x-x/3), ... (x-x/n)
    Returns
    -------
    div_harmonics: array
        (n_peaks, n_harmonics + 1)
    div_harmonics_bounded: array
        (n_peaks, n_harmonics + 1)
    """
    n_harmonics = n_harmonics + 2
    div_harmonics = []
    for p in peaks:
        harmonics = []
        p = p * (2**n_oct_up)
        i = 1
        harm_temp = p
        while i < n_harmonics:
            if mode == 'div':
                harm_temp = (p/i)
            if mode == 'div_add':
                harm_temp = p + (p/i)
            if mode == 'div_sub':
                harm_temp = p - (p/i)
            harmonics.append(harm_temp)
            i+=1
        div_harmonics.append(harmonics)
    div_harmonics = np.array(div_harmonics)
    div_harmonics_bounded = div_harmonics.copy()
    #Rebound the result between 1 and 2
    for i in range(len(div_harmonics_bounded)):
        for j in range(len(div_harmonics_bounded[i])):
            div_harmonics_bounded[i][j] = rebound(div_harmonics_bounded[i][j])
    return div_harmonics, div_harmonics_bounded

In [59]:
peaks = [4, 5.2, 13.3, 25, 43.2]
a, b = EEG_harmonics_div(peaks, 5, n_oct_up = 0, mode = 'div')
a

array([[ 4.        ,  2.        ,  1.33333333,  1.        ,  0.8       ,
         0.66666667],
       [ 5.2       ,  2.6       ,  1.73333333,  1.3       ,  1.04      ,
         0.86666667],
       [13.3       ,  6.65      ,  4.43333333,  3.325     ,  2.66      ,
         2.21666667],
       [25.        , 12.5       ,  8.33333333,  6.25      ,  5.        ,
         4.16666667],
       [43.2       , 21.6       , 14.4       , 10.8       ,  8.64      ,
         7.2       ]])

1.95

In [None]:
data = epochs_data[0][0:2]

biotuning.peaks[0].shape
out = function_3d(biotuning.peaks, compute_peak_ratios)
out = np.moveaxis(out, 2, 1)
out = np.moveaxis(out, 0, 1)[1]


In [None]:
peaks = biotuning.peaks
peaks = [p*128 for p in peaks[2][1]]

In [None]:
peaks = [4, 5.2, 13.3, 25, 43.2]
a = compute_peak_ratios(peaks, rebound = True, sub = True)
a

In [None]:
peaks = [4, 5.2, 13.3, 25, 43.2]
peaks = [p*3.19 for p in peaks]
eul = [int(p*100000) for p in peaks]
print(eul)
d = euler(*eul)
d

In [28]:
savefolder = r'C:\Users\Dell\GitHub\biotuner\graphs\test\\'
data = epochs_data[10][50:53] # Define data (single time series)
#data = np.stack((epochs_data[:,10], epochs_data[:,84]))

biotuning = biotuner(1000, peaks_function = 'EEMD', precision = 0.25, n_harm = 10, surrogate = True, surrogate_type = ['pink', 'white']) # Initialize biotuner object
biotuning.peaks_extraction(data)
biotuning.compute_peaks_metrics()
biotuning.graph_metric(savefolder, labels = None)
biotuning.compute_diss_curve(plot = False)
biotuning.peaks_extension(method = 'consonant_harmonic_fit', harm_function = 'mult', cons_limit = 0.01)
biotuning.compute_harmonic_entropy(input_type = 'extended_peaks', plot_entropy = True)

Condition ( 1 of 3 ): og_data
[ -79.1356412   -58.43607182  -54.77512406 ... -103.21266356 -105.65269598
 -109.17333649]
[ -62.00280984  -53.63337233  -50.58111768 ... -126.02348088 -129.54506071
 -136.29892205]
[ -54.6731026   -51.63946839  -50.85843349 ... -156.97337059 -157.13829866
 -158.58348601]
[ -65.7576993   -57.29529568  -55.37547221 ... -179.1929837  -174.84297661
 -177.81466727]
[ -68.21257397  -51.74263497  -51.17777917 ... -210.14541944 -212.57876667
 -216.72521362]
[ -56.96309925  -61.04775127  -51.74966411 ... -104.3970049  -101.38188165
 -120.24980739]
[ -66.60672842  -54.65632387  -53.87296112 ... -119.81527195 -123.61020214
 -128.0015353 ]
[ -66.18478279  -52.66245975  -51.34420102 ... -149.29634245 -152.41220041
 -165.6070851 ]
[ -56.47414794  -54.33309038  -53.43327537 ... -180.72305374 -184.00163605
 -187.74949464]
[ -72.71636862  -56.02361873  -52.33029076 ... -207.67644644 -202.69336678
 -210.73529989]
[ -73.3823302   -57.92605822  -55.23474638 ... -102.51473642

  app.launch_new_instance()


[[147, 293, 96], [191, 266, 48], [91, 133, 85]]
[[9.496293600608407, 11.368489684445677, 10.587505778312206], [10.059221882580694, 9.181091238645838, 8.989299421581205], [8.894336250599824, 12.27781429606214, 8.703596395396154]]
[[8, 4, 4], [9, 11, 11], [6, 4, 5]]
[1600, 1890, 2000]
[1340, 1640, 1690, 2000]
[1310, 1470, 1670, 1880, 2000]
[1140, 1230, 1450, 1780, 1940, 2000]
[1430, 1830, 2000]
[1200, 1320, 1660, 2000]
[1330, 1530, 1870, 2000]
[1330, 1540, 1560, 1870, 2000]
[1500, 1740, 1930, 2000]
(3, 11, 3, 5)


ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

<Figure size 1008x720 with 0 Axes>

<Figure size 1008x720 with 0 Axes>

<Figure size 1008x720 with 0 Axes>

<Figure size 1008x720 with 0 Axes>

In [14]:
biotuning.peaks


array([[[ 3.  ,  4.25,  9.75, 17.  , 42.75],
        [ 3.  ,  6.25, 10.5 , 28.  , 42.25],
        [ 2.  ,  5.75,  8.25, 18.75, 48.5 ],
        [ 2.  ,  4.75,  9.  , 26.5 , 33.75],
        [ 2.25,  6.5 ,  9.5 , 21.5 , 37.25]],

       [[ 2.25,  4.25,  7.  , 15.  , 37.  ],
        [ 2.75,  4.5 , 10.25, 15.5 , 42.25],
        [ 2.  ,  4.  ,  7.5 , 24.75, 31.25],
        [ 2.75,  6.75, 10.  , 20.  , 30.  ],
        [ 2.25,  6.5 ,  9.5 , 21.25, 32.5 ]],

       [[ 3.25,  5.75,  8.25, 23.  , 36.25],
        [ 2.25,  5.25, 13.25, 21.75, 39.25],
        [ 2.  ,  6.25,  7.25, 25.75, 38.75],
        [ 2.75,  5.  , 11.  , 26.  , 43.25],
        [ 2.75,  5.5 ,  7.25, 14.25, 33.  ]]])

In [None]:
vars(biotuning)
