In [37]:
import os
import h5py
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
from pathlib import Path
import torch.nn.functional as F
import matplotlib.pyplot as plt
from scipy.ndimage import zoom  # Alternative to cv2.resize



In [43]:
class VibrationDatasetWithMetadata(Dataset):
    def __init__(self, data_dir):
        self.file_paths = []
        self.labels = []
        self.metadata = []

        # Define machine and operation mappings
        self.machine_mapping = {"M01": 0, "M02": 1, "M03": 2}
        self.operation_mapping = {
            "OP01": [0, 250, 100],  # Step Drill
            "OP02": [1, 200, 50],   # Drill
            "OP04": [2, 250, 100],  # Step Drill
            "OP07": [3, 200, 50],   # Step Drill
            "OP10": [4, 250, 50]    # Step Drill
        }

        for label, label_idx in zip(["good", "bad"], [0, 1]):  # 0 = good, 1 = bad
            folder = Path(data_dir) / label
            for file_name in os.listdir(folder):
                if file_name.endswith(".h5"):
                    self.file_paths.append(os.path.join(folder, file_name))
                    self.labels.append(label_idx)

                    # Extract metadata from filename
                    parts = file_name.split("_")
                    machine = parts[0]  # "M01"
                    operation = parts[3]  # "OP01"

                    machine_encoded = self.machine_mapping.get(machine, -1)
                    op_metadata = self.operation_mapping.get(operation, [-1, -1, -1])

                    # Final metadata: [Machine ID, Operation ID, Spindle Speed, Feed Rate]
                    self.metadata.append([machine_encoded] + op_metadata)

        self.metadata = np.array(self.metadata)

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        file_path = self.file_paths[idx]
        with h5py.File(file_path, "r") as f:
            data = f["vibration_data"][:]  # Shape (10000, 3)

        data = np.transpose(data, (1, 0))  # Change to (3, 10000) for CNN
        label = self.labels[idx]
        metadata = torch.tensor(self.metadata[idx], dtype=torch.float32)

        return torch.tensor(data, dtype=torch.float32), metadata, torch.tensor(label, dtype=torch.long)


In [44]:
class CNN1DWithMetadata(nn.Module):
    def __init__(self, num_metadata_features=4):  # 4 metadata values now
        super(CNN1DWithMetadata, self).__init__()

        # CNN branch (for vibration signals)
        self.conv1 = nn.Conv1d(in_channels=3, out_channels=16, kernel_size=7, stride=1, padding=3)
        self.bn1 = nn.BatchNorm1d(16)
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv2 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5, stride=1, padding=2)
        self.bn2 = nn.BatchNorm1d(32)
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.conv3 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.bn3 = nn.BatchNorm1d(64)
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)

        self.cnn_fc = nn.Linear(64 * 1250, 128)  # CNN feature output

        # Metadata branch (small MLP)
        self.metadata_fc1 = nn.Linear(num_metadata_features, 16)
        self.metadata_fc2 = nn.Linear(16, 16)

        # Final classification layer (combined CNN + metadata features)
        self.final_fc1 = nn.Linear(128 + 16, 64)
        self.final_fc2 = nn.Linear(64, 2)  # Binary classification (good/bad)

        self.dropout = nn.Dropout(0.3)
        self.relu = nn.ReLU()

    def forward(self, x, metadata):
        # CNN branch
        x = self.pool1(self.relu(self.bn1(self.conv1(x))))
        x = self.pool2(self.relu(self.bn2(self.conv2(x))))
        x = self.pool3(self.relu(self.bn3(self.conv3(x))))

        x = x.view(x.shape[0], -1)  # Flatten CNN output
        x = self.relu(self.cnn_fc(x))

        # Metadata branch
        metadata = self.relu(self.metadata_fc1(metadata))
        metadata = self.relu(self.metadata_fc2(metadata))

        # Concatenate CNN and metadata features
        combined = torch.cat((x, metadata), dim=1)

        # Fully connected layers
        combined = self.relu(self.final_fc1(combined))
        combined = self.dropout(combined)
        combined = self.final_fc2(combined)

        return combined


In [4]:
# ------------------------
# 3️⃣ Training, Validation & Testing Functions
# ------------------------
def train_epoch(model, train_loader, optimizer, criterion, device):
    model.train()
    total_loss, correct, total = 0, 0, 0

    for inputs, metadata, labels in train_loader:
        inputs, metadata, labels = inputs.to(device), metadata.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs, metadata)  # Forward pass
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)

    accuracy = correct / total
    return total_loss / len(train_loader), accuracy




In [5]:

def validate_epoch(model, val_loader, criterion, device):
    model.eval()
    total_loss, correct, total = 0, 0, 0

    with torch.no_grad():
        for inputs, metadata, labels in val_loader:
            inputs, metadata, labels = inputs.to(device), metadata.to(device), labels.to(device)
            outputs = model(inputs, metadata)
            loss = criterion(outputs, labels)

            total_loss += loss.item()
            _, preds = torch.max(outputs, 1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    return total_loss / len(val_loader), accuracy




In [6]:
def test_model(model, test_loader, device):
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for inputs, metadata, labels in test_loader:
            inputs, metadata, labels = inputs.to(device), metadata.to(device), labels.to(device)
            outputs = model(inputs, metadata)
            preds = torch.argmax(outputs, dim=1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = (np.array(all_preds) == np.array(all_labels)).mean()
    return accuracy

In [45]:
# ------------------------
# 5️⃣ Full Training & Evaluation Function
# ------------------------
def train_and_evaluate(train_loader, val_loader, test_loader, epochs=20, lr=0.001):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Initialize the model with metadata processing
    model = CNN1DWithMetadata(num_metadata_features=4).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    # Training loop
    for epoch in range(epochs):
        train_loss, train_acc = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        train_accuracies.append(train_acc)
        val_accuracies.append(val_acc)

        print(f"Epoch [{epoch+1}/{epochs}] - "
              f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} - "
              f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

    print("✅ Training and validation complete!")

    # Evaluate on the test set
    test_accuracy = test_model(model, test_loader, device)
    print(f"🔥 Final Test Accuracy: {test_accuracy:.4f}")

    return model


In [46]:
# Load dataset
data_directory = "../data/final/Selected_data_windowed_grouped_normalized"
dataset = VibrationDatasetWithMetadata(data_directory)

# Split dataset
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Train & evaluate model
model = train_and_evaluate(train_loader, val_loader, test_loader, epochs=20, lr=0.001)




Epoch [1/20] - Train Loss: 5.1393, Train Acc: 0.5583 - Val Loss: 0.8660, Val Acc: 0.7282
Epoch [2/20] - Train Loss: 1.4133, Train Acc: 0.7542 - Val Loss: 0.3240, Val Acc: 0.8641
Epoch [3/20] - Train Loss: 0.3573, Train Acc: 0.8521 - Val Loss: 0.2265, Val Acc: 0.9223
Epoch [4/20] - Train Loss: 0.2546, Train Acc: 0.8917 - Val Loss: 0.1704, Val Acc: 0.9029
Epoch [5/20] - Train Loss: 0.2067, Train Acc: 0.9187 - Val Loss: 0.1655, Val Acc: 0.9126
Epoch [6/20] - Train Loss: 0.1552, Train Acc: 0.9375 - Val Loss: 0.2745, Val Acc: 0.9029
Epoch [7/20] - Train Loss: 0.0938, Train Acc: 0.9625 - Val Loss: 0.1811, Val Acc: 0.8835
Epoch [8/20] - Train Loss: 0.0953, Train Acc: 0.9625 - Val Loss: 0.1685, Val Acc: 0.9126
Epoch [9/20] - Train Loss: 0.0616, Train Acc: 0.9750 - Val Loss: 0.2102, Val Acc: 0.9126
Epoch [10/20] - Train Loss: 0.0566, Train Acc: 0.9854 - Val Loss: 0.1772, Val Acc: 0.9126
Epoch [11/20] - Train Loss: 0.0529, Train Acc: 0.9854 - Val Loss: 0.3570, Val Acc: 0.8835
Epoch [12/20] - Tra

In [47]:
# Save the trained model
torch.save(model.state_dict(), "cnn1d_with_metadata.ckpt")
print("✅ Model saved to cnn1d_with_metadata.ckpt")

✅ Model saved to cnn1d_with_metadata.ckpt
