In [None]:
import time

import numpy as np
import matplotlib.pyplot as plt

import pandas as pd

from sklearn.utils import shuffle
from sklearn.model_selection import train_test_split

from scipy.fftpack import fft
from sklearn.preprocessing import StandardScaler
from scipy.signal import butter, filtfilt, resample_poly
from fractions import Fraction


import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, Conv2D, MaxPooling1D, MaxPooling2D, Flatten, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau

from tensorflow.keras.models import load_model


from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score


from sklearn.preprocessing import label_binarize
from sklearn.metrics import roc_curve, auc
from itertools import cycle

import seaborn as sns

from matplotlib.backends.backend_agg import FigureCanvasAgg
from PIL import Image


In [None]:
#File names

import os

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
#Load a model

loaded_model = load_model("")

# --------------------------------------------------------

# Input Dataset I

In [None]:

data0D = pd.read_csv('/kaggle/input/vibration-dataset-1/0D.csv')
data1D = pd.read_csv('/kaggle/input/vibration-dataset-1/1D.csv')
data2D = pd.read_csv('/kaggle/input/vibration-dataset-1/2D.csv')
data3D = pd.read_csv('/kaggle/input/vibration-dataset-1/3D.csv')
data4D = pd.read_csv('/kaggle/input/vibration-dataset-1/4D.csv')

data0E = pd.read_csv('/kaggle/input/vibration-dataset-1/0E.csv')
data1E = pd.read_csv('/kaggle/input/vibration-dataset-1/1E.csv')
data2E = pd.read_csv('/kaggle/input/vibration-dataset-1/2E.csv')
data3E = pd.read_csv('/kaggle/input/vibration-dataset-1/3E.csv')
data4E = pd.read_csv('/kaggle/input/vibration-dataset-1/4E.csv')


print('DATASET LOADED')

In [None]:
# The signal was initialized from 20 seconds onward to eliminate the initial transient phase.

fs = 4096
initial_time = 20 * fs  


# Reset index
data0D = data0D.iloc[initial_time:].reset_index(drop=True)
data1D = data1D.iloc[initial_time:].reset_index(drop=True)
data2D = data2D.iloc[initial_time:].reset_index(drop=True)
data3D = data3D.iloc[initial_time:].reset_index(drop=True)
data4D = data4D.iloc[initial_time:].reset_index(drop=True)


data0E = data0E.iloc[initial_time:].reset_index(drop=True)
data1E = data1E.iloc[initial_time:].reset_index(drop=True)
data2E = data2E.iloc[initial_time:].reset_index(drop=True)
data3E = data3E.iloc[initial_time:].reset_index(drop=True)
data4E = data4E.iloc[initial_time:].reset_index(drop=True)


print('Done')

In [None]:
# One-second window

window_time = 1
window = fs * window_time  

In [None]:
# Extracts signal segments of the specified window size.

def get_features(data, label):
    n = int(np.floor(len(data)/window))
    data = data[:int(n)*window]
    X = data.values.reshape((n, window))
    y = np.ones(n)*labels[label]
    return X,y

In [None]:
labels = {'no_unbalance':0, 'unbalance_1':1, 'unbalance_2':2,'unbalance_3':3, 'unbalance_4':4}
sensor = 'Vibration_2'


X0D, y0D = get_features(data0D[sensor], "no_unbalance")
X1D, y1D = get_features(data1D[sensor], "unbalance_1")
X2D, y2D = get_features(data2D[sensor], "unbalance_2")
X3D, y3D = get_features(data3D[sensor], "unbalance_3")
X4D, y4D = get_features(data4D[sensor], "unbalance_4")


X0E, y0E = get_features(data0E[sensor], "no_unbalance")
X1E, y1E = get_features(data1E[sensor], "unbalance_1")
X2E, y2E = get_features(data2E[sensor], "unbalance_2")
X3E, y3E = get_features(data3E[sensor], "unbalance_3")
X4E, y4E = get_features(data4E[sensor], "unbalance_4")


X=np.concatenate([X0D, X1D, X2D, X3D, X4D, X0E, X1E, X2E, X3E, X4E])
Y=np.concatenate([y0D, y1D, y2D, y3D, y4D, y0E, y1E, y2E, y3E, y4E])


print(X.shape, Y.shape)

In [None]:
# Histogram used to visualize the number of samples per class (to verify whether the dataset is balanced).

custom_labels = [
    "Normal",
    "Unb. I",
    "Unb. II",
    "Unb. III",
    "Unb. IV"
]


plt.figure(figsize=(8,5))
plt.hist(Y, bins=np.arange(len(labels)+1)-0.5, edgecolor='black', rwidth=0.8)


plt.xticks(range(len(labels)), custom_labels, rotation=0)
plt.xlabel("Class")
plt.ylabel("Absolute Frequency")
plt.title("Class Distribution for Dataset I")

plt.grid(axis='y', linestyle='--', alpha=0.6)

plt.show()

# --------------------------------------------------------

# Input Dataset II

In [None]:


for i in range(1, 1001):
    globals()[f"data_normal_{i}"] = pd.read_csv(f'/kaggle/input/vbl-va001/normal/normal_{i}.csv', header=None)

for i in range(1, 501):
    globals()[f"data_unbalance_i_{i}"] = pd.read_csv(f'/kaggle/input/vbl-va001/unbalance_6/unbalance_i_{i}.csv', header=None)

for i in range(1, 501):
    globals()[f"data_unbalance_ii_{i}"] = pd.read_csv(f'/kaggle/input/vbl-va001/unbalance_27/unbalance_ii_{i}.csv', header=None)



for i in range(1, 1001):
    globals()[f"data_misalignment_{i}"] = pd.read_csv(f'/kaggle/input/vbl-va001/misalignment/misalignment_{i}.csv', header=None)
    

for i in range(1, 1001):
    globals()[f"data_bearing_{i}"] = pd.read_csv(f'/kaggle/input/vbl-va001/bearing/bearing_{i}.csv', header=None)



print('DATASET LOADED')

In [None]:
# Visualize which axis exhibits the highest amplitude.


df = globals()["data_unbalance_ii_100"]

time = df.iloc[:, 0]
axis_x = df.iloc[:, 1]
axis_y = df.iloc[:, 2]
axis_z = df.iloc[:, 3]

plt.figure(figsize=(12, 6))
plt.plot(time, axis_x, label='X axis')
plt.plot(time, axis_y, label='Y axis')
plt.plot(time, axis_z, label='Z axis')
plt.title('Vibration in 3 axis - Unbalance II')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
fs = 20000

window_time = 1
window = fs * window_time  


In [None]:
# Extracts signal segments of the specified window size.

def get_features(data, label):
    n = int(np.floor(len(data)/window))
    data = data[:int(n)*window]
    X = data.values.reshape((n, window))
    y = np.ones(n)*labels[label]
    return X,y

In [None]:
labels = {'normal':0, 'unbalance_1':1, 'unbalance_2':2,'misaligment':3, 'bearing_fault':4}
axis = 2


X_list = []
Y_list = []


for i in range(1, 1001):
    globals()[f'X_normal_{i}'], globals()[f'y_normal_{i}'] = get_features(globals()[f'data_normal_{i}'][axis], "normal")
    X_list.append(globals()[f"X_normal_{i}"])
    Y_list.append(globals()[f"y_normal_{i}"])


for i in range(1, 501):
    globals()[f'X_unbalance_i_{i}'], globals()[f'y_unbalance_i_{i}'] = get_features(globals()[f'data_unbalance_i_{i}'][axis], "unbalance_1")
    X_list.append(globals()[f"X_unbalance_i_{i}"])
    Y_list.append(globals()[f"y_unbalance_i_{i}"])
    
for i in range(1, 501):
    globals()[f'X_unbalance_ii_{i}'], globals()[f'y_unbalance_ii_{i}'] = get_features(globals()[f'data_unbalance_ii_{i}'][axis], "unbalance_2")
    X_list.append(globals()[f"X_unbalance_ii_{i}"])
    Y_list.append(globals()[f"y_unbalance_ii_{i}"])



for i in range(1, 1001):
    globals()[f'X_misaligment_{i}'], globals()[f'y_misaligment_{i}'] = get_features(globals()[f'data_misalignment_{i}'][axis], "misaligment")
    X_list.append(globals()[f"X_misaligment_{i}"])
    Y_list.append(globals()[f"y_misaligment_{i}"])

for i in range(1, 1001):
    globals()[f'X_bearing_{i}'], globals()[f'y_bearing_{i}'] = get_features(globals()[f'data_bearing_{i}'][axis], "bearing_fault")
    X_list.append(globals()[f"X_bearing_{i}"])
    Y_list.append(globals()[f"y_bearing_{i}"])



X=np.concatenate(X_list)
Y=np.concatenate(Y_list)


print(X.shape, Y.shape)

print('Done')

In [None]:
# Histogram used to visualize the number of samples per class (to verify whether the dataset is balanced).

custom_labels = [
    "Normal",
    "Unb. I",
    "Unb. II",
    "Misalig.",
    "Bearings"
]

# Criar histograma
plt.figure(figsize=(8,5))
plt.hist(Y, bins=np.arange(len(labels)+1)-0.5, edgecolor='black', rwidth=0.8)

# Ajustar eixos e rótulos
plt.xticks(range(len(labels)), custom_labels, rotation=0)
plt.xlabel("Class")
plt.ylabel("Absolute Frequency")
plt.title("Class Distribution for Dataset II")

plt.grid(axis='y', linestyle='--', alpha=0.6)

plt.show()

# --------------------------------------------------------

# Input Dataset III

In [None]:


for i in range(1, 50):
    globals()[f"data_normal_{i}"] = pd.read_csv(f'/kaggle/input/comfaulda/COMFAULDA_v2/normal/normal_{i}.csv', header=None, sep = '[;,]', engine = 'python')

for i in range(1, 49):
    globals()[f"data_unbalance_i_{i}"] = pd.read_csv(f'/kaggle/input/comfaulda/COMFAULDA_v2/unbalance_i/unbalance_6_{i}.csv', header=None, sep = '[;,]', engine = 'python')

for i in range(1, 49):
    globals()[f"data_unbalance_ii_{i}"] = pd.read_csv(f'/kaggle/input/comfaulda/COMFAULDA_v2/unbalance_ii/unbalance_20_{i}.csv', header=None, sep = '[;,]', engine = 'python')

for i in range(1, 49):
    globals()[f"data_unbalance_iii_{i}"] = pd.read_csv(f'/kaggle/input/comfaulda/COMFAULDA_v2/unbalance_iii/unbalance_35_{i}.csv', header=None, sep = '[;,]', engine = 'python')


for i in range(1, 50):
    globals()[f"data_misalignment_{i}"] = pd.read_csv(f'/kaggle/input/comfaulda/COMFAULDA_v2/misalignment/misalignment_{i}.csv', header=None,sep = '[;,]', engine = 'python')


for i in range(1, 40):
    globals()[f"data_unbalance_misaligment_{i}"] = pd.read_csv(f'/kaggle/input/comfaulda/COMFAULDA_v2/unbalance_misalignment/unbalance_misalignment_{i}.csv', header=None, sep = '[;,]', engine = 'python')




print('DATASET LOADED')

In [None]:
# Visualize which axis exhibits the highest amplitude.


df = globals()["data_unbalance_ii_42"]

axis_time = df.iloc[:, 0]
axis_x = df.iloc[:, 5]
axis_y = df.iloc[:, 7]
axis_z = df.iloc[:, 6]

plt.figure(figsize=(12, 6))
plt.plot(time, axis_x, label='X axis')
plt.plot(time, axis_y, label='Y axis')
plt.plot(time, axis_z, label='Z axis')
plt.title('Vibration in 3 axis - Unbalance II')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
fs = 50000

window_time = 1
window = fs * window_time 

In [None]:
# Extracts signal segments of the specified window size.

def get_features(data, label):
    n = int(np.floor(len(data)/window))
    data = data[:int(n)*window]
    X = data.values.reshape((n, window))
    y = np.ones(n)*labels[label]
    return X,y

In [None]:
labels = {'normal':0, 'unbalance_1':1, 'unbalance_2':2, 'unbalance_3':3, 'misaligment':4, 'unbalance_misaligment':5}
axis = 6


X_list = []
Y_list = []


for i in range(1, 50):
    globals()[f'X_normal_{i}'], globals()[f'y_normal_{i}'] = get_features(globals()[f'data_normal_{i}'][axis], "normal")
    X_list.append(globals()[f"X_normal_{i}"])
    Y_list.append(globals()[f"y_normal_{i}"])

for i in range(1, 49):
    globals()[f'X_unbalance_i_{i}'], globals()[f'y_unbalance_i_{i}'] = get_features(globals()[f'data_unbalance_i_{i}'][axis], "unbalance_1")
    X_list.append(globals()[f"X_unbalance_i_{i}"])
    Y_list.append(globals()[f"y_unbalance_i_{i}"])
    
for i in range(1, 49):
    globals()[f'X_unbalance_ii_{i}'], globals()[f'y_unbalance_ii_{i}'] = get_features(globals()[f'data_unbalance_ii_{i}'][axis], "unbalance_2")
    X_list.append(globals()[f"X_unbalance_ii_{i}"])
    Y_list.append(globals()[f"y_unbalance_ii_{i}"])

for i in range(1, 49):
    globals()[f'X_unbalance_iii_{i}'], globals()[f'y_unbalance_iii_{i}'] = get_features(globals()[f'data_unbalance_iii_{i}'][axis], "unbalance_3")
    X_list.append(globals()[f"X_unbalance_iii_{i}"])
    Y_list.append(globals()[f"y_unbalance_iii_{i}"])


for i in range(1, 50):
    globals()[f'X_misalignment_{i}'], globals()[f'y_misalignment_{i}'] = get_features(globals()[f'data_misalignment_{i}'][axis], "misaligment")
    X_list.append(globals()[f"X_misalignment_{i}"])
    Y_list.append(globals()[f"y_misalignment_{i}"])

for i in range(1, 40):
    globals()[f'X_unbalance_misaligment_{i}'], globals()[f'y_unbalance_misaligment_{i}'] = get_features(globals()[f'data_unbalance_misaligment_{i}'][axis], "unbalance_misaligment")
    X_list.append(globals()[f"X_unbalance_misaligment_{i}"])
    Y_list.append(globals()[f"y_unbalance_misaligment_{i}"])


X=np.concatenate(X_list)
Y=np.concatenate(Y_list)


print(X.shape, Y.shape)

print('Done')


In [None]:
# Histogram used to visualize the number of samples per class (to verify whether the dataset is balanced).

custom_labels = [
    "Normal",
    "Unb. I",
    "Unb. II",
    "Unb. III",
    "Misalig.",
    "Unb. II + Misalig."
]

# Criar histograma
plt.figure(figsize=(8,5))
plt.hist(Y, bins=np.arange(len(labels)+1)-0.5, edgecolor='black', rwidth=0.8)

# Ajustar eixos e rótulos
plt.xticks(range(len(labels)), custom_labels, rotation=0)
plt.xlabel("Class")
plt.ylabel("Absolute Frequency")
plt.title("Class Distribution for Dataset III")

plt.grid(axis='y', linestyle='--', alpha=0.6)

plt.show()

# --------------------------------------------------------

## Train, validation and test dataset Split

##### 70 % Train, 10% Validation and  20% Test

In [None]:
X, Y = shuffle(X, Y, random_state=42)

In [None]:
# 20% of the data used for testing.
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

# 10% of the data used for validation.
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.125, random_state=42)

print(X_train.shape, Y_train.shape, X_val.shape, Y_val.shape, X_test.shape, Y_test.shape)

## CNN 1D - TIME DOMAIN

In [None]:
#Feature Scaling - Samples Standardization


scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

In [None]:
X_train = X_train[..., np.newaxis]
X_val   = X_val[...,   np.newaxis]
X_test  = X_test[...,  np.newaxis] 

In [None]:

start_time = time.time()

num_classes = len(np.unique(Y_train)) 


# --- CNN model arquitecture ---

input_shape = X_train.shape[1:]

model = Sequential([
    Conv1D(32, kernel_size=3 , activation='relu', input_shape=input_shape),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Conv1D(64, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Conv1D(128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(num_classes, activation='softmax')
]) 


early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)



model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy', 
              metrics=['accuracy']) 


model.summary()



# --- Model training ---
history = model.fit(X_train, Y_train, 
                    validation_data=(X_val, Y_val),
                    epochs=250, 
                    batch_size=32,
                    shuffle = True,
                    callbacks=[early_stopping])



end_time = time.time()
elapsed_time = end_time - start_time

print("Time spent in training the model (s):", elapsed_time)

In [None]:
# --- Model evaluation ---

Y_pred = np.argmax(model.predict(X_test), axis=1)
print(Y_pred)


print(classification_report(Y_test, Y_pred))


test_loss, test_acc = model.evaluate(X_test, Y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f} - Test Accuracy: {test_acc:.4f}")

In [None]:
# Learning curves.

plt.plot(history.history['accuracy'], label='Test set')
plt.plot(history.history['val_accuracy'], label = 'Validation set')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("Accuracy curves")
plt.ylim([0, 1])
plt.xlim([0, 250])
plt.legend(loc='lower right')


In [None]:
conf_matriz = confusion_matrix(Y_test, Y_pred)

#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

plt.figure(figsize=(6, 5))
sns.heatmap(conf_matriz, annot=True, fmt='d', cmap='Blues',
            xticklabels=conf_matriz_classes, yticklabels=conf_matriz_classes,cbar=True)

plt.title('Confusion matrix')
plt.xlabel('Predicted Class')
plt.ylabel('Real Class')
plt.tight_layout()
plt.show()

In [None]:
# ROC Curve and AUC value

#Binarize the labels.
classes = list(labels.values())  # [0, 1, 2, 3, 4]
Y_test_bin = label_binarize(Y_test, classes=classes)


Y_score = model.predict(X_test)


fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = len(classes)


for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test_bin[:, i], Y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])



plt.figure(figsize=(8, 6))

colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'crimson','purple'])

#class_names = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
#class_names = ['Normal', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
class_names = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']


for i, (color, name) in enumerate(zip(colors, class_names)):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label=f"{name} (AUC = {roc_auc[i]:.2f})")


plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Specificity)')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
#Save model

model.save("")

## CNN 1D - FREQUENCY DOMAIN

In [None]:
def extract_features_fft(signal):
    fft_vals = fft(signal)
    fft_magnitude = np.abs(fft_vals)[:len(signal)//2]
    
    return fft_magnitude

In [None]:
X_train_fft = []
X_val_fft = []
X_test_fft = []

for signal in X_train:
    featured_signal = extract_features_fft(signal)
    X_train_fft.append(featured_signal)

for signal in X_val:
    featured_signal = extract_features_fft(signal)
    X_val_fft.append(featured_signal)
    
for signal in X_test:
    featured_signal = extract_features_fft(signal)
    X_test_fft.append(featured_signal)    
    
X_train_fft =  np.array(X_train_fft)
X_val_fft = np.array(X_val_fft)
X_test_fft = np.array(X_test_fft)


print(X_train_fft.shape)
print(X_val_fft.shape)
print(X_test_fft.shape)

In [None]:
X_train_fft = X_train_fft[..., np.newaxis]
X_val_fft   = X_val_fft[...,   np.newaxis]
X_test_fft  = X_test_fft[...,  np.newaxis] 

In [None]:
start_time = time.time()

num_classes = len(np.unique(Y_train)) 


# --- CNN model arquitecture ---
input_shape = X_train_fft.shape[1:]

model = Sequential([
    Conv1D(32, kernel_size=3 , activation='relu', input_shape=input_shape),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Conv1D(64, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Conv1D(128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(num_classes, activation='softmax')
]) 
                 
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


model.summary()


# --- Model training ---
history = model.fit(X_train_fft, Y_train, 
                    validation_data=(X_val_fft, Y_val),
                    epochs=250, 
                    batch_size=32,
                    shuffle = True,
                    callbacks=[early_stopping])


end_time = time.time()
elapsed_time = end_time - start_time

print("Time spent in training the model (s):", elapsed_time)

In [None]:
# --- Model evaluation ---

Y_pred = np.argmax(model.predict(X_test_fft), axis=1)
print(Y_pred)


print(classification_report(Y_test, Y_pred))


test_loss, test_acc = model.evaluate(X_test_fft, Y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f} - Test Accuracy: {test_acc:.4f}")

In [None]:
# Learning curves.

plt.plot(history.history['accuracy'], label='Test set')
plt.plot(history.history['val_accuracy'], label = 'Validation set')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("Accuracy curves")
plt.ylim([0, 1])
plt.xlim([0, 60])
plt.legend(loc='lower right')


In [None]:
conf_matriz = confusion_matrix(Y_test, Y_pred)

#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
#conf_matriz_classes = ['No unb.', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

plt.figure(figsize=(6, 5))
sns.heatmap(conf_matriz, annot=True, fmt='d', cmap='Blues',
            xticklabels=conf_matriz_classes, yticklabels=conf_matriz_classes,cbar=True)

plt.title('Confusion matrix')
plt.xlabel('Predicted Class')
plt.ylabel('Real Class')
plt.tight_layout()
plt.show()

In [None]:
# ROC Curve and AUC value

#Binarize the labels.
classes = list(labels.values())  # [0, 1, 2, 3, 4]
Y_test_bin = label_binarize(Y_test, classes=classes)

Y_score = model.predict(X_test_fft)

fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = len(classes)


for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test_bin[:, i], Y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])



plt.figure(figsize=(8, 6))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'crimson'])


#class_names = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
class_names = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']


for i, (color, name) in enumerate(zip(colors, class_names)):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label=f"{name} (AUC = {roc_auc[i]:.2f})")



plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Specificity)')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
#Save model

model.save("")

## CNN 2D - TIME DOMAIN

In [None]:
#Feature Scaling - Samples Standardization


scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_val = scaler.transform(X_val)
X_test = scaler.transform(X_test)

In [None]:
# Creating figures from the time domain samples.


def segments_to_waveform_images(X, img_size=(128, 128)):
   
    n_segments, n_samples = X.shape
    images = np.zeros((n_segments, img_size[0], img_size[1], 3), dtype=np.float32)

    fig = plt.figure(figsize=(img_size[1]/100, img_size[0]/100), dpi=100)
    ax = fig.add_subplot(111)
    ax.set_axis_off()
    
    for i in range(n_segments):
        ax.clear()
        ax.plot(X[i], color='black', linewidth=1)
        ax.set_xlim(0, n_samples-1)
        ax.set_ylim(np.min(X[i]), np.max(X[i]))
        ax.set_axis_off()
        
        canvas = FigureCanvasAgg(fig)
        canvas.draw()
        buf = canvas.buffer_rgba()
        img = np.asarray(buf)[:, :, :3]
        
        img_pil = Image.fromarray(img)
        img_resized = img_pil.resize(img_size, resample=Image.BILINEAR)
        
        images[i] = np.array(img_resized, dtype=np.float32) / 255.0
    
    plt.close(fig)
    return images


img_size = (128, 128)

X_train_img = segments_to_waveform_images(X_train, img_size)
X_val_img   = segments_to_waveform_images(X_val,   img_size)
X_test_img  = segments_to_waveform_images(X_test,  img_size)

print("Train images:", X_train_img.shape)  # deve ser (n_train, 128, 128, 3)
print("Valdation images:", X_val_img.shape)
print("Test images:", X_test_img.shape)

In [None]:
# Example of one sample.

img = 30

plt.imshow(X_train_img[img])
plt.axis("off")
plt.show()

print(Y_train[img])

In [None]:
start_time = time.time()

num_classes = len(np.unique(Y_train)) 


# --- CNN model arquitecture ---
input_shape = X_train_img.shape[1:]

model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Conv2D(128, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(num_classes, activation='softmax')      
])

early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


model.summary()


# --- Model training ---
history = model.fit(X_train_img, Y_train, 
                    validation_data=(X_val_img, Y_val),
                    epochs=250, 
                    batch_size=32,
                    shuffle = True,
                    callbacks=[early_stopping])



end_time = time.time()
elapsed_time = end_time - start_time

print("Time spent in training the model (s):", elapsed_time)

In [None]:
# --- Model evaluation ---

Y_pred = np.argmax(model.predict(X_test_img), axis=1)
print(Y_pred)


print(classification_report(Y_test, Y_pred))


test_loss, test_acc = model.evaluate(X_test_img, Y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f} - Test Accuracy: {test_acc:.4f}")


In [None]:
# Learning curves.

plt.plot(history.history['accuracy'], label='Test set')
plt.plot(history.history['val_accuracy'], label = 'Validation set')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("Accuracy curves")
plt.ylim([0, 1])
plt.xlim([0, 50])
plt.legend(loc='lower right')

In [None]:
conf_matriz = confusion_matrix(Y_test, Y_pred)

conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
# conf_matriz_classes = ['No unb.', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

plt.figure(figsize=(6, 5))
sns.heatmap(conf_matriz, annot=True, fmt='d', cmap='Blues',
            xticklabels=conf_matriz_classes, yticklabels=conf_matriz_classes,cbar=True)

plt.title('Confusion matrix')
plt.xlabel('Predicted Class')
plt.ylabel('Real Class')
plt.tight_layout()
plt.show()

In [None]:
# ROC Curve and AUC value

classes = list(labels.values())
Y_test_bin = label_binarize(Y_test, classes=classes)

Y_score = model.predict(X_test_img)

fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = len(classes)


for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test_bin[:, i], Y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])


plt.figure(figsize=(8, 6))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'crimson'])

class_names = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
#class_names =  = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']


for i, (color, name) in enumerate(zip(colors, class_names)):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label=f"{name} (AUC = {roc_auc[i]:.2f})")


plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Specificity)')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
#Save model

model.save("")

## CNN 2D - FREQUENCY DOMAIN

In [None]:
# Creating the spectogram of each sample.


def compute_spectrogram(dataset, fs):
    spectrograms = []
    epsilon = 1e-10
    
    for n in range(dataset.shape[0]):
        Pxx, freqs, bins, im = plt.specgram(dataset[n], NFFT=256, Fs=fs, cmap='inferno')
        plt.close()
        
        Pxx_db = 10 * np.log10(Pxx + epsilon)
        
        Pxx_norm = (Pxx_db - np.min(Pxx_db)) / (np.max(Pxx_db) - np.min(Pxx_db))
        
        spectrograms.append(Pxx_norm)
    
    spectrograms = np.array(spectrograms)
    return spectrograms


X_train_spec = compute_spectrogram(X_train,fs)
X_val_spec = compute_spectrogram(X_val,fs)
X_test_spec = compute_spectrogram(X_test,fs)


print("Spectrogram array format X_train_spec:", X_train_spec.shape)
print("Spectrogram array format X_val_spec:", X_val_spec.shape)
print("Spectrogram array format X_test_spec:", X_test_spec.shape)


In [None]:

def compute_spectrogram(dataset, fs):
    target_fs = 4096.0
    spectrograms = []
    epsilon = 1e-10


    do_resample = (float(fs) != target_fs)
    if do_resample:
        cutoff = 0.9 * (target_fs * 0.5)
        nyq_orig = 0.5 * fs
        norm_cut = min(0.999, max(1e-6, cutoff / nyq_orig))
        b, a = butter(6, norm_cut, btype='low')
        frac = Fraction(int(target_fs), int(fs)).limit_denominator(1000)
        up, down = frac.numerator, frac.denominator
    else:
        b = a = None
        up = down = None

    for n in range(dataset.shape[0]):
        x = dataset[n].astype(np.float64, copy=False)

        if do_resample:
            x = filtfilt(b, a, x)
            x = resample_poly(x, up, down)

        Pxx, freqs, bins, im = plt.specgram(x, NFFT=256, Fs=target_fs, cmap='inferno')
        plt.close()

        Pxx_db = 10.0 * np.log10(Pxx + epsilon)
        Pxx_norm = (Pxx_db - np.min(Pxx_db)) / (np.max(Pxx_db) - np.min(Pxx_db) + epsilon)

        spectrograms.append(Pxx_norm.astype(np.float32))

    return np.array(spectrograms)


X_train_spec = compute_spectrogram(X_train,fs)
X_val_spec = compute_spectrogram(X_val,fs)
X_test_spec = compute_spectrogram(X_test,fs)


print("Spectrogram array format X_train_spec:", X_train_spec.shape)
print("Spectrogram array format X_val_spec:", X_val_spec.shape)
print("Spectrogram array format X_test_spec:", X_test_spec.shape)

In [None]:
# Example of one spectogram

img = 15

plt.imshow(X_train_spec[img])
plt.axis("off")
plt.show()

print(Y_train[img])

In [None]:
# Example of one spectogram

img = 15
x_vis = X_train[img]
Pxx_, freqs, bins, _ = plt.specgram(x_vis, NFFT=256, Fs=4096)
plt.close()

extent = [bins[0], bins[-1], freqs[0], freqs[-1]]

plt.figure(figsize=(10, 3))
plt.imshow(X_train_spec[img], origin='lower', aspect='auto', extent=extent, cmap='inferno')
plt.xlabel('Tempo [s]')
plt.ylabel('Frequência [Hz]')
plt.colorbar(label='(normalizado)')
plt.show()

print(Y_train[img])


In [None]:
X_train_spec = X_train_spec[..., np.newaxis]
X_val_spec   = X_val_spec[..., np.newaxis]
X_test_spec  = X_test_spec[..., np.newaxis]

In [None]:

start_time = time.time()

num_classes = len(np.unique(Y_train)) 


# --- CNN model arquitecture ---
input_shape = X_train_spec.shape[1:]

model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Conv2D(128, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(num_classes, activation='softmax')
])

early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


model.summary()


# --- Model training ---
history = model.fit(X_train_spec, Y_train, 
                    validation_data=(X_val_spec, Y_val),
                    epochs=250, 
                    batch_size=32,
                    shuffle = True,
                    callbacks=[early_stopping])


end_time = time.time()
elapsed_time = end_time - start_time

print("Time spent in training the model (s):", elapsed_time)

In [None]:
# --- Model evaluation ---

Y_pred = np.argmax(model.predict(X_test_spec), axis=1)
print(Y_pred)


print(classification_report(Y_test, Y_pred))


test_loss, test_acc = model.evaluate(X_test_spec, Y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f} - Test Accuracy: {test_acc:.4f}")


In [None]:
# Learning curves.

plt.plot(history.history['accuracy'], label='Test set')
plt.plot(history.history['val_accuracy'], label = 'Validation set')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("Accuracy curves")
plt.ylim([0, 1])
plt.xlim([0, 172])
plt.legend(loc='lower right')

In [None]:
conf_matriz = confusion_matrix(Y_test, Y_pred)

#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
# conf_matriz_classes = ['No unb.', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

plt.figure(figsize=(6, 5))
sns.heatmap(conf_matriz, annot=True, fmt='d', cmap='Blues',
            xticklabels=conf_matriz_classes, yticklabels=conf_matriz_classes,cbar=True)

plt.title('Confusion matrix')
plt.xlabel('Predicted Class')
plt.ylabel('Real Class')
plt.tight_layout()
plt.show()

In [None]:
# ROC Curve and AUC value

classes = list(labels.values())  # [0, 1, 2, 3, 4]
Y_test_bin = label_binarize(Y_test, classes=classes)

Y_score = model.predict(X_test_spec)

fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = len(classes)


for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test_bin[:, i], Y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])


plt.figure(figsize=(8, 6))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'crimson'])

#class_names = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
class_names =  ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

for i, (color, name) in enumerate(zip(colors, class_names)):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label=f"{name} (AUC = {roc_auc[i]:.2f})")


plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Specificity)')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.show()

## CNN 1D - FREQUENCY DOMAIN (NOISY EXPERIMENTS)

In [None]:
def add_white_noise(signal, snr_db):
    signal_power = np.mean(signal**2)
    snr_linear = 10 ** (snr_db / 10)
    noise_power = signal_power / snr_linear
    noise = np.random.normal(0, np.sqrt(noise_power), signal.shape)
    return signal + noise

In [None]:
#Run this to add noise only to the Testing and Validation dataset

X_train_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_train])
X_val_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_val]) 

# snr_db=30 → Almost noise-free.
# snr_db=10 → Moderate noise.
# snr_db=5 → High noise.

print(X_train_noisy.shape)
print(X_val_noisy.shape)

In [None]:
# Run this to add noise only to the Testing dataset

X_test_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_test]) 


print(X_test_noisy.shape)

In [None]:
# Run this to add noise to all datasets

X_train_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_train])
X_val_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_val])
X_test_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_test]) 



print(X_train_noisy.shape)
print(X_val_noisy.shape)
print(X_test_noisy.shape)

### -----------------------------------------------------------------

In [None]:
def extract_features_fft(signal):
    fft_vals = fft(signal)
    fft_magnitude = np.abs(fft_vals)[:len(signal)//2]
    
    return fft_magnitude

In [None]:
#  Adjust according the datasets used.

X_train_fft = []
X_val_fft = []
X_test_fft = []

for signal in X_train_noisy:
    featured_signal = extract_features_fft(signal)
    X_train_fft.append(featured_signal)

for signal in X_val_noisy:
    featured_signal = extract_features_fft(signal)
    X_val_fft.append(featured_signal)
    
for signal in X_test_noisy:
    featured_signal = extract_features_fft(signal)
    X_test_fft.append(featured_signal)    
    
X_train_fft =  np.array(X_train_fft)
X_val_fft = np.array(X_val_fft)
X_test_fft = np.array(X_test_fft)


print(X_train_fft.shape)
print(X_val_fft.shape)
print(X_test_fft.shape)

In [None]:
X_train_fft = X_train_fft[..., np.newaxis]
X_val_fft   = X_val_fft[...,   np.newaxis]
X_test_fft  = X_test_fft[...,  np.newaxis] 

In [None]:
start_time = time.time()

num_classes = len(np.unique(Y_train)) 


# --- CNN model arquitecture ---
input_shape = X_train_fft.shape[1:]

model = Sequential([
    Conv1D(32, kernel_size=3 , activation='relu', input_shape=input_shape),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Conv1D(64, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Conv1D(128, kernel_size=3, activation='relu'),
    MaxPooling1D(pool_size=2),
    BatchNormalization(),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(num_classes, activation='softmax')
]) 
                 
early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])


model.summary()


# --- Model training ---
history = model.fit(X_train_fft, Y_train, 
                    validation_data=(X_val_fft, Y_val),
                    epochs=250, 
                    batch_size=32,
                    shuffle = True,
                    callbacks=[early_stopping])


end_time = time.time()
elapsed_time = end_time - start_time

print("Time spent in training the model (s):", elapsed_time)

In [None]:
# --- Model evaluation ---

Y_pred = np.argmax(model.predict(X_test_fft), axis=1)
print(Y_pred)


print(classification_report(Y_test, Y_pred))


test_loss, test_acc = model.evaluate(X_test_fft, Y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f} - Test Accuracy: {test_acc:.4f}")

In [None]:
# Learning curves.

plt.plot(history.history['accuracy'], label='Test set')
plt.plot(history.history['val_accuracy'], label = 'Validation set')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("Accuracy curves")
plt.ylim([0, 1])
plt.xlim([0, 50])
plt.legend(loc='lower right')

In [None]:
conf_matriz = confusion_matrix(Y_test, Y_pred)

#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
#conf_matriz_classes = ['No unb.', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

plt.figure(figsize=(6, 5))
sns.heatmap(conf_matriz, annot=True, fmt='d', cmap='Blues',
            xticklabels=conf_matriz_classes, yticklabels=conf_matriz_classes,cbar=True)

plt.title('Confusion matrix')
plt.xlabel('Predicted Class')
plt.ylabel('Real Class')
plt.tight_layout()
plt.show()

In [None]:
# ROC Curve and AUC value

classes = list(labels.values())  # [0, 1, 2, 3, 4]
Y_test_bin = label_binarize(Y_test, classes=classes)

Y_score = model.predict(X_test_fft)

fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = len(classes)

for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test_bin[:, i], Y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

plt.figure(figsize=(8, 6))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'crimson'])

# Nomes personalizados das classes
#class_names = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
class_names = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']


for i, (color, name) in enumerate(zip(colors, class_names)):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label=f"{name} (AUC = {roc_auc[i]:.2f})")


plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Specificity)')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.show()

## CNN 2D - FREQUENCY DOMAIN (NOISY EXPERIMENTS)

In [None]:
def add_white_noise(signal, snr_db):
    signal_power = np.mean(signal**2)
    snr_linear = 10 ** (snr_db / 10)
    noise_power = signal_power / snr_linear
    noise = np.random.normal(0, np.sqrt(noise_power), signal.shape)
    return signal + noise

In [None]:
#Run this to add noise only to the Testing and Validation dataset

X_train_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_train])
X_val_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_val]) 


print(X_train_noisy.shape)
print(X_val_noisy.shape)

In [None]:
# Run this to add noise only to the Testing dataset

X_test_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_test]) 


print(X_test_noisy.shape)

In [None]:
# Run this to add noise to all datasets

X_train_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_train])
X_val_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_val])
X_test_noisy = np.array([add_white_noise(signal, snr_db=10) for signal in X_test]) 


print(X_train_noisy.shape)
print(X_val_noisy.shape)
print(X_test_noisy.shape)

### ---------------------------------------------------------

In [None]:
def compute_spectrogram(dataset, fs):
    target_fs = 4096.0
    spectrograms = []
    epsilon = 1e-10

    do_resample = (float(fs) != target_fs)
    if do_resample:

        cutoff = 0.9 * (target_fs * 0.5)
        nyq_orig = 0.5 * fs
        norm_cut = min(0.999, max(1e-6, cutoff / nyq_orig))
        b, a = butter(6, norm_cut, btype='low')
        frac = Fraction(int(target_fs), int(fs)).limit_denominator(1000)
        up, down = frac.numerator, frac.denominator
    else:
        b = a = None
        up = down = None

    for n in range(dataset.shape[0]):
        x = dataset[n].astype(np.float64, copy=False)

        if do_resample:
            x = filtfilt(b, a, x)
            x = resample_poly(x, up, down)

        Pxx, freqs, bins, im = plt.specgram(x, NFFT=256, Fs=target_fs, cmap='inferno')
        plt.close()

        Pxx_db = 10.0 * np.log10(Pxx + epsilon)
        Pxx_norm = (Pxx_db - np.min(Pxx_db)) / (np.max(Pxx_db) - np.min(Pxx_db) + epsilon)

        spectrograms.append(Pxx_norm.astype(np.float32))

    return np.array(spectrograms)


In [None]:
# Adjust according the datasets used.

X_train_spec = compute_spectrogram(X_train_noisy,fs)
X_val_spec = compute_spectrogram(X_val_noisy,fs)
X_test_spec = compute_spectrogram(X_test_noisy,fs)


print("Spectrogram array format X_train_spec:", X_train_spec.shape)
print("Spectrogram array format X_val_spec:", X_val_spec.shape)
print("Spectrogram array format X_test_spec:", X_test_spec.shape)

In [None]:
# Example of one spectogram

img = 15

plt.imshow(X_train_spec[img])
plt.axis("off")
plt.show()

print(Y_train[img])

In [None]:
X_train_spec = X_train_spec[..., np.newaxis]
X_val_spec   = X_val_spec[..., np.newaxis]
X_test_spec  = X_test_spec[..., np.newaxis]

In [None]:
start_time = time.time()

num_classes = len(np.unique(Y_train)) 


# --- CNN model arquitecture ---
input_shape = X_train_spec.shape[1:]

model = Sequential([
    Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=input_shape),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Conv2D(64, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Conv2D(128, kernel_size=(3, 3), activation='relu'),
    MaxPooling2D(pool_size=(2, 2)),
    BatchNormalization(),
    
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.2),
    Dense(num_classes, activation='softmax')
])

early_stopping = EarlyStopping(monitor='val_loss', patience=50, restore_best_weights=True)

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

model.summary()


# --- Model training ---
history = model.fit(X_train_spec, Y_train, 
                    validation_data=(X_val_spec, Y_val),
                    epochs=250, 
                    batch_size=32,
                    shuffle = True,
                    callbacks=[early_stopping])


end_time = time.time()
elapsed_time = end_time - start_time

print("Time spent in training the model (s):", elapsed_time)

In [None]:
# --- Model evaluation ---

Y_pred = np.argmax(model.predict(X_test_spec), axis=1)
print(Y_pred)


print(classification_report(Y_test, Y_pred))


test_loss, test_acc = model.evaluate(X_test_spec, Y_test, verbose=0)
print(f"Test Loss: {test_loss:.4f} - Test Accuracy: {test_acc:.4f}")


In [None]:
# Learning curves.

plt.plot(history.history['accuracy'], label='Test set')
plt.plot(history.history['val_accuracy'], label = 'Validation set')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title("Accuracy curves")
plt.ylim([0, 1])
plt.xlim([0, 50])
plt.legend(loc='lower right')

In [None]:
conf_matriz = confusion_matrix(Y_test, Y_pred)

#conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
#conf_matriz_classes = ['No unb.', 'Unb. I', 'Unb. II', 'Misalig.', 'Bearings']
conf_matriz_classes = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']

plt.figure(figsize=(6, 5))
sns.heatmap(conf_matriz, annot=True, fmt='d', cmap='Blues',
            xticklabels=conf_matriz_classes, yticklabels=conf_matriz_classes,cbar=True)

plt.title('Confusion matrix')
plt.xlabel('Predicted Class')
plt.ylabel('Real Class')
plt.tight_layout()
plt.show()

In [None]:
# ROC Curve and AUC value

classes = list(labels.values())
Y_test_bin = label_binarize(Y_test, classes=classes)


Y_score = model.predict(X_test_fft)


fpr = dict()
tpr = dict()
roc_auc = dict()

n_classes = len(classes)


for i in range(n_classes):
    fpr[i], tpr[i], _ = roc_curve(Y_test_bin[:, i], Y_score[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])


plt.figure(figsize=(8, 6))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'darkgreen', 'crimson'])


#class_names = ['Normal', 'Unb. I', 'Unb. II', 'Unb. III', 'Unb. IV']
class_names = ['Normal', 'Unb. I', 'Unb. II','Unb. III', 'Misalig.', 'Unb. II + Misalig.']


for i, (color, name) in enumerate(zip(colors, class_names)):
    plt.plot(fpr[i], tpr[i], color=color, lw=2,
             label=f"{name} (AUC = {roc_auc[i]:.2f})")



plt.plot([0, 1], [0, 1], 'k--', lw=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate (1 - Specificity)')
plt.ylabel('True Positive Rate (Specificity)')
plt.title('ROC Curves')
plt.legend(loc="lower right")
plt.grid(True)
plt.tight_layout()
plt.show()