In [2]:
import numpy as np
import pandas as pd
import scipy.io
import math
import os
import ntpath
import sys
import logging
import time
import sys
import random

from importlib import reload
import plotly.graph_objects as go

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, regularizers

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation
from tensorflow.keras.optimizers import SGD, Adam
from tensorflow.keras.layers import LSTM, Embedding, RepeatVector, TimeDistributed, Masking, Bidirectional, Lambda, LayerNormalization, Flatten, MultiHeadAttention
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, LambdaCallback
from tensorflow.keras import layers


In [3]:
# Checking environment
IS_COLAB = True
IS_TRAINING = True
MODEL = "LSTM"
RESULT_NAME = ""
data_path = "/content/drive/My Drive/battery-rul-estimation/" if IS_COLAB else "../../"

# If on Colab, mount the drive
if IS_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')

# Appending to system path
sys.path.append(data_path)

# Custom modules
from data_processing.nasa_random_data import NasaRandomizedData
from data_processing.prepare_rul_data import RulHandler


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
# Logging configuration
reload(logging)
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s', level=logging.DEBUG, datefmt='%Y/%m/%d %H:%M:%S')

# Data parameters
train_names = [
        'Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW1',
        'Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW2',
        'Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW7',

        #'Battery_Uniform_Distribution_Discharge_Room_Temp_DataSet_2Post/data/Matlab/RW3',
        'Battery_Uniform_Distribution_Discharge_Room_Temp_DataSet_2Post/data/Matlab/RW4',
        'Battery_Uniform_Distribution_Discharge_Room_Temp_DataSet_2Post/data/Matlab/RW5',

        #'Battery_Uniform_Distribution_Charge_Discharge_DataSet_2Post/data/Matlab/RW9',
        #'Battery_Uniform_Distribution_Charge_Discharge_DataSet_2Post/data/Matlab/RW10',
        #'Battery_Uniform_Distribution_Charge_Discharge_DataSet_2Post/data/Matlab/RW11',

        'RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW13',
        'RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW14',
        'RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW15',

        'RW_Skewed_High_Room_Temp_DataSet_2Post/data/Matlab/RW17',
        'RW_Skewed_High_Room_Temp_DataSet_2Post/data/Matlab/RW18',
        'RW_Skewed_High_Room_Temp_DataSet_2Post/data/Matlab/RW19',

        'RW_Skewed_Low_40C_DataSet_2Post/data/Matlab/RW21',
        'RW_Skewed_Low_40C_DataSet_2Post/data/Matlab/RW22',
        'RW_Skewed_Low_40C_DataSet_2Post/data/Matlab/RW23',

        'RW_Skewed_High_40C_DataSet_2Post/data/Matlab/RW25',
        'RW_Skewed_High_40C_DataSet_2Post/data/Matlab/RW26',
        'RW_Skewed_High_40C_DataSet_2Post/data/Matlab/RW27',
]

test_names = [
        'Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW8',
        #'Battery_Uniform_Distribution_Discharge_Room_Temp_DataSet_2Post/data/Matlab/RW6',
        #'Battery_Uniform_Distribution_Charge_Discharge_DataSet_2Post/data/Matlab/RW12',
        'RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW16',
        'RW_Skewed_High_Room_Temp_DataSet_2Post/data/Matlab/RW20',
        'RW_Skewed_Low_40C_DataSet_2Post/data/Matlab/RW24',
        'RW_Skewed_High_40C_DataSet_2Post/data/Matlab/RW28',
]

In [5]:
# Data handlers
nasa_data_handler = NasaRandomizedData(data_path)
rul_handler = RulHandler()


In [None]:
CAPACITY_THRESHOLDS = None
NOMINAL_CAPACITY = 2.2
N_CYCLE = 50
WARMUP_TRAIN = 15
WARMUP_TEST = 30

(train_x, train_y_soh, test_x, test_y_soh,
  train_battery_range, test_battery_range,
  time_train, time_test, current_train, current_test) = nasa_data_handler.get_discharge_whole_cycle_future(train_names, test_names)

train_y = rul_handler.prepare_y_future(train_names, train_battery_range, train_y_soh, current_train, time_train, CAPACITY_THRESHOLDS, capacity=NOMINAL_CAPACITY)
del globals()["current_train"]
del globals()["time_train"]
test_y = rul_handler.prepare_y_future(test_names, test_battery_range, test_y_soh, current_test, time_test, CAPACITY_THRESHOLDS, capacity=NOMINAL_CAPACITY)
del globals()["current_test"]
del globals()["time_test"]
train_x, test_x = rul_handler.compress_cycle(train_x, test_x)

x_norm = rul_handler.Normalization()
train_x, test_x = x_norm.fit_and_normalize(train_x, test_x)

train_x = rul_handler.battery_life_to_time_series(train_x, N_CYCLE, train_battery_range)
test_x = rul_handler.battery_life_to_time_series(test_x, N_CYCLE, test_battery_range)

train_x, train_y, train_battery_range, train_y_soh = rul_handler.delete_initial(train_x, train_y, train_battery_range, train_y_soh, WARMUP_TRAIN)
test_x, test_y, test_battery_range, test_y_soh = rul_handler.delete_initial(test_x, test_y, test_battery_range, test_y_soh, WARMUP_TEST)

train_x, train_y, train_battery_range, train_y_soh = rul_handler.limit_zeros(train_x, train_y, train_battery_range, train_y_soh)
test_x, test_y, test_battery_range, test_y_soh = rul_handler.limit_zeros(test_x, test_y, test_battery_range, test_y_soh)

# first one is SOH, we keep only RUL
train_y = train_y[:,1]
test_y = test_y[:,1]

2023/09/27 20:05:16 [INFO]: Loading train data...
2023/09/27 20:05:16 [INFO]: Processing file Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW1
2023/09/27 20:05:25 [INFO]: Processing file Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW2
2023/09/27 20:05:34 [INFO]: Processing file Battery_Uniform_Distribution_Variable_Charge_Room_Temp_DataSet_2Post/data/Matlab/RW7
2023/09/27 20:05:41 [INFO]: Processing file Battery_Uniform_Distribution_Discharge_Room_Temp_DataSet_2Post/data/Matlab/RW4
2023/09/27 20:05:47 [INFO]: Processing file Battery_Uniform_Distribution_Discharge_Room_Temp_DataSet_2Post/data/Matlab/RW5
2023/09/27 20:05:52 [INFO]: Processing file RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW13
2023/09/27 20:06:08 [INFO]: Processing file RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW14
2023/09/27 20:06:22 [INFO]: Processing file RW_Skewed_Low_Room_Temp_DataSet_2Post/data/Matlab/RW15
2023/09/27 20:06:40 

In [6]:
if IS_TRAINING and MODEL == "transformer":
    EXPERIMENT = "transformer_rul_nasa_randomized"
    experiment_name = time.strftime("%Y-%m-%d-%H-%M-%S") + '_' + EXPERIMENT
    print(experiment_name)

    class MultiHeadAttention(tf.keras.layers.Layer):
        def __init__(self, d_model, num_heads):
            super(MultiHeadAttention, self).__init__()
            self.num_heads = num_heads
            self.d_model = d_model

            assert d_model % self.num_heads == 0

            self.depth = d_model // self.num_heads

            self.wq = tf.keras.layers.Dense(d_model)
            self.wk = tf.keras.layers.Dense(d_model)
            self.wv = tf.keras.layers.Dense(d_model)

            self.dense = tf.keras.layers.Dense(d_model)

        def split_heads(self, x, batch_size):
            """Split the last dimension into (num_heads, depth)."""
            x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
            return tf.transpose(x, perm=[0, 2, 1, 3])

        def call(self, v, k, q, mask):
            batch_size = tf.shape(q)[0]

            q = self.wq(q)
            k = self.wk(k)
            v = self.wv(v)

            q = self.split_heads(q, batch_size)
            k = self.split_heads(k, batch_size)
            v = self.split_heads(v, batch_size)

            scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v, mask)

            scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
            concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

            output = self.dense(concat_attention)
            return output

        def scaled_dot_product_attention(self, q, k, v, mask):
            matmul_qk = tf.matmul(q, k, transpose_b=True)

            d_k = tf.cast(tf.shape(k)[-1], tf.float32)
            scaled_attention_logits = matmul_qk / tf.math.sqrt(d_k)

            if mask is not None:
                scaled_attention_logits += (mask * -1e9)

            attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
            output = tf.matmul(attention_weights, v)

            return output, attention_weights

    class EncoderLayer(tf.keras.layers.Layer):
        def __init__(self, d_model, num_heads, dff, rate=0.1):
            super(EncoderLayer, self).__init__()

            self.mha = MultiHeadAttention(d_model, num_heads)
            self.ffn = self.point_wise_feed_forward_network(d_model, dff)

            self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
            self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)

            self.dropout1 = tf.keras.layers.Dropout(rate)
            self.dropout2 = tf.keras.layers.Dropout(rate)

        def call(self, x, training, mask=None):
            attn_output = self.mha(x, x, x, mask)
            attn_output = self.dropout1(attn_output, training=training)
            out1 = self.layernorm1(x + attn_output)

            ffn_output = self.ffn(out1)
            ffn_output = self.dropout2(ffn_output, training=training)
            out2 = self.layernorm2(out1 + ffn_output)

            return out2

        def point_wise_feed_forward_network(self, d_model, dff):
            return tf.keras.Sequential([
                tf.keras.layers.Dense(dff, activation='relu'),
                tf.keras.layers.Dense(d_model)
            ])

    def build_transformer_model(input_shape, d_model, num_heads, dff, rate=0.1):
        inputs = tf.keras.Input(shape=input_shape)

        x = EncoderLayer(d_model, num_heads, dff, rate)(inputs)
        x = tf.keras.layers.GlobalAveragePooling1D()(x)
        x = tf.keras.layers.Dense(64, activation='selu', kernel_regularizer=regularizers.l2(0.0002))(x)
        x = tf.keras.layers.Dense(32, activation='selu', kernel_regularizer=regularizers.l2(0.0002))(x)
        outputs = tf.keras.layers.Dense(1, activation='linear')(x)

        model = tf.keras.Model(inputs=inputs, outputs=outputs)
        return model

    model = build_transformer_model((train_x.shape[1], train_x.shape[2]), 6, 2, 512)
    opt = tf.keras.optimizers.Adam(lr=0.000003)
    model.compile(optimizer=opt, loss='huber', metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')])
    model.summary()

In [7]:
# Model definition and training
if IS_TRAINING and MODEL == "LSTM":
    EXPERIMENT = "lstm_rul_nasa_randomized"
    experiment_name = time.strftime("%Y-%m-%d-%H-%M-%S") + '_' + EXPERIMENT
    print(experiment_name)

    model = tf.keras.models.Sequential([
        tf.keras.layers.Masking(input_shape=(train_x.shape[1], train_x.shape[2])),
        tf.keras.layers.LSTM(128, activation='selu', return_sequences=True, kernel_regularizer=tf.keras.regularizers.l2(0.0002)),
        tf.keras.layers.LSTM(64, activation='selu', return_sequences=False, kernel_regularizer=tf.keras.regularizers.l2(0.0002)),
        tf.keras.layers.Dense(64, activation='selu', kernel_regularizer=tf.keras.regularizers.l2(0.0002)),
        tf.keras.layers.Dense(32, activation='selu', kernel_regularizer=tf.keras.regularizers.l2(0.0002)),
        tf.keras.layers.Dense(1, activation='linear')
    ])

    opt = tf.keras.optimizers.Adam(lr=0.000003)
    model.compile(optimizer=opt, loss='huber', metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')])
    model.summary()

    # Model training
    history = model.fit(train_x, train_y, epochs=50, batch_size=32, verbose=1, validation_split=0)

    # Save model and history
    model.save(f'{data_path}results/trained_model/{experiment_name}.h5')
    hist_df = pd.DataFrame(history.history)
    hist_df.to_csv(f'{data_path}results/trained_model/{experiment_name}_history.csv')

# If not training, load model and print history
else:
    model = tf.keras.models.load_model(f'{data_path}results/trained_model/{RESULT_NAME}.h5')
    model.summary()

    history = pd.read_csv(f'{data_path}results/trained_model/{RESULT_NAME}_history.csv')
    with pd.option_context('display.max_rows', None, 'display.max_columns', None):
        print(history)



2023-09-27-20-15-41_lstm_rul_nasa_randomized


NameError: ignored

In [8]:
# Evaluate the model
results = model.evaluate(test_x, test_y, return_dict=True)
print(results)

NameError: ignored

In [None]:
# Compute max RMSE
max_rmse = max([model.evaluate(np.array([test_x[index, :, :]]), np.array([test_y[index]]), return_dict=True, verbose=0)['rmse'] for index in range(test_x.shape[0])])
print(f"Max rmse: {max_rmse}")

In [None]:
# Plotting loss trend
fig = go.Figure()
fig.add_trace(go.Scatter(y=history['loss'], mode='lines', name='train'))
if 'val_loss' in history:
    fig.add_trace(go.Scatter(y=history['val_loss'], mode='lines', name='validation'))
fig.update_layout(title='Loss trend', xaxis_title='epoch', yaxis_title='loss', width=1400, height=600)
fig.show()