# Imports e Setup $\sqrt{}$
$\alpha := 5 \%$ !!!

In [4]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from numpy import random as rd
%matplotlib inline
plt.rcParams['figure.figsize'] = [20, 8]
plt.rcParams['figure.dpi'] = 100 # 200 e.g. is really fine, but slower
sns.set_theme(context="notebook")
from tqdm import tqdm

# scipy.fft é melhor !
# https://stackoverflow.com/questions/6363154/what-is-the-difference-between-numpy-fft-and-scipy-fftpack
from scipy.fft import fft
from scipy.fftpack.helper import fftfreq
# from numpy.fft import fft,fftfreq

from scipy.stats import f as FDIST
from scipy.stats.distributions import chi2

In [5]:
# Calcula valor crítico em ORD qualquer, calculando quantil 95%
def vc_pratico(ORD, alpha = 0.05):
    return np.quantile(a= ORD, q = 1-alpha)

# SFT: Spectral F-Test $\sqrt{}$

## Definição e Funções

<img src = "sft.png"/>

In [3]:
def vc_SFT(L, alpha = 0.05, VERBOSE = 0):
    # valor crítico teórico, dado tomando inversa da distribuição acumulada F
    # (Fisher-Snedecor)
    # com significancia 1-alpha (95% se não for alterada)
    vc = FDIST.ppf(1-alpha, 2,2*L) 
    
    if VERBOSE==1:
        print('Significância desejada: ',alpha*100,'\b%')
    if VERBOSE==2:
        print('Significância desejada: ',alpha*100,'\b%')
        print('Valor crítico SFT: ',vc)

    return vc
        #fce999 TODO: perguntar por que 2 e 2*L!

def ord_SFT(sinal, L, BIN):
    # L: tamanho das laterais
    SINAL = fft(sinal)

    central = round(BIN) # certifica inteiro (não float)
    lateralMenor = round(central-L/2)
    lateralMaior = round(central+L/2)+1       

    DEN =np.abs(SINAL[central])**2
    SINAL_lateral = np.array(list(SINAL[lateralMenor:central])+list(SINAL[central+1:lateralMaior])) 
    NUM = (1/L)*np.sum(np.abs(SINAL_lateral)**2)
    SFT = DEN/NUM
    
    return [SINAL,SFT]

## Single-shot

In [4]:
from tqdm import tqdm 

nSim = 1e4
L = 6
BIN = 30

vc = vc_SFT(L=L,alpha=0.02)
nd = 0
hist = []

for i in tqdm(range(1,round(nSim+1))):
    x = np.zeros((round(nSim),))

    x = rd.randn(round(nSim))
    [X, ORD]= ord_SFT(sinal = x, L = L, BIN = BIN)
    if ORD>vc: nd+=1


print('Taxa de FP: '+str(nd/nSim *100))

100%|██████████| 10000/10000 [00:03<00:00, 2660.08it/s]

Taxa de FP: 2.02





# CSM: Component Synchrony Measure $\sqrt{}$

## Definição e Funções: 

<img src = "csm.png"/>

In [5]:
def vc_CSM(M, alpha = 0.05, VERBOSE=0):
    vc = chi2.ppf(1-alpha,2)/(2*M)
    
    if VERBOSE==1:
        print('Significância desejada: ',alpha*100,'\b%')
    if VERBOSE==2:
        print('Significância desejada: ',alpha*100,'\b%')
        print('Valor crítico CSM: ',vc)

    return vc


def ord_CSM(sinal, tamanhoJanela, M):
    if len(sinal)-tamanhoJanela*M>=0:
        
        sinal = np.reshape(sinal[0:tamanhoJanela*M], (tamanhoJanela,M))


        FFT_SINAL = fft(sinal)

        CSM = (np.sum(np.cos(np.angle(FFT_SINAL)),axis=1)/M)**2 + (np.sum(np.sin(np.angle(FFT_SINAL)),axis=1)/M)**2

        return [FFT_SINAL,CSM]
    else:
        print('Erro no número de janelas', round(tamanhoJanela),'(ou amostras, M =', M,'\b) escolhido.')
        print('Tamanho do Sinal menos M*tamanhoJanela:', len(sinal)-M*tamanhoJanela)
        print('(Retorna 0)')
        return 0 

## Single-shot

In [6]:
from tqdm import tqdm 

nSim = round(1e4)

fs = 100 # frequencia de amostragem
tj = 100 # tamanho da janela (1 seg/janela, pra facilitar)
M = 100 # teste em 100 segundos
vc = vc_CSM(M)

N = round(M*tj)
BIN = 42
nd = 0
ndp = 0
hist = []


for i in tqdm(range(1,nSim+1)):
    x = np.zeros((N,))

    x = rd.randn(N)
    [X, ORD]= ord_CSM(sinal=x, tamanhoJanela=tj, M=M)
    # print(ORD)

    if ORD[BIN]>vc: nd+=1

    vcp = vc_pratico(ORD)
    if ORD[BIN]>vcp: ndp+=1

    hist.append(ORD)

vcp = vc_pratico(hist)

print('Taxa de FP: '+str(nd/nSim *100))


100%|██████████| 10000/10000 [00:13<00:00, 750.40it/s]

Taxa de FP: 7.870000000000001





### Gráfico de comparação entre VC Teórico e Prático

> ~ COMPARAÇÕES ~

In [7]:
print('\n ~ COMPARAÇÕES ~')
print('Valor critico pratico: ',vcp)
print('Valor critico teorico: ',vc)
print('Erro relativo:' , round((vcp-vc)/vc*100,ndigits=2),'\b%')

print('Taxa de FP (vc_teorico): '+str(round(nd/nSim *100,ndigits=2)),'\b%')
print('Taxa de FP (vc_pratico): '+str(round(ndp/nSim *100,ndigits=2)),'\b%')
# sns.lineplot(y= hist,x=range(1,nSim+1));


 ~ COMPARAÇÕES ~
Valor critico pratico:  0.03824475122598301
Valor critico teorico:  0.029957322735539894
Erro relativo: 27.66%
Taxa de FP (vc_teorico): 7.87%
Taxa de FP (vc_pratico): 4.58%


## Sequencial:

In [8]:
# Dois testes:
from tqdm import tqdm 

nSim = 1e4

fs = 126 
tj = 126 # cada janela um segundo 
alfa = 0.05

M1test = 31 # teste em 30 segundos 
M2test = 60 # teste em 60 segundos

vc1 = vc_CSM(M1test)
vc2 = vc_CSM(M2test)

Mmax = M2test; 
N = Mmax *tj;
BIN = 30
nd = 0
hist = []


for i in tqdm(range(1,round(nSim+1))):
    x = np.zeros((Mmax*tj,))

    x[0:M1test*tj] = rd.randn(M1test*tj)
    [X, ORD]= ord_CSM(sinal=x, tamanhoJanela=tj, M=M1test)

    if ORD[BIN]>vc1: nd+=1
    else:
        x[M1test*tj:tj*M2test+1] = rd.randn((M2test-M1test)*tj)
        [X, ORD]= ord_CSM(sinal=x, tamanhoJanela=tj, M=M2test)
        if ORD[BIN]>vc2: nd+=1


print('Taxa de FP: '+str(round(nd/nSim *100,ndigits=2)),'\b%')

100%|██████████| 10000/10000 [00:13<00:00, 717.42it/s]

Taxa de FP: 15.49%





# MSC: Magnitude-Squared Coherence $\sqrt{}$

## Definição e Funções

<img src = "msc.png"/>

In [8]:
def vc_MSC(M, alpha = 0.05, VERBOSE = 0):
    vc = 1-alpha**(1/(M-1))
    if VERBOSE==1:
        print('Significância desejada: ',alpha*100,'\b%')
    if VERBOSE==2:
        print('Significância desejada: ',alpha*100,'\b%')
        print('Valor crítico MSC: ',vc)
    return vc # valor crítico

def ord_MSC(sinal, tamanhoJanela, M):
    sinal = np.reshape(sinal[0:tamanhoJanela*M], (tamanhoJanela,M))

    SINAL = fft(sinal)
    
    MSC = np.abs(np.sum(SINAL,axis=1))**2 / (M*np.sum(np.abs(SINAL)**2,axis=1))

    return [SINAL,MSC]

## Single-shot:

In [10]:
from tqdm import tqdm 

nSim = round(1e4)

fs = 100 # frequencia de amostragem
tj = 100 # tamanho da janela (1 seg/janela, pra facilitar)
M = 301 # teste em 100 segundos
vc = vc_MSC(M)

N = round(M*tj)
BIN = 30
nd = 0
ndp = 0
hist = []


for i in tqdm(range(1,nSim+1)):
    x = np.zeros((N,))

    x = rd.randn(N)
    [X, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M)
    

    if ORD[BIN]>vc: nd+=1
    if ORD[BIN]>vc_pratico(ORD): ndp+=1

# Só por comparação e discussão:
print('\n ~ COMPARAÇÕES ~')
print('Valor critico pratico: ',vc_pratico(ORD))

print('Taxa de FP (vc_t): '+str(round(nd/nSim *100,ndigits=2)),'\b%')
print('Taxa de FP (vc_p): '+str(round(ndp/nSim *100,ndigits=2)),'\b%')

100%|██████████| 10000/10000 [00:17<00:00, 559.71it/s]


 ~ COMPARAÇÕES ~
Valor critico pratico:  0.01256237097860481
Taxa de FP (vc_t): 8.42%
Taxa de FP (vc_p): 5.02%





## Sequencial:

In [11]:
# Apenas 2 testes (exempo):
from tqdm import tqdm 

nSim = 1e4

fs = 126 
tj = 126 # cada janela um segundo 
alfa = 0.05

M1test = 31 # teste em 30 segundos 
M2test = 60 # teste em 60 segundos

vc1 = vc_MSC(M1test)
vc2 = vc_MSC(M2test)

Mmax = M2test; 
N = Mmax *tj
BIN = 30
nd = 0
hist = []


for i in tqdm(range(1,round(nSim+1))):
    x = np.zeros((Mmax*tj,))

    x[0:M1test*tj] = rd.randn(M1test*tj)
    [X, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M1test)

    if ORD[BIN]>vc1: nd+=1
    else:
        x[M1test*tj:tj*M2test+1] = rd.randn((M2test-M1test)*tj)
        [X, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M2test)
        if ORD[BIN]>vc2: nd+=1


print('Taxa de FP: '+str(nd/nSim *100),'\b%')

100%|██████████| 10000/10000 [00:05<00:00, 1774.38it/s]

Taxa de FP: 16.48%





In [12]:
# Sequencial generalizado:
from tqdm import tqdm 

nSim = round(1e4)

fs = 100 
tj = 100 # cada janela um segundo 
alfa = 0.05

M = 30 # teste em 30 segundos
Mstep = 10

vc = vc_MSC(M)

Mmax = 90; 
N = round(Mmax*tj)
BIN = 30
nd = 0
hist = []


for i in tqdm(range(1,nSim+1)):
    x = np.zeros((N,))
    x = rd.randn(N)

    for M_atual in range(M, Mmax, Mstep):
        [X, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M_atual)

        if ORD[BIN]>vc: nd+=1

print('Taxa de FP: '+str(nd/nSim *100),'\b%')

100%|██████████| 10000/10000 [00:13<00:00, 714.98it/s]

Taxa de FP: 18.029999999999998%





# Controle de significância (MSC) $\sqrt{}$

## Definindo:

In [26]:
# Usando sequencial generalizado, queremos FP= 5% no teste GERAL! Qual deve ser FPi (cada iteração)?

def corrige_alfaMSC(
    tj = 100, # cada janela um segundo 
    Mmin = 10,
    Mstep = 10,
    Mmax = 60,
    BIN = 7,
    alphaD = 5/100,  #fff significancia a atingir
    alphaInicial = -1,
    tx_apr = 0.1,    # taxa de aprendizado
    nSim = 1e3, 
    itMAX = 300,
    VERBOSE = 1):

    print('\nCalculando alpha corrigido para alphaD =', alphaD)

    N = round(Mmax*tj)  # tamanho máximo do sinal
    nSim = round(nSim)  # numero de simulações
    if alphaInicial<0: alphas = [alphaD/(Mmax/Mstep)] # primeiro "chute" = alpha Desejado
    alphaR = 1   # significancia REAL
    it = 1  # iteração atual

    while alphaR > alphaD and it <= itMAX:
        if it>1:
            # Se J = mse = e^2 (univariado), então:
            grad = 2*alphas[-1] - alphaD
            alphas.append(alphas[-1] +tx_apr*(grad))

        descricao = 'Iteração #'+str(it); it+=1; nd = 0; 
        
        if VERBOSE>=2:
            for _ in tqdm(range(1,nSim+1), leave=(VERBOSE>1), desc=descricao):
                x = rd.randn(N) # gera o "sinal" completo
                for M in range(Mmin, Mmax+Mstep,Mstep):
                    vc = vc_MSC(M,alpha=alphas[-1])            
                    [_, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M)
                    if ORD[BIN]>vc: nd+=1

        if VERBOSE==1:
            for _ in tqdm(range(1,nSim+1), leave=False):
                x = rd.randn(N) # gera o "sinal" completo
                for M in range(Mmin, Mmax+Mstep,Mstep):
                    vc = vc_MSC(M,alpha=alphas[-1])            
                    [_, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M)
                    if ORD[BIN]>vc: nd+=1

        else:
            for _ in range(1,nSim+1):
                x = rd.randn(N) # gera o "sinal" completo
                for M in range(Mmin, Mmax+Mstep,Mstep):
                    vc = vc_MSC(M,alpha=alphas[-1])            
                    [_, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M)
                    if ORD[BIN]>vc: nd+=1

        alphaR = nd/(nSim*(Mmax/Mstep))


        if VERBOSE==3:
            print('Significancia atingida:',round(alphaR*100,ndigits=3),'\b%')
            print('Alpha corrigido:', round(alphas[-1],ndigits=3))

    if VERBOSE==1:
        print('Significancia atingida:',round(alphaR*100,ndigits=3),'\b%')
        print('Alpha corrigido:', round(alphas[-1],ndigits=4))
    
    if it>=itMAX: 
        print('Iteração máxima (',itMAX,') atingida!')
        print('Significancia alcançada:',round(alphaR*100,ndigits=3),'\b%')
        print('Retornando último valor de alpha corrigido: ', round(alphas[-1],ndigits=4))
    
    return alphas[-1]

# Exemplo:
print(corrige_alfaMSC(VERBOSE=3))


Calculando alpha corrigido para alphaD = 0.05


Iteração #1: 100%|██████████| 1000/1000 [00:00<00:00, 1193.81it/s]


Significancia atingida: 6.167%
Alpha corrigido: 0.008


Iteração #2: 100%|██████████| 1000/1000 [00:00<00:00, 1282.24it/s]


Significancia atingida: 4.233%
Alpha corrigido: 0.005
0.004999999999999999


## Teste sequencial com controle de nível significância

In [30]:
nSim = round(1e4)
fs = 126 
tj = 100 # tempo de cada janela (um segundo no caso)
alpha=0.05
Mmax= 80
Mstep= 5
Mmin= 10
BIN = 7
N = round(Mmax*tj)  # tamanho máximo do sinal

alfa = corrige_alfaMSC(alphaD=alpha, Mmax= Mmax, Mstep=Mstep, Mmin= Mmin, BIN = BIN, VERBOSE=2) # significancia
nd = 0
numTestes=0

for _ in tqdm(range(1,nSim+1), desc = 'Simulando:'):
    x = rd.randn(N) # gera o "sinal" completo

    for M in range(Mmin, Mmax+Mstep,Mstep):
        vc = vc_MSC(M,alpha=alfa)            
        [_, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M)
        if ORD[BIN]>vc: nd+=1
        numTestes+=1

alphaR = nd/(numTestes)
print(numTestes-nSim*(Mmax/Mstep))
# for i in tqdm(range(1,round(nSim+1))):
#     x = np.zeros((Mmax*tj,))

#     x[0:M1test*tj] = rd.randn(M1test*tj)
#     [X, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M1test)

#     if ORD[BIN]>vc1: nd+=1
#     else:
#         x[M1test*tj:tj*M2test+1] = rd.randn((M2test-M1test)*tj)
#         [X, ORD]= ord_MSC(sinal=x, tamanhoJanela=tj, M=M2test)
#         if ORD[BIN]>vc2: nd+=1


print('Taxa de FP: '+str(nd/nSim *100),'\b%')


Calculando alpha corrigido para alphaD = 0.05


Iteração #1: 100%|██████████| 1000/1000 [00:02<00:00, 480.30it/s]
Simulando:: 100%|██████████| 10000/10000 [00:21<00:00, 475.82it/s]

-10000.0
Taxa de FP: 26.06%



