<a href="https://colab.research.google.com/github/DKS2301/GRU-MHA-RUL-Forecasting/blob/main/GRU_MHA_model_for_accurate_battery_RUL_forecasting.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Battery RUL Prediction using GRU-MHA Model**

## **Import Required Libraries**

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import Ridge, LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.neighbors import KNeighborsRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

print("TensorFlow version:", tf.__version__)
print("GPU Available:", tf.config.list_physical_devices('GPU'))

TensorFlow version: 2.19.0
GPU Available: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]


## **Data Loading Functions**

In [None]:
def load_nmc_lco_data(filepath):
    """
    Load NMC-LCO 18650 battery dataset
    Expected columns: Cycle-Index, Discharge Time (s), Decrease 3.6-3.4V (s),
    Max Discharge Voltage (V), Min Charging Voltage (V), Time at 4.15V (s),
    Time at Constant Current (s), Charging Time (s), RUL
    """
    try:
        df = pd.read_csv(filepath)
        print(f"Loaded NMC-LCO data: {df.shape}")
        print(f"Columns: {df.columns.tolist()}")
        return df
    except Exception as e:
        print(f"Error loading data: {e}")
        return None

def load_nasa_data(filepath):
    """
    Load NASA battery dataset
    Expected columns: Cycle, Time Measured(Sec), Voltage Measured(V),
    Current Measured, Temperature Measured, Capacity(Ah), Signal Energy,
    Fluctuation Index, Skewness Index, Kurtosis Index
    """
    try:
        df = pd.read_csv(filepath)
        print(f"Loaded NASA data: {df.shape}")
        print(f"Columns: {df.columns.tolist()}")

        # Calculate SoH if Capacity column exists
        if 'Capacity(Ah)' in df.columns:
            initial_capacity = df['Capacity(Ah)'].max()
            df['SoH'] = df['Capacity(Ah)'] / initial_capacity

        return df
    except Exception as e:
        print(f"Error loading data: {e}")
        return None


## **Feature Selection using Pearson Correlation**

In [None]:
def select_features_by_correlation(X, y, threshold=0.4):
    """
    Separate features into correlated and uncorrelated based on Pearson correlation
    """
    correlations = {}
    for col in X.columns:
        corr = np.corrcoef(X[col], y)[0, 1]
        correlations[col] = abs(corr)

    correlated_features = [k for k, v in correlations.items() if v >= threshold]
    uncorrelated_features = [k for k, v in correlations.items() if v < threshold]

    print(f"\nCorrelated features (>={threshold}): {correlated_features}")
    print(f"Uncorrelated features (<{threshold}): {uncorrelated_features}")
    print(f"\nCorrelation values:")
    for k, v in sorted(correlations.items(), key=lambda x: x[1], reverse=True):
        print(f"  {k}: {v:.4f}")

    return correlated_features, uncorrelated_features, correlations

## **Visualization Functions**

In [None]:
def plot_correlation_matrix(df, target_col):
    """Plot correlation matrix"""
    plt.figure(figsize=(12, 10))
    corr = df.corr()
    sns.heatmap(corr, annot=True, fmt='.2f', cmap='coolwarm', center=0)
    plt.title('Feature Correlation Matrix')
    plt.tight_layout()
    plt.show()

def plot_rul_trend(df, cycle_col, target_col):
    """Plot RUL/SoH trend over cycles"""
    plt.figure(figsize=(12, 6))
    plt.plot(df[cycle_col], df[target_col], linewidth=2)
    plt.xlabel('Cycle Number')
    plt.ylabel(target_col)
    plt.title(f'{target_col} vs Cycle Number')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()


## **Dynamic Swish Activation**

In [None]:

class DynamicSwish(layers.Layer):
    """
    Custom Dynamic Swish activation function
    Formula: x / (s * (1 + e^(-x)))
    """
    def __init__(self, scale=1.0, **kwargs):
        super(DynamicSwish, self).__init__(**kwargs)
        self.scale = scale

    def call(self, x):
        return x / (self.scale * (1 + tf.exp(-x)))

    def get_config(self):
        config = super().get_config()
        config.update({"scale": self.scale})
        return config


## **GRU-MHA Model Architecture**

In [None]:
def build_gru_mha_model(correlated_dim, uncorrelated_dim,
                        gru_units=4, dense_units=30,
                        mha_heads=8, mha_units=20,
                        scale_factor=1.0):
    """
    Build the hybrid GRU-MHA model with two input paths
    """
    # Input layers for correlated and uncorrelated features
    correlated_input = layers.Input(shape=(None, correlated_dim), name='correlated_input')
    uncorrelated_input = layers.Input(shape=(None, uncorrelated_dim), name='uncorrelated_input')

    # Path 1: Correlated features
    gru1 = layers.GRU(gru_units, return_sequences=True, name='gru_corr')(correlated_input)
    attention1 = layers.Attention(name='attention_corr')([gru1, gru1])
    mha1 = layers.MultiHeadAttention(num_heads=mha_heads, key_dim=mha_units,
                                     name='mha_corr')(attention1, attention1)
    flatten1 = layers.Flatten(name='flatten_corr')(mha1)

    # Path 2: Uncorrelated features
    gru2 = layers.GRU(gru_units, return_sequences=True, name='gru_uncorr')(uncorrelated_input)
    attention2 = layers.Attention(name='attention_uncorr')([gru2, gru2])
    mha2 = layers.MultiHeadAttention(num_heads=mha_heads, key_dim=mha_units,
                                     name='mha_uncorr')(attention2, attention2)
    flatten2 = layers.Flatten(name='flatten_uncorr')(mha2)

    # Concatenate both paths
    concatenated = layers.Concatenate(name='concat')([flatten1, flatten2])

    # Dense layers
    dense1 = layers.Dense(dense_units, activation='swish', name='dense1')(concatenated)
    dense2 = layers.Dense(dense_units//2, activation='swish', name='dense2')(dense1)

    # Output with dynamic swish
    output = layers.Dense(1, name='output_dense')(dense2)
    output = DynamicSwish(scale=scale_factor, name='dynamic_swish')(output)

    model = Model(inputs=[correlated_input, uncorrelated_input],
                  outputs=output, name='GRU_MHA_Model')

    return model


## **Baseline CNN-LSTM Model**

In [None]:
def build_cnn_lstm_baseline(input_dim, sequence_length=1):
    """
    Build baseline CNN-LSTM model for comparison
    """
    inputs = layers.Input(shape=(sequence_length, input_dim))

    # Conv1D layers
    conv1 = layers.Conv1D(32, kernel_size=3, padding='same', activation='relu')(inputs)
    conv2 = layers.Conv1D(32, kernel_size=3, padding='same', activation='relu')(conv1)

    # LSTM layer
    lstm = layers.LSTM(27, return_sequences=False)(conv2)

    # Dense layers
    dense1 = layers.Dense(30, activation='relu')(lstm)
    output = layers.Dense(1)(dense1)

    model = Model(inputs=inputs, outputs=output, name='CNN_LSTM_Baseline')
    return model

# **Data Preparation**


In [None]:
def prepare_data(df, target_col, correlated_features, uncorrelated_features,
                 sequence_length=1, test_size=0.2):
    """
    Prepare data for training with two separate paths
    """
    # Ensure features exist in dataframe
    correlated_features = [f for f in correlated_features if f in df.columns]
    uncorrelated_features = [f for f in uncorrelated_features if f in df.columns]

    if len(correlated_features) == 0:
        correlated_features = uncorrelated_features[:1]
        uncorrelated_features = uncorrelated_features[1:]

    X_corr = df[correlated_features].values
    X_uncorr = df[uncorrelated_features].values
    y = df[target_col].values

    # Normalize features
    scaler_corr = StandardScaler()
    scaler_uncorr = StandardScaler()
    scaler_y = StandardScaler()

    X_corr_scaled = scaler_corr.fit_transform(X_corr)
    X_uncorr_scaled = scaler_uncorr.fit_transform(X_uncorr)
    y_scaled = scaler_y.fit_transform(y.reshape(-1, 1)).flatten()

    # Reshape for sequence input
    X_corr_seq = X_corr_scaled.reshape(-1, sequence_length, len(correlated_features))
    X_uncorr_seq = X_uncorr_scaled.reshape(-1, sequence_length, len(uncorrelated_features))

    # Split data
    indices = np.arange(len(y_scaled))
    train_idx, test_idx = train_test_split(indices, test_size=test_size, random_state=42)

    X_corr_train = X_corr_seq[train_idx]
    X_corr_test = X_corr_seq[test_idx]
    X_uncorr_train = X_uncorr_seq[train_idx]
    X_uncorr_test = X_uncorr_seq[test_idx]
    y_train = y_scaled[train_idx]
    y_test = y_scaled[test_idx]

    return {
        'X_corr_train': X_corr_train, 'X_corr_test': X_corr_test,
        'X_uncorr_train': X_uncorr_train, 'X_uncorr_test': X_uncorr_test,
        'y_train': y_train, 'y_test': y_test,
        'scaler_corr': scaler_corr, 'scaler_uncorr': scaler_uncorr,
        'scaler_y': scaler_y, 'train_idx': train_idx, 'test_idx': test_idx
    }



# **Training Function**

In [None]:
def train_model(model, data_dict, epochs=250, batch_size=32, patience=50):
    """
    Train the GRU-MHA model
    """
    # Callbacks
    early_stopping = EarlyStopping(
        monitor='val_loss',
        patience=patience,
        restore_best_weights=True,
        verbose=1
    )

    reduce_lr = ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.3,
        patience=patience,
        min_lr=1e-7,
        verbose=1
    )

    # Compile model
    optimizer = keras.optimizers.Nadam(learning_rate=0.001)
    model.compile(optimizer=optimizer, loss='mse', metrics=['mae'])

    # Train
    history = model.fit(
        [data_dict['X_corr_train'], data_dict['X_uncorr_train']],
        data_dict['y_train'],
        validation_split=0.2,
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[early_stopping, reduce_lr],
        verbose=1
    )

    return history


## **Ridge Regression Final Stage**

In [None]:
def apply_ridge_regression(model, data_dict, alpha=1.5):
    """
    Apply ridge regression as the final forecasting stage
    """
    # Get predictions from the model
    train_pred = model.predict([data_dict['X_corr_train'], data_dict['X_uncorr_train']])
    test_pred = model.predict([data_dict['X_corr_test'], data_dict['X_uncorr_test']])

    # Train ridge regression on model outputs
    ridge = Ridge(alpha=alpha, solver='cholesky')
    ridge.fit(train_pred, data_dict['y_train'])

    # Final predictions
    final_train_pred = ridge.predict(train_pred)
    final_test_pred = ridge.predict(test_pred)

    return final_train_pred, final_test_pred, ridge

## **Evaluation Metrics**

In [None]:
def evaluate_model(y_true, y_pred, scaler_y, model_name="Model"):
    """
    Calculate and display evaluation metrics
    """
    # Inverse transform
    y_true_orig = scaler_y.inverse_transform(y_true.reshape(-1, 1)).flatten()
    y_pred_orig = scaler_y.inverse_transform(y_pred.reshape(-1, 1)).flatten()

    mae = mean_absolute_error(y_true_orig, y_pred_orig)
    mse = mean_squared_error(y_true_orig, y_pred_orig)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_true_orig, y_pred_orig)

    print(f"\n{model_name} Performance:")
    print(f"  MAE: {mae:.6f}")
    print(f"  MSE: {mse:.6f}")
    print(f"  RMSE: {rmse:.6f}")
    print(f"  R² Score: {r2:.6f} ({r2*100:.2f}%)")

    return {'mae': mae, 'mse': mse, 'rmse': rmse, 'r2': r2}


## **Plotting Functions:**

In [None]:
def plot_training_history(history):
    """Plot training and validation loss"""
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))

    # Loss
    axes[0].plot(history.history['loss'], label='Train Loss')
    axes[0].plot(history.history['val_loss'], label='Val Loss')
    axes[0].set_xlabel('Epoch')
    axes[0].set_ylabel('Loss')
    axes[0].set_title('Training and Validation Loss')
    axes[0].legend()
    axes[0].grid(True, alpha=0.3)

    # MAE
    axes[1].plot(history.history['mae'], label='Train MAE')
    axes[1].plot(history.history['val_mae'], label='Val MAE')
    axes[1].set_xlabel('Epoch')
    axes[1].set_ylabel('MAE')
    axes[1].set_title('Training and Validation MAE')
    axes[1].legend()
    axes[1].grid(True, alpha=0.3)

    plt.tight_layout()
    plt.show()

def plot_predictions(y_true, y_pred, scaler_y, title="Predictions vs Actual"):
    """Plot predicted vs actual values"""
    y_true_orig = scaler_y.inverse_transform(y_true.reshape(-1, 1)).flatten()
    y_pred_orig = scaler_y.inverse_transform(y_pred.reshape(-1, 1)).flatten()

    plt.figure(figsize=(12, 6))
    plt.plot(y_true_orig, label='Actual', linewidth=2, alpha=0.7)
    plt.plot(y_pred_orig, label='Predicted', linewidth=2, alpha=0.7)
    plt.xlabel('Sample Index')
    plt.ylabel('Value')
    plt.title(title)
    plt.legend()
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

    # Scatter plot
    plt.figure(figsize=(8, 8))
    plt.scatter(y_true_orig, y_pred_orig, alpha=0.5)
    plt.plot([y_true_orig.min(), y_true_orig.max()],
             [y_true_orig.min(), y_true_orig.max()],
             'r--', linewidth=2)
    plt.xlabel('Actual Values')
    plt.ylabel('Predicted Values')
    plt.title('Predicted vs Actual Values')
    plt.grid(True, alpha=0.3)
    plt.tight_layout()
    plt.show()

## **Comparison with ML Models**

In [None]:
def compare_ml_models(X_train, X_test, y_train, y_test, scaler_y):
    """
    Compare with traditional ML models
    """
    models = {
        'KNN': KNeighborsRegressor(n_neighbors=5),
        'SVR': SVR(kernel='rbf'),
        'Random Forest': RandomForestRegressor(n_estimators=150, max_depth=10,
                                               min_samples_leaf=4, random_state=42),
        'Linear Regression': LinearRegression()
    }

    results = {}

    for name, model in models.items():
        print(f"\nTraining {name}...")
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
        metrics = evaluate_model(y_test, y_pred, scaler_y, name)
        results[name] = metrics

    return results

## **Forward Feature Selection**

In [None]:
def forward_feature_selection(df, target_col, features):
    """
    Evaluate feature importance using forward selection
    """
    selected_features = []
    remaining_features = features.copy()
    r2_scores = []

    print("\nForward Feature Selection:")

    for i in range(len(features)):
        best_r2 = -np.inf
        best_feature = None

        for feature in remaining_features:
            test_features = selected_features + [feature]
            X = df[test_features].values
            y = df[target_col].values

            # Simple train-test split
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, random_state=42
            )

            # Train simple linear regression
            model = LinearRegression()
            model.fit(X_train, y_train)
            y_pred = model.predict(X_test)
            r2 = r2_score(y_test, y_pred)

            if r2 > best_r2:
                best_r2 = r2
                best_feature = feature

        selected_features.append(best_feature)
        remaining_features.remove(best_feature)
        r2_scores.append(best_r2)

        print(f"  Step {i+1}: Added '{best_feature}' (R² = {best_r2:.4f})")

    # Plot results
    plt.figure(figsize=(12, 6))
    plt.plot(range(1, len(features)+1), r2_scores, marker='o', linewidth=2)
    plt.xlabel('Number of Features')
    plt.ylabel('R² Score')
    plt.title('Forward Feature Selection Results')
    plt.grid(True, alpha=0.3)
    plt.xticks(range(1, len(features)+1))
    for i, feature in enumerate(selected_features):
        plt.text(i+1, r2_scores[i], feature, ha='center', va='bottom', fontsize=8)
    plt.tight_layout()
    plt.show()

    return selected_features, r2_scores


## **Main Execution Pipeline**

In [None]:
def run_complete_pipeline(data_path, dataset_type='NMC', target_col='RUL',
                          correlation_threshold=0.4, scale_factor=1.0):
    """
    Complete pipeline for battery RUL/SoH prediction

    Parameters:
    - data_path: Path to the CSV file
    - dataset_type: 'NMC' or 'NASA'
    - target_col: Target column name ('RUL' or 'SoH')
    - correlation_threshold: Threshold for feature correlation
    - scale_factor: Scale factor for dynamic swish activation
    """

    print("="*80)
    print("BATTERY RUL/SOH PREDICTION PIPELINE")
    print("="*80)

    # Load data
    if dataset_type == 'NMC':
        df = load_nmc_lco_data(data_path)
    else:
        df = load_nasa_data(data_path)

    if df is None:
        return None

    # Exploratory analysis
    print("\n" + "="*80)
    print("DATA EXPLORATION")
    print("="*80)
    print(df.describe())

    # Plot correlation matrix
    plot_correlation_matrix(df, target_col)

    # Plot RUL/SoH trend
    cycle_col = 'Cycle-Index' if 'Cycle-Index' in df.columns else 'Cycle'
    plot_rul_trend(df, cycle_col, target_col)

    # Feature selection
    print("\n" + "="*80)
    print("FEATURE SELECTION")
    print("="*80)

    feature_cols = [col for col in df.columns if col != target_col]
    X = df[feature_cols]
    y = df[target_col]

    correlated_features, uncorrelated_features, correlations = \
        select_features_by_correlation(X, y, correlation_threshold)

    # Prepare data
    print("\n" + "="*80)
    print("DATA PREPARATION")
    print("="*80)

    data_dict = prepare_data(df, target_col, correlated_features,
                            uncorrelated_features, sequence_length=1)

    print(f"Training samples: {len(data_dict['y_train'])}")
    print(f"Test samples: {len(data_dict['y_test'])}")
    print(f"Correlated features shape: {data_dict['X_corr_train'].shape}")
    print(f"Uncorrelated features shape: {data_dict['X_uncorr_train'].shape}")

    # Build model
    print("\n" + "="*80)
    print("MODEL BUILDING")
    print("="*80)

    model = build_gru_mha_model(
        correlated_dim=len(correlated_features),
        uncorrelated_dim=len(uncorrelated_features),
        gru_units=4,
        dense_units=30,
        mha_heads=8,
        mha_units=20,
        scale_factor=scale_factor
    )

    model.summary()

    # Train model
    print("\n" + "="*80)
    print("MODEL TRAINING")
    print("="*80)

    history = train_model(model, data_dict, epochs=250, batch_size=32, patience=50)

    # Plot training history
    plot_training_history(history)

    # Apply ridge regression
    print("\n" + "="*80)
    print("APPLYING RIDGE REGRESSION")
    print("="*80)

    final_train_pred, final_test_pred, ridge_model = \
        apply_ridge_regression(model, data_dict, alpha=1.5)

    # Evaluate
    print("\n" + "="*80)
    print("MODEL EVALUATION")
    print("="*80)

    train_metrics = evaluate_model(data_dict['y_train'], final_train_pred,
                                   data_dict['scaler_y'], "Training Set")
    test_metrics = evaluate_model(data_dict['y_test'], final_test_pred,
                                  data_dict['scaler_y'], "Test Set")

    # Plot predictions
    plot_predictions(data_dict['y_test'], final_test_pred,
                    data_dict['scaler_y'],
                    f"Test Set: {target_col} Predictions vs Actual")

    # Compare with ML models (flatten features for comparison)
    print("\n" + "="*80)
    print("COMPARISON WITH ML MODELS")
    print("="*80)

    X_train_flat = np.concatenate([
        data_dict['X_corr_train'].reshape(len(data_dict['X_corr_train']), -1),
        data_dict['X_uncorr_train'].reshape(len(data_dict['X_uncorr_train']), -1)
    ], axis=1)

    X_test_flat = np.concatenate([
        data_dict['X_corr_test'].reshape(len(data_dict['X_corr_test']), -1),
        data_dict['X_uncorr_test'].reshape(len(data_dict['X_uncorr_test']), -1)
    ], axis=1)

    ml_results = compare_ml_models(X_train_flat, X_test_flat,
                                   data_dict['y_train'], data_dict['y_test'],
                                   data_dict['scaler_y'])

    # Feature importance
    print("\n" + "="*80)
    print("FEATURE IMPORTANCE ANALYSIS")
    print("="*80)

    all_features = correlated_features + uncorrelated_features
    selected_features, r2_scores = forward_feature_selection(df, target_col, all_features)

    print("\n" + "="*80)
    print("PIPELINE COMPLETED SUCCESSFULLY")
    print("="*80)

    return {
        'model': model,
        'ridge': ridge_model,
        'data_dict': data_dict,
        'history': history,
        'test_metrics': test_metrics,
        'ml_results': ml_results,
        'correlations': correlations,
        'selected_features': selected_features
    }

## **Example Usage**

In [None]:

# For NMC-LCO 18650 Battery Dataset:
results = run_complete_pipeline(
    data_path='path/to/nmc_lco_battery_data.csv',
    dataset_type='NMC',
    target_col='RUL',
    correlation_threshold=0.4,
    scale_factor=1.0
)

# For NASA Battery Dataset:
results = run_complete_pipeline(
    data_path='path/to/nasa_battery_B0005.csv',
    dataset_type='NASA',
    target_col='SoH',
    correlation_threshold=0.4,
    scale_factor=8.0  # Use 8.0 for values between 0-2
)

# Access results:
if results:
    print(f"Test R² Score: {results['test_metrics']['r2']:.4f}")
    print(f"Test MAE: {results['test_metrics']['mae']:.6f}")

    # Save model
    results['model'].save('gru_mha_model.h5')

    # Access predictions, etc.

print("\n" + "="*80)
print("IMPLEMENTATION READY")
print("="*80)
print("\nTo use this implementation:")
print("1. Load your battery dataset (NMC-LCO or NASA)")
print("2. Run: results = run_complete_pipeline('your_data.csv', dataset_type='NMC' or 'NASA')")
print("3. The pipeline will automatically handle everything!")
print("\nMake sure your CSV has the appropriate columns as described in the paper.")

BATTERY RUL/SOH PREDICTION PIPELINE
Error loading data: [Errno 2] No such file or directory: 'path/to/nmc_lco_battery_data.csv'
BATTERY RUL/SOH PREDICTION PIPELINE
Error loading data: [Errno 2] No such file or directory: 'path/to/nasa_battery_B0005.csv'

IMPLEMENTATION READY

To use this implementation:
1. Load your battery dataset (NMC-LCO or NASA)
2. Run: results = run_complete_pipeline('your_data.csv', dataset_type='NMC' or 'NASA')
3. The pipeline will automatically handle everything!

Make sure your CSV has the appropriate columns as described in the paper.
