# import das bibliotecas

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
from matplotlib import rcParams
import numpy as np
from scipy import signal
from glob import glob
from scipy import stats

rcParams['figure.figsize'] = [16., 5.]

### Filtros

In [2]:
# definições de filtros

def butter_bandpass(data, lowcut, highcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    high = highcut / nyq
    b, a = signal.butter(order, [low, high], btype='bandpass')
    return signal.filtfilt(b, a, data)

def butter_lowpass(data, lowcut, fs=200, order=4):
    nyq = fs * 0.5
    low = lowcut / nyq
    b, a = signal.butter(order, low, btype='lowpass')
    return signal.filtfilt(b, a, data)

def butter_highpass(data, highcut, fs=200, order=4):
    nyq = fs * 0.5
    high = highcut / nyq
    b, a = signal.butter(order, high, btype='highpass')
    return signal.filtfilt(b, a, data)

def butter_notch(data, cutoff, var=1, fs=200, order=4):
    nyq = fs * 0.5
    low = (cutoff - var) / nyq
    high = (cutoff + var) / nyq
    b, a = signal.iirfilter(order, [low, high], btype='bandstop', ftype="butter")
    return signal.filtfilt(b, a, data)

### Carregando Dados

In [None]:
# def plotData(dirs):
#     data = np.load("datasets/topicos_cc/"+dirs)
#     data = np.transpose(data, (0, 2, 1))
#     print(data.shape)
#     data_filtered = butter_notch(data, 60)
#     data_filtered = butter_highpass(data_filtered, 5)
#     data_filtered = butter_lowpass(data_filtered, 50)
#     for i in range(data_filtered.shape[1]):
#         plt.plot(data_filtered[0,i,:])
#     plt.suptitle(dirs)
#     plt.show()


In [3]:
def loadSujeito(dirs):
    data = np.load("datasets/topicos_cc/"+dirs)
    data = np.transpose(data, (0, 2, 1))
    data_filtered = butter_notch(data, 60)
    data_filtered = butter_highpass(data_filtered, 5)
    data_filtered = butter_lowpass(data_filtered, 50)
    
    return data_filtered

In [4]:
dirs = [ i.split("/")[-1] for i in glob('datasets/topicos_cc/p1*')]
data = []
for d in dirs:
    data.append(loadSujeito(d))

In [5]:
data_array = np.array(data)

In [10]:
data_array.shape

(3, 8, 4, 1600)

### Visualizando

In [None]:
# for d in dirs:
#     plotData(d)

# Concatenando Trials x Movimentos

In [7]:
X_entrada = data_array[0]
X = []
X.append(data_array[1])
X.append(data_array[2])
X = np.array(X)

In [8]:
import numpy as np

#X = data.reshape(24, 4, 1600)
X = np.concatenate((np.array(X)), axis=0)

X.shape, X_entrada.shape

((16, 4, 1600), (8, 4, 1600))

## Segmentação dos dados

In [11]:
from scipy.signal import stft

def segmentacao(X):
    data = X
    step = 11.8
    segment = 128
    # data = X.reshape(24, 4, 1600)
    # print('', data.shape)

    n_win = int((data.shape[-1] - segment) / step) + 1
    ids = np.arange(n_win) * int(step)

    # Janelas do dado no dominio do tempo
    chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)

    # Janelas do dado no domínio da frequência
    _, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=115)
    chunks_freq = np.swapaxes(chunks_freq, 2, 3)

    print('Formato (shape) dos dados depois da divisão de janelas')
    print(f'Dominio do tempo: {chunks_time.shape} - (classes+ensaios, canais, janelas, linhas)')
    print(f'Dominio da frequência:  {chunks_freq.shape} - (classes+ensaios, canais, janelas, linhas)')
    return chunks_time, chunks_freq


In [12]:
chunks_time, chunks_freq = segmentacao(X)
print()
chunks_time_entrada, chunks_freq_entrada = segmentacao(X_entrada)

Formato (shape) dos dados depois da divisão de janelas
Dominio do tempo: (16, 4, 125, 128) - (classes+ensaios, canais, janelas, linhas)
Dominio da frequência:  (16, 4, 125, 65) - (classes+ensaios, canais, janelas, linhas)

Formato (shape) dos dados depois da divisão de janelas
Dominio do tempo: (8, 4, 125, 128) - (classes+ensaios, canais, janelas, linhas)
Dominio da frequência:  (8, 4, 125, 65) - (classes+ensaios, canais, janelas, linhas)


In [13]:
chunks_time.shape, chunks_freq.shape

((16, 4, 125, 128), (16, 4, 125, 65))

## Achar as Janelas
O mesmo que a função acima, mas generico

In [None]:
# from scipy.signal import stft

# def janela(overl=64):
    
#     for i in np.arange(1, 128, 0.1):
#         step = i
#         segment = 128
#         data = X.reshape(24, 4, 1600)
        
#         _, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=overl)
#         chunks_freq = np.swapaxes(chunks_freq, 2, 3)
#         window = chunks_freq.shape[2] 

#         n_win = int((data.shape[-1] - segment) / step) + 1
#         ids = np.arange(n_win) * int(step)

#         # Janelas do dado no dominio do tempo
#         chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)
#         time_window = chunks_time.shape[2]
            
#         if( time_window == window ):
#             return step
        
    
# step = janela(overl=64)
# print(step)

In [None]:
# all_steps = []
# for i in [0.5, 0.7, 0.8, 0.9 ]:
#     n_step = int(128*i)
#     print(n_step)
#     step = janela(overl=n_step)
#     all_steps.append(step)
    
# all_steps

In [None]:
# from scipy.signal import stft

# def time_freq(step):
    
#     segment = 128
#     data = X.reshape(24, 4, 1600)
#     print('', data.shape)

#     n_win = int((data.shape[-1] - segment) / step) + 1
#     ids = np.arange(n_win) * int(step)

#     # Janelas do dado no dominio do tempo
#     chunks_time = np.array([data[:,:,k:(k + segment)] for k in ids]).transpose(1, 2, 0, 3)

#     # Janelas do dado no domínio da frequência
#     _, _, chunks_freq = stft(data, fs=200, nperseg=128, noverlap=64)
#     chunks_freq = np.swapaxes(chunks_freq, 2, 3)

#     print('Formato (shape) dos dados depois da divisão de janelas')
#     print(f'Dominio do tempo: {chunks_time.shape} - (classes+ensaios, canais, janelas, linhas)')
#     print(f'Dominio da frequência:  {chunks_freq.shape} - (classes+ensaios, canais, janelas, linhas)')
#     return chunks_time, chunks_freq

In [None]:
# chunks_time , chunks_freq = time_freq(11.8)

## Extração e seleção de características

In [14]:
def getZC(data, th):
    tamanho = len(data)
    somatoria = 0
    
    for i in range(tamanho-1):
        resultado = (data[i] * data[i+1] )
        resultado2 = np.abs(data[i] - data[i+1])
        if(resultado < 0 ) and (resultado2 > th):
            somatoria += 1
        
    return somatoria

def fj(i, sampleRate, tamanho):
    return i * sampleRate / (2 * tamanho)

def getFMN(data):
    tamanho = len(data)
    somatoria = 0
    sumPSD = np.sum(PSD(data))
    for i in range(tamanho):
        somatoria += (fj(i, 41, tamanho) * PSD(data[i]) ) / sumPSD
        
    return somatoria



In [16]:
from math import prod

# funções auxiliares
def PSD(w):
    ''' definição da função PSD para o sinal no domínio da frequência '''
    return np.abs(w) ** 2

def wamp(x, th):
    res = np.abs(np.diff(x))
    return np.sum(res >= th, axis=-1)

def wl(x):
    res = np.abs(np.diff(x))
    return np.sum(res, axis=-1)

def var(x):
    return np.sum(x ** 2, axis=-1) / (np.prod(x.shape[:-1]) - 1)

def rms(x):
    return np.sqrt(np.sum(np.abs(x) ** 2, axis=-1) / (np.prod(x.shape[:-1])))

def fmd(w):
    return np.sum(PSD(w), axis=-1) / 2

def mmdf(w):
    return np.sum(np.abs(w), axis=-1) / 2

def zc(data,threshold):
    f =[]
    x,y,z = data.shape[:3]
    for xx in range(x):
        fx = []
        for yy in range(y):
            fy = []
            for zz in range(z):
                fy.append( getZC(data[xx][yy][zz], threshold ) )
            fx.append(fy)
        f.append(fx)
    return np.array(f)

def fmn(data):
    f =[]
    x,y,z = data.shape[:3]
    for xx in range(x):
        fx = []
        for yy in range(y):
            fy = []
            for zz in range(z):
                
                fy.append( getFMN(data[xx][yy][zz]) )
                
            fx.append(fy)
        f.append(fx)
    return np.array(f)

def A(w):
    return np.abs(w)

def getMMNF(data):
    tamanho = len(data)
    somatoria = 0
    
    sumA = np.sum(A(data))
    
    for i in range(tamanho):
        somatoria += (fj(i, 200, tamanho) * A(data[i]) ) / sumA 
        
    return somatoria

def mmnf(data):
    f =[]
    x,y,z = data.shape[:3]
    for xx in range(x):
        fx = []
        for yy in range(y):
            fy = []
            for zz in range(z):
                
                fy.append( getMMNF(data[xx][yy][zz]) )
                
            fx.append(fy)
        f.append(fx)
    return np.array(f)

def logD(data):
    from math import e
    N = np.prod(data.shape)
    
    return e ** ( np.sum(np.log10( np.abs(data) ), axis=-1) ) / N

def iemg(data):
    # tempo
    return np.sum(A(data), axis=-1)

def dasdv(data):
    #tempo
    return np.sqrt( np.sum(np.diff(data) ** 2, axis=-1) / (np.prod(data.shape[:-1]) - 1) )

def tmx(x, n):
    N = np.prod(x.shape[:-1])
    return np.abs(np.sum(x ** n, axis=-1) / N)

## Implementação do vetor

In [17]:
def final_filtros(chunks_time, chunks_freq):
    th = np.median(chunks_time)
    #VAR, RMS, WL, TM5 e DASDV
    #WL, IEMG, LOGD
    final_data = list()
    final_data.append(var(chunks_time))
    final_data.append(rms(chunks_time))
    final_data.append(wamp(chunks_time, th))
    final_data.append(logD(chunks_time))
    final_data.append(wl(chunks_time))
    final_data.append(zc(chunks_time,0))

    final_data.append(iemg(chunks_time))
    final_data.append(dasdv(chunks_time))
    final_data.append(tmx(chunks_time, 3))
    final_data.append(tmx(chunks_time, 4))
    final_data.append(tmx(chunks_time, 5))

    final_data.append(fmd(chunks_freq))
    final_data.append(mmdf(chunks_freq))
    final_data.append(fmn(chunks_freq))
    final_data.append(mmnf(chunks_freq))

    f, Pxx_den = signal.welch(data, fs=200, nperseg=248, noverlap=223)
    final_data.append(Pxx_den)

    return np.array(final_data)
  

In [20]:


th = np.median(chunks_time)


#VAR, RMS, WL, TM5 e DASDV
#WL, IEMG, LOGD
final_data = list()
final_data.append(var(chunks_time))
final_data.append(rms(chunks_time))
final_data.append(wamp(chunks_time, th))
final_data.append(logD(chunks_time))
final_data.append(wl(chunks_time))
final_data.append(zc(chunks_time,0))

final_data.append(iemg(chunks_time))
final_data.append(dasdv(chunks_time))
final_data.append(tmx(chunks_time, 3))
final_data.append(tmx(chunks_time, 4))
final_data.append(tmx(chunks_time, 5))

final_data.append(fmd(chunks_freq))
final_data.append(mmdf(chunks_freq))
final_data.append(fmn(chunks_freq))
final_data.append(mmnf(chunks_freq))

f, Pxx_den = signal.welch(data, fs=200, nperseg=248, noverlap=223)
final_data.append(Pxx_den)

final = np.array(final_data)
final.shape


  final = np.array(final_data)


(16,)

In [18]:
final = final_filtros(chunks_time, chunks_freq)


  return np.array(final_data)


In [19]:
final.shape

(16,)

In [None]:
final_entrada = final_filtros(chunks_time_entrada, chunks_freq_entrada)

final.shape, final_entrada.shape

In [None]:
final_entrada[0]

In [None]:
# data = final.transpose(0, 1, 3, 2)
# sh = data.shape

# X = data.reshape(sh[0], int(sh[1]/3), 3 * sh[2], sh[3])

# print(X.shape)

## PCA


In [None]:
from sklearn.decomposition import PCA

In [None]:
# pca = PCA(n_components=2)

# features = list()
# for f in X:
#     classes = list()
#     for c in f:
#         C_pca = pca.fit_transform(c)
#         classes.append(C_pca)
#     features.append(classes)

# X_pca = np.array(features)

In [None]:
# X_pca.shape

## Visualização

In [None]:
# def plot_features(features, features_names, classes_names, ch_1, ch_2):
    
#     movs = np.arange(len(classes_names))
#     markers = ["o", "v", "^", "P", "*", "x", "X", "2", "3", "1", 'm', 'L', 'z', 'U', '6']
#     for f, feature in enumerate(features):
        
#         for mov, marker in zip(movs, markers):
#             # argumentos: classes, amostras, canal
#             plt.scatter(feature[mov, :, ch_1],
#                         feature[mov, :, ch_2], marker=marker)

#         plt.legend((classes_names), scatterpoints=1, loc='best',
#                    ncol=3, fontsize=8)
        
#         plt.title(features_names[f])
#         plt.xlabel('CH{}'.format(ch_1))
#         plt.ylabel('CH{}'.format(ch_2))
#         plt.show()


In [None]:


# import matplotlib.pyplot as plt
# from matplotlib import rcParams

# plt.rcParams["figure.figsize"] = (12, 12)

# features_name = ('var', 'rms', 'wamp', 'wl', 'zc','logd', 'iemg','dasdv','tm3','tm4','tm5', 'fmd', 'mmdf', 'fmn', 'mmnf')
# classes = [str(item) for item in list(range(8))]
# plot_features(X_pca, features_name, classes, 0, 1)


## Transpose para Selecionar Feature

In [None]:
final.shape
# 24*26 ,9, 4
# 24*26 , 10, 4
# 24*26 , 15, 4

In [None]:
data = final.transpose(1, 3, 2, 0)
X = data.reshape(data.shape[0]*data.shape[1], data.shape[2]*data.shape[3])
X.shape

In [None]:
# y = np.array(list(range(1, 9)) * int(X.shape[0] / 8)) # Antigo

y = [ [(i)] * int(X.shape[0] / 8 ) for i in range(8)]
y = np.array(y).flatten()
y.shape

## Seleção de características

## Variance Threshold


In [None]:
# # teste

# data_t = final.transpose(1, 3, 2, 0)
# ## X_t = data.reshape(24*26, 9, 4)
# ## X_t = data.reshape(24*26, 10, 4)
# ## X_t = data.reshape(24*26, 15, 4)
# X_t = data.reshape(24*26, 5, 4)

# data_t = X_t.transpose(2, 0, 1)
# data_t.shape


In [None]:
# from sklearn.feature_selection import VarianceThreshold
# canais = list()

# for c in data_t:
#     sel = VarianceThreshold(threshold=(.1))
#     vt = sel.fit_transform(c)
#     canais.append(vt)


### RFE (Por causa do Kernel Linear não iremos utilizar)

In [None]:
# from sklearn.feature_selection import RFE
# from sklearn.svm import SVC
# estimator = SVC(kernel="linear")
# selector = RFE(estimator, n_features_to_select=5, step=1)
# selector = selector.fit(X, y)
# s = selector.fit_transform(X, y)

### GenericUnivariateSelect

In [None]:
# X.shape, y.shape

In [None]:
# from sklearn.feature_selection import GenericUnivariateSelect, chi2
# transformer = GenericUnivariateSelect(chi2, mode='k_best', param=10)
# X_new = transformer.fit_transform(X, y)
# X_new.shape

In [None]:
# from sklearn.feature_selection import SelectKBest
# X_new = SelectKBest(k=10).fit_transform(X, y)

In [None]:
# X.shape, X_new.shape

## Normalização

In [None]:
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

X_norm = scaler.fit_transform(X)

# X_new_norm = scaler.fit_transform(X_new)

## SVM

In [None]:
from sklearn.model_selection import train_test_split

from sklearn.svm import SVC
from sklearn import metrics

def do_svm(X,y):

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.30, shuffle=True)

    clf = SVC(gamma='scale')
    clf.fit(X_train, y_train)

    y_pred = clf.predict(X_test)
    
    acc = metrics.accuracy_score(y_test, y_pred)
    
    return acc

do_svm(X,y), do_svm(X_norm,y)

# Combinations

In [None]:
# from itertools import combinations

# best_acc = 0
# best_comb = 0
# for comb in range(1,12):
#     for res in combinations(range(9),comb):
#         acc = do_svm(X_norm.take(res, axis=1), y)

#         if acc > best_acc:
#             best_acc = acc
#             best_comb = res

# print(f"Melhor Acurácia: {best_acc}, Melhor Combinação: {best_comb}")

# features_name = ('var', 'rms', 'wamp', 'wl', 'zc','logd', 'fmd', 'mmdf', 'fmn', 'mmnf')
# for i in best_comb:
#     print(f" {i} --- {features_name[i]}")


In [None]:
# from itertools import combinations

# best_acc = 0
# best_comb = 0
# for comb in range(1,12):
#     for res in combinations(range(9),comb):
#         acc = do_svm(X.take(res, axis=1), y)

#         if acc > best_acc:
#             best_acc = acc
#             best_comb = res

# print(f"Melhor Acurácia: {best_acc}, Melhor Combinação: {best_comb}")

# features_name = ('var', 'rms', 'wamp', 'wl', 'zc','logd', 'fmd', 'mmdf', 'fmn', 'mmnf')
# for i in best_comb:
#     print(f" {i} --- {features_name[i]}")


In [None]:
# from itertools import combinations

# best_acc = 0
# best_comb = 0
# best_k = 0
# for comb in range(1,12):
#     for res in combinations(range(9),comb):
#         for ks in range(1,41):
#             X_new = SelectKBest(k=ks).fit_transform(X, y)
#             acc = do_svm(X_new_norm.take(res, axis=1), y)

#             if acc > best_acc:
#                 best_acc = acc
#                 best_comb = res
#                 best_k = ks

# print(f"Melhor Acurácia: {best_acc}, Melhor Combinação: {best_comb}, Melhor K: {best_k}")




## RNN

In [None]:
from keras import regularizers
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.optimizers import SGD, Adam
from keras.utils.np_utils import to_categorical
import numpy as np
from sklearn.metrics import classification_report, accuracy_score, confusion_matrix
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.preprocessing import LabelEncoder
from urllib.request import urlopen, urlretrieve

import matplotlib.pyplot as plt
%matplotlib inline

## Divisão dos dados em treino e teste:

In [None]:

# Dividindo em conjuntos de treino (80%) e teste (20%)
X_train, X_test, y_train, y_test = train_test_split(
    X_norm, y, test_size=0.3)

# treino: 80% dos 80% de treino. Validacao: 20% dos 80% de treino.
X_train, X_val, y_train, y_val = train_test_split(
    X_train, y_train, test_size=0.3, shuffle=True)


## Aplicação do algoritmo de MLP e geração dos resultados:

In [None]:
# definição de uma fração do regularizador
l = 0.01

# desenvolvimento do modelo Keras para uma MLP
model = Sequential()
model.add(Dense(20, activation='relu', input_dim=64,
                kernel_regularizer=regularizers.l2(l)))
# Aplicação de um dropout (caso necessário)
# model.add(Dropout(0.5))
model.add(Dense(10, activation='relu',
                kernel_regularizer=regularizers.l2(l)))
# Aplicação de um dropout (caso necessário)
# model.add(Dropout(0.5))
model.add(Dense(8, activation='softmax'))

# Aplicação de um modelo de descida de gradiente utilizando o Stocastic Gradient Descendent (SGD)
sgd = SGD(lr=0.05, momentum=0.0)
# Função de otimização da rede: ADAM
adam = Adam(lr=0.005, beta_1=0.9, beta_2=0.999)
# Função de custo baseada em dados originalmente categóricos
model.compile(loss='sparse_categorical_crossentropy', optimizer=adam,
              metrics=['accuracy'])

history = model.fit(X_train, y_train, epochs=550, batch_size=15,
                    validation_data=(X_val, y_val))

In [None]:
# 
# score = model.predict_classes(X_test)
# y_true = [np.where(x == 1)[0][0] for x in y_test]

predict_x=model.predict(X_test) 
score=np.argmax(predict_x,axis=1)
y_true = y_test

print('Acurácia: %0.2f%%' % (accuracy_score(y_true, score) * 100))
print('Matriz de confusão:')
print(confusion_matrix(y_true, score))
print()
print(classification_report(y_true, score, digits=5))

# Grafico

In [None]:
def plot_history(h):
    loss_list = [s for s in h.history.keys() if 'loss' in s and 'val' not in s]
    val_loss_list = [s for s in h.history.keys() if 'loss' in s and 'val' in s]
    acc_list = [s for s in h.history.keys() if 'acc' in s and 'val' not in s]
    val_acc_list = [s for s in h.history.keys() if 'acc' in s and 'val' in s]
    if len(loss_list) == 0:
        print('Custo não está presente no histórico')
        return
    epochs = range(1, len(history.history[loss_list[0]]) + 1)
    # Custo
    plt.figure(1)
    for l in loss_list:
        plt.plot(epochs, h.history[l], 'b',
                 label='Custo [treinamento] (' + str(str(format(
                    h.history[l][-1],'.5f'))+')'))
    for l in val_loss_list:
        plt.plot(epochs, h.history[l], 'g',
                 label='Custo [validação] (' + str(str(format(
                    h.history[l][-1],'.5f'))+')'))
    plt.title('Custo')
    plt.xlabel('Épocas')
    plt.ylabel('Custo')
    plt.legend()
    # Acurácia
    plt.figure(2)
    for l in acc_list:
        plt.plot(epochs, h.history[l], 'b',
                 label='Acurácia [treinamento] (' + str(format(
                    h.history[l][-1],'.5f'))+')')
    for l in val_acc_list:
        plt.plot(epochs, h.history[l], 'g',
                 label='Acurácia [validação] (' + str(format(
                    h.history[l][-1],'.5f'))+')')
    plt.title('Acurácia')
    plt.xlabel('Épocas')
    plt.ylabel('Acurácia')
    plt.legend()
    plt.show()

In [None]:
plot_history(history)