# Inicialização

In [2]:
import  torch
import  torch.nn as nn
import  torch.nn.functional as F
import  torch.optim as optim
import  psutil
import  os
import joblib
import  warnings
import  pandas as pd
import  seaborn as sns
import  numpy as np
import  matplotlib.pyplot as plt
import  moviepy.video.io.ImageSequenceClip
import  time
from    kan import *
from    sklearn.model_selection import train_test_split
from    sklearn.metrics import confusion_matrix, classification_report
from    sklearn.ensemble import RandomForestClassifier
from    sklearn.preprocessing import LabelEncoder
from    sklearn.model_selection import GridSearchCV
from    sklearn.preprocessing import StandardScaler
from    sklearn.preprocessing import MinMaxScaler

from    imblearn.over_sampling import SMOTE
from    imblearn.under_sampling import RandomUnderSampler
from    imblearn.pipeline import Pipeline
from    imblearn.combine import SMOTETomek

warnings.filterwarnings("ignore")

if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")

print(device)

ModuleNotFoundError: No module named 'moviepy'

# Carregar e Preparar Conjunto de Dados

In [None]:
# dados importados da plataforma KAGGLE - https://www.kaggle.com/datasets/shivamb/machine-predictive-maintenance-classification
df = pd.read_csv("predictive_maintenance.csv")
df.head()


Unnamed: 0,UDI,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min],Target,Failure Type
0,1,M14860,M,298.1,308.6,1551,42.8,0,0,No Failure
1,2,L47181,L,298.2,308.7,1408,46.3,3,0,No Failure
2,3,L47182,L,298.1,308.5,1498,49.4,5,0,No Failure
3,4,L47183,L,298.2,308.6,1433,39.5,7,0,No Failure
4,5,L47184,L,298.2,308.7,1408,40.0,9,0,No Failure


* descartar as colunas desnecessárias

In [None]:
df=df.drop(['Product ID','UDI','Target'],axis=1)

* codificar a coluna de falhas para tipo int entre 0 e 5

In [None]:
label_encoder = LabelEncoder()

# Perform label encoding
df['Failure Type_encoded'] = label_encoder.fit_transform(df['Failure Type'])

df['Failure Type_encoded'].value_counts()

Failure Type_encoded
1    9652
0     112
3      95
2      78
5      45
4      18
Name: count, dtype: int64

* converter a coluna 'Type' (string) em outras duas separadas do tipo bool (uma para cada tipo de material) 

In [3]:
df = pd.get_dummies(df, columns=['Type'], prefix='Type', drop_first=True)
df.head()


NameError: name 'df' is not defined

* separar os dados em: dados de entrada (features), X  e dados de saída (label), y

In [7]:
# Definição do conjunto de dados de entrada e do conjunto de dados de saída

X_raw = df.drop(['Failure Type','Failure Type_encoded'], axis=1).values
y_raw= df['Failure Type_encoded'].values


* Em aplicações do mundo real, a modelagem de classificação frequentemente enfrenta o problema de conjuntos de dados desequilibrados, onde o número de instâncias da classe majoritária é muito maior do que o da classe minoritária, o que dificulta o aprendizado adequado do modelo em relação à classe minoritária. Isso se torna um problema sério quando a informação contida na classe minoritária é mais importante como, por exemplo, no conjunto utilizado.

* Uma das abordagens populares para resolver o problema de conjuntos de dados desequilibrados é a superamostragem da classe minoritária ou a subamostragem da classe majoritária. No entanto, essas abordagens possuem suas próprias limitações. No método tradicional de superamostragem, a ideia é duplicar aleatoriamente alguns exemplos da classe minoritária — essa técnica não adiciona novas informações ao conjunto de dados. Por outro lado, o método de subamostragem é realizado removendo aleatoriamente alguns exemplos da classe majoritária, o que resulta na perda de algumas informações originais dos dados.

In [8]:
print('Total de amostras antes balanceamento:',df['Failure Type_encoded'].count())
print('Total de amostras por falha:\n\n', df['Failure Type_encoded'].value_counts())

Total de amostras antes balanceamento: 10000
Total de amostras por falha:

 Failure Type_encoded
1    9652
0     112
3      95
2      78
5      45
4      18
Name: count, dtype: int64


* Uma solução para superar essas limitações é gerar novos exemplos sintetizados a partir da classe minoritária existente. Esse método é conhecido como Técnica de Superamostragem da Minoria Sintética (SMOTE). Existem muitas variações do SMOTE, mas neste artigo será explicado o método SMOTE-Tomek Links e sua implementação em Python. Esse método combina a superamostragem do SMOTE com a subamostragem dos Tomek Links.

* O método SMOTE-Tomek Links, desenvolvido por Chawla et al. (2002) é uma combinação de técnicas que visa equilibrar os dados ao aumentar a representatividade da classe minoritária através da criação de novos exemplos sintéticos e ao mesmo tempo remover exemplos que são considerados ruidosos ou redundantes na classe majoritária. A implementação desta abordagem utilizando Python é apresentada, detalhando os passos e a lógica do algoritmo, assim como os resultados obtidos em diferentes conjuntos de dados desequilibrados.

* A combinação das técnicas de superamostragem e subamostragem, especificamente utilizando o método SMOTE-Tomek Links, mostra-se eficaz na melhoria do desempenho dos modelos de classificação em conjuntos de dados desequilibrados. Esta abordagem permite ao modelo aprender de forma mais eficiente a partir da classe minoritária, mantendo a integridade e a diversidade da informação no conjunto de dados.



In [9]:
#implementação do método SMOTE no conunto de dados
smt=SMOTETomek(sampling_strategy='auto',random_state=42)
X_resampled, y_resampled = smt.fit_resample(X_raw, y_raw)

# Convertendo para dataframe do pandas
df_resampled = pd.DataFrame(X_resampled, columns=[f'feature_{i}' for i in range(X_resampled.shape[1])])
df_resampled['Type_encoded'] = y_resampled 

df_resampled.head()

#distribuição das classes após balanceamento
print('Total de amostras após balanceamento:',df_resampled['Type_encoded'].count())
print('Total de amostras por falha:\n\n', df_resampled['Type_encoded'].value_counts())

Total de amostras após balanceamento: 57868
Total de amostras por falha:

 Type_encoded
2    9652
0    9649
5    9647
3    9645
4    9645
1    9630
Name: count, dtype: int64


In [10]:
df_resampled = df_resampled.rename(columns={'feature_0':'Temp_Ar'})
df_resampled = df_resampled.rename(columns={'feature_1':'Temp_Pr'})
df_resampled = df_resampled.rename(columns={'feature_2':'Vel_Spindle'})
df_resampled = df_resampled.rename(columns={'feature_3':'Torque'})
df_resampled = df_resampled.rename(columns={'feature_4':'Desg_Ferr'})
df_resampled = df_resampled.rename(columns={'feature_5':'Mat_L'})
df_resampled = df_resampled.rename(columns={'feature_6':'Mat_M'})

df_resampled = df_resampled.rename(columns={'Type_encoded':'Tipo_Falha'})

df_resampled.head()


Unnamed: 0,Temp_Ar,Temp_Pr,Vel_Spindle,Torque,Desg_Ferr,Mat_L,Mat_M,Tipo_Falha
0,298.1,308.6,1551.0,42.8,0.0,0.0,1.0,1
1,298.2,308.7,1408.0,46.3,3.0,1.0,0.0,1
2,298.1,308.5,1498.0,49.4,5.0,1.0,0.0,1
3,298.2,308.6,1433.0,39.5,7.0,1.0,0.0,1
4,298.2,308.7,1408.0,40.0,9.0,1.0,0.0,1


* Metodo para normalização do conjunto de dados e formatação para as diferentes redes neurais (MLP e KAN)

In [1]:
def load_Preditive_Maintenance_Dataset():

    X_ = df_resampled.drop(['Tipo_Falha'], axis="columns")
    '''''
    scaler = MinMaxScaler(feature_range=(-1, 1))
    X_   = scaler.fit_transform(X_)    
    joblib.dump(scaler, 'minmax_scaler.pkl')
    '''
    y_ = df_resampled['Tipo_Falha']

    #convertendo os dados em tensores e atribuindo a sua excucção na CPU ou GPU (device) conforme disponibilidade avaliada no inicio do programa
    X = torch.tensor(X_, dtype = torch.float32).to(device)
    y = torch.tensor(y_.values, dtype = torch.long).to(device)    

    #separa os dados entre conjunto de dados para treinamento (80%) e conjunto de dados para teste/validação (20% <-> teste_size=0.2)
    train_data, test_data, train_target, test_target = train_test_split(X, y, test_size=0.2, random_state=42)

    # geração de  mini batches para melhorar desempenho do treinamento
    train_loader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(train_data, train_target), batch_size=10, shuffle=True)   
    validation_loader = torch.utils.data.DataLoader(torch.utils.data.TensorDataset(test_data, test_target), batch_size=10, shuffle=False)  

    #conjunto de dados no formato a ser utilizado no redes MLP
    train_data    = train_loader
    test_data     = validation_loader
    MLP_dataset   = [train_data, test_data]  

    # conjunto de dados no formato a ser utilizado  redes KAN
    train_inputs = torch.empty(0, 7, device=device)
    train_labels = torch.empty(0, dtype=torch.long, device=device)
    test_inputs = torch.empty(0, 7, device=device)
    test_labels = torch.empty(0, dtype=torch.long, device=device)

    for data, labels in train_loader:
        train_inputs = torch.cat((train_inputs, data.to(device)), dim=0)
        train_labels = torch.cat((train_labels, labels.to(device)), dim=0)

    for data, labels in validation_loader:
        test_inputs = torch.cat((test_inputs, data.to(device)), dim=0)
        test_labels = torch.cat((test_labels, labels.to(device)), dim=0)

    KAN_dataset = {}
    KAN_dataset['train_input']  = train_inputs
    KAN_dataset['test_input']   = test_inputs
    KAN_dataset['train_label']  = train_labels
    KAN_dataset['test_label']   = test_labels


    return MLP_dataset, KAN_dataset

dataset =load_Preditive_Maintenance_Dataset()

NameError: name 'df_resampled' is not defined

# Implementação Rede Kolmogorov-Arnold

In [1]:
image_folder = 'videos'

in_layer = 7

model = KAN(width=[7, 5, 6], grid=5, k=5, seed=0, device=device)

model(dataset[1]['train_input'])

model.plot(beta=100, scale=1, in_vars=['ta', 'tp', 'vs', 'to','df','ml','mm'], out_vars=['01', '02', '03','04','05','06'])

NameError: name 'KAN' is not defined

In [None]:
def train_acc():
    return torch.mean((torch.argmax(model(dataset[1]['train_input']), dim=1) == dataset[1]['train_label']).float())

def test_acc():
    return torch.mean((torch.argmax(model(dataset[1]['test_input']), dim=1) == dataset[1]['test_label']).float())

KAN_start_train = time.time()

results = model.train(dataset[1], opt="Adam", device=device, metrics=(train_acc, test_acc),
                      loss_fn=torch.nn.CrossEntropyLoss(), steps=1, lamb=0.01, lamb_entropy=10., save_fig=True, img_folder=image_folder)
KAN_end_train = time.time()

KAN_training_time = KAN_end_train - KAN_start_train

results['train_acc'][-1], results['test_acc'][-1]

train loss: 2.25e+00 | test loss: 3.71e+00 | reg: 1.16e+02 : 100%|████| 2/2 [00:15<00:00,  7.59s/it]


(0.12375254184007645, 0.13150164484977722)

In [None]:
train_loss_evolution = np.array([x.item() for x in results['train_loss']])  
test_loss_evolution  = np.array([x.item() for x in results['test_loss']])  
iteration =  range(1, 20+1)       

plt.figure(figsize=(10, 6))
plt.plot(iteration, train_loss_evolution, marker='o', linestyle='-',  color='b', label='Erro Treino 1')
plt.plot(iteration, test_loss_evolution,  marker='x', linestyle='--', color='c', label='Erro Teste 2')

plt.title('Evolução do Ajuste do Erro')
plt.xlabel('Iterações')
plt.ylabel('Erro')
plt.grid(True)
plt.show()

In [None]:
model.plot(scale=1, in_vars=['ta', 'tp', 'vs', 'to','df','ml','mm'], out_vars=['01', '02', '03','04','05','06'])

In [45]:
video_name='video'
fps=10

fps = fps
files = os.listdir(image_folder)
train_index = []
for file in files:
    if file[0].isdigit() and file.endswith('.jpg'):
        train_index.append(int(file[:-4]))

train_index = np.sort(train_index)

image_files = [image_folder+'/'+str(train_index[index])+'.jpg' for index in train_index]

clip = moviepy.video.io.ImageSequenceClip.ImageSequenceClip(image_files, fps=fps)
clip.write_videofile(video_name+'.mp4')

Moviepy - Building video video.mp4.
Moviepy - Writing video video.mp4



                                                            

Moviepy - Done !
Moviepy - video ready video.mp4


In [None]:
model = model.prune()
model(dataset[1]['train_input'])
model.plot(scale=1, in_vars=['f1', 'f2', 'f3', 'f4','f5','f6','f7'], out_vars=['01', '02', '03','04','05','06'])

In [32]:
# fine tune
results_1 = model.train(dataset[1], opt="Adam", device=device, metrics=(train_acc, test_acc),
                      loss_fn=torch.nn.CrossEntropyLoss(), steps=1, lamb=0.01, lamb_entropy=10.)
results_1['train_acc'][-1], results_1['test_acc'][-1]

train loss: 1.40e+00 | test loss: 3.11e+00 | reg: 1.07e+02 : 100%|████| 1/1 [00:04<00:00,  4.39s/it]


(0.3195662498474121, 0.3209780752658844)

In [None]:
model.plot(scale=1, in_vars=['f1', 'f2', 'f3', 'f4','f5','f6','f7'], out_vars=['01', '02', '03','04','05','06'])

# Implementação Rede Neural Multilayer Perceptron

In [41]:
class MLP_Model(nn.Module):  
    
    def __init__(self):
        super(MLP_Model, self).__init__()
        self.fc1 = nn.Linear(7,10)  
        self.fc2 = nn.Linear(10,10)
        self.fc3 = nn.Linear(10,10)
        self.fc4 = nn.Linear(10,6)

    def forward(self,x):

        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = self.fc4(x)    
        return x        
    
def train(model, train_loader, criterion, optimizer, num_epochs):
    model.train()
    
    ram_usage=[]
    cpu_usage=[]
    backprop_time=[]
    loss_list=[]

    startMLP_train = time.time()
    for epoch in range(num_epochs):  

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device) 
            optimizer.zero_grad()            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            startMLP_bwd = time.time()
            loss.backward()        
            endMLP_bwd = time.time()
            backprop_time.append((endMLP_bwd-startMLP_bwd)*1000)
            optimizer.step()

        loss_list.append(loss)
        ram_usage.append(psutil.cpu_percent(4))
        cpu_usage.append(psutil.virtual_memory()[3]/1000000000)
        print('\033[K',f'Epoch {epoch+1}/{num_epochs} | Loss: {round(loss.item(), 15)} | Backpropagation Time: {round(backprop_time[-1],2)} ms | CPU Usage: {round(cpu_usage[-1],2)} % | RAM Usage: {round(ram_usage[-1],2)} GB', end="\r")

    endMLP_train = time.time()
    train_time = endMLP_train - startMLP_train
    backpropagation_time = np.max(backprop_time)
    train_cpu_usage = np.max(cpu_usage)   
    train_ram_usage = np.max(ram_usage)

    info = {'Num_Epochs': num_epochs,
            'Back_Propagation_Time': backpropagation_time,
            'Max_CPU_usage': train_cpu_usage,
            'Max_RAM_usage' : train_ram_usage,
            'Train_Time' : train_time,
            'Loss_List': loss_list }

    return info

def test_model(model, test_loader):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device) 
            outputs = model(inputs)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    accuracy = 100 * correct / total
    print(f'Accuracy: {accuracy}%')


In [42]:
model = MLP_Model().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.01)

In [27]:

mlp_train_results = train(model, dataset[0][0], criterion, optimizer, num_epochs=10)

test_model(model, dataset[0][1])

Accuracy: 94.50492483151892%537381414324 | Backpropagation Time: 0.6 ms | CPU Usage: 5.47 % | RAM Usage: 0.9 GBB


In [None]:
MLP_loss_list = np.array([x.item() for x in mlp_train_results['Loss_List']]) 
iteration =  range(1, mlp_train_results['Num_Epochs']+1)        

plt.figure(figsize=(10, 6))
plt.plot(iteration, MLP_loss_list, marker='o', linestyle='-',  color='b', label='Erro Treino 1')

plt.title('Evolução do Ajuste do Erro')
plt.xlabel('Iterações')
plt.ylabel('Erro')
plt.grid(True)
plt.show()

INFERÊNCIA