## Bibliotecas

In [None]:
import pandas as pd
import numpy as np
from scipy import signal
from math import prod
from scipy.signal import stft
import matplotlib.pyplot as plt
from matplotlib import rcParams, cm
from math import e

## Filtragem

In [None]:
# definições de filtros

def butter_bandpass(data, lowcut, highcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='bandpass')
    return signal.filtfilt(b, a, data)


def butter_lowpass(data, lowcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    b, a = signal.butter(order, low, btype='lowpass')
    return signal.filtfilt(b, a, data)


def butter_highpass(data, highcut, fs=200, order=4):
    nyq = fs * 0.5
    high = highcut / nyq
    b, a = signal.butter(order, high, btype='highpass')
    return signal.filtfilt(b, a, data)


def butter_notch(data, cutoff, var=1, fs=200, order=4):
    nyq = fs * 0.5
    low = (cutoff - var) / nyq
    high = (cutoff + var) / nyq
    b, a = signal.iirfilter(order, [low, high], btype='bandstop', ftype="butter")
    return signal.filtfilt(b, a, data)

## Carregar Arquivos

In [None]:
data = []
def CarregaSujeito(diretorio):
    data = np.load("datasets/topicos_cc/" + diretorio)
    data = np.transpose(data, (0, 2, 1))
    data_filtered = butter_notch(data, 60)
    data_filtered = butter_highpass(data_filtered, 5)
    data_filtered = butter_lowpass(data_filtered, 50)
    
    return data_filtered

diretorio = [ i.split("/")[-1] for i in glob('datasets/topicos_cc/s10*')]

for a in diretorio:
    data.append(CarregaSujeito(a))

## Concatenação dos Movimentos

In [None]:
X = np.concatenate((data), axis = 0)
X.shape

## Segmentação de Dados (janelamento)

In [None]:
data = X
step = 11.8
segment = 128
# data = X.reshape(24, 4, 1600)
# print('', data.shape)

n_win = int((data.shape[-1] - segment) / step) + 1
ids = np.arange(n_win) * int(step)

# Janelas do dado no dominio do tempo
chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)

# Janelas do dado no domínio da frequência
_, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=115)
chunks_freq = np.swapaxes(chunks_freq, 2, 3)

print('Formato (shape) dos dados depois da divisão de janelas')
print(f'Dominio do tempo: {chunks_time.shape} - (classes+ensaios, canais, janelas, linhas)')
print(f'Dominio da frequência:  {chunks_freq.shape} - (classes+ensaios, canais, janelas, linhas)')

## Extração e Seleção de Características

In [None]:
# funções auxiliares:

def PSD(w):
    ''' definição da função PSD para o sinal no domínio da frequência '''
    return np.abs(w) ** 2

def f_j(j, SampleRate, M):
    return (j * SampleRate) / (2 * M)

# funções de extração de características:

def var(x):
    return np.sum(x ** 2, axis=-1) / (np.prod(x.shape) - 1)

def rms(x):
    return np.sqrt(np.sum(np.abs(x) ** 2, axis=-1) / (np.prod(x.shape) - 1))

def WL(x):
    return np.sum(np.abs(np.diff(x)), axis = -1)

def fmd(w):
    return np.sum(PSD(w), axis=-1) / 2

def mmdf(w):
    return np.sum(np.abs(w), axis=-1) / 2

def WAMP(x, limiar):
    return np.sum( np.abs(np.diff(x)) > limiar, axis= -1)

def WL(x):
    return np.sum(np.abs(np.diff(x)), axis = -1)

#funções de extração de características:

def ZC_Add(data, th):
    
    somatorio = 0
    resultado = 0
    tamanho = len(data)
    
    for i in range(tamanho - 1):
        resultado1 = (data[i] * data[i+1])
        resultado2 = np.abs((data[i] - data[i+1]))
        
        if(resultado1 < 0) and (resultado2 >=  th):
            somatorio += 1
    return somatorio

def ZC(data, th):
    
    x,y,z = data.shape[:3]
    somatorio_final = []
    for i in range(x):
        somatorio_fx = []
        for j in range(y):
            somatorio_fy = []
            for k in range(z):
                somatorio_fz = ZC_Add(data[i][j][k], th)
                
                somatorio_fy.append(somatorio_fz)
            
            somatorio_fx.append(somatorio_fy)
        
        somatorio_final.append(somatorio_fx)
        
    return np.array(somatorio_final)

# funções auxiliares:

def f_j(j, SampleRate, M):
    return (j * SampleRate) / (2 * M)

#funções de extração de característica:

def FMN_Add(data):
    M = len(data)
    somatorio = 0
    denominador = np.sum(PSD(data), axis=-1)
    
    for j in range(M):
        somatorio += (f_j(j, 200, M) * PSD(data[j])) / denominador
    return somatorio


def FMN(data):
    x,y,z = data.shape[:3]
    somatorio_final = []
    
    for i in range(x):
        somatorio_fx = []
        for j in range(y):
            somatorio_fy = []
            for k in range(z):
                somatorio_fz = FMN_Add(data[i][j][k])
                
                somatorio_fy.append(somatorio_fz)
            
            somatorio_fx.append(somatorio_fy)
        
        somatorio_final.append(somatorio_fx)
        
    return np.array(somatorio_final)

# funções auxiliares:

def A_j(w):
    return np.abs(w)

#funções de extração de características:

def MMNF_Add(data):
    
    M = len(data)
    somatorio = 0
    denominador = np.sum(A_j(data), axis=-1)
    
    for j in range(M):
        somatorio += ( f_j(j, 200, M) * A_j(data[j])) / denominador
    return somatorio


def MMNF(data):
   
    x,y,z = data.shape[:3]
    somatorio_final = []
    
    for i in range(x):
        somatorio_fx = []
        for j in range(y):
            somatorio_fy = []
            for k in range(z):
                somatorio_fz = MMNF_Add(data[i][j][k])
                
                somatorio_fy.append(somatorio_fz)
            
            somatorio_fx.append(somatorio_fy)
        
        somatorio_final.append(somatorio_fx)
        
    return np.array(somatorio_final)

PontoMax = np.max(chunks_time)
PontoMax

Mediana = np.median(chunks_time)
Mediana

MeuLimiar = (PontoMax + Mediana) / 2
MeuLimiar

Media = np.mean(chunks_time)
Media

MeuLimiarNovo = (PontoMax + Media) / 2
MeuLimiarNovo


## Novas Extração e Classificação de Características

In [None]:
def LogD(data):
    from math import e
    N = np.prod(data.shape)
    
    return e ** ( np.sum(np.log10( np.abs(data) ), axis = -1) ) / N

def IEMG(data):
    # tempo
    return np.sum(A(data), axis=-1)

def DASDV(data):
    #tempo
    return np.sqrt( np.sum(np.diff(data) ** 2, axis = -1) / (np.prod(data.shape[:-1]) - 1) )

def TMX(x, n):
    N = np.prod(x.shape[:-1])
    return np.abs(np.sum(x ** n, axis = -1) / N)

## Implementação Vetor

In [None]:
final_data = list()

final_data.append(WAMP(chunks_time, Mediana))
final_data.append(var(chunks_time))
final_data.append(rms(chunks_time))
final_data.append(WL(chunks_time))
final_data.append(ZC(chunks_time, 0))

final_data.append(fmd(chunks_freq))
final_data.append(FMN(chunks_freq))
final_data.append(mmdf(chunks_freq))
final_data.append(MMNF(chunks_freq))


final = np.array(final_data)
final.shape

## Transpose

In [None]:
data = final.transpose(1, 3, 2, 0)
X = data.reshape(data.shape[0]*data.shape[1], data.shape[2]*data.shape[3])
X.shape

## Variance Treshold

In [None]:
from sklearn.feature_selection import VarianceThreshold
sel = VarianceThreshold(threshold = (0.1))
X_Variance = sel.fit_transform(X)

## Select KBest

In [None]:
from sklearn.feature_selection import SelectKBest, chi2
X_new = SelectKBest(chi2, k = 10).fit_transform(X, y)

In [None]:
# carrega um arquivo .csv para um pandas dataframe
t_t1 = pd.read_csv('./datasets/khushaba/EMG-S2/T-T1.csv', delimiter=',', header=None)

# converte o dataframe do pandas para um numpy array
t_t1 = t_t1.to_numpy()

# imprime o shape do numpy array
print(t_t1.shape)
# neste exemplo devemos ter um shape de (20000, 2), 20000 linhas e 2 colunas, onde cada coluna é um canal.

import numpy as np

classes = ['T-I','T-L','T-M','T-R','T-T','HC-','I-I','L-L','M-M','R-R']

# variável para armazenar os dados das classes
data = []

for classe in classes:
    # variável para armazear os dados dos ensaios
    trials = []
    for i in range(1, 7): # de 1 a 6 (Qt. de ensaios)
        # carrega o arquivo .csv para um pandas dataframe
        dataframe = pd.read_csv(f'./datasets/khushaba/EMG-S1/{classe}{i}.csv', delimiter=',', header=None)
        
        # converte os dados do um ensaio para numpy array e o adiciona na lista de ensaios
        trials.append(dataframe.to_numpy())
    
    # adiciona os ensaios de uma classe a lista de dados das classes
    data.append(trials)

# transforma os dados das classes em um numpy array
data = np.array(data)

# imprime o shape do numpy array
print(f'{data.shape} - (classes, ensaios, linhas, canais)')
# neste momento devemos ter um shape de (10, 6, 20000, 2), sendo 10 classes, 6 ensaios, 20000 linhas e 2 canais

# troca os eixos 2 e 3 do numpy array
data = data.swapaxes(2, 3)

# imprime o shape do numpy array
print(f'{data.shape} - (classes, ensaios, canais, linhas)')
# neste momento devemos ter um shape de (10, 6, 2, 2000), sendo 10 classes, 6 ensaios, 2 canais e 20000 linhas

import numpy as np

classes = ['T-I','T-L','T-M','T-R','T-T','HC-','I-I','L-L','M-M','R-R']

voluntarios = list (range(1, 11))

final = {}

for voluntario in voluntarios:
    # variável para armazenar os dados das classes
    data = []

    for classe in classes:
        # variável para armazear os dados dos ensaios
        trials = []
        for i in range(1, 7): # de 1 a 6 (Qt. de ensaios)
            # carrega o arquivo .csv para um pandas dataframe
            dataframe = pd.read_csv(f'./datasets/khushaba/EMG-S{voluntario}/{classe}{i}.csv', delimiter=',', header=None)

            # converte os dados do um ensaio para numpy array e o adiciona na lista de ensaios
            trials.append(dataframe.to_numpy())

        # adiciona os ensaios de uma classe a lista de dados das classes
        data.append(trials)

    # transforma os dados das classes em um numpy array
    data = np.array(data)

    # imprime o shape do numpy array
    print(f'{data.shape} - (classes, ensaios, linhas, canais)----- {voluntario}')
    # neste momento devemos ter um shape de (10, 6, 20000, 2), sendo 10 classes, 6 ensaios, 20000 linhas e 2 canais
    
    
    # troca os eixos 2 e 3 do numpy array
    data = data.swapaxes(2, 3)

    # imprime o shape do numpy array
    print(f'{data.shape} - (classes, ensaios, canais, linhas)')
    # neste momento devemos ter um shape de (10, 6, 2, 2000), sendo 10 classes, 6 ensaios, 2 canais e 20000 linhas
    
    
    chave = 'S'+ str(voluntario)
    final[chave] = data
# Salva o numpy array 'data' em './lib/data/converted/s1.npy'
np.save('./datasets/khushaba/s2', data)
# print(np.load('./datasets/khushaba/s2.npy'))

# Lembre-se de que o diretório deve existir, caso contrário uma exceção será lançada

def pegaDados(voluntario, classes):
    data = []
    for classe in classes:
        # variável para armazear os dados dos ensaios
        trials = []
        for i in range(1, 7): # de 1 a 6 (Qt. de ensaios)
            # carrega o arquivo .csv para um pandas dataframe
            dataframe = pd.read_csv(f'./datasets/khushaba/EMG-S{voluntario}/{classe}{i}.csv', delimiter=',', header=None)

            # converte os dados do um ensaio para numpy array e o adiciona na lista de ensaios
            trials.append(dataframe.to_numpy())

        # adiciona os ensaios de uma classe a lista de dados das classes
        data.append(trials)

    # transforma os dados das classes em um numpy array
    data = np.array(data)

    # imprime o shape do numpy array
    print(f'{data.shape} - (classes, ensaios, linhas, canais)----- {voluntario}')
    # neste momento devemos ter um shape de (10, 6, 20000, 2), sendo 10 classes, 6 ensaios, 20000 linhas e 2 canais
    
    
    # troca os eixos 2 e 3 do numpy array
    data = data.swapaxes(2, 3)

    # imprime o shape do numpy array
    print(f'{data.shape} - (classes, ensaios, canais, linhas)')
    # neste momento devemos ter um shape de (10, 6, 2, 2000), sendo 10 classes, 6 ensaios, 2 canais e 20000 linhas
    
    return data

import os
classes = ['T-I','T-L','T-M','T-R','T-T','HC-','I-I','L-L','M-M','R-R']

todosDados = {}

voluntarios = list (range(1, 11))
for voluntario in voluntarios:
    chave = 's'+ str(voluntario)
    caminho = "./datasets/khushaba/" + chave + ".npy"
    existe = os.path.isfile(caminho)
    print(existe)
    print(caminho)
    if(existe == True):
        todosDados[chave] = np.load(caminho)
    
    else:
        aux = pegaDados(voluntario, classes)
        np.save('./datasets/khushaba/s' + str(voluntario), aux)
        todosDados[chave] = aux
        
todosDados

%run "load_dataset.ipynb"

%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import rcParams, cm


# [classe 1, ensaio 1, canal 1], [classe 1, ensaio 1, canal 2]
d1 = data[0,0,0,:], data[0,0,1,:]
# [classe 1, ensaio 2, canal 1], [classe 1, ensaio 2, canal 2]
d2 = data[0,1,0,:], data[0,1,1,:]

rcParams['figure.figsize'] = [16., 10.]

x = np.linspace(0, 5, 20000)
fig, ax = plt.subplots(2, 1)

ax[0].plot(x, d1[0])
ax[0].plot(x, d1[1])

ax[1].plot(x, d2[0])
ax[1].plot(x, d2[1])

plt.show()

rcParams['figure.figsize'] = [18., 6.]

for trial in (0, 1):
    mov = 1
    plot_data = []
    for channel in range(2):
        # espaço de 4000 pontos entre 0 e 1
        for i, t in enumerate(np.linspace(0, 1, 4000)):
            plot_data.append([channel, t, data[mov][trial][channel][i]])

    plot_data = np.array(plot_data)
    x, y, z = plot_data[:,0], plot_data[:,1], plot_data[:,2]
    ax = plt.axes(projection ='3d')
    ax.set_title('Movimento {}'.format(mov + 1))
    ax.set_xlabel('Canais')
    ax.set_ylabel('Tempo (s)')
    ax.set_zlabel('Potência (mV)')
    ax.plot_trisurf(x, y, z, antialiased=True, cmap=cm.inferno, linewidth=1)
    plt.show()

# [classe 1, ensaio 1, canal 1], [classe 1, ensaio 1, canal 2]
d1 = data[0,0,0,:], data[0,0,1,:]
# [classe 3, ensaio 1, canal 1], [classe 3, ensaio 1, canal 2]
d2 = data[2,0,0,:], data[2,0,1,:]

rcParams['figure.figsize'] = [16., 10.]

x = np.linspace(0, 5, 20000)
fig, ax = plt.subplots(2, 1)

ax[0].plot(x, d1[0])
ax[0].plot(x, d1[1])

ax[1].plot(x, d2[0])
ax[1].plot(x, d2[1])

plt.show()

rcParams['figure.figsize'] = [18., 6.]

for mov in (0, 2):
    trial = 0
    plot_data = []
    for channel in range(2):
        # espaço de 4000 pontos entre 0 e 1
        for i, t in enumerate(np.linspace(0, 1, 4000)):
            plot_data.append([channel, t, data[mov][trial][channel][i]])

    plot_data = np.array(plot_data)
    x, y, z = plot_data[:,0], plot_data[:,1], plot_data[:,2]
    ax = plt.axes(projection ='3d')
    ax.set_title('Movimento {}'.format(mov + 1))
    ax.set_xlabel('Canais')
    ax.set_ylabel('Tempo (s)')
    ax.set_zlabel('Potência (mV)')
    ax.plot_trisurf(x, y, z, antialiased=True, cmap=cm.inferno, linewidth=1)
    plt.show()
    
%run "load_dataset.ipynb"

%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import rcParams
import numpy as np
from scipy import signal


# definições de filtros

def butter_bandpass(data, lowcut, highcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='bandpass')
    return signal.filtfilt(b, a, data)


def butter_lowpass(data, lowcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    b, a = signal.butter(order, low, btype='lowpass')
    return signal.filtfilt(b, a, data)


def butter_highpass(data, highcut, fs=200, order=4):
    nyq = fs * 0.5
    high = highcut / nyq
    b, a = signal.butter(order, high, btype='highpass')
    return signal.filtfilt(b, a, data)


def butter_notch(data, cutoff, var=1, fs=200, order=4):
    nyq = fs * 0.5
    low = (cutoff - var) / nyq
    high = (cutoff + var) / nyq
    b, a = signal.iirfilter(order, [low, high], btype='bandstop', ftype="butter")
    return signal.filtfilt(b, a, data)

data_filtered = butter_notch(data, 50)
data_filtered = butter_highpass(data_filtered, 5)
data_filtered = butter_lowpass(data_filtered, 50)

data_filtered.shape

import numpy as np

X = data_filtered.reshape(60, 2, 20000)
X.shape

from scipy.signal import stft

step = 470
segment = 1024
data = data_filtered.reshape(60, 2, 20000)
print('', data.shape)

n_win = int((data.shape[-1] - segment) / step) + 1
ids = np.arange(n_win) * step

# Janelas do dado no dominio do tempo
chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)

# Janelas do dado no domínio da frequência
_, _, chunks_freq = stft(data, fs=4000, nperseg=1024, noverlap=512)
chunks_freq = np.swapaxes(chunks_freq, 2, 3)

print('Formato (shape) dos dados depois da divisão de janelas')
print(f'Dominio do tempo: {chunks_time.shape} - (classes+ensaios, canais, janelas, linhas)')
print(f'Dominio da frequência:  {chunks_freq.shape} - (classes+ensaios, canais, janelas, linhas)')

%run preprocessing.ipynb

from math import prod

# funções auxiliares:

def PSD(w):
    ''' definição da função PSD para o sinal no domínio da frequência '''
    return np.abs(w) ** 2

def f_j(j, SampleRate, M):
    return (j * SampleRate) / (2 * M)



# funções de extração de características:

def var(x):
    return np.sum(x ** 2, axis=-1) / (np.prod(x.shape) - 1)

def rms(x):
    return np.sqrt(np.sum(np.abs(x) ** 2, axis=-1) / (np.prod(x.shape) - 1))

def WL(x):
    return np.sum(np.abs(np.diff(x)), axis = -1)

def fmd(w):
    return np.sum(PSD(w), axis=-1) / 2

def mmdf(w):
    return np.sum(np.abs(w), axis=-1) / 2

def WAMP(x, limiar):
    return np.sum( np.abs(np.diff(x)) > limiar, axis= -1)

def WL(x):
    return np.sum(np.abs(np.diff(x)), axis = -1)

#funções de extração de características:

def ZC_Add(data, th):
    
    somatorio = 0
    resultado = 0
    tamanho = len(data)
    
    for i in range(tamanho - 1):
        resultado1 = (data[i] * data[i+1])
        resultado2 = np.abs((data[i] - data[i+1]))
        
        if(resultado1 < 0) and (resultado2 >=  th):
            somatorio += 1
    return somatorio

def ZC(data, th):
    
    x,y,z = data.shape[:3]
    somatorio_final = []
    for i in range(x):
        somatorio_fx = []
        for j in range(y):
            somatorio_fy = []
            for k in range(z):
                somatorio_fz = ZC_Add(data[i][j][k], th)
                
                somatorio_fy.append(somatorio_fz)
            
            somatorio_fx.append(somatorio_fy)
        
        somatorio_final.append(somatorio_fx)
        
    return np.array(somatorio_final)

# funções auxiliares:

def f_j(j, SampleRate, M):
    return (j * SampleRate) / (2 * M)

#funções de extração de característica:

def FMN_Add(data):
    M = len(data)
    somatorio = 0
    denominador = np.sum(PSD(data), axis=-1)
    
    for j in range(M):
        somatorio += (f_j(j, 200, M) * PSD(data[j])) / denominador
    return somatorio


def FMN(data):
    x,y,z = data.shape[:3]
    somatorio_final = []
    
    for i in range(x):
        somatorio_fx = []
        for j in range(y):
            somatorio_fy = []
            for k in range(z):
                somatorio_fz = FMN_Add(data[i][j][k])
                
                somatorio_fy.append(somatorio_fz)
            
            somatorio_fx.append(somatorio_fy)
        
        somatorio_final.append(somatorio_fx)
        
    return np.array(somatorio_final)

# funções auxiliares:

def A_j(w):
    return np.abs(w)

#funções de extração de características:

def MMNF_Add(data):
    
    M = len(data)
    somatorio = 0
    denominador = np.sum(A_j(data), axis=-1)
    
    for j in range(M):
        somatorio += ( f_j(j, 200, M) * A_j(data[j])) / denominador
    return somatorio


def MMNF(data):
   
    x,y,z = data.shape[:3]
    somatorio_final = []
    
    for i in range(x):
        somatorio_fx = []
        for j in range(y):
            somatorio_fy = []
            for k in range(z):
                somatorio_fz = MMNF_Add(data[i][j][k])
                
                somatorio_fy.append(somatorio_fz)
            
            somatorio_fx.append(somatorio_fy)
        
        somatorio_final.append(somatorio_fx)
        
    return np.array(somatorio_final)

PontoMax = np.max(chunks_time)
PontoMax

Mediana = np.median(chunks_time)
Mediana

MeuLimiar = (PontoMax + Mediana) / 2
MeuLimiar

Media = np.mean(chunks_time)
Media

MeuLimiarNovo = (PontoMax + Media) / 2
MeuLimiarNovo

final_data = list()

final_data.append(WAMP(chunks_time, Mediana))
final_data.append(var(chunks_time))
final_data.append(rms(chunks_time))
final_data.append(WL(chunks_time))
final_data.append(ZC(chunks_time, 0))

final_data.append(fmd(chunks_freq))
final_data.append(FMN(chunks_freq))
final_data.append(mmdf(chunks_freq))
final_data.append(MMNF(chunks_freq))


final = np.array(final_data)
final.shape

data = final.transpose(1, 3, 2, 0)
X = data.reshape(data.shape[0]*data.shape[1], data.shape[2]*data.shape[3])
X.shape

from sklearn.feature_selection import VarianceThreshold
sel = VarianceThreshold(threshold = (0.1))
X_Variance = sel.fit_transform(X)

# criação dos rótulos
# 1,1,1,1,1,1,1,1,1,1,...,2,2,2,2,2,2,2,2,2,2,...,3,...

y = [[str(i)] * int(X.shape[0] / 10) for i in range(10)]
y = np.array(y).flatten()
print('Shape dos rótulos:', y.shape)

from sklearn.feature_selection import SelectKBest, chi2
X_new = SelectKBest(chi2, k = 10).fit_transform(X, y)

