In [1]:
from platform import python_version

print(python_version())

3.10.12


In [2]:
import pyreadr
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
# constants
dataset_dir = "./datasets/TEP/"

In [4]:
def read_r_as_df(file_path):
    result = pyreadr.read_r(file_path) # also works for Rds
    return result[list(result.keys())[0]]

In [5]:
training_fault_free_df = read_r_as_df(f"{dataset_dir}/TEP_FaultFree_Training.RData")

In [None]:
test_falty_df = read_r_as_df(f"{dataset_dir}/TEP_Faulty_Testing.RData")

In [None]:
class PCA():
   
    ###############

    def __init__ (self, a = 0.9):
        '''
        Construtor: função chamada toda vez que um objeto PCA é inicializado
        '''
        # se 0<=a<1,  'a' indica a fraçao de variancia explicada desejada
        # se a>=1,    'a' indica o numero de componentes desejado
        self.a = a
   
    ###############

    def fit(self, X, conf_Q = 0.99, conf_T2 = 0.99, plot = True):
        '''
        Função para treino do modelo
        '''       
        # guardando médias e desvios-padrão do treino
        self.mu_train = X.mean(axis=0)
        self.std_train = X.std(axis=0)        
       
        # normalizando dados de treino
        X = np.array(((X - self.mu_train)/self.std_train))
        
        # calculando a matriz de covariâncias dos dados
        Cx = np.cov(X, rowvar=False)
        
        # aplicando decomposição em autovalores e autovetores
        self.L, self.P = np.linalg.eig(Cx)
        
        # frações da variância explicada
        fv = self.L/np.sum(self.L)
        
        # frações da variância explicada acumuladas
        fva = np.cumsum(self.L)/sum(self.L)
       
        # definindo número de componentes
        if self.a>0 and self.a<1:
            self.a = np.where(fva>self.a)[0][0]+1 
            
        # calculando limites de detecção

        # limite da estatística T^2
        from scipy.stats import f
        F = f.ppf(conf_T2, self.a, X.shape[0]-self.a)
        self.T2_lim = ((self.a*(X.shape[0]**2-1))/(X.shape[0]*(X.shape[0]-self.a)))*F
        
        # limite da estatística Q
        theta = [np.sum(self.L[self.a:]**(i)) for i in (1,2,3)]
        ho = 1-((2*theta[0]*theta[2])/(3*(theta[1]**2)))
        from scipy.stats import norm
        nalpha = norm.ppf(conf_Q)
        self.Q_lim = (theta[0]*(((nalpha*np.sqrt(2*theta[1]*ho**2))/theta[0])+1+
                                ((theta[1]*ho*(ho-1))/theta[0]**2))**(1/ho))
        
        # calculando T2 e Q para dados de treino
        
        # calculando estatística T^2
        T = X@self.P[:,:self.a]
        self.T2_train = np.array([T[i,:]@np.linalg.inv(np.diag(self.L[:self.a]))@T[i,:].T for i in range(X.shape[0])])

        # calculando estatística Q
        e = X - X@self.P[:,:self.a]@self.P[:,:self.a].T
        self.Q_train  = np.array([e[i,:]@e[i,:].T for i in range(X.shape[0])])
        
        
        # plotando variâncias explicadas
        if plot:
            fig, ax = plt.subplots()
            ax.bar(np.arange(len(fv)),fv)
            ax.plot(np.arange(len(fv)),fva)
            ax.set_xlabel('Número de componentes')
            ax.set_ylabel('Variância dos dados')
            ax.set_title('PCA - Variância Explicada');
            
        ###############

    def plot_train_control_charts(self, fault = None):
        '''
        Função para plotar cartas de controle
        '''        
        fig, ax = plt.subplots(1,2, figsize=(15,3))

        ax[0].semilogy(self.T2_train,'.')
        ax[0].axhline(self.T2_lim,ls='--',c='r');
        ax[0].set_title('Carta de Controle $T^2$')
        
        ax[1].semilogy(self.Q_train,'.')
        ax[1].axhline(self.Q_lim,ls='--',c='r')
        ax[1].set_title('Carta de Controle Q')
 
        if fault is not None:
            ax[0].axvline(fault, c='k')
            ax[1].axvline(fault, c='k')
    
    ###############
            
    def predict(self, X):
        '''
        Função para teste do modelo
        '''
            
        # normalizando dados de teste (usando os parâmetros do treino!)
        X = np.array((X - self.mu_train)/self.std_train)

        # calculando estatística T^2
        T = X@self.P[:,:self.a]
        self.T2 = np.array([T[i,:]@np.linalg.inv(np.diag(self.L[:self.a]))@T[i,:].T for i in range(X.shape[0])])

        # calculando estatística Q
        e = X - X@self.P[:,:self.a]@self.P[:,:self.a].T
        self.Q  = np.array([e[i,:]@e[i,:].T for i in range(X.shape[0])])
        
        # calculando contribuições para Q
        self.c = np.absolute(X*e) 
                
    ###############

    def plot_control_charts(self, fault = None):
        '''
        Função para plotar cartas de controle
        '''        
        fig, ax = plt.subplots(1,2, figsize=(15,3))

        ax[0].semilogy(self.T2,'.')
        ax[0].axhline(self.T2_lim,ls='--',c='r');
        ax[0].set_title('Carta de Controle $T^2$')
        
        ax[1].semilogy(self.Q,'.')
        ax[1].axhline(self.Q_lim,ls='--',c='r')
        ax[1].set_title('Carta de Controle Q')
 
        if fault is not None:
            ax[0].axvline(fault, c='k')
            ax[1].axvline(fault, c='k')

    ###############
            
    def plot_contributions(self, fault = None, 
                           index = None, 
                           columns = None):
        '''
        Função para plotar mapas de contribuição
        '''
        fig, ax = plt.subplots(figsize=(20, 6))
        
        c = pd.DataFrame(self.c, 
                         index = index,
                         columns = columns)
    
        sns.heatmap(c, ax = ax, 
                    yticklabels=int(self.c.shape[0]/10),
                    cmap = plt.cm.Blues);
        
        ax.set_title('Contribuições parciais para Q')
        
        if fault is not None:
            ax.axhline(y=c.index[fault],
                       ls='--', c='k')
