# **Conexão com o Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')
# 1 - Dê a permissão
# 2 - Copie e cole o código gerado
# 3 - Coloque o arquivo dataZ24.mat no diretório raiz do drive

# **Importação de bibliotecas tradicionais**

In [None]:
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt

# **Carregamento e visualização dos dados**

In [None]:
#Carregar os dados
X = sio.loadmat('/content/drive/My Drive/dataZ24.mat')['data']
print('Dimensões da base de dados: ' + str(np.shape(X)))
X_treino = X[0:198,:]
print('Dimensões do conjunto de treino: ' + str(np.shape(X_treino)))
X_teste = X[0:235,:]
print('Dimensões do conjunto de teste: ' + str(np.shape(X_teste)))
n_normal = X_treino.shape[0]
print(f'Quantidade de dados da condição normal: {n_normal}')
n_total = X_teste.shape[0]
print(f'Quantidade de dados das condições normal e anormal: {n_total}')

In [None]:
ax = np.linspace(1, n_total, n_total)
plt.plot(ax[0:n_normal], X_treino[:,0], 'b^', label='Normal')
plt.plot(ax[0:n_normal], X_treino[:,1], 'b^')
plt.plot(ax[n_normal:n_total], X_teste[198:,0], 'r^', label='Anormal')
plt.plot(ax[n_normal:n_total], X_teste[198:,1], 'r^')
plt.xlabel('Número de observações')
plt.ylabel('Frequência (Hz)')
plt.legend()
plt.show()

# **Funções auxiliares**

In [None]:
def est_threshold(scores, level=0.95):

    aux = np.sort(scores,axis=0)

    thr = aux[round(aux.shape[0]*level)-1]

    return thr


def get_errors(scores, flag, thr):

    n = scores.shape[0]
    nb = np.sum(flag == 0)

    class_state = np.zeros((n, 1))

    class_state[scores > thr] = 1

    num_errors_type1 = np.sum((class_state == 1) & (flag == 0))

    num_errors_type2 = np.sum((class_state == 0) & (flag == 1))

    perc_errors_type1 = num_errors_type1/nb*100
    perc_errors_type2 = num_errors_type2/(n-nb)*100

    return num_errors_type1, num_errors_type2, perc_errors_type1, perc_errors_type2

def get_flag_test(n, nb):

    flag_test = np.zeros((n, 1))

    flag_test[nb:] = 1

    return flag_test

# **Definição das flags (rótulos) dos dados**

In [None]:
flag_test = get_flag_test(n_total,n_normal)

# **Algoritmo Mahalanobis Squared distance (MSD)**

In [None]:
def learn_mahalanobis(train_data):

    data_mean = np.mean(train_data, axis=0)
    data_cov = np.cov(train_data.T)

    model = {'data_mean': np.array([data_mean]),
             'data_cov': data_cov}

    return model


def score_mahalanobis(test_data, model):

    n = test_data.shape[0]
    data_mean = model['data_mean']
    data_cov = model['data_cov']

    scores = np.zeros((n,1))

    for i in range(0, n):

        scores[i, :] = np.dot(test_data[i,:]-data_mean,np.dot(np.linalg.pinv(data_cov), (test_data[i,:]-data_mean).T))

    return scores

In [None]:
model = learn_mahalanobis(X_treino)
scores = score_mahalanobis(X_teste,model)
scores_thr = score_mahalanobis(X_treino,model)
thr = est_threshold(scores_thr)

num_err_t1, num_err_t2, per_err_t1, per_err_t2 = get_errors(scores, flag_test, thr)
print(f'Erros Tipo 1 (falso-positivo): {num_err_t1}')
print(f'Erros Tipo 2 (falso-negativo): {num_err_t2}')
print(f'Porcentagem de erros Tipo 1: {per_err_t1:.2f}')
print(f'Porcentagem de erros Tipo 2: {per_err_t2:.2f}')

scores = np.log(scores) #esta linha so precisa para o MSD
thr = np.log(thr) #esta linha so precisa para o MSD
ix_out = scores > thr
ax = np.linspace(1, n_total, n_total)
plt.plot(ax[np.squeeze(ix_out)], scores[ix_out], 'yo', label='Outlier', markersize=12)
plt.plot(ax[0:n_normal], scores[0:n_normal], 'b^', label='Normal')
plt.plot(ax[n_normal:n_total], scores[n_normal:n_total], 'r^', label='Anormal')
plt.plot(ax,thr*np.ones(ax.shape), 'k--', label='Limiar')
plt.xlabel('Número de observações')
plt.ylabel('Indicador de anomalia')
plt.legend()
plt.show()

# **Algoritmo k-means**

In [None]:
from sklearn.cluster import KMeans

def learn_KM(train_data, nc):

    km = KMeans(n_clusters=nc).fit(train_data)
    labels = km.labels_
    cluster_centers_ = km.cluster_centers_

    n_clusters = km.n_clusters

    labelsClusterCenters = km.predict(km.cluster_centers_)

    print("Númbero de clusters estimados: {}".format(n_clusters))

    plt.scatter(train_data[:, 0], train_data[:, 1], s=10, c=labels)
    plt.scatter(km.cluster_centers_[:, 0], km.cluster_centers_[:, 1], s=100, c=labelsClusterCenters)
    plt.show()

    model = {'n_clusters':n_clusters,'cluster_centers':cluster_centers_,'labels':labels}

    return model

def score_KM(test_data, model):

    n = test_data.shape[0]
    n_clusters = model['n_clusters']
    cluster_centers = model['cluster_centers']

    scores_aux = np.zeros((n, n_clusters))

    for i in range(0, n_clusters):

        residuals = test_data - cluster_centers[i, :]

        scores_aux[:, i] = np.squeeze(np.linalg.norm(residuals, axis=1, keepdims=True))

    scores = np.min(scores_aux, axis=1, keepdims=True)

    return scores

In [None]:
model = learn_KM(X_treino, nc=3)
scores = score_KM(X_teste,model)
scores_thr = score_KM(X_treino,model)
thr = est_threshold(scores_thr)

num_err_t1, num_err_t2, per_err_t1, per_err_t2 = get_errors(scores, flag_test, thr)
print(f'Erros Tipo 1 (falso-positivo): {num_err_t1}')
print(f'Erros Tipo 2 (falso-negativo): {num_err_t2}')
print(f'Porcentagem de erros Tipo 1: {per_err_t1:.2f}')
print(f'Porcentagem de erros Tipo 2: {per_err_t2:.2f}')

ix_out = scores > thr
ax = np.linspace(1, n_total, n_total)
plt.plot(ax[np.squeeze(ix_out)], scores[ix_out], 'yo', label='Outlier', markersize=12)
plt.plot(ax[0:n_normal], scores[0:n_normal], 'b^', label='Normal')
plt.plot(ax[n_normal:n_total], scores[n_normal:n_total], 'r^', label='Anormal')
plt.plot(ax,thr*np.ones(ax.shape), 'k--', label='Limiar')
plt.xlabel('Número de observações')
plt.ylabel('Indicador de anomalia')
plt.legend()
plt.show()