## Referência

Parte do presente script (classe PCA) usou como base o código apresentado na aula abaixo:

https://www.kaggle.com/code/afrniomelo/epv-peq-aula-2-detec-o-de-falhas

## Bibliotecas

In [None]:
import math
import os
import re
import datetime

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import tensorflow as tf

from collections import Counter
from datetime import timedelta
from sklearn import preprocessing
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support , roc_auc_score, auc, precision_score
from timeit import default_timer as timer
from scipy.stats import f, norm

## Funções

In [None]:
class PCA():
   
    # ---------------------------------------------------------------------------
    # Método construtor
    # Função que será chamada toda vez que um objeto PCA for inicializado
    # O modelo selecionará quantos PCs forem necessários para cumprir o % de variabilidade explicada especificado em a

    def __init__ (self, a=0.9):

        # se 0<=a<1,  'a' indica a fraçao de variancia explicada desejada
        # se a>=1,    'a' indica o numero de componentes desejado
        self.a = a
   
    # ---------------------------------------------------------------------------
    # Função para treino do modelo
    # X são os dados de treino
    
    def fit(self, X, conf_Q=0.99, conf_T2=0.99, plot=True):
    
        # guardando médias e desvios-padrão do treino (de cada coluna)
        self.mu_train = X.mean(axis=0)
        self.std_train = X.std(axis=0)        
       
        # normalizando dados de treino
        X = np.array(((X - self.mu_train)/self.std_train))
       
        # calculando a matriz de covariâncias dos dados
        Cx = np.cov(X, rowvar=False)
        
        # aplicando decomposição em autovalores e autovetores
        self.L, self.P = np.linalg.eig(Cx)
        
        # frações da variância explicada
        fv = self.L/np.sum(self.L)
        
        # frações da variância explicada acumuladas
        fva = np.cumsum(self.L)/sum(self.L)
       
        # definindo número de componentes
        if (self.a>0 and self.a<1):
            self.a = np.where(fva>self.a)[0][0]+1 
            
        # calculando limites de detecção

        # limite da estatística T^2
        F = f.ppf(conf_T2, self.a, X.shape[0]-self.a)
        self.T2_lim = ((self.a*(X.shape[0]**2-1))/(X.shape[0]*(X.shape[0]-self.a)))*F
        print("T2_lim: ", self.T2_lim)
        
        # limite da estatística Q
        theta = [np.sum(self.L[self.a:]**(i)) for i in (1,2,3)]
        ho = 1-((2*theta[0]*theta[2])/(3*(theta[1]**2)))
        nalpha = norm.ppf(conf_Q)
        self.Q_lim = (theta[0]*(((nalpha*np.sqrt(2*theta[1]*ho**2))/theta[0])+1+
                                ((theta[1]*ho*(ho-1))/theta[0]**2))**(1/ho))
        print("Q_lim: ", self.Q_lim)
        
        # plotando variâncias explicadas
        if plot:
            fig, ax = plt.subplots()
            ax.bar(np.arange(len(fv)),fv)
            ax.plot(np.arange(len(fv)),fva)
            ax.set_xlabel('Número de componentes')
            ax.set_ylabel('Variância dos dados')
            ax.set_title('PCA - Variância Explicada');

    # ---------------------------------------------------------------------------
    # Função para teste do modelo
            
    def predict(self, X):
            
        # normalizando dados de teste (usando os parâmetros do treino!)
        X = np.array((X - self.mu_train)/self.std_train)

        # calculando estatística T^2
        T = X@self.P[:,:self.a]
        self.T2 = np.array([T[i,:]@np.linalg.inv(np.diag(self.L[:self.a]))@T[i,:].T for i in range(X.shape[0])])

        # calculando estatística Q
        e = X - X@self.P[:,:self.a]@self.P[:,:self.a].T
        self.Q  = np.array([e[i,:]@e[i,:].T for i in range(X.shape[0])])
        
        # calculando contribuições para Q
        self.c = np.absolute(X*e) 
                
    # ---------------------------------------------------------------------------
    # Função para plotar cartas de controle
    # Eixo y contém as estatísticas e eixo x contém a quantidade de amostras
    
    def plot_control_charts(self, fault = None):
     
        fig, ax = plt.subplots(1,2, figsize=(15,3))
        
        ax[0].semilogy(self.T2,'.')
        ax[0].axhline(self.T2_lim,ls='--',c='r');
        ax[0].set_title('Carta de Controle $T^2$')
        ax[0].set_xlabel('Samples')
        ax[0].set_ylabel('$T^2$')
        
        ax[1].semilogy(self.Q,'.')
        ax[1].axhline(self.Q_lim,ls='--',c='r')
        ax[1].set_title('Carta de Controle Q')
        ax[1].set_xlabel('Samples')
        ax[1].set_ylabel('$Q$')
 
        if fault is not None:
            ax[0].axvline(fault, c='w')
            ax[1].axvline(fault, c='w')

    # ---------------------------------------------------------------------------
    # Função para plotar mapas de contribuição
            
    def plot_contributions(self, fault=None, index=None, columns=None):

        fig, ax = plt.subplots(figsize=(20, 6))
        
        c = pd.DataFrame(self.c, index=index, columns=columns)
    
        sns.heatmap(c, ax=ax, yticklabels=int(self.c.shape[0]/10), cmap = plt.cm.Blues)
        
        ax.set_title('Contribuições parciais para Q')
        
        if fault is not None:
            ax.axhline(y=c.index[fault], ls='--', c='k')

In [None]:
def metrics(y_real, y_pred, conf_matrix, STATUS, multi_problem):
    
    # Classificação multiclass
    if (multi_problem):
    
        cm_values = conf_matrix.values
    
        # Cálculo dos FP, FN e TP para cada classe
        tp = np.zeros((len(cm_values),1))
        tn = np.zeros((len(cm_values),1))
        fp = np.zeros((len(cm_values),1))
        fn = np.zeros((len(cm_values),1))
        acc = np.zeros((len(cm_values),1))

        for i in range(0, len(cm_values)):

            tp[i] = cm_values[i,i]
            fp[i] = cm_values[:,i].sum() - cm_values[i,i]
            fn[i] = cm_values[i,:].sum() - cm_values[i,i]
            tn[i] = len(y_real) - tp[i] - fp[i] - fn[i]
            acc[i] = (len(y_real) - (cm_values[i,:].sum() + cm_values[:,i].sum() - 2*cm_values[i,i]))/len(y_real)

        # Cálculo das métricas Precision, Recall e F-Score para cada classe
        try:
            metricas = precision_recall_fscore_support(y_real.argmax(axis=1), y_pred.argmax(axis=1), zero_division=0)
        except:
            metricas = precision_recall_fscore_support(y_real, y_pred, zero_division=0)

        # Arranjo do dataframe e inclusão de outras métricas
        metricas_df = pd.DataFrame(list(metricas))
        metricas_df = metricas_df.transpose()

        metricas_df.columns = ['Precision', 'Recall', 'F-score(a=1)', 'Total']
        metricas_df['Accuracy'] = acc
        metricas_df['Class'] = STATUS
        metricas_df['Total'] = metricas_df['Total'].astype(int)

        metricas_df['TP'] = tp.astype(int)
        metricas_df['TN'] = tn.astype(int)
        metricas_df['FP'] = fp.astype(int)
        metricas_df['FN'] = fn.astype(int)

        metricas_df['Specificity'] = (metricas_df['TN']/(metricas_df['TN'] + metricas_df['FP']))
        metricas_df['FPR(FAR)'] = (metricas_df['FP']/(metricas_df['FP'] + metricas_df['TN']))
        metricas_df['F-score(a=0.5)'] =\
            (1.5*metricas_df['Precision']*metricas_df['Recall'])/((0.5*metricas_df['Precision']) + metricas_df['Recall'])
        metricas_df['F-score(a=2)'] =\
            (3*metricas_df['Precision']*metricas_df['Recall'])/((2*metricas_df['Precision']) + metricas_df['Recall'])
        metricas_df['F-score(a=0.5)'].fillna(0, inplace=True)
        metricas_df['F-score(a=2)'].fillna(0, inplace=True)

        metricas_df = metricas_df.set_index('Class')
        metricas_df = metricas_df[['Precision', 'Recall', 'F-score(a=1)', 'F-score(a=0.5)', 'F-score(a=2)', \
                                   'Specificity', 'Accuracy', 'FPR(FAR)', 'TP', 'TN', 'FP', 'FN', 'Total']]
    
    # Classificação binária
    else:
        
        try:
            tn, fp, fn, tp = confusion_matrix(y_real.argmax(axis=1), y_pred.argmax(axis=1)).ravel()
        except:
            tn, fp, fn, tp = confusion_matrix(y_real, y_pred).ravel()
        
        recall = tp/(tp+fn)
        precision = tp/(tp+fp)
        f1_score = 2*recall*precision/(recall+precision)
        f2_score = 3*recall*precision/((2*precision)+recall)
        f05_score = 1.5*recall*precision/((0.5*precision)+recall)
        accuracy = (tp+tn)/(tp+tn+fp+fn)
        specificity = tn/(fp+tn)
        fpr = fp/(fp+tn)
        
        metricas_df = pd.DataFrame([precision, recall, f1_score, f05_score, f2_score, specificity, accuracy, fpr, tp, \
                                    tn, fp, fn]).T
        metricas_df.columns = ['Precision', 'Recall', 'F-score(a=1)', 'F-score(a=0.5)', 'F-score(a=2)', \
                               'Specificity', 'Accuracy', 'FPR(FAR)', 'TP', 'TN', 'FP', 'FN']

    return metricas_df

## Fixando a seed

In [None]:
# Seed value
# Mesma seed utilizada no script que gerou o melhor modelo da CNN
seed_value = 53771
print(seed_value)

os.environ['PYTHONHASHSEED']=str(seed_value)

# 2. Set the `python` built-in pseudo-random generator at a fixed value
import random
random.seed(seed_value)

# 3. Set the `numpy` pseudo-random generator at a fixed value
np.random.seed(seed_value)

# 4. Set the `tensorflow` pseudo-random generator at a fixed value
tf.random.set_seed(seed_value)

## 1. Leitura dos dados

In [None]:
data_path = r"/home/lcap/Desktop/workspace/data/bomba/"

In [None]:
data = pd.read_csv(data_path + "banco_labeling-v1.csv", sep=';')
data.columns

In [None]:
data.head()

In [None]:
time_data = data[['Time']].copy()
data = data.drop(['Time'], axis=1)

data.shape

In [None]:
params = pd.read_csv(data_path + "banco_labeling_params-v1.csv", sep=';')
params.drop(['status_init', 'status_end'], 1, inplace=True)
params.head()

### Divisão dos dados

In [None]:
# O PCA é treinado para reconhecer apenas a operação normal
# A divisão abaixo é igual à divisão utilizada no script que gerou o melhor modelo da CNN

x_data = data.loc[:1212560,:].copy()
x_data = x_data[x_data['rotulos_multi'] == 0].copy()
y_data = x_data[['rotulos_multi']].copy()
x_data.drop(['rotulos_multi', 'rotulos_bin'], axis=1, inplace=True)

# O banco de teste possui tanto dados normais quanto de falha

x_test = data.loc[1212560:,:].copy()
y_test = x_test[['rotulos_multi']].copy()
# x_test.drop(['rotulos_multi', 'rotulos_bin'], axis=1, inplace=True)

print("\nTREINO")
print("X: ", np.shape(x_data))
print("Y: ", np.shape(y_data))
print("Status:", Counter(y_data['rotulos_multi']))

print("\nTESTE")
print("X: ", np.shape(x_test))
print("Y: ", np.shape(y_test))
print("Status:", Counter(y_test['rotulos_multi']))

## 2. Aplicação do PCA

### Treinamento

In [None]:
# Instancia o objeto PCA
pca = PCA(a=0.9)

# Treina o modelo PCA
pca.fit(x_data)

print("\nPCs: ", pca.a)

### Teste - Falhas e operação normal

In [None]:
# Aplicação do PCA

fault_id = y_test['rotulos_multi'].unique()#[1:]

# Dataframe para armazenar estatísticas calculadas
pca_stats = pd.DataFrame(np.zeros((len(fault_id),3)), columns=['fault', 'T2', 'Q'])

# Dataframe para armazenar o resultado do PCA de cada instância
pca_results_final = pd.DataFrame(columns=['T2', 'Q', 'fault'])

data_to_pca = x_test.copy()
data_to_pca.rename(columns={'rotulos_multi': 'STATUS'}, inplace=True)

i=0

for fault in fault_id:
    
    print("\n-------------------------------------------------------------------------------")
    df_test = data_to_pca.query("STATUS == " + str(fault)).copy()
    df_test.drop(['STATUS', 'rotulos_bin'], 1, inplace=True)
    
    pca.predict(df_test)
    
    print(f'Taxas de detecção de falhas - ID({fault})')
    
    print(f'\nT2: {(pca.T2>pca.T2_lim).sum()/pca.T2.shape[0]}')
    print(f'Q: {(pca.Q>pca.Q_lim).sum()/pca.Q.shape[0]}')
    
    pca_stats['fault'].iloc[i] = fault
    pca_stats['Q'].iloc[i] = (pca.Q>pca.Q_lim).sum()/pca.Q.shape[0]
    pca_stats['T2'].iloc[i] = (pca.T2>pca.T2_lim).sum()/pca.T2.shape[0]
    
    pca_results_parcial = pd.DataFrame(columns=['T2', 'Q'])
    pca_results_parcial['T2'] = pca.T2
    pca_results_parcial['Q'] = pca.Q
    pca_results_parcial['fault'] = fault
    pca_results_final = pca_results_final.append(pca_results_parcial)
    
    i += 1

    pca.plot_control_charts()
    plt.suptitle(f'ID({fault})');

    pca.plot_contributions(columns=df_test.columns)
    plt.show()

In [None]:
pca_stats

In [None]:
pca_results_final

### Pré-falha 17

In [None]:
pre_fault17 = x_test.loc[1213608:1218929,:].copy()
pre_fault17 = pre_fault17.reset_index().drop(['index'], axis=1)

plt.plot(pre_fault17['rotulos_multi']);

In [None]:
fault = 17

data_to_pca = pre_fault17.copy()
data_to_pca.rename(columns={'rotulos_multi': 'STATUS'}, inplace=True)

df_test = data_to_pca.copy()
df_test.drop(['STATUS', 'rotulos_bin'], 1, inplace=True)

pca.predict(df_test)

print(f'Taxas de detecção de falhas + pré-falha - ID({fault})')

print(f'\nT2: {(pca.T2>pca.T2_lim).sum()/pca.T2.shape[0]}')
print(f'Q: {(pca.Q>pca.Q_lim).sum()/pca.Q.shape[0]}')

fig, ax = plt.subplots(1,2, figsize=(15,3))

ax[0].semilogy(pca.T2,'.')
ax[0].axhline(pca.T2_lim,ls='--',c='r')
ax[0].axvline(3000,ls='--',c='k') # marca a pré-falha da 17
ax[0].set_title('Carta de Controle $T^2$')
ax[0].set_xlabel('Samples')
ax[0].set_ylabel('$T^2$')

ax[1].semilogy(pca.Q,'.')
ax[1].axhline(pca.Q_lim,ls='--',c='r')
ax[1].axvline(3000,ls='--',c='k')
ax[1].set_title('Carta de Controle Q')
ax[1].set_xlabel('Samples')
ax[1].set_ylabel('$Q$')

if fault is not None:
    ax[0].axvline(fault, c='w')
    ax[1].axvline(fault, c='w')

pca.plot_contributions(columns=df_test.columns)
plt.show()

## 3. Métricas que permitem comparação com outros algoritmos

In [None]:
pca.predict(x_test.drop(['rotulos_multi','rotulos_bin'],axis=1))

In [None]:
print(f'\nT2: {(pca.T2>pca.T2_lim).sum()/pca.T2.shape[0]}')
print(f'Q:  {(pca.Q>pca.Q_lim).sum()/pca.Q.shape[0]}')

pca.plot_control_charts()
plt.suptitle(f'Banco de teste');

In [None]:
pca_results = pd.DataFrame(columns=['T2', 'Q'])
pca_results['T2'] = pca.T2
pca_results['Q'] = pca.Q 
pca_results['rotulos_multi'] = y_test.values
pca_results.head()

In [None]:
pca_results.shape

In [None]:
pca_results['T2_classific'] = np.where(pca_results['T2'] > pca.T2_lim, 1, 0)
pca_results['Q_classific'] = np.where(pca_results['Q'] > pca.Q_lim, 1, 0)
pca_results['rotulos_bin'] = np.where(pca_results['rotulos_multi'] >= 1, 1, 0)
pca_results = pca_results.reset_index().drop(['index'], axis=1)
pca_results

In [None]:
# Desempenho por T2

new_status = [0,1]

df_cm = pd.DataFrame(confusion_matrix(pca_results['rotulos_bin'], pca_results['T2_classific']), \
                     index=[i for i in new_status], columns=[i for i in new_status])

# Linha para normalizar os dados
df_cm_norm = round((df_cm.astype('float')/df_cm.sum(axis=1).values.reshape(-1,1)), 3)

# Gráfico da matriz de confusão
plt.figure(figsize=(6,5), dpi=100)
plt.title("Confusion Matrix - T2", fontsize=10)
ax = sns.heatmap(df_cm_norm, annot=True, cmap='PuBu')
ax.set_xlabel("PREDICTED CLASSES", fontsize=8)
ax.set_ylabel("REAL CLASSES", fontsize=8)
bottom, top = ax.get_ylim()
ax.set_ylim(bottom + 0.5, top - 0.5)
plt.show()

# Chama a função metrics()
data_metrics = metrics(pca_results['rotulos_bin'], pca_results['T2_classific'], df_cm, new_status, \
                       multi_problem=False)

print("\nOverall Precision:       {:.2f}%".format((data_metrics['Precision'].values.item()*100)))
print("Overall Recall:          {:.2f}%".format((data_metrics['Recall'].values.item()*100)))
print("Overall F-score(a=1):    {:.2f}%".format((data_metrics['F-score(a=1)'].values.item()*100)))
print("Overall F-score(a=0.5):  {:.2f}%".format((data_metrics['F-score(a=0.5)'].values.item()*100)))
print("Overall F-score(a=2):    {:.2f}%".format((data_metrics['F-score(a=2)'].values.item()*100)))
print("Overall Specificity:     {:.2f}%".format((data_metrics['Specificity'].values.item()*100)))
print("Overall FPR(FAR):        {:.2f}%".format((data_metrics['FPR(FAR)'].values.item()*100)))
print("Overall Accuracy:        {:.2f}%".format((data_metrics['Accuracy'].values.item()*100)))

In [None]:
data_metrics

In [None]:
# Desempenho por Q

new_status = [0,1]

df_cm = pd.DataFrame(confusion_matrix(pca_results['rotulos_bin'], pca_results['Q_classific']), \
                     index=[i for i in new_status], columns=[i for i in new_status])

# Linha para normalizar os dados
df_cm_norm = round((df_cm.astype('float')/df_cm.sum(axis=1).values.reshape(-1,1)), 3)

# Gráfico da matriz de confusão
plt.figure(figsize=(6,5), dpi=100)
plt.title("Confusion Matrix - Q", fontsize=10)
ax = sns.heatmap(df_cm_norm, annot=True, cmap='PuBu')
ax.set_xlabel("PREDICTED CLASSES", fontsize=8)
ax.set_ylabel("REAL CLASSES", fontsize=8)
bottom, top = ax.get_ylim()
ax.set_ylim(bottom + 0.5, top - 0.5)
plt.show()

# Chama a função metrics()
data_metrics = metrics(pca_results['rotulos_bin'], pca_results['Q_classific'], df_cm, new_status, \
                       multi_problem=False)

print("\nOverall Precision:       {:.2f}%".format((data_metrics['Precision'].values.item()*100)))
print("Overall Recall:          {:.2f}%".format((data_metrics['Recall'].values.item()*100)))
print("Overall F-score(a=1):    {:.2f}%".format((data_metrics['F-score(a=1)'].values.item()*100)))
print("Overall F-score(a=0.5):  {:.2f}%".format((data_metrics['F-score(a=0.5)'].values.item()*100)))
print("Overall F-score(a=2):    {:.2f}%".format((data_metrics['F-score(a=2)'].values.item()*100)))
print("Overall Specificity:     {:.2f}%".format((data_metrics['Specificity'].values.item()*100)))
print("Overall FPR(FAR):        {:.2f}%".format((data_metrics['FPR(FAR)'].values.item()*100)))
print("Overall Accuracy:        {:.2f}%".format((data_metrics['Accuracy'].values.item()*100)))

In [None]:
data_metrics