In [1]:
from temporora import * 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class RNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(RNNModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)  # Output size is the number of classes
    
    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out[:, -1, :])  # Use the output of the last time step
        return out

for dataset in datasets:
    print(f"Processing dataset: {dataset}")
    args.sub_dataset = dataset

    train_loader, valid_loader, test_loader, test_loader_last, num_test_windows, train_visualize, engine_id = get_dataloader(
        dir_path=args.dataset_root,
        sub_dataset=args.sub_dataset,
        max_rul=args.max_rul,
        seq_length=args.sequence_len,
        batch_size=args.batch_size,
        use_exponential_smoothing=args.use_exponential_smoothing,
        smooth_rate=args.smooth_rate)

    input_size = args.feature_num
    hidden_size = 64
    num_layers = 2

    # Dynamically infer the number of classes from the dataset
    all_labels = []
    for _, y in train_loader:
        all_labels.extend(y.numpy().flatten())
    output_size = len(set(all_labels))  # Dynamically calculate number of classes

    num_epochs = 50

    rnn_model = RNNModel(input_size, hidden_size, num_layers, output_size).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(rnn_model.parameters(), lr=0.001)

    for epoch in range(num_epochs):
        rnn_model.train()
        for x, y in train_loader:
            x = x.view(-1, args.sequence_len, args.feature_num).to(device)
            y = y.view(-1).to(device).long()
            optimizer.zero_grad()
            outputs = rnn_model(x)
            loss = criterion(outputs, y)

            loss.backward()
            optimizer.step()

        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}")

    rnn_model.eval()
    valid_predictions, valid_actuals = [], []
    test_predictions, test_actuals = [], []

    with torch.no_grad():
        for x, y in valid_loader:
            x = x.view(-1, args.sequence_len, args.feature_num).to(device)
            y = y.view(-1).to(device).long()
            preds = rnn_model(x)
            _, predicted = torch.max(preds, 1) 
            valid_predictions.extend(predicted.cpu().numpy())
            valid_actuals.extend(y.cpu().numpy())

    with torch.no_grad():
        for x, y in test_loader:
            x = x.view(-1, args.sequence_len, args.feature_num).to(device)
            y = y.view(-1).to(device).long()
            preds = rnn_model(x)
            _, predicted = torch.max(preds, 1)
            test_predictions.extend(predicted.cpu().numpy())
            test_actuals.extend(y.cpu().numpy())

    valid_accuracy = accuracy_score(valid_actuals, valid_predictions)
    test_accuracy = accuracy_score(test_actuals, test_predictions)

    valid_classification_report = classification_report(valid_actuals, valid_predictions)
    test_classification_report = classification_report(test_actuals, test_predictions)

    metrics[dataset] = {
        "Validation Accuracy": valid_accuracy,
        "Test Accuracy": test_accuracy,
        "Validation Report": valid_classification_report,
        "Test Report": test_classification_report
    }

    print(f"Dataset {dataset} Metrics:")
    print(f"Validation Accuracy: {valid_accuracy:.4f}")
    print(f"Test Accuracy: {test_accuracy:.4f}")
    print(f"Validation Classification Report:\n{valid_classification_report}")
    print(f"Test Classification Report:\n{test_classification_report}")

    # Confusion Matrix
    cm = confusion_matrix(test_actuals, test_predictions)
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=np.arange(output_size),
                yticklabels=np.arange(output_size))
    plt.xlabel('Predicted')
    plt.ylabel('Actual')
    plt.title(f'Confusion Matrix for {dataset}')
    plt.show()


Processing dataset: FD001
Epoch [1/50], Loss: 0.3955
Epoch [2/50], Loss: 0.3973
Epoch [3/50], Loss: 0.3874
Epoch [4/50], Loss: 0.4013
Epoch [5/50], Loss: 0.4192
Epoch [6/50], Loss: 0.2883
Epoch [7/50], Loss: 0.4348
Epoch [8/50], Loss: 0.3655
Epoch [9/50], Loss: 0.4186
Epoch [10/50], Loss: 0.4018
Epoch [11/50], Loss: 0.4060
Epoch [12/50], Loss: 0.3800
Epoch [13/50], Loss: 0.2470
Epoch [14/50], Loss: 0.2880
Epoch [15/50], Loss: 0.3469
Epoch [16/50], Loss: 0.2725
Epoch [17/50], Loss: 0.1987
Epoch [18/50], Loss: 0.3550
Epoch [19/50], Loss: 0.3306
Epoch [20/50], Loss: 0.1956
Epoch [21/50], Loss: 0.3359
Epoch [22/50], Loss: 0.2571
Epoch [23/50], Loss: 0.2044
Epoch [24/50], Loss: 0.3028
Epoch [25/50], Loss: 0.2587
Epoch [26/50], Loss: 0.2023
Epoch [27/50], Loss: 0.1767
Epoch [28/50], Loss: 0.3415
Epoch [29/50], Loss: 0.2583
Epoch [30/50], Loss: 0.2759
Epoch [31/50], Loss: 0.2363
Epoch [32/50], Loss: 0.2039
