In [2]:
import numpy as np
import pandas as pd
import torch
import numpy as np
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
from collections import Counter
import torch.nn.functional as F

In [3]:
# Custom Dataset for evaluation
class SingleDriverDataset(Dataset):
    def __init__(self, data):
        """
        Dataset to wrap the data for a single driver.
        
        Args:
            data (numpy.ndarray): The feature data (e.g., last 3 laps of a driver).
        """
        self.data = torch.tensor(data, dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
    
# Model Definition
class CNNLSTMModel(nn.Module):
    """
    CNN-LSTM hybrid model for classification.
    Combines convolutional layers for feature extraction and LSTM layers for sequential modeling.
    """
    def __init__(self, n_features, n_classes):
        super(CNNLSTMModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=n_features, out_channels=64, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.conv2 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.lstm = nn.LSTM(input_size=128, hidden_size=64, num_layers=2, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(128, n_classes)
        )

    def forward(self, x):
        # CNN layers
        x = x.permute(0, 2, 1)  # Switch to (batch_size, n_features, sequence_length)
        x = self.conv1(x)
        x = torch.relu(x)
        x = self.pool(x)
        x = self.conv2(x)
        x = torch.relu(x)
        x = self.pool(x)

        # LSTM layers
        x = x.permute(0, 2, 1)  # Switch back to (batch_size, sequence_length, features)
        x, _ = self.lstm(x)

        # Fully connected layers
        x = x[:, -1, :]  # Use the last LSTM output
        x = self.fc(x)
        return x

def predict_anomaly(model_path, driver_data, sequence_length, n_classes=8, anomaly_classes=None):
    """
    Predicts the anomaly for a single driver's data using a sliding window approach.
    Returns a single prediction using maximum average probability and the aggregated probabilities for all classes.

    Args:
        model_path (str): Path to the saved model (.pth file).
        driver_data (numpy.ndarray): The preprocessed and normalized data of the driver.
        sequence_length (int): Length of the sliding window.
        n_classes (int): Number of output classes.
    
    Returns:
        Tuple[str, dict]: Predicted anomaly class (maximum average probability) and aggregated probabilities for all classes.
    """
    # Load the trained model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = CNNLSTMModel(n_features=driver_data.shape[1], n_classes=n_classes)
    model.load_state_dict(torch.load(model_path, map_location=device, weights_only=True))
    model.to(device)
    model.eval()

    # Generate sliding windows dynamically
    sliding_windows = []
    for i in range(len(driver_data) - sequence_length + 1):
        window = driver_data[i:i + sequence_length]
        sliding_windows.append(window)
    
    sliding_windows = np.array(sliding_windows)  # Convert to NumPy array

    # Wrap sliding windows in a Dataset and DataLoader
    dataset = SingleDriverDataset(sliding_windows)
    dataloader = DataLoader(dataset, batch_size=1, shuffle=False)

    # Make predictions for each window
    probabilities = torch.zeros((len(sliding_windows), n_classes), device=device)  # Store probabilities for all windows

    with torch.no_grad():
        for idx, inputs in enumerate(dataloader):
            inputs = inputs.to(device)
            outputs = model(inputs)
            probabilities[idx] = F.softmax(outputs, dim=1).squeeze()  # Store probabilities

    # Aggregated Probabilities
    mean_probabilities = probabilities.mean(dim=0).cpu().numpy()  # Average probabilities across all windows
    aggregated_probabilities = {
        anomaly_classes[i]: float(mean_probabilities[i]) for i in range(n_classes)
    }

    # Majority Voting based on probabilities
    most_probable_class_idx = mean_probabilities.argmax()  # Get the class with the highest average probability
    predicted_class = anomaly_classes[most_probable_class_idx]

    return predicted_class, aggregated_probabilities


In [28]:

# Path to the saved model
model_path = "../classification_models/classification_model_epoch20_loss0.0789.pth"
folder_path = "../Dataset/OnlyFailuresByDriver/npz_failures_MinMaxScaler_normalized_test_firstDriver.npz"

# Load the data for the first driver
failureFirstDriver = np.load(folder_path)["data"]
df = pd.DataFrame(failureFirstDriver)

# Convert DataFrame to NumPy array for processing
driver_data = df.to_numpy()

# Extract the actual failure class (last column)
actual_failure = driver_data[:, -1]  # Last column contains the true failure class indices

# Remove the last column to get the input features
driver_data = driver_data[:, :-1]

# Define the anomaly classes
anomaly_classes = ["Others", "Braking System", "Engine", "Power Unit", "Cooling System", 
                   "Suspension and Drive", "Aerodynamics and Tyres", "Transmission and Gearbox"]
n_classes = len(anomaly_classes)   # Number of distinct failure classes

# Sliding window length (sequence length for LSTM)
sequence_length = 20  # Use the entire data as a single sequence

# Predict the anomaly using sliding window with aggregation
predicted_anomaly, aggregated_probabilities = predict_anomaly(
    model_path=model_path,
    driver_data=driver_data,
    sequence_length=sequence_length,
    n_classes=n_classes,
    anomaly_classes=anomaly_classes
)

# Map the actual failure index to its corresponding class name
actual_failure_class = anomaly_classes[int(actual_failure[0])]

# Print results
print(f"Actual Anomaly: {actual_failure_class}")
print(f"Predicted Anomaly: {predicted_anomaly}")
print("\nAggregated Probabilities:")

# Sort aggregated probabilities by value in descending order
sorted_probabilities = sorted(aggregated_probabilities.items(), key=lambda x: x[1], reverse=True)

# Print only probabilities > 1e-3 (0.001)
for anomaly, probability in sorted_probabilities:
    if probability > 1e-3:
        print(f"{anomaly}: {probability:.2%}")

Actual Anomaly: Braking System
Predicted Anomaly: Suspension and Drive

Aggregated Probabilities:
Suspension and Drive: 42.74%
Braking System: 23.92%
Engine: 22.36%
Cooling System: 10.95%
