<a href="https://colab.research.google.com/github/MichaelDeyid/Parcial-2/blob/main/parcial_2_(Tercer_punto)_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Case Western Reserve Experiments


Sea la base de datos para el monitoreo de condición (fallos) en rodamientos a partir del análisis de vibraciones descrita en [Case Western Reserve Experiments](https://engineering.case.edu/bearingdatacenter). Las señales fueron adquiridas para las siguientes condiciones (clases): i) Normal bearing (Nor), fault in the internal train (IR1), fault in the external train (IR2), and fault in the rolling element-ball (BE). Además, los fallos se generaron para tres niveles de severidad (profundidad): 0.007′′, 0.014′′, and 0.021′′ y tres velocidades de operación (1730, 1750, 1772, and 1797 [rpm]). Los datos fueron adquiridos a 12 kHz. Por consiguiente, se tienen los siguientes parámetros de estudio: $F_s=12k$ [Hz] cantidad de puntos en el tiempo $4096$ y cantidad de clases $C = 10$.

Grafique la señal promedio de cada fallo en el tiempo y en la frecuencia.

Utilizando la transformada rápida de Fourier diseñe y construya un detector fallos en rodamientos a partir de señales de vibración y sus etiquetas en los arreglos Xtrain y Ytrain (ver cuaderno de apoyo). Genere las predicciones de fallos para el arreglo Xtest.

In [None]:
#data downloaded for google drive
FILEID = "1IC11LrPCZIo_Am5eXP2p2tDAlrGTlPjn"
!wget --load-cookies /tmp/cookies.txt "https://docs.google.com/uc?export=download&confirm=$(wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies --no-check-certificate 'https://docs.google.com/uc?export=download&id='$FILEID -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\1\n/p')&id="$FILEID -O datos.zip && rm -rf /tmp/cookies.txt
!unzip -o datos.zip
!dir

In [None]:
#librerias
import scipy.io as sio
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.preprocessing import MinMaxScaler, StandardScaler
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.ticker import FormatStrFormatter
import warnings
from sklearn.metrics import pairwise_distances
import matplotlib
from sklearn.model_selection import train_test_split

warnings.filterwarnings('ignore')

#cargar datos
path_ = 'CaractCE.mat'#Case Western Database
dicX = sio.loadmat(path_)

In [None]:
Xt = dicX['F'] #datos en el tiempo
Fs = 12000 #frecuencia de muestreo
Tl = Xt.shape[1]/Fs #tamaño del segmento
print('Xt shape:',Xt.shape)
print('tiempo [s]', Tl)

Y = dicX['E']
Ytrue = Y[:,2] #clases fallos en los rodamientos

labels_ = ['NOR','IR1_0.007´´','IR1_0.014´´','IR1_0.021´´',
           'IR2_0.007´´','IR2_0.014´´','IR2_0.021´´',
           'BE_0.007´´','BE_0.014´´','BE_0.021´´'
           ] #nombres de las clases

In [None]:
print(Ytrue.shape) #etique membresia de los datos 10 posibles valores
print(np.unique(Ytrue))

In [None]:
#partir datos para train y test
Xtrain, Xtest, Ytrain, _ = train_test_split(Xt, Ytrue, test_size=0.3)

print(f"Xtrain shape {Xtrain.shape}, Ytrain shape {Ytrain.shape }Xtest shape {Xtest.shape} ")

In [None]:
#calcular espectro de Fourier Xtrain
vf = np.fft.rfftfreq(Xtrain.shape[1],1/Fs) #freq vector
Xw = (abs(np.fft.rfft(Xtrain))) # FFT
Xw.shape

In [None]:
#graficar espectro para clases representativas
sca_ = MinMaxScaler()
Xw_ = sca_.fit_transform(Xw.T).T
#red = TSNE(perplexity = 15,n_components=2,random_state=123,learning_rate='auto',init='pca')
red = PCA(n_components=2)
Z = red.fit_transform(Xw_)

plt.scatter(Z[:,0],Z[:,1],c=Ytrain, label='Xtrain')
plt.colorbar()
plt.show()

In [None]:
# Crear una lista para almacenar los promedios de cada clase
promedios_tiempo = []

# Calcular el promedio en el tiempo para cada clase
for clase in np.unique(Ytrue):
    promedio_clase = np.mean(Xtrain[Ytrain == clase], axis=0)
    promedios_tiempo.append(promedio_clase)

# Graficar el promedio en el tiempo de cada clase
plt.figure(figsize=(12, 6))
for i, promedio_clase in enumerate(promedios_tiempo):
    plt.plot(promedio_clase, label=labels_[i])

plt.title("Promedio en el tiempo de cada clase de fallo")
plt.xlabel("Muestras en el tiempo")
plt.ylabel("Amplitud")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Calcular el promedio en la frecuencia para cada clase
promedios_frecuencia = []

for clase in np.unique(Ytrain):
    espectros = Xw[Ytrain == clase]
    promedio_espectro = np.mean(espectros, axis=0)
    promedios_frecuencia.append(promedio_espectro)

# Graficar el promedio en la frecuencia de cada clase
plt.figure(figsize=(12, 6))
for i, promedio_espectro in enumerate(promedios_frecuencia):
    plt.plot(vf, promedio_espectro, label=labels_[i])

plt.title("Promedio en la frecuencia de cada clase de fallo")
plt.xlabel("Frecuencia (Hz)")
plt.ylabel("Amplitud")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
import numpy as np

#Se calcula la media y la desviación estándar
media = np.mean(Xw)
desviacion_estandar = np.std(Xw)

#Se normaliza utilizando la normalización Z-score
Xw_normalizado = (Xw - media) / desviacion_estandar
Xw_normalizado

In [None]:
#calcular espectro de Fourier Xtest
vx = np.fft.rfftfreq(Xtest.shape[1],1/Fs) #freq vector
Xwtest = (abs(np.fft.rfft(Xtest))) # FFT
Xwtest.shape

In [None]:
#graficar espectro de los Xw test
sca_test = MinMaxScaler()
Xw_t = sca_.fit_transform(Xwtest.T).T
#red = TSNE(perplexity = 15,n_components=2,random_state=123,learning_rate='auto',init='pca')
red_test = PCA(n_components=2)
Z_test = red.fit_transform(Xw_t)

plt.scatter(Z[:,0],Z[:,1], label='Xtrain')
plt.colorbar()
plt.show()

In [None]:
import numpy as np

#Se calcula la media y la desviación estándar de xwtest
media1 = np.mean(Xwtest)
desviacion_estandar1 = np.std(Xwtest)

#Se normaliza Xw test utilizando la normalización Z-score
Xw_normalizado1 = (Xwtest - media1) / desviacion_estandar1
Xw_normalizado1

In [None]:
#Se comparan las bases de datos con las entradas
from scipy.spatial.distance import cdist
import  numpy as np

Lista_de_fallos = [] #lista que recibirá los tipos de fallos para cada entrada

for o in range (len(Xtest)):       #ciclo que recorre el número de entradas(tests) a procesar para este caso 360
  Distancia = cdist(Xw_normalizado1,Xw_normalizado) #Se encuentra la distancia entre la base de datos y todas las entradas
  Lista_de_fallos.append(Ytrain[np.argmin(Distancia[o,])]) #se encuentra la mínima distancia comparada con Ytrain y se guarda en la lista de fallos con su respectivo número de fallo,

In [None]:
Lista_de_fallos #Números correspondientes a cada fallo

In [None]:
numero_de_fallo = np.unique(Ytrue) #números correspondientes a cada fallo
print(labels_) #Nombres de los fallos
print(numero_de_fallo)

In [None]:
#Se creó una función que guarde en un diccionario en el que reciba el nombre del fallo y el número correspondiente a su fallo (ej : "NOR":1)
def asignar_numeros(x,y ):
    diccionario = {}

    for i in range(len(x)):
        diccionario[x[i]] = y[i]

    return diccionario

resultado = asignar_numeros(labels_, numero_de_fallo)
print(resultado)

In [None]:
#Se crea una función que reciba cada número de fallo correspondiente de la lista de fallos y le asigne el nombre del fallo con el diccionario anteriormente creado
def obtener_nombres(D, lista_valores):
    nombres_correspondientes = []

    for valor in lista_valores:
        for nombre, val in D.items():
            if val == valor:
                nombres_correspondientes.append(nombre)
                break
        else:
            nombres_correspondientes.append(None)

    return nombres_correspondientes


resultados = obtener_nombres(resultado, Lista_de_fallos)
print(resultados)