### LSTM (Long-short time memory) model for SOC prediction with using LG 18650HG2 Li-ion Battery 

In [1]:
import os
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import optuna
import time
from optuna.visualization import plot_optimization_history
from tqdm import tqdm
from sklearn.metrics import mean_squared_error, mean_absolute_error

In [3]:
PROCESSED_DATA_DIR = '../../datasets/LG_dataset/LG_HG2_processed'
FEATURE_COLS = ['Voltage [V]', 'Current [A]', 'Temperature [degC]', 'Power [W]', 'Cumulative_Capacity_Ah']
LABEL_COL = 'SOC [-]'
BATCH_SIZE = 128
SEQUENCE_LENGTH = 20  
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [4]:
# Function to load data
def load_data(directory, temperatures):
    frames = []    
    for temp_folder in os.listdir(directory):
        if temp_folder in temperatures:
            temp_path = os.path.join(directory, temp_folder)
            for file in os.listdir(temp_path):
                if 'Charge' in file or 'Dis' in file:
                    continue  # Skip constant charge and discharge files
                if file.endswith('.csv'):
                    df = pd.read_csv(os.path.join(temp_path, file))
                    df['SourceFile'] = file

                    # Calculate power
                    df['Power [W]'] = df['Voltage [V]'] * df['Current [A]']
                    
                    frames.append(df)
    return pd.concat(frames, ignore_index=True)

In [5]:
# Custom dataset class for LSTM
class BatteryDatasetLSTM(Dataset):
    def __init__(self, data_tensor, labels_tensor, sequence_length=50, filenames=None, times=None):
        self.sequence_length = sequence_length
        self.features = data_tensor
        self.labels = labels_tensor
        self.filenames = filenames 
        self.times = times 

    def __len__(self):
        return len(self.features) - self.sequence_length

    def __getitem__(self, idx):
        idx_end = idx + self.sequence_length
        sequence = self.features[idx:idx_end]
        label = self.labels[idx_end - 1]
        filename = self.filenames[idx_end - 1]
        time = self.times[idx_end - 1]  

        sequence = sequence.clone().detach()
        label = label.clone().detach()

        return sequence, label, filename, time
    
    def get_unique_filenames(self):
        return set(self.filenames)
    
    def get_times(self):
        return self.times

In [6]:
# SoCLSTM Model
class SoCLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(SoCLSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, dtype=x.dtype, device=x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size, dtype=x.dtype, device=x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [39]:
# Training loop with validation
def train_and_validate(model, criterion, optimizer, train_loader, val_loader, epochs, device, patience=5, min_delta=0.0001, flag=False):
    history = {'train_loss': [], 'val_loss': []}

    best_val_loss = float('inf')
    epochs_no_improve = 0
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0.0
        epoch_start_time = time.time()
        for _, (sequences, labels, _, _) in enumerate(tqdm(train_loader, desc=f'Epoch: {epoch}/{epochs}')):  
            sequences, labels = sequences.to(device), labels.to(device)
            labels = labels.unsqueeze(1) 
            optimizer.zero_grad()
            outputs = model(sequences)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        
        epoch_end_time = time.time()
        epoch_time = epoch_end_time - epoch_start_time
        train_loss /= len(train_loader)
        history['train_loss'].append(train_loss)

        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for sequences, labels, _, _ in val_loader:  
                sequences, labels = sequences.to(device), labels.to(device)
                labels = labels.unsqueeze(1)  
                outputs = model(sequences)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

        val_loss /= len(val_loader)
        history['val_loss'].append(val_loss)

        # Early stopping logic
        if val_loss < best_val_loss - min_delta:
            best_val_loss = val_loss
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1

        print(f'Epoch {epoch + 1}/{epochs}, Train Loss: {train_loss}, Validation Loss: {val_loss}')
        print(f'Time taken for epoch: {epoch_time:.8f} seconds')
        
        if flag:
            model_path = "soc_lstm_model.pth"
            torch.save({'model_state_dict': model.state_dict(), 'input_size': len(FEATURE_COLS)}, model_path)
        if epochs_no_improve >= patience:
            print('Early stopping triggered')
            #break

    return history

In [8]:
temperatures_to_process = [folder for folder in os.listdir(PROCESSED_DATA_DIR) if 'degC' in folder]

In [9]:
data = load_data(PROCESSED_DATA_DIR, temperatures_to_process)
data

Unnamed: 0,Timestamp,Time [min],Time [s],Voltage [V],Current [A],Temperature [degC],Capacity [Ah],Time_diff,Cumulative_Capacity_Ah,SOC [-],Rounded_Time,SourceFile,Power [W]
0,2018-11-08 09:53:14,0.000000,0.000,4.18936,-0.05108,23.34519,0.00000,0.000000,0.000000,1.005569,0,562_Mixed6_processed.csv,-0.213993
1,2018-11-08 09:53:14,0.010033,0.602,4.18784,-0.09450,23.34519,-0.00001,0.000029,-0.000015,0.999995,1,562_Mixed6_processed.csv,-0.395751
2,2018-11-08 09:53:15,0.025017,1.501,4.18767,-0.09450,23.34519,-0.00004,0.000028,-0.000039,0.999987,2,562_Mixed6_processed.csv,-0.395735
3,2018-11-08 09:53:16,0.041700,2.502,4.18750,-0.09450,23.34519,-0.00006,0.000028,-0.000065,0.999977,3,562_Mixed6_processed.csv,-0.395719
4,2018-11-08 09:53:17,0.059983,3.599,4.18734,-0.09195,23.34519,-0.00009,0.000028,-0.000093,0.999966,4,562_Mixed6_processed.csv,-0.385026
...,...,...,...,...,...,...,...,...,...,...,...,...,...
532257,2018-12-18 18:24:55,112.126150,6727.569,3.48709,0.00000,-9.77974,-2.10860,0.000027,-2.112200,0.170802,6728,604_Mixed8_processed.csv,0.000000
532258,2018-12-18 18:24:56,112.142883,6728.573,3.48726,0.00000,-9.77974,-2.10860,0.000028,-2.112200,0.170802,6729,604_Mixed8_processed.csv,0.000000
532259,2018-12-18 18:24:57,112.159483,6729.569,3.48726,0.00000,-9.77974,-2.10860,0.000027,-2.112200,0.170802,6730,604_Mixed8_processed.csv,0.000000
532260,2018-12-18 18:24:58,112.176183,6730.571,3.48726,0.00000,-9.77974,-2.10860,0.000029,-2.112200,0.170802,6731,604_Mixed8_processed.csv,0.000000


In [10]:
scaler = StandardScaler()
data[FEATURE_COLS] = scaler.fit_transform(data[FEATURE_COLS])
data

Unnamed: 0,Timestamp,Time [min],Time [s],Voltage [V],Current [A],Temperature [degC],Capacity [Ah],Time_diff,Cumulative_Capacity_Ah,SOC [-],Rounded_Time,SourceFile,Power [W]
0,2018-11-08 09:53:14,0.000000,0.000,1.693907,0.421254,0.821057,0.00000,0.000000,1.531129,1.005569,0,562_Mixed6_processed.csv,0.410519
1,2018-11-08 09:53:14,0.010033,0.602,1.688779,0.402139,0.821057,-0.00001,0.000029,1.531109,0.999995,1,562_Mixed6_processed.csv,0.387702
2,2018-11-08 09:53:15,0.025017,1.501,1.688205,0.402139,0.821057,-0.00004,0.000028,1.531078,0.999987,2,562_Mixed6_processed.csv,0.387704
3,2018-11-08 09:53:16,0.041700,2.502,1.687632,0.402139,0.821057,-0.00006,0.000028,1.531044,0.999977,3,562_Mixed6_processed.csv,0.387707
4,2018-11-08 09:53:17,0.059983,3.599,1.687092,0.403262,0.821057,-0.00009,0.000028,1.531006,0.999966,4,562_Mixed6_processed.csv,0.389049
...,...,...,...,...,...,...,...,...,...,...,...,...,...
532257,2018-12-18 18:24:55,112.126150,6727.569,-0.675525,0.443741,-0.992532,-2.10860,0.000027,-1.259410,0.170802,6728,604_Mixed8_processed.csv,0.437382
532258,2018-12-18 18:24:56,112.142883,6728.573,-0.674951,0.443741,-0.992532,-2.10860,0.000028,-1.259410,0.170802,6729,604_Mixed8_processed.csv,0.437382
532259,2018-12-18 18:24:57,112.159483,6729.569,-0.674951,0.443741,-0.992532,-2.10860,0.000027,-1.259410,0.170802,6730,604_Mixed8_processed.csv,0.437382
532260,2018-12-18 18:24:58,112.176183,6730.571,-0.674951,0.443741,-0.992532,-2.10860,0.000029,-1.259410,0.170802,6731,604_Mixed8_processed.csv,0.437382


In [31]:
unique_files = np.array(list(set(data['SourceFile'])))
train_files, temp_files = train_test_split(unique_files, test_size=0.2, random_state=52)
val_files, test_files = train_test_split(temp_files, test_size=0.5, random_state=52)

In [32]:
def filter_data_by_filenames(df, filenames):
    return df[df['SourceFile'].isin(filenames)]

# Filter data for each set
train_data = filter_data_by_filenames(data, train_files)
val_data = filter_data_by_filenames(data, val_files)
test_data = filter_data_by_filenames(data, test_files)

In [33]:
# Convert to tensors
train_tensor = torch.tensor(train_data[FEATURE_COLS].values, dtype=torch.float32).to(device)
train_labels = torch.tensor(train_data[LABEL_COL].values, dtype=torch.float32).to(device)

val_tensor = torch.tensor(val_data[FEATURE_COLS].values, dtype=torch.float32).to(device)
val_labels = torch.tensor(val_data[LABEL_COL].values, dtype=torch.float32).to(device)

test_tensor = torch.tensor(test_data[FEATURE_COLS].values, dtype=torch.float32).to(device)
test_labels = torch.tensor(test_data[LABEL_COL].values, dtype=torch.float32).to(device)

In [34]:
# Convert filtered data to tensors and create dataset instances
train_dataset = BatteryDatasetLSTM(
    torch.tensor(train_data[FEATURE_COLS].values, dtype=torch.float32).to(device),
    torch.tensor(train_data[LABEL_COL].values, dtype=torch.float32).to(device),
    SEQUENCE_LENGTH,
    train_data['SourceFile'].values,
    train_data['Time [s]'].values  
)

val_dataset = BatteryDatasetLSTM(
    torch.tensor(val_data[FEATURE_COLS].values, dtype=torch.float32).to(device),
    torch.tensor(val_data[LABEL_COL].values, dtype=torch.float32).to(device),
    SEQUENCE_LENGTH,
    val_data['SourceFile'].values,
    val_data['Time [s]'].values  
)

test_dataset = BatteryDatasetLSTM(
    torch.tensor(test_data[FEATURE_COLS].values, dtype=torch.float32).to(device),
    torch.tensor(test_data[LABEL_COL].values, dtype=torch.float32).to(device),
    SEQUENCE_LENGTH,
    test_data['SourceFile'].values,
    test_data['Time [s]'].values  
)

In [35]:
# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True) 
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [36]:
# Print file names used in training, validation, and testing
train_files = train_dataset.get_unique_filenames()
val_files = val_dataset.get_unique_filenames()
test_files = test_dataset.get_unique_filenames()

train_files_sorted = sorted(train_files)
val_files_sorted = sorted(val_files)
test_files_sorted = sorted(test_files)

print("Training files:", train_files)
print("\nValidation files:", val_files)
print("\nTesting files:", test_files)

Training files: {'610_HWFET_processed.csv', '562_Mixed4_processed.csv', '610_UDDS_processed.csv', '575_HPPC_processed.csv', '576_UDDS_processed.csv', '567_Mixed2_processed.csv', '602_Mixed4_processed.csv', '590_PausCycl_processed.csv', '552_Mixed4_processed.csv', '596_HWFET_processed.csv', '589_UDDS_processed.csv', '610_US06_processed.csv', '557_Cap_1C_processed.csv', '610_Cap_1C_processed.csv', '562_Mixed6_processed.csv', '557_Mixed3_processed.csv', '589_Mixed2_processed.csv', '611_Cap_1C_processed.csv', '551_LA92_processed.csv', '571_Mixed6_processed.csv', '562_Mixed5_processed.csv', '601_US06_processed.csv', '555_HPPC_processed.csv', '551_Mixed2_processed.csv', '556_HWFET_processed.csv', '602_Cap_1C_processed.csv', '604_PausCycl_processed.csv', '604_Mixed8_processed.csv', '611_Mixed8_processed.csv', '556_Mixed2_processed.csv', '551_UDDS_processed.csv', '604_Mixed6_processed.csv', '589_Mixed1_processed.csv', '556_Mixed1_processed.csv', '571_Mixed7_processed.csv', '556_UDDS_processed.

In [37]:
print("Train features shape:", train_tensor.shape)
print("Test features shape:", test_tensor .shape)
print("Train labels shape:", train_labels.shape)
print("Test labels shape:", test_labels.shape)

Train features shape: torch.Size([428005, 5])
Test features shape: torch.Size([47112, 5])
Train labels shape: torch.Size([428005])
Test labels shape: torch.Size([47112])


## Hyperparameter tuning

In [38]:
# Define Optuna objective function
EPOCHS = 10
def objective(trial):
    # Suggest hyperparameters
    hidden_size = trial.suggest_int('hidden_size', 16, 128)
    num_layers = trial.suggest_int('num_layers', 1, 5)
    learning_rate = trial.suggest_float('learning_rate', 1e-5, 1e-1, log=True)

    # Instantiate model with suggested hyperparameters
    model = SoCLSTM(input_size=len(FEATURE_COLS), hidden_size=hidden_size, num_layers=num_layers).type(torch.float32).to(device)

    # Define your loss function and optimizer with suggested hyperparameters
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Call your train and validate function
    history = train_and_validate(model, criterion, optimizer, train_loader, val_loader, EPOCHS, device)

    # Extract last validation loss
    last_val_loss = history['val_loss'][-1]
    return last_val_loss


In [None]:
# Optuna study
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

# Extract best trial
best_trial = study.best_trial
print(f"Best trial: {best_trial}")

best_hyperparams = study.best_trial.params
print('Best hyperparameters:', best_hyperparams)

# Plot optimization history
optimization_history = plot_optimization_history(study)
optimization_history.show()

In [None]:
# Initialize the model, loss function, and optimizer
hidden_size = best_hyperparams['hidden_size']
num_layers = best_hyperparams['num_layers']
lr = best_hyperparams['learning_rate']

EPOCHS = 20

model = SoCLSTM(input_size=len(FEATURE_COLS), hidden_size=hidden_size, num_layers=num_layers)
model.to(device).type(torch.float32)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.MSELoss()

print(model)
total_params = sum(p.numel() for p in model.parameters())
print(f"Trainable parameters: {total_params}")

In [None]:
# Train and validate the model
history = train_and_validate(model, criterion, optimizer, train_loader, val_loader, EPOCHS, device, flag=True)

# Plot training history
plt.figure(figsize=(10, 5))
plt.plot(history['train_loss'], label='Train Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training and Validation Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.show()

# Save the model
model_path = "soc_lstm_model.pth"
torch.save({'model_state_dict': model.state_dict(), 'input_size': len(FEATURE_COLS)}, model_path)

In [None]:
model_path = "soc_lstm_model.pth"
    
def load_lstm_model(model_path, input_size, hidden_size, num_layers):
    model = SoCLSTM(input_size=len(FEATURE_COLS), hidden_size=hidden_size, num_layers=num_layers).to(device).type(torch.float32)
    model.load_state_dict(torch.load(model_path, map_location=device)['model_state_dict'])
    model.to(device)
    model.eval()
    return model

loaded_model = load_lstm_model(model_path, input_size=len(FEATURE_COLS), hidden_size=hidden_size, num_layers=num_layers)

In [None]:
def test_model(model, test_loader, device):
    model.eval()
    test_predictions = []
    test_labels = []

    with torch.no_grad():
        for inputs, labels, _, _ in test_loader: 
            outputs = model(inputs)
            test_predictions.extend(outputs.cpu().view(-1).tolist())
            test_labels.extend(labels.cpu().view(-1).tolist())

    return test_predictions, test_labels

# Evaluate the model
test_predictions, test_labels = test_model(loaded_model, test_loader, device)

# Convert predictions and labels to numpy arrays for error calculation
test_predictions_np = np.array(test_predictions)
test_labels_np = np.array(test_labels)

In [None]:
# Calculate metrics MAE, MSE, STD
mse = mean_squared_error(test_labels_np, test_predictions_np)
mae = mean_absolute_error(test_labels_np, test_predictions_np)
stddev = np.std(test_labels_np - test_predictions_np)

print(f"Test MSE: {mse:.6f}")
print(f"Test MAE: {mae:.6f}")
print(f"Test StdDev: {stddev:.6f}")

In [None]:
plt.figure(figsize=(8, 8))
plt.scatter(test_labels, test_predictions, alpha=0.5)
plt.xlabel('True Values [SOC]')
plt.ylabel('Predictions [SOC]')
plt.axis('equal')
plt.axis('square')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.plot([0, 1], [0, 1], color='red') 
plt.title('Predicted SOC vs True SOC')
plt.show()

In [None]:
def test_model(model, test_loader, device):
    model.eval()
    test_results = {}

    with torch.no_grad():
        for inputs, labels, filenames, times in test_loader: 
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            predictions = outputs.cpu().view(-1).numpy()
            labels = labels.cpu().view(-1).numpy()

            for filename, time, pred, label in zip(filenames, times, predictions, labels):
                if filename not in test_results:
                    test_results[filename] = {'times': [], 'predictions': [], 'labels': []}
                test_results[filename]['times'].append(time)
                test_results[filename]['predictions'].append(pred)
                test_results[filename]['labels'].append(label)

    return test_results

def plot_soc_over_time(test_results):
    for filename, data in test_results.items():
        times = data['times']
        predictions = data['predictions']
        labels = data['labels']

        plt.figure(figsize=(12, 6))
        plt.plot(times, labels, label='True SOC', color='blue')
        plt.plot(times, predictions, label='Predicted SOC', color='red')
        plt.title(f'Test File: {filename}')
        plt.xlabel('Time [s]')
        plt.ylabel('SOC')
        plt.legend()
        plt.show()

# Evaluate the model on the test set
test_results = test_model(loaded_model, test_loader, device)

# Plot the SOC over time for each test file
plot_soc_over_time(test_results)