# LSTM Model Training for RUL Prediction (PyTorch)

**Objective:** Build and train an LSTM neural network to predict Remaining Useful Life (RUL)

**Steps:**
1. Load preprocessed data
2. Build LSTM model architecture with PyTorch
3. Train model with early stopping 
4. Evaluate performance (RMSE, MAE) 
5. Visualize predictions and training history 
6. Save trained model and extract latent features for XGBoost 

In [None]:
# --------------------------------------------------------------------------
# Import required libraries for deep learning and visualization
# --------------------------------------------------------------------------

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import pickle
import os

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
torch.manual_seed(42)
if torch.cuda.is_available():
    torch.cuda.manual_seed(42)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

print(f"PyTorch version: {torch.__version__}")
print(f"Device: {device}")

In [None]:
# --------------------------------------------------------------------------
# Load preprocessed data and metadata
# --------------------------------------------------------------------------

X_train = np.load('processed_data/X_train.npy')
y_train = np.load('processed_data/y_train.npy')
X_val = np.load('processed_data/X_val.npy')
y_val = np.load('processed_data/y_val.npy')
X_test = np.load('processed_data/X_test.npy')
y_test = np.load('processed_data/y_test.npy')

with open('processed_data/metadata.pkl', 'rb') as f:
    metadata = pickle.load(f)

print(f"Training set: {X_train.shape}")
print(f"Validation set: {X_val.shape}")
print(f"Test set: {X_test.shape}")
print(f"\nMetadata:")
for key, value in metadata.items():
    print(f"  {key}: {value}")

# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train).to(device)
y_train_tensor = torch.FloatTensor(y_train).unsqueeze(1).to(device)
X_val_tensor = torch.FloatTensor(X_val).to(device)
y_val_tensor = torch.FloatTensor(y_val).unsqueeze(1).to(device)
X_test_tensor = torch.FloatTensor(X_test).to(device)
y_test_tensor = torch.FloatTensor(y_test).unsqueeze(1).to(device)

# Create data loaders
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
val_dataset = TensorDataset(X_val_tensor, y_val_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=128, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)

print(f"\nData loaders created with batch size: 128")

In [None]:
# --------------------------------------------------------------------------
# Build LSTM model architecture
# --------------------------------------------------------------------------

class LSTMModel(nn.Module):
    def __init__(self, n_features, hidden_dim1=64, hidden_dim2=32, fc_dim=32, dropout=0.2):
        super(LSTMModel, self).__init__()
        
        self.lstm1 = nn.LSTM(n_features, hidden_dim1, batch_first=True)
        self.dropout1 = nn.Dropout(dropout)
        
        self.lstm2 = nn.LSTM(hidden_dim1, hidden_dim2, batch_first=True)
        self.dropout2 = nn.Dropout(dropout)
        
        self.fc1 = nn.Linear(hidden_dim2, fc_dim)
        self.relu = nn.ReLU()
        self.dropout3 = nn.Dropout(dropout)
        
        self.fc2 = nn.Linear(fc_dim, 1)
        
    def forward(self, x):
        # LSTM 1
        lstm1_out, _ = self.lstm1(x)
        lstm1_out = self.dropout1(lstm1_out)
        
        # LSTM 2
        lstm2_out, _ = self.lstm2(lstm1_out)
        lstm2_out = self.dropout2(lstm2_out)
        
        # Take last timestep output
        lstm2_last = lstm2_out[:, -1, :]
        
        # Fully connected layers
        fc1_out = self.fc1(lstm2_last)
        fc1_out = self.relu(fc1_out)
        fc1_out = self.dropout3(fc1_out)
        
        output = self.fc2(fc1_out)
        
        return output, lstm2_last  # Return both output and latent features
    
    def predict(self, x):
        output, _ = self.forward(x)
        return output

# Build model
sequence_length = metadata['sequence_length']
n_features = metadata['n_features']

model = LSTMModel(n_features=n_features).to(device)

# Define loss function and optimizer
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=7, min_lr=1e-6, verbose=True
)

print("Model architecture:")
print(model)
print(f"\nTotal parameters: {sum(p.numel() for p in model.parameters()):,}")

In [None]:
# --------------------------------------------------------------------------
# Define training and validation functions
# --------------------------------------------------------------------------

# Create models directory
os.makedirs('models', exist_ok=True)

def train_epoch(model, loader, criterion, optimizer, device):
    """Train for one epoch"""
    model.train()
    total_loss = 0
    total_mae = 0
    
    for batch_X, batch_y in loader:
        optimizer.zero_grad()
        
        outputs, _ = model(batch_X)
        loss = criterion(outputs, batch_y)
        
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        total_mae += torch.mean(torch.abs(outputs - batch_y)).item()
    
    avg_loss = total_loss / len(loader)
    avg_mae = total_mae / len(loader)
    
    return avg_loss, avg_mae

def validate(model, loader, criterion, device):
    """Validate model"""
    model.eval()
    total_loss = 0
    total_mae = 0
    
    with torch.no_grad():
        for batch_X, batch_y in loader:
            outputs, _ = model(batch_X)
            loss = criterion(outputs, batch_y)
            
            total_loss += loss.item()
            total_mae += torch.mean(torch.abs(outputs - batch_y)).item()
    
    avg_loss = total_loss / len(loader)
    avg_mae = total_mae / len(loader)
    
    return avg_loss, avg_mae

print("Training functions defined")
print("  - Early stopping patience: 15 epochs")
print("  - Learning rate reduction patience: 7 epochs")

In [None]:
# --------------------------------------------------------------------------
# Train the model
# --------------------------------------------------------------------------

print("Starting training...\n")

# Training parameters
num_epochs = 100
patience = 15
best_val_loss = float('inf')
patience_counter = 0

# History tracking
history = {
    'train_loss': [],
    'val_loss': [],
    'train_mae': [],
    'val_mae': []
}

# Training loop
for epoch in range(num_epochs):
    # Train
    train_loss, train_mae = train_epoch(model, train_loader, criterion, optimizer, device)
    
    # Validate
    val_loss, val_mae = validate(model, val_loader, criterion, device)
    
    # Update scheduler
    scheduler.step(val_loss)
    
    # Store history
    history['train_loss'].append(train_loss)
    history['val_loss'].append(val_loss)
    history['train_mae'].append(train_mae)
    history['val_mae'].append(val_mae)
    
    # Print progress
    print(f"Epoch [{epoch+1}/{num_epochs}] - "
          f"Train Loss: {train_loss:.4f}, Train MAE: {train_mae:.4f}, "
          f"Val Loss: {val_loss:.4f}, Val MAE: {val_mae:.4f}")
    
    # Early stopping and model checkpoint
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
        # Save best model
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
        }, 'models/lstm_best_model.pth')
        print(f"  --> Model saved (val_loss improved to {val_loss:.4f})")
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f"\nEarly stopping triggered after {epoch+1} epochs")
            break

# Load best model
checkpoint = torch.load('models/lstm_best_model.pth')
model.load_state_dict(checkpoint['model_state_dict'])

print("\nTraining complete!")
print(f"Best validation loss: {best_val_loss:.4f}")

In [None]:
# --------------------------------------------------------------------------
# Visualize training history
# --------------------------------------------------------------------------

fig, axes = plt.subplots(1, 2, figsize=(15, 5))

# Loss plot
axes[0].plot(history['train_loss'], label='Training Loss', linewidth=2)
axes[0].plot(history['val_loss'], label='Validation Loss', linewidth=2)
axes[0].set_xlabel('Epoch', fontsize=12)
axes[0].set_ylabel('Loss (MSE)', fontsize=12)
axes[0].set_title('Model Loss Over Epochs', fontsize=14, fontweight='bold')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# MAE plot
axes[1].plot(history['train_mae'], label='Training MAE', linewidth=2)
axes[1].plot(history['val_mae'], label='Validation MAE', linewidth=2)
axes[1].set_xlabel('Epoch', fontsize=12)
axes[1].set_ylabel('MAE', fontsize=12)
axes[1].set_title('Mean Absolute Error Over Epochs', fontsize=14, fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('models/training_history.png', dpi=300, bbox_inches='tight')
plt.show()

print(f"Training stopped at epoch: {len(history['train_loss'])}")
print(f"Best validation loss: {min(history['val_loss']):.4f}")
print(f"Best validation MAE: {min(history['val_mae']):.4f}")

In [None]:
# --------------------------------------------------------------------------
# Evaluate model on test set
# --------------------------------------------------------------------------

def predict_all(model, loader, device):
    model.eval()
    predictions = []
    actuals = []
    
    with torch.no_grad():
        for batch_X, batch_y in loader:
            outputs, _ = model(batch_X)
            predictions.append(outputs.cpu().numpy())
            actuals.append(batch_y.cpu().numpy())
    
    predictions = np.concatenate(predictions, axis=0).flatten()
    actuals = np.concatenate(actuals, axis=0).flatten()
    
    return predictions, actuals

# Make predictions
y_pred_train, _ = predict_all(model, train_loader, device)
y_pred_val, _ = predict_all(model, val_loader, device)
y_pred_test, _ = predict_all(model, test_loader, device)

# Calculate metrics
def calculate_metrics(y_true, y_pred, dataset_name):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))
    mae = mean_absolute_error(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    
    print(f"\n{dataset_name} Metrics:")
    print(f"  RMSE: {rmse:.4f}")
    print(f"  MAE:  {mae:.4f}")
    print(f"  R²:   {r2:.4f}")
    
    return {'rmse': rmse, 'mae': mae, 'r2': r2}

train_metrics = calculate_metrics(y_train, y_pred_train, "Training")
val_metrics = calculate_metrics(y_val, y_pred_val, "Validation")
test_metrics = calculate_metrics(y_test, y_pred_test, "Test")

In [None]:
# --------------------------------------------------------------------------
# Visualize predictions vs actual RUL
# --------------------------------------------------------------------------

fig, axes = plt.subplots(2, 2, figsize=(16, 12))

# Test set: Scatter plot
axes[0, 0].scatter(y_test, y_pred_test, alpha=0.5, s=20)
axes[0, 0].plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 
                'r--', linewidth=2, label='Perfect Prediction')
axes[0, 0].set_xlabel('Actual RUL', fontsize=12)
axes[0, 0].set_ylabel('Predicted RUL', fontsize=12)
axes[0, 0].set_title(f'Test Set: Predictions vs Actual\nRMSE: {test_metrics["rmse"]:.2f}', 
                     fontsize=13, fontweight='bold')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# Test set: Residual plot
residuals = y_test - y_pred_test
axes[0, 1].scatter(y_pred_test, residuals, alpha=0.5, s=20)
axes[0, 1].axhline(y=0, color='r', linestyle='--', linewidth=2)
axes[0, 1].set_xlabel('Predicted RUL', fontsize=12)
axes[0, 1].set_ylabel('Residuals', fontsize=12)
axes[0, 1].set_title('Test Set: Residual Plot', fontsize=13, fontweight='bold')
axes[0, 1].grid(True, alpha=0.3)

# Sample predictions over time (first 500 samples)
sample_size = min(500, len(y_test))
axes[1, 0].plot(y_test[:sample_size], label='Actual RUL', linewidth=2, alpha=0.8)
axes[1, 0].plot(y_pred_test[:sample_size], label='Predicted RUL', linewidth=2, alpha=0.8)
axes[1, 0].set_xlabel('Sample Index', fontsize=12)
axes[1, 0].set_ylabel('RUL', fontsize=12)
axes[1, 0].set_title('Test Set: Sample Predictions (First 500)', fontsize=13, fontweight='bold')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# Error distribution
axes[1, 1].hist(residuals, bins=50, edgecolor='black', alpha=0.7)
axes[1, 1].axvline(x=0, color='r', linestyle='--', linewidth=2)
axes[1, 1].set_xlabel('Prediction Error (Actual - Predicted)', fontsize=12)
axes[1, 1].set_ylabel('Frequency', fontsize=12)
axes[1, 1].set_title('Test Set: Error Distribution', fontsize=13, fontweight='bold')
axes[1, 1].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('models/predictions_evaluation.png', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
# --------------------------------------------------------------------------
# Extract latent features from LSTM for XGBoost
# --------------------------------------------------------------------------

def extract_features(model, loader, device):
    """Extract latent features from LSTM layer"""
    model.eval()
    features = []
    
    with torch.no_grad():
        for batch_X, _ in loader:
            _, latent = model(batch_X)
            features.append(latent.cpu().numpy())
    
    return np.concatenate(features, axis=0)

# Extract features
train_features = extract_features(model, train_loader, device)
val_features = extract_features(model, val_loader, device)
test_features = extract_features(model, test_loader, device)

print(f"Extracted latent features:")
print(f"  Training: {train_features.shape}")
print(f"  Validation: {val_features.shape}")
print(f"  Test: {test_features.shape}")

# Save latent features
np.save('processed_data/train_features_lstm.npy', train_features)
np.save('processed_data/val_features_lstm.npy', val_features)
np.save('processed_data/test_features_lstm.npy', test_features)

print("\nLatent features saved to 'processed_data/' folder")

In [None]:
# --------------------------------------------------------------------------
# Save model and results
# --------------------------------------------------------------------------

# Save final model
torch.save({
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'history': history,
    'metadata': metadata
}, 'models/lstm_final_model.pth')
print("Model saved to 'models/lstm_final_model.pth'")

# Save predictions
np.save('models/y_pred_test.npy', y_pred_test)
print("Test predictions saved to 'models/y_pred_test.npy'")

# Save metrics
results = {
    'train_metrics': train_metrics,
    'val_metrics': val_metrics,
    'test_metrics': test_metrics,
    'training_history': history,
    'model_architecture': {
        'sequence_length': sequence_length,
        'n_features': n_features,
        'total_params': sum(p.numel() for p in model.parameters())
    }
}

with open('models/training_results.pkl', 'wb') as f:
    pickle.dump(results, f)

print("Training results saved to 'models/training_results.pkl'")

# Summary
print("\n" + "="*60)
print("Training Complete")
print("="*60)
print(f"\nTest Set Performance:")
print(f"  RMSE: {test_metrics['rmse']:.4f} cycles")
print(f"  MAE:  {test_metrics['mae']:.4f} cycles")
print(f"  R²:   {test_metrics['r2']:.4f}")
print(f"\nFiles saved:")
print(f"  - models/lstm_best_model.pth")
print(f"  - models/lstm_final_model.pth")
print(f"  - models/training_results.pkl")
print(f"  - processed_data/train_features_lstm.npy")
print(f"  - processed_data/val_features_lstm.npy")
print(f"  - processed_data/test_features_lstm.npy")
print("="*60)