# Battery State of Charge (SoC) Estimation using Deep Learning

This notebook implements multiple deep learning models for battery State of Charge estimation using LSTM, Bidirectional LSTM, GRU, Bidirectional GRU, and 1D CNN architectures with Bayesian hyperparameter optimization.

In [None]:
import tensorflow as tf
import numpy as np
import pandas as pd
import scipy.io
import math
import os
import ntpath
import sys
import logging
import time
import sys

from importlib import reload
import plotly.graph_objects as go

from tensorflow import keras
from tensorflow.keras import layers, regularizers, mixed_precision
from keras.models import Sequential
from keras.layers import Dense, Dropout, Activation, Conv1D, MaxPooling1D, Flatten, Bidirectional, GRU
from keras.optimizers import SGD, Adam
import np_utils
from keras.layers import LSTM, Embedding, RepeatVector, TimeDistributed, Masking
from keras.callbacks import EarlyStopping, ModelCheckpoint, LambdaCallback
import matplotlib.pyplot as plt
from kerastuner.tuners import BayesianOptimization
from kerastuner.engine.hyperparameters import HyperParameters
from sklearn.metrics import mean_squared_error, mean_absolute_error
from tensorflow.keras.models import load_model

# Check if GPU is available
print(tf.config.list_physical_devices('GPU'))

# Check if running in Google Colab
IS_COLAB = False

if IS_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    data_path = "/content/drive/My Drive/battery-state-estimation/battery-state-estimation/"
else:
    data_path = "../"

# Add the data path to sys.path for module imports
sys.path.append(data_path)
from data_processing.unibo_powertools_data import UniboPowertoolsData, CycleCols
from data_processing.model_data_handler import ModelDataHandler

### Config logging

In [None]:
#Reload the logging module (if needed).
#Set up a consistent format for log messages, including timestamps and severity levels.
#Ensure all messages (from DEBUG level onward) are recorded.
reload(logging)
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s', level=logging.DEBUG, datefmt='%Y/%m/%d %H:%M:%S')
#DEBUG is the lowest level, meaning all messages (including DEBUG, INFO, WARNING, ERROR, and CRITICAL) will be logged.

# Load Data

### Initialize the data object

Load the cycle and capacity data to memory based on the specified chunk size

In [None]:
dataset = UniboPowertoolsData(
    test_types=['S'],
    chunk_size=1000000,
    lines=[37, 40],
    charge_line=37,
    discharge_line=40,
    base_path=data_path
)

### Determine the training and testing datasets

Prepare the training and testing data for model data handler to load the model input and output data.

In [None]:
train_data_test_names = [
    '000-DM-3.0-4019-S', 
    '001-DM-3.0-4019-S', 
    '002-DM-3.0-4019-S', 
    '006-EE-2.85-0820-S', 
    '007-EE-2.85-0820-S', 
    '018-DP-2.00-1320-S', 
    '019-DP-2.00-1320-S',
    '036-DP-2.00-1720-S',
    '037-DP-2.00-1720-S', 
    '038-DP-2.00-2420-S', 
    '040-DM-4.00-2320-S',
    '042-EE-2.85-0820-S', 
    '045-BE-2.75-2019-S'
]

test_data_test_names = [
    '003-DM-3.0-4019-S',
    '008-EE-2.85-0820-S',
    '039-DP-2.00-2420-S', 
    '041-DM-4.00-2320-S',    
]

dataset.prepare_data(train_data_test_names, test_data_test_names)

### Initialize the model data handler

Model data handler will be used to get the model input and output data for training.

In [None]:
mdh = ModelDataHandler(dataset, [
    CycleCols.VOLTAGE,
    CycleCols.CURRENT,
    CycleCols.TEMPERATURE
])

# Data Preparation

In [None]:
train_x, train_y, test_x, test_y = mdh.get_discharge_whole_cycle(soh = False, output_capacity = False)

In [None]:

train_y = mdh.keep_only_capacity(train_y, is_multiple_output = True)
test_y = mdh.keep_only_capacity(test_y, is_multiple_output = True)

# Sliding Windows Implementation

In [None]:
# Apply sliding windows to train_x and test_x
window_size = 30
stride = 1

In [None]:
#at the start of each array pad for window length the first number of the array
train_x= np.array([np.vstack(([cycle[0]]*(window_size-1), cycle))for cycle in train_x])
train_y= np.array([np.hstack(([soc[0]]*(window_size-1) , soc)) for soc in train_y])
test_x= np.array([np.vstack(([cycle[0]]*(window_size-1), cycle))for cycle in test_x])
test_y= np.array([np.hstack(([soc[0]]*(window_size-1) , soc)) for soc in test_y])

In [None]:
print(f"Train X shape: {train_x.shape}")
print(f"Train Y shape: {train_y.shape}")
print(f"Test X shape: {test_x.shape}")
print(f"Test Y shape: {test_y.shape}")

In [None]:
# Create sliding windows function
def create_sliding_windows(data, labels, window_size=30, stride=1):
    sliding_windows = []
    aligned_labels = []
    skipped_samples = 0

    for i, sample in enumerate(data):
        sample_labels = labels[i]  # Labels for the current sample (time_steps,)
        if len(sample) < window_size:
            skipped_samples += 1
            continue
        #find the number of padding start indices
        padding_start_index = len(sample_labels)
        for a in range(len(sample_labels)):
            if sample_labels[a] == 1 and sample_labels[a - 1] == 1:
                padding_start_index = a-1
                break
        for j in range(0, padding_start_index - window_size + 1, stride):
            # Extract window and corresponding label (last time step of the window)
            sliding_windows.append(sample[j:j + window_size])
            aligned_labels.append(sample_labels[j + window_size - 1])  # Use label at the end of the window

    if skipped_samples > 0:
        logging.warning(f"Skipped {skipped_samples} samples due to insufficient length for window size {window_size}.")

    return np.array(sliding_windows), np.array(aligned_labels)

In [None]:
# create sliding windows for train and test data
train_x_windowed, train_y_aligned = create_sliding_windows(train_x, train_y, window_size=window_size, stride=stride)
test_x_windowed, test_y_aligned = create_sliding_windows(test_x, test_y, window_size=window_size, stride=stride)

# Update train_x, train_y, test_x, test_y
train_x = train_x_windowed
train_y = train_y_aligned
test_x = test_x_windowed
test_y = test_y_aligned

# Log the new shapes after applying sliding windows
logging.info("After sliding windows - Train x: %s, Train y: %s | Test x: %s, Test y: %s" %
             (train_x.shape, train_y.shape, test_x.shape, test_y.shape))

# Model Architectures

## Bidirectional LSTM Model

In [None]:
def build_bi_lstm_model(hp):
    """Build Bidirectional LSTM model with hyperparameter tuning"""
    model = Sequential()
    
    # Tunable Bidirectional LSTM layers
    for i in range(hp.Int('lstm_layers', 1, 3)):  # Number of Bidirectional LSTM layers (1 to 3)
        if i == 0:
            model.add(layers.Bidirectional(
                layers.LSTM(
                    units=hp.Int(f'units_{i}', 32, 256, step=32),  # Tunable number of units
                    activation='tanh',
                    input_shape=(train_x.shape[1], train_x.shape[2]),  # Input shape for the first LSTM layer
                    return_sequences=True if hp.Int('lstm_layers', 1, 3) > 1 else False  # Return sequences if more layers follow
                )
            ))
        else:
            model.add(layers.Bidirectional(
                layers.LSTM(
                    units=hp.Int(f'units_{i}', 32, 256, step=32),  # Tunable number of units
                    activation='tanh',
                    return_sequences=True if i < hp.Int('lstm_layers', 1, 3) - 1 else False  # Return sequences if not the last layer
                )
            ))
        model.add(layers.Dropout(
            rate=hp.Float(f'dropout_{i}', 0.0, 0.5, step=0.1)  # Tunable dropout rate
        ))
    
    # Tunable dense layers
    for j in range(hp.Int('dense_layers', 1, 3)):  # Number of dense layers (1 to 3)
        model.add(layers.Dense(
            units=hp.Int(f'dense_units_{j}', 64, 512, step=32),  # Tunable number of units
            activation='selu'
        ))
        model.add(layers.Dropout(
            rate=hp.Float(f'dense_dropout_{j}', 0.0, 0.5, step=0.1)  # Tunable dropout rate
        ))
    
    model.add(layers.Dense(1, activation='linear'))  # Output layer
    
    # Tunable learning rate
    learning_rate = hp.Choice('learning_rate', values=[1e-5, 5e-4, 1e-4, 5e-3, 1e-3])
    optimizer = Adam(learning_rate=learning_rate)
    
    model.compile(
        optimizer=optimizer,
        loss='huber',
        metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')]
    )
    
    return model

## LSTM Model

In [None]:
def build_lstm_model(hp):
    """Build LSTM model with hyperparameter tuning"""
    model = Sequential()
    
    # Tunable LSTM layers
    for i in range(hp.Int('lstm_layers', 1, 3)):  # Number of LSTM layers (1 to 3)
        if i == 0:
            model.add(layers.LSTM(
                units=hp.Int(f'units_{i}', 32, 256, step=32),  # Tunable number of units
                activation='tanh',
                input_shape=(train_x.shape[1], train_x.shape[2]),  # Input shape for the first LSTM layer
                return_sequences=True if hp.Int('lstm_layers', 1, 3) > 1 else False  # Return sequences if more layers follow
            ))
        else:
            model.add(layers.LSTM(
                units=hp.Int(f'units_{i}', 32, 256, step=32),  # Tunable number of units
                activation='tanh',
                return_sequences=True if i < hp.Int('lstm_layers', 1, 3) - 1 else False  # Return sequences if not the last layer
            ))
        model.add(layers.Dropout(
            rate=hp.Float(f'dropout_{i}', 0.0, 0.5, step=0.1)  # Tunable dropout rate
        ))
    
    # Tunable dense layers
    for j in range(hp.Int('dense_layers', 1, 3)):  # Number of dense layers (1 to 3)
        model.add(layers.Dense(
            units=hp.Int(f'dense_units_{j}', 64, 512, step=32),  # Tunable number of units
            activation='selu'
        ))
        model.add(layers.Dropout(
            rate=hp.Float(f'dense_dropout_{j}', 0.0, 0.5, step=0.1)  # Tunable dropout rate
        ))
    
    model.add(layers.Dense(1, activation='linear'))  # Output layer
    
    # Tunable learning rate
    learning_rate = hp.Choice('learning_rate', values=[1e-5, 5e-4, 1e-4, 5e-3, 1e-3])
    optimizer = Adam(learning_rate=learning_rate)
    
    model.compile(
        optimizer=optimizer,
        loss='huber',
        metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')]
    )
    
    return model

## 1D CNN Model

In [None]:
def build_1d_cnn_model(hp):
    """Build 1D CNN model with hyperparameter tuning"""
    model = Sequential()
    
    # Tunable Conv1D layers
    for i in range(hp.Int('conv_layers', 1, 3)):
        if i == 0:
            model.add(layers.Conv1D(
                filters=hp.Int(f'filters_{i}', 32, 256, step=32),
                kernel_size=hp.Int(f'kernel_size_{i}', 2, 5),
                activation='selu',
                padding='same',
                input_shape=(train_x.shape[1], train_x.shape[2])
            ))
        else:
            model.add(layers.Conv1D(
                filters=hp.Int(f'filters_{i}', 32, 256, step=32),
                kernel_size=hp.Int(f'kernel_size_{i}', 2, 5),
                activation='selu',
                padding='same'
            ))
    
    model.add(layers.Flatten())
    
    # Tunable dense layers
    for j in range(hp.Int('dense_layers', 1, 3)):
        model.add(layers.Dense(
            units=hp.Int(f'dense_units_{j}', 64, 512, step=32),
            activation='selu'
        ))
        model.add(layers.Dropout(
            rate=hp.Float(f'dropout_{j}', 0.0, 0.5, step=0.1)
        ))
    
    model.add(layers.Dense(1, activation='linear'))
    
    # Tunable learning rate
    learning_rate = hp.Choice('learning_rate', values=[1e-5, 5e-4, 1e-4, 5e-3, 1e-3])
    optimizer = Adam(learning_rate=learning_rate)
    
    model.compile(
        optimizer=optimizer,
        loss='huber',
        metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')]
    )
    
    return model

# Hyperparameter Optimization and Training

In [None]:
# Configuration
EXPERIMENT = "lstm_soc_estimation"
experiment_name = time.strftime("%Y-%m-%d-%H-%M-%S") + '_' + EXPERIMENT
print(f"Experiment: {experiment_name}")

# Set GPU device (adjust according to your setup)
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
# Choose model architecture (uncomment the desired model)
build_model = build_lstm_model  # Default to LSTM
# build_model = build_bi_lstm_model  # Uncomment for Bidirectional LSTM
# build_model = build_1d_cnn_model  # Uncomment for 1D CNN

# Tuner configuration
tuner = BayesianOptimization(
    build_model,
    objective='val_loss',
    max_trials=20,
    executions_per_trial=1,
    directory='bayesian_optimization',
    project_name=experiment_name,
    overwrite=False
)

# Display tuner search space
tuner.search_space_summary()

In [None]:
# Start hyperparameter tuning
print("Starting hyperparameter tuning...")
tuner.search(
    train_x,
    train_y,
    epochs=100,
    batch_size=2048,
    validation_split=0.2,
    callbacks=[
        EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
    ],
    verbose=1
)

print("Hyperparameter tuning completed!")

# Model Evaluation

In [None]:
def evaluate_best_models(tuner, experiment_name, num_models=5):
    """Evaluate the best models from hyperparameter tuning"""
    best_models = tuner.get_best_models(num_models=num_models)
    results = []
    
    for i, model in enumerate(best_models, 1):
        print(f"Evaluating model {i}/{num_models}...")
        
        val_metrics = model.evaluate(train_x, train_y, verbose=0)
        test_metrics = model.evaluate(test_x, test_y, verbose=0)
        
        results.append({
            "Model": f"Best_Model_{i}",
            "Val Loss": val_metrics[0],
            "Val MSE": val_metrics[1],
            "Val MAE": val_metrics[2],
            "Val MAPE": val_metrics[3],
            "Val RMSE": val_metrics[4],
            "Test Loss": test_metrics[0],
            "Test MSE": test_metrics[1],
            "Test MAE": test_metrics[2],
            "Test MAPE": test_metrics[3],
            "Test RMSE": test_metrics[4]
        })
    
    results_df = pd.DataFrame(results)
    print("\nEvaluation Results:")
    print(results_df.to_string(index=False))
    
    # Save results
    results_file = f'{experiment_name}_evaluation_results.txt'
    with open(results_file, 'w') as f:
        f.write(f"Experiment: {experiment_name}\n")
        f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write("\nEvaluation Results:\n")
        f.write(results_df.to_string(index=False))
    
    # Select best model based on test RMSE
    best_idx = results_df['Test RMSE'].idxmin()
    best_model = best_models[best_idx]
    
    # Save best model
    model_file = f'best_soc_model_{experiment_name}.h5'
    best_model.save(model_file)
    print(f"\nBest model saved as: {model_file}")
    
    return best_model, results_df

# Execute evaluation
best_model, evaluation_results = evaluate_best_models(tuner, experiment_name)

In [None]:
# Get and save the best hyperparameters
best_hyperparameters = tuner.get_best_hyperparameters(num_trials=1)[0]

hyperparams_file = f'best_hyperparameters_{experiment_name}.txt'
with open(hyperparams_file, 'w') as f:
    f.write(f"Best Hyperparameters for {experiment_name}:\n")
    f.write(f"Date: {time.strftime('%Y-%m-%d %H:%M:%S')}\n\n")
    for key, value in best_hyperparameters.values.items():
        f.write(f"{key}: {value}\n")

print(f"Best hyperparameters saved to: {hyperparams_file}")
print("\nBest Hyperparameters:")
for key, value in best_hyperparameters.values.items():
    print(f"{key}: {value}")

In [None]:
# Display model architecture
print("\nBest Model Architecture:")
best_model.summary()

# Results Visualization

In [None]:
# Generate predictions
print("Generating predictions...")
train_predictions = best_model.predict(train_x)
test_predictions = best_model.predict(test_x)

print(f"Training predictions shape: {train_predictions.shape}")
print(f"Test predictions shape: {test_predictions.shape}")

In [None]:
# Training results visualization
cycle_num = 0
steps_num = min(8000, len(train_predictions))  # Ensure we don't exceed array bounds
step_index = np.arange(cycle_num*steps_num, min((cycle_num+1)*steps_num, len(train_predictions)))

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=step_index, 
    y=train_predictions.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
    mode='lines', 
    name='SoC Predicted',
    line=dict(color='red', width=2)
))
fig.add_trace(go.Scatter(
    x=step_index, 
    y=train_y.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
    mode='lines', 
    name='SoC Actual',
    line=dict(color='blue', width=2)
))
fig.update_layout(
    title=f'Training Results - {experiment_name}',
    xaxis_title='Time Steps',
    yaxis_title='SoC Percentage',
    width=1400,
    height=600,
    legend=dict(x=0.02, y=0.98)
)
fig.show()

In [None]:
# Test results visualization
cycle_num = 0
steps_num = min(1000, len(test_predictions))  # Ensure we don't exceed array bounds
step_index = np.arange(cycle_num*steps_num, min((cycle_num+1)*steps_num, len(test_predictions)))

fig = go.Figure()
fig.add_trace(go.Scatter(
    x=step_index, 
    y=test_predictions.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
    mode='lines', 
    name='SoC Predicted',
    line=dict(color='red', width=2)
))
fig.add_trace(go.Scatter(
    x=step_index, 
    y=test_y.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
    mode='lines', 
    name='SoC Actual',
    line=dict(color='blue', width=2)
))
fig.update_layout(
    title=f'Test Results - {experiment_name}',
    xaxis_title='Time Steps',
    yaxis_title='SoC Percentage',
    width=1400,
    height=600,
    legend=dict(x=0.02, y=0.98)
)
fig.show()

In [None]:
# Calculate and display final metrics
test_metrics = best_model.evaluate(test_x, test_y, verbose=0)
metric_names = ['Loss', 'MSE', 'MAE', 'MAPE', 'RMSE']

print("\n" + "="*50)
print(f"FINAL TEST RESULTS - {experiment_name}")
print("="*50)
for name, value in zip(metric_names, test_metrics):
    print(f"{name}: {value:.6f}")
print("="*50)