# ‚öôÔ∏è Vibration Fault Classification Training (CWRU)

This notebook trains a **1D CNN** on the CWRU Bearing dataset.

**Features:**
- Uses ALL .mat files (10 fault classes)
- Can also use pre-processed .npz file
- Proper 80/20 train/test split
- Confusion matrix and per-class accuracy

## Classes:
| # | Class | Description |
|---|-------|-------------|
| 0 | Normal | Healthy bearing |
| 1-3 | Ball_007/014/021 | Ball defect (0.007"/0.014"/0.021") |
| 4-6 | IR_007/014/021 | Inner race defect |
| 7-9 | OR_007/014/021 | Outer race defect |

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install scipy seaborn -q

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import scipy.io
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
import glob
import os
from tqdm import tqdm

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')

## 1. Configuration

In [None]:
# ============= UPDATE THIS PATH =============
DATA_PATH = '/content/drive/MyDrive/MaintanenceAI/Data/CWRU'
SAVE_PATH = '/content/drive/MyDrive/MaintanenceAI/trained_models'
# =============================================

# Data loading options
USE_NPZ = True  # Set to True to use pre-processed .npz file (faster)

# Hyperparameters
WINDOW_SIZE = 2048
NUM_CLASSES = 10
EPOCHS = 30
BATCH_SIZE = 64
LEARNING_RATE = 1e-3
TEST_SPLIT = 0.2

print(f'Data path: {DATA_PATH}')

## 2. Model Architecture

In [None]:
class VibrationClassifier(nn.Module):
    def __init__(self, num_classes=10):
        super(VibrationClassifier, self).__init__()
        self.features = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=64, stride=2, padding=32),
            nn.BatchNorm1d(16), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(16, 32, kernel_size=32, stride=2, padding=16),
            nn.BatchNorm1d(32), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(32, 64, kernel_size=16, stride=2, padding=8),
            nn.BatchNorm1d(64), nn.ReLU(), nn.MaxPool1d(2),
            nn.Conv1d(64, 128, kernel_size=8, stride=2, padding=4),
            nn.BatchNorm1d(128), nn.ReLU(),
            nn.AdaptiveAvgPool1d(1)
        )
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(128, 64), nn.ReLU(),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        x = self.features(x)
        x = x.view(x.size(0), -1)
        return self.classifier(x)

## 3. Data Loading

In [None]:
def load_from_npz(data_path):
    """Load pre-processed data from .npz file."""
    npz_path = os.path.join(data_path, 'CWRU_48k_load_1_CNN_data.npz')
    data = np.load(npz_path)
    return data['X'], data['y']

def load_from_mat(data_path, window_size=2048):
    """Load from raw .mat files with proper segmentation."""
    raw_path = os.path.join(data_path, 'raw')
    
    file_patterns = {
        'Normal': '*Normal*.mat',
        'Ball_007': '*B007*.mat', 'Ball_014': '*B014*.mat', 'Ball_021': '*B021*.mat',
        'IR_007': '*IR007*.mat', 'IR_014': '*IR014*.mat', 'IR_021': '*IR021*.mat',
        'OR_007': '*OR007*.mat', 'OR_014': '*OR014*.mat', 'OR_021': '*OR021*.mat',
    }
    
    all_segments, all_labels = [], []
    
    for label_name, pattern in file_patterns.items():
        files = glob.glob(os.path.join(raw_path, pattern))
        print(f'{label_name}: {len(files)} files found')
        
        for mat_file in files:
            try:
                mat = scipy.io.loadmat(mat_file)
                for key in mat.keys():
                    if 'DE_time' in key:
                        signal = mat[key].flatten()
                        n_segments = len(signal) // window_size
                        for i in range(n_segments):
                            segment = signal[i*window_size:(i+1)*window_size]
                            all_segments.append(segment)
                            all_labels.append(label_name)
                        break
            except Exception as e:
                print(f'Error: {mat_file}: {e}')
    
    return np.array(all_segments), np.array(all_labels)

# Load data
print('Loading CWRU data...\n')

if USE_NPZ and os.path.exists(os.path.join(DATA_PATH, 'CWRU_48k_load_1_CNN_data.npz')):
    print('Using pre-processed .npz file')
    X, y = load_from_npz(DATA_PATH)
    le = LabelEncoder()
    # Create consistent label mapping
    label_names = ['Normal', 'Ball_007', 'Ball_014', 'Ball_021', 'IR_007', 'IR_014', 'IR_021', 'OR_007', 'OR_014', 'OR_021']
    le.fit(label_names)
else:
    print('Loading from raw .mat files')
    X, y_str = load_from_mat(DATA_PATH, WINDOW_SIZE)
    le = LabelEncoder()
    y = le.fit_transform(y_str)

print(f'\nTotal samples: {len(X)}')
print(f'Sample shape: {X.shape}')
print(f'Classes: {le.classes_}')

In [None]:
# Check class distribution
unique, counts = np.unique(y, return_counts=True)
print('\nClass Distribution:')
for cls, cnt in zip(unique, counts):
    print(f'  Class {cls} ({le.classes_[cls]}): {cnt} samples')

In [None]:
# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SPLIT, random_state=42, stratify=y
)
print(f'Train: {len(X_train)}, Test: {len(X_test)}')

## 4. Dataset & DataLoader

In [None]:
class CWRUDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx]
        x = (x - np.mean(x)) / (np.std(x) + 1e-8)  # Normalize
        return torch.tensor(x, dtype=torch.float32).unsqueeze(0), torch.tensor(self.labels[idx], dtype=torch.long)

train_dataset = CWRUDataset(X_train, y_train)
test_dataset = CWRUDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

## 5. Training

In [None]:
model = VibrationClassifier(num_classes=NUM_CLASSES).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

train_losses, train_accs = [], []

print('Starting Training...\n')
for epoch in range(EPOCHS):
    model.train()
    total_loss, correct, total = 0, 0, 0
    
    for data, target in tqdm(train_loader, desc=f'Epoch {epoch+1}/{EPOCHS}', leave=False):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pred = output.argmax(dim=1)
        correct += pred.eq(target).sum().item()
        total += target.size(0)
    
    avg_loss = total_loss / len(train_loader)
    accuracy = 100. * correct / total
    train_losses.append(avg_loss)
    train_accs.append(accuracy)
    print(f'Epoch {epoch+1}/{EPOCHS}, Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')

print('\nTraining Complete!')

## 6. Evaluation

In [None]:
model.eval()
all_preds, all_targets = [], []

with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = output.argmax(dim=1)
        all_preds.extend(pred.cpu().numpy())
        all_targets.extend(target.cpu().numpy())

all_preds = np.array(all_preds)
all_targets = np.array(all_targets)

test_accuracy = 100. * np.mean(all_preds == all_targets)
print(f'\nüìä Test Accuracy: {test_accuracy:.2f}%')

In [None]:
# Classification Report
print('\nClassification Report:')
print(classification_report(all_targets, all_preds, target_names=le.classes_))

In [None]:
# Confusion Matrix
cm = confusion_matrix(all_targets, all_preds)

plt.figure(figsize=(12, 10))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=le.classes_, yticklabels=le.classes_)
plt.title(f'Confusion Matrix (Accuracy: {test_accuracy:.1f}%)')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
plt.savefig('confusion_matrix.png', dpi=150)
plt.show()

## 7. Save Model

In [None]:
os.makedirs(SAVE_PATH, exist_ok=True)
save_file = os.path.join(SAVE_PATH, 'vibration_classifier.pth')

torch.save({
    'model_state_dict': model.state_dict(),
    'label_encoder_classes': le.classes_.tolist(),
    'test_accuracy': test_accuracy,
    'num_classes': NUM_CLASSES
}, save_file)

print(f'‚úÖ Model saved: {save_file}')

# Also save locally
torch.save(model.state_dict(), 'vibration_classifier.pth')
print('üì• Also saved locally: vibration_classifier.pth')

## 8. Training Curves

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

axes[0].plot(train_losses)
axes[0].set_title('Training Loss')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].grid(True)

axes[1].plot(train_accs)
axes[1].set_title('Training Accuracy')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Accuracy (%)')
axes[1].grid(True)

plt.tight_layout()
plt.savefig('training_curves.png', dpi=150)
plt.show()