# CWRU Bearing Fault Data - RNN Models

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import time
import os
import pickle

import tensorflow as tf
import tensorflow.keras as keras

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import SimpleRNN, LSTM, GRU, Dense, Dropout, Flatten, BatchNormalization, Conv1D, MaxPooling1D
from tensorflow.keras.regularizers import l2
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import Sequence
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, ConfusionMatrixDisplay

In [None]:
def count_frames(signal_length, frame_length, hop_length):
    return max(1, 1 + (signal_length - frame_length) // hop_length)

In [None]:
def fragment_and_normalize(df, frame_size=1024, hop_size=512):
    fragments = []
    labels = []

    for idx, row in df.iterrows():
        signal = row['signal']
        label = row['label']

        for start in range(0, len(signal) - frame_size + 1, hop_size):
            frame = signal[start:start+frame_size]


            frame = (frame - np.mean(frame)) / np.std(frame)

            fragments.append(frame)
            labels.append(label)

    fragments = np.array(fragments)
    labels = np.array(labels)

    fragments = fragments[..., np.newaxis]

    return fragments, labels

In [None]:
def add_gaussian_noise(signal, noise_level=0.005):
    noise = np.random.normal(0, noise_level, signal.shape)
    return signal + noise

In [None]:
def add_masking_noise(signal, mask_fraction=0.1):
    signal = signal.copy()
    n_mask = int(mask_fraction * signal.shape[0])
    mask_indices = np.random.choice(signal.shape[0], n_mask, replace=False)
    signal[mask_indices] = 0
    return signal

In [None]:
def time_shift(signal, shift_max=50):
    shift = np.random.randint(-shift_max, shift_max)
    return np.roll(signal, shift)

In [None]:
def amplitude_shift(signal, shift_range=0.1):
    shift = np.random.uniform(1 - shift_range, 1 + shift_range)
    return signal * shift

In [None]:
class DataGenerator(Sequence):
    def __init__(self, X, y, batch_size=64, shuffle=True, augment=True):
        self.X = X
        self.y = y
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.augment = augment
        self.indexes = np.arange(len(self.X))
        self.on_epoch_end()

    def __len__(self):
        return int(np.ceil(len(self.X) / self.batch_size))

    def __getitem__(self, index):
      batch_indexes = self.indexes[index * self.batch_size:(index + 1) * self.batch_size]
      X_batch = self.X[batch_indexes]
      y_batch = self.y[batch_indexes]

      if self.augment:
        X_batch = np.array([self.augment_sample(x) for x in X_batch])

      return X_batch, y_batch

    def on_epoch_end(self):
        if self.shuffle:
            np.random.shuffle(self.indexes)

    def augment_sample(self, signal):
        if np.random.rand() < 0.5:
            signal = add_gaussian_noise(signal)
        if np.random.rand() < 0.5:
            signal = add_masking_noise(signal)
        if np.random.rand() < 0.5:
            signal = time_shift(signal)
        if np.random.rand() < 0.5:
            signal = amplitude_shift(signal)
        return signal

In [None]:
with open('48kdrive-end_normalbaseline_loadsplit_data.pkl', 'rb') as f:
    data = pickle.load(f)

In [None]:
SAMPLE_RATE = 48000
FRAME_SIZE = 1024
HOP_SIZE = 512

INPUT_SHAPE = (FRAME_SIZE, 1)
NUM_CLASSES = 14
BATCH_SIZE = 32

unique_classes = ['B007', 'B014', 'B021', 'IR007', 'IR014', 'IR021', 'Normal',
                  'OR007@12', 'OR007@3', 'OR007@6', 'OR014@6', 'OR021@12', 'OR021@3', 'OR021@6']

## Preprocessing and exploratory analysis of data for RNNs

### Basic data characteristics

In [None]:
data.head()

In [None]:
print(f'Liczba kombinacji klas uszkodzeń i loadów: {len(data)}')

In [None]:
print(f'Liczba unikalnych klass: {len(data["label"].unique())}')
print(f'Unikalne klasy: {data["label"].unique()}')

In [None]:
data['signal_length'] = data['signal'].apply(len)
print(data.groupby('label')['signal_length'].agg(['count', 'mean', 'std', 'min', 'max']))

data.drop('signal_length', axis=1, inplace=True)

### Data framing and summary

In [None]:
data['num_frames'] = data['signal'].apply(lambda x: count_frames(len(x), FRAME_SIZE, HOP_SIZE))

frame_stats = data.groupby('label')['num_frames'].agg(['count', 'mean', 'min', 'max', 'sum']).reset_index()
frame_stats.rename(columns={'sum': 'total_frames'}, inplace=True)

print(frame_stats)

data.drop('num_frames', axis=1, inplace=True)

In [None]:
X, y = fragment_and_normalize(data, frame_size=FRAME_SIZE, hop_size=HOP_SIZE)

print(f"Liczba fragmentów (ramek): {X.shape[0]}")
print(f"Kształt fragmentów (ilość, długość, kanały): {X.shape}")

In [None]:
if X.dtype != np.float32:
    X = X.astype(np.float32)
    print('Przekonwertowano X na float32')

In [None]:
class_counts = {label: count for label, count in zip(np.unique(y), [len(y[y == label]) for label in np.unique(y)])}
classes = list(class_counts.keys())
counts = list(class_counts.values())

colors = plt.cm.tab20.colors

plt.figure(figsize=(12, 6))
plt.bar(classes, counts, color=colors[:len(classes)])
plt.xticks(rotation=45, ha='right')
plt.xlabel('Klasa')
plt.ylabel('Liczba ramek')
plt.title('Rozkład liczby ramek w każdej klasie')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.tight_layout()
plt.show()

### Class label preprocessing

In [None]:
le = LabelEncoder()
y_encoded = le.fit_transform(y)
print(f'Zakodowane klasy: {np.unique(y_encoded)}')

In [None]:
print(f'Kształt y przed kodowaniem: {y.shape}')
print(f'Kształt y po kodowaniu: {y_encoded.shape}')

In [None]:
y_onehot = to_categorical(y_encoded)
print(f'Kształt y one-hot: {y_onehot.shape}')

In [None]:
y_labels = np.argmax(y_onehot, axis=1)
print(f"Kształt y_labels: {y_labels.shape}")

### Train/Validation/Test split and Data Generators

In [None]:
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=2025, stratify=y_labels)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp)

print(f"Train: {X_train.shape[0]} ramek")
print(f"Validation: {X_val.shape[0]} ramek")
print(f"Test: {X_test.shape[0]} ramek")

In [None]:
y_train_enc = le.transform(y_train)
y_val_enc = le.transform(y_val)
y_test_enc = le.transform(y_test)

y_train_onehot = to_categorical(y_train_enc)
y_val_onehot = to_categorical(y_val_enc)
y_test_onehot = to_categorical(y_test_enc)

In [None]:
train_gen = DataGenerator(X_train, y_train_onehot, batch_size=BATCH_SIZE, augment=True)
val_gen = DataGenerator(X_val, y_val_onehot, batch_size=BATCH_SIZE, augment=False)
test_gen = DataGenerator(X_test, y_test_onehot, batch_size=BATCH_SIZE, augment=False, shuffle=False)

## RNN

### Model architecture and compilation

In [None]:
model_rnn = Sequential()

model_rnn.add(Conv1D(64, kernel_size=5, strides=1, activation='relu', input_shape=INPUT_SHAPE))
model_rnn.add(MaxPooling1D(pool_size=4))  
model_rnn.add(BatchNormalization())

model_rnn.add(SimpleRNN(64, return_sequences=True))
model_rnn.add(BatchNormalization())

model_rnn.add(SimpleRNN(32))
model_rnn.add(BatchNormalization())

model_rnn.add(Dense(64, activation='relu'))
model_rnn.add(Dropout(0.3))
              
model_rnn.add(Dense(NUM_CLASSES, activation='softmax'))

In [None]:
model_rnn.compile(optimizer=Adam(learning_rate=1e-4),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

In [None]:
model_rnn.summary()

### Training the model with early stopping and LR scheduling

In [None]:
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
checkpoint = ModelCheckpoint('model_rnn.h5', monitor='val_loss', save_best_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

In [None]:
start_time = time.time()

history_rnn = model_rnn.fit(train_gen,
                         validation_data=val_gen,
                         epochs=60,
                         callbacks=[early_stop, checkpoint, reduce_lr])

end_time = time.time()
training_time = end_time - start_time
print(f"Czas treningu: {training_time:.2f} sekund")

### Evaluating model performance (accuracy, loss, confusion matrix, classification report, learning curves)

In [None]:
test_loss, test_accuracy = model_rnn.evaluate(test_gen, verbose=0)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

In [None]:
y_pred_probs = model_rnn.predict(test_gen, verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_onehot, axis=1)

In [None]:
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
plt.figure(figsize=(12, 12))
disp.plot(cmap='Blues', colorbar=False, xticks_rotation=90)
plt.title("Macierz pomyłek na zbiorze testowym")
plt.show()

In [None]:
y_pred_probs = model_rnn.predict(test_gen, verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_onehot, axis=1)

print(classification_report(y_true, y_pred, target_names=unique_classes))

In [None]:
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(history_rnn.history['accuracy'], label='Train Accuracy')
plt.plot(history_rnn.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Dokładność podczas treningu i walidacji')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(history_rnn.history['loss'], label='Train Loss')
plt.plot(history_rnn.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Funkcja straty podczas treningu i walidacji')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

## LSTM

### Model architecture and compilation

In [None]:
model_lstm = Sequential()

model_lstm.add(Conv1D(64, kernel_size=5, strides=1, activation='relu', input_shape=INPUT_SHAPE))
model_lstm.add(MaxPooling1D(pool_size=4))  
model_lstm.add(BatchNormalization())

model_lstm.add(LSTM(64, return_sequences=True))
model_lstm.add(BatchNormalization())

model_lstm.add(LSTM(32))
model_lstm.add(BatchNormalization())

model_lstm.add(Dense(64, activation='relu'))
model_lstm.add(Dropout(0.2))

model_lstm.add(Dense(NUM_CLASSES, activation='softmax'))

In [None]:
model_lstm.compile(optimizer=Adam(learning_rate=1e-4),
                   loss='categorical_crossentropy',
                   metrics=['accuracy'])

In [None]:
model_lstm.summary()

### Training the model with early stopping and LR scheduling

In [None]:
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
checkpoint = ModelCheckpoint('model_lstm.h5', monitor='val_loss', save_best_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

In [None]:
start_time = time.time()

history_lstm = model_lstm.fit(train_gen,
                         validation_data=val_gen,
                         epochs=60,
                         callbacks=[early_stop, checkpoint, reduce_lr])

end_time = time.time()
training_time = end_time - start_time
print(f"Czas treningu: {training_time:.2f} sekund")

### Evaluating model performance (accuracy, loss, confusion matrix, classification report, learning curves)

In [None]:
test_loss, test_accuracy = model_lstm.evaluate(test_gen, verbose=0)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

In [None]:
y_pred_probs = model_lstm.predict(test_gen, verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_onehot, axis=1)

In [None]:
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
plt.figure(figsize=(12, 12))
disp.plot(cmap='Blues', colorbar=False, xticks_rotation=90)
plt.title("Macierz pomyłek na zbiorze testowym")
plt.show()

In [None]:
y_pred_probs = model_lstm.predict(test_gen, verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_onehot, axis=1)

print(classification_report(y_true, y_pred, target_names=unique_classes))

In [None]:
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(history_lstm.history['accuracy'], label='Train Accuracy')
plt.plot(history_lstm.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Dokładność podczas treningu i walidacji')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(history_lstm.history['loss'], label='Train Loss')
plt.plot(history_lstm.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Funkcja straty podczas treningu i walidacji')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

## GRU

### Model architecture and compilation

In [None]:
model_gru = Sequential()

model_gru.add(Conv1D(64, kernel_size=5, strides=1, activation='relu', input_shape=INPUT_SHAPE))
model_gru.add(MaxPooling1D(pool_size=4))  
model_gru.add(BatchNormalization())

model_gru.add(GRU(64, return_sequences=True))
model_gru.add(BatchNormalization())

model_gru.add(GRU(32))
model_gru.add(BatchNormalization())

model_gru.add(Dense(64, activation='relu'))
model_gru.add(Dropout(0.2))

model_gru.add(Dense(NUM_CLASSES, activation='softmax'))

In [None]:
model_gru.compile(optimizer=Adam(learning_rate=1e-4),
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])

In [None]:
model_gru.summary()

### Training the model with early stopping and LR scheduling

In [None]:
early_stop = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
checkpoint = ModelCheckpoint('model_gru.h5', monitor='val_loss', save_best_only=True)
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1)

In [None]:
start_time = time.time()

history_gru = model_gru.fit(train_gen,
                         validation_data=val_gen,
                         epochs=60,
                         callbacks=[early_stop, checkpoint, reduce_lr])

end_time = time.time()
training_time = end_time - start_time
print(f"Czas treningu: {training_time:.2f} sekund")

### Evaluating model performance (accuracy, loss, confusion matrix, classification report, learning curves)

In [None]:
test_loss, test_accuracy = model_gru.evaluate(test_gen, verbose=0)
print(f"Test Accuracy: {test_accuracy:.4f}")
print(f"Test Loss: {test_loss:.4f}")

In [None]:
y_pred_probs = model_gru.predict(test_gen, verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_onehot, axis=1)

In [None]:
cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
plt.figure(figsize=(12, 12))
disp.plot(cmap='Blues', colorbar=False, xticks_rotation=90)
plt.title("Macierz pomyłek na zbiorze testowym")
plt.show()

In [None]:
y_pred_probs = model_gru.predict(test_gen, verbose=1)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test_onehot, axis=1)

print(classification_report(y_true, y_pred, target_names=unique_classes))

In [None]:
plt.figure(figsize=(14, 5))

plt.subplot(1, 2, 1)
plt.plot(history_gru.history['accuracy'], label='Train Accuracy')
plt.plot(history_gru.history['val_accuracy'], label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Dokładność podczas treningu i walidacji')
plt.legend()
plt.grid(True)

plt.subplot(1, 2, 2)
plt.plot(history_gru.history['loss'], label='Train Loss')
plt.plot(history_gru.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Funkcja straty podczas treningu i walidacji')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

In [None]:
model_gru.save('model_gru_final.h5')