<a href="https://colab.research.google.com/github/Klrojasm/SyS/blob/main/punto3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Case Western Reserve Experiments


Sea la base de datos para el monitoreo de condición (fallos) en rodamientos a partir del análisis de vibraciones descrita en [Case Western Reserve Experiments](https://engineering.case.edu/bearingdatacenter). Las señales fueron adquiridas para las siguientes condiciones (clases): i) Normal bearing (Nor), fault in the internal train (IR1), fault in the external train (IR2), and fault in the rolling element-ball (BE). Además, los fallos se generaron para tres niveles de severidad (profundidad): 0.007′′, 0.014′′, and 0.021′′ y tres velocidades de operación (1730, 1750, 1772, and 1797 [rpm]). Los datos fueron adquiridos a 12 kHz. Por consiguiente, se tienen los siguientes parámetros de estudio: $F_s=12k$ [Hz] cantidad de puntos en el tiempo $4096$ y cantidad de clases $C = 10$.

Grafique la señal promedio de cada fallo en el tiempo y en la frecuencia.

Utilizando la transformada rápida de Fourier diseñe y construya un detector fallos en rodamientos a partir de señales de vibración y sus etiquetas en los arreglos Xtrain y Ytrain (ver cuaderno de apoyo). Genere las predicciones de fallos para el arreglo Xtest.

trabajando desde el cuaderno suministraddo por el profesor:

In [None]:
#data downloaded for google drive
FILEID = "1IC11LrPCZIo_Am5eXP2p2tDAlrGTlPjn"
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id='$FILEID -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id="$FILEID -O datos.zip && rm -rf /tmp/cookies.txt
!unzip -o datos.zip
!dir

In [None]:
#librerias
import scipy.io as sio
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import warnings
from sklearn.metrics import pairwise_distances
import matplotlib
from sklearn.model_selection import train_test_split

warnings.filterwarnings('ignore')

#cargar datos
path_ = 'CaractCE.mat'#Case Western Database
dicX = sio.loadmat(path_)

In [None]:
Xt = dicX['F'] #datos en el tiempo
Fs = 12000 #frecuencia de muestreo
Tl = Xt.shape[1]/Fs #tamaño del segmento
print('Xt shape:',Xt.shape)
print('tiempo [s]', Tl)

Y = dicX['E']
Ytrue = Y[:,2] #clases fallos en los rodamientos

labels_ = ['NOR','IR1_0.007´´','IR1_0.014´´','IR1_0.021´´',
           'IR2_0.007´´','IR2_0.014´´','IR2_0.021´´',
           'BE_0.007´´','BE_0.014´´','BE_0.021´´'
           ] #nombres de las clases

In [None]:
print(Ytrue.shape) #etique membresia de los datos 10 posibles valores
print(np.unique(Ytrue))

In [None]:
#partir datos para train y test
Xtrain, Xtest, Ytrain, _ = train_test_split(Xt, Ytrue, test_size=0.3)

print(f"Xtrain shape {Xtrain.shape}, Ytrain shape {Ytrain.shape }Xtest shape {Xtest.shape} ")

In [None]:
#calcular espectro de Fourier Xtrain
vf = np.fft.rfftfreq(Xtrain.shape[1],1/Fs) #freq vector
Xw = (abs(np.fft.rfft(Xtrain))) # FFT
Xw.shape

In [None]:
#graficar espectro para clases representativas
sca_ = MinMaxScaler()
Xw_ = sca_.fit_transform(Xw.T).T
#red = TSNE(perplexity = 15,n_components=2,random_state=123,learning_rate='auto',init='pca')
red = PCA(n_components=2)
Z = red.fit_transform(Xw_)

plt.scatter(Z[:,0],Z[:,1],c=Ytrain, label='Xtrain')
plt.colorbar()
plt.show()

In [None]:
# Calcular el espectro de Fourier utilizando rfft
Xw = np.abs(np.fft.rfft(Xtrain))

# Graficar el espectro de Fourier de las señales en Xtrain
n_subplots = len(Xtrain)
n_figures = (n_subplots - 1) // 4 + 1

for figure_index in range(n_figures):
    plt.figure(figsize=(15, 8))  # Ajusta el tamaño de la figura

    for i in range(4):
        subplot_index = figure_index * 4 + i
        if subplot_index < n_subplots:
            plt.subplot(2, 2, i + 1)  # Configura una cuadrícula de 2x2
            plt.plot(vf, Xw[subplot_index])
            plt.title(f'Clase {Ytrain[subplot_index]}')
            plt.xlabel('Frecuencia (Hz)')
            plt.ylabel('Amplitud')
            plt.grid()

    plt.tight_layout()
    plt.show()

# Reducción de dimensionalidad con PCA
pca = PCA(n_components=2)
X_pca = pca.fit_transform(Xw)

# Gráfico de dispersión de las clases en el espacio reducido
plt.figure(figsize=(8, 6))
scatter = plt.scatter(X_pca[:, 0], X_pca[:, 1], c=Ytrain, cmap='viridis')
plt.legend(*scatter.legend_elements(), title='Clases')
plt.title('Visualización PCA de las clases')
plt.xlabel('Componente Principal 1')
plt.ylabel('Componente Principal 2')
plt.show()

Procedemos a realizar cada grafico de las señales suministradas en funcion del tiempo

In [None]:
# Graficar las señales en el dominio del tiempo
n_subplots = len(Xtrain)
n_figures = (n_subplots - 1) // 4 + 1

for figure_index in range(n_figures):
    plt.figure(figsize=(15, 8))  # Ajusta el tamaño de la figura

    for i in range(4):
        subplot_index = figure_index * 4 + i
        if subplot_index < n_subplots:
            plt.subplot(2, 2, i + 1)  # Configura una cuadrícula de 2x2
            plt.plot(Xtrain[subplot_index])
            plt.title(f'Clase {Ytrain[subplot_index]}')
            plt.xlabel('Tiempo')
            plt.ylabel('Amplitud')
            plt.grid()

    plt.tight_layout()
    plt.show()

ahora en funcion de la frecuencia

In [None]:
import numpy as np
import scipy.io as sio
import matplotlib.pyplot as plt

# Cargar los datos
path_ = 'CaractCE.mat'  # Ruta de la base de datos Case Western
dicX = sio.loadmat(path_)
Xt = dicX['F']  # Datos en el tiempo
Fs = 12000  # Frecuencia de muestreo

Y = dicX['E']
Ytrue = Y[:, 2]  # Clases de fallos en los rodamientos

# Función para calcular el espectro de Fourier utilizando rfft
def calculate_spectrogram(signal):
    return np.abs(np.fft.rfft(signal))

# Crear un diccionario para almacenar las señales promedio
average_signals = {}

# Calcular y graficar la señal promedio en el tiempo y en la frecuencia para cada clase
for class_label in np.unique(Ytrue):
    # Filtrar las señales para la clase actual
    class_data = Xt[Ytrue == class_label]

    # Calcular la señal promedio en el tiempo
    average_signal_time = np.mean(class_data, axis=0)

    # Calcular la señal promedio en la frecuencia utilizando la transformada de Fourier
    average_signal_freq = calculate_spectrogram(average_signal_time)

    # Agregar las señales promedio al diccionario
    average_signals[class_label] = (average_signal_time, average_signal_freq)

    # Graficar la señal promedio en el tiempo
    plt.figure(figsize=(10, 4))
    plt.subplot(1, 2, 1)
    plt.plot(average_signal_time)
    plt.title(f'Average Signal in Time (Class {class_label})')
    plt.xlabel('Tiempo')
    plt.ylabel('Amplitud')

    # Graficar la señal promedio en la frecuencia
    plt.subplot(1, 2, 2)
    vf = np.fft.rfftfreq(len(average_signal_time), 1 / Fs)
    plt.plot(vf, average_signal_freq)
    plt.title(f'Average Signal in Frequency (Class {class_label})')
    plt.xlabel('Frecuencia (Hz)')
    plt.ylabel('Amplitud')


    plt.tight_layout()
    plt.show()

para finalmente realizar las comparaciones de una señal nueva con nuestro banco de datos de los espectros los cuales se les sacó una media.

In [None]:
import numpy as np
import scipy.io as sio

# Cargar la base de datos
path_ = 'CaractCE.mat'  # Ruta de la base de datos Case Western
dicX = sio.loadmat(path_)
Xt = dicX['F']  # Datos en el tiempo
Fs = 12000  # Frecuencia de muestreo

Y = dicX['E']
Ytrue = Y[:, 2]  # Clases de fallos en los rodamientos

# Función para calcular el espectro de Fourier utilizando rfft
def calculate_spectrogram(signal, max_spectrum_length=None):
    if max_spectrum_length is not None:
        signal = signal[:max_spectrum_length]  # Asegurar que todas las señales tengan la misma longitud
    return np.abs(np.fft.rfft(signal))

# Crear un diccionario para almacenar las señales promedio
average_signals = {}

# Encontrar la longitud máxima de espectro en la base de datos
max_spectrum_length = max(len(signal) for signal in Xt)

# Calcular y graficar la señal promedio en el tiempo y en la frecuencia para cada clase
for class_label in np.unique(Ytrue):
    # Filtrar las señales para la clase actual
    class_data = Xt[Ytrue == class_label]

    # Calcular la señal promedio en el tiempo
    average_signal_time = np.mean(class_data, axis=0)

    # Calcular el espectro de Fourier con la longitud máxima
    average_signal_freq = calculate_spectrogram(average_signal_time, max_spectrum_length)

    # Agregar las señales promedio al diccionario
    average_signals[class_label] = (average_signal_time, average_signal_freq)

def compare_to_database(signal, similarity_threshold=500):
    signal_spectrogram = calculate_spectrogram(signal, max_spectrum_length)

    best_match = None
    best_distance = float('inf')

    for class_label, (_, avg_freq) in average_signals.items():
        distance = np.linalg.norm(signal_spectrogram - avg_freq)
        if distance < best_distance and distance < similarity_threshold:
            best_distance = distance
            best_match = class_label

    if best_match is None:
        return "no tiene similitud con nuestra base de datos"
    else:
        return best_match

# Señal entrante de un nuevo motor (reemplaza esto con tu señal)
new_signal = np.random.rand(max_spectrum_length)

# Realizar la comparación con la base de datos
best_match_class = compare_to_database(new_signal)

# Mostrar la clase a la que se parece más
print(f'La señal entrante se parece más a la clase {best_match_class}')