In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from mvf_bto.data_loading import load_data
from mvf_bto.constants import * 
from mvf_bto.models.baseline_lstm import BaselineLSTM
from mvf_bto.preprocessing import create_discharge_inputs
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.metrics import MeanSquaredError
from scipy.interpolate import interp1d

import numpy as np
import pandas as pd
import plotly
import plotly.graph_objects as go

## Loading Data

In [None]:
data_path = "/Users/anoushkabhutani/PycharmProjects/10701-mvf-bto/data/2017-05-12_batchdata_updated_struct_errorcorrect.mat"

In [None]:
data = load_data(file_path=data_path, num_cells=9)

## Preprocessing to create model inputs and targets

In [None]:
train_split = 0.7
test_split = 0.2
# by default uses validation_split = 1 - (train_split + test_split)


In [None]:
datasets = create_discharge_inputs(data, train_split, test_split, forecast_horizon=2, history_window=4)

## Train Model

In [None]:
window_length = datasets["X_train"].shape[1]
n_features = datasets["X_train"].shape[2]
batch_input_shape = (datasets["batch_size"], window_length, n_features)
n_outputs = datasets["y_train"].shape[-1]
nf_steps = datasets["y_train"].shape[1]
y = datasets["y_train"][:, 0, 0]
idx = y < 2.9
weights = np.ones_like(y) * 1
weights[idx] = 2

In [None]:
model = BaselineLSTM(batch_input_shape=batch_input_shape, n_outputs=n_outputs, nf_steps=nf_steps)

In [None]:
model.compile(optimizer="adam", loss="mse", metrics=[MeanSquaredError()])

es = EarlyStopping(
    monitor="val_mean_squared_error",
    min_delta=0,
    patience=10,
    verbose=1,
    mode="auto",
    restore_best_weights=True,
)

history = model.fit(
    datasets["X_train"],
    datasets["y_train"],
    validation_data=(datasets["X_val"], datasets["y_val"]),
    epochs=250,
    batch_size=datasets["batch_size"],
    shuffle=False,
    callbacks=[es],
    verbose=1,
    sample_weight=weights

)

In [None]:
fig = go.Figure()
fig.add_trace(
    go.Scatter(
        x=np.linspace(1, 50),
        y=history.history["loss"],
        showlegend=False,
        mode="markers+lines",
    )
)
fig.update_xaxes(title="Epochs")
fig.update_yaxes(title="Loss (MSE)")

## Parity Plot of Training Error

In [None]:
# random plotting traing error at some interval = skip to not make the plot rendering too slow
batch_size = datasets["batch_size"]
skip = 70

fig = go.Figure()
fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], showlegend=False, mode="markers+lines"))
for i in range(0, len(datasets["X_train"]), batch_size * skip):
    df_pred = pd.DataFrame(model.predict(datasets["X_train"][i : i + batch_size], verbose=0)[:, :, 0])
    df_train = pd.DataFrame(datasets["y_train"][i : i + batch_size][:, :, 0])
    fig.add_trace(
        go.Scatter(
            x=df_pred[0].values,
            y=df_train[0].values,
            showlegend=False,
            mode="markers+lines",
        )
    )

fig.update_yaxes(title="Normalized Voltage Target")
fig.update_xaxes(title="Normalized Voltage Prediction")
fig.update_layout(template="simple_white")

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], showlegend=False, mode="markers+lines"))
for i in range(0, len(datasets["X_train"]), batch_size * skip):
    df_pred = pd.DataFrame(model.predict(datasets["X_train"][i : i + batch_size], verbose=0)[:, :, 0])
    df_train = pd.DataFrame(datasets["y_train"][i : i + batch_size][:, :, 0])
    fig.add_trace(
        go.Scatter(
            x=df_pred[0].values,
            y=df_train[0].values,
            showlegend=False,
            mode="markers+lines",
        )
    )

fig.update_yaxes(title="Normalized Temperature Target")
fig.update_xaxes(title="Normalized Temperature Prediction")
fig.update_layout(template="simple_white")

## Parity Plot of Test Error

In [None]:
skip = 20

fig = go.Figure()
fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], showlegend=False, mode="markers+lines"))
for i in range(0, len(datasets["X_test"]), batch_size * skip):
    df_pred = pd.DataFrame(model.predict(datasets["X_test"][i : i + batch_size], verbose=0)[:, :, 0])
    df_train = pd.DataFrame(datasets["y_test"][i : i + batch_size][:, :, 0])
    fig.add_trace(
        go.Scatter(
            x=df_pred[0].values,
            y=df_train[0].values,
            showlegend=False,
            mode="markers+lines",
        )
    )

fig.update_yaxes(title="Normalized Voltage Target")
fig.update_xaxes(title="Normalized Voltage Prediction")
fig.update_layout(template="simple_white")

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(x=[0, 1], y=[0, 1], showlegend=False, mode="markers+lines"))
for i in range(0, len(datasets["X_test"]), batch_size * skip):
    df_pred = pd.DataFrame(model.predict(datasets["X_test"][i : i + batch_size], verbose=0)[:, :, 1])
    df_train = pd.DataFrame(datasets["y_test"][i : i + batch_size][:, :, 1])
    fig.add_trace(
        go.Scatter(
            x=df_pred[0].values,
            y=df_train[0].values,
            showlegend=False,
            mode="markers+lines",
        )
    )

fig.update_yaxes(title="Normalized Temperature Target")
fig.update_xaxes(title="Normalized Temperature Prediction")
fig.update_layout(template="simple_white")

## True vs Predicted Traces (Train Set)

In [None]:
train_cell_ids = datasets['original_train']['Cell'].unique()
train_cell_ids

In [None]:
symbol_list = ["circle-open", "circle", "triangle-up"]
pallete = plotly.colors.qualitative.Dark24 + plotly.colors.qualitative.T10
pallete = pallete*70000

In [None]:
skip = 500
train_cell_id_idx = 0
last_cycle = 1
current_cycle = 1
opacity_list = [1, 0.6, 0.3]
fig = go.Figure()
for i in range(0, len(datasets["X_train"]), batch_size * skip):
    df_true = pd.DataFrame(datasets["y_train"][i : i + batch_size][:, 0, 0])
    
    last_cycle = current_cycle
    current_cycle = int(datasets["X_train"][i : i + batch_size][0][0][-1]*MAX_CYCLE)
    if current_cycle>740:
        continue
    if last_cycle> current_cycle:
        train_cell_id_idx += 1
        
    cell_id = train_cell_ids[train_cell_id_idx]
    original_df = datasets["original_train"]

    original_df= original_df[original_df.Cycle==current_cycle]
    original_df= original_df[original_df.Cell==cell_id]
    original_df = original_df[original_df.I < MAX_DISCHARGE_CURRENT]
    original_df = original_df[original_df.I > MIN_DISCHARGE_CURRENT]

    original_df['Qd'] = (original_df['Qd']-original_df['Qd'].min())/(original_df['Qd'].max()-original_df['Qd'].min())

    if len(original_df)< 2:
        continue
    q_new = original_df.Qd.values
    t_new = original_df['t'].values
    time_interpolator = interp1d(x = q_new,  y = t_new, fill_value="extrapolate")
    
    
    for j in range(nf_steps):
        df_pred = pd.DataFrame(model.predict(datasets["X_train"][i : i + batch_size], verbose=0)[:, j, 0])
    
        ref_capacities_wrt_nf = REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1]

        prediction_interpolator = interp1d(x = ref_capacities_wrt_nf, 
                                           y = df_pred[0].values*(VOLTAGE_MAX - VOLTAGE_MIN) + VOLTAGE_MIN,
                                           
                                          )
        
        q_j = q_new[q_new>= min(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        q_j = q_j[q_j<= max(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        t_interp = time_interpolator(REFERENCE_CAPACITIES[window_length:-nf_steps-1])
        if j==0:
            q_j0 = q_j
        V_pred = prediction_interpolator(q_j)
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(q_j),
                y=V_pred,
                showlegend=True,
                mode="lines",
                line_dash="dash",
                name = "Interpolated Predictions",
                marker_color=pallete[i],
                opacity=opacity_list[j]
            )
        )
        
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(ref_capacities_wrt_nf),
                y=df_pred[0].values*(VOLTAGE_MAX - VOLTAGE_MIN) + VOLTAGE_MIN,
                showlegend=True,
                mode="markers",
                name = f"Predictions Forecast Horizon {j+1}",
                marker_color=pallete[i],
                marker_symbol = symbol_list[j],
                marker_size=8,
#                 marker_opacity=opacity_list[j]
            )
        )


    odf = original_df[original_df.Qd > q_j0.min()]
    odf = odf[odf.Qd < q_j.max()]
    fig.add_trace(
        go.Scatter(
            x=odf['t'],
            y=odf['V'],
            showlegend=True,
            mode="lines",
            name = f"{cell_id} Cycle {current_cycle}",
            line_color=pallete[i]
        )
    )


fig.update_yaxes(title="Voltage [V]", showgrid=True)
fig.update_xaxes(title="Times (min)", showgrid=True)
fig.update_layout(template="simple_white")

In [None]:
train_cell_id_idx = 0
last_cycle = 1
current_cycle = 1

fig = go.Figure()
for i in range(0, len(datasets["X_train"]), batch_size * skip):
    df_true = pd.DataFrame(datasets["y_train"][i : i + batch_size][:, 0, 1])
    
    last_cycle = current_cycle
    current_cycle = int(datasets["X_train"][i : i + batch_size][0][0][-1]*MAX_CYCLE)
    if current_cycle>740:
        continue
    if last_cycle> current_cycle:
        train_cell_id_idx += 1
        
    cell_id = train_cell_ids[train_cell_id_idx]
    original_df = datasets["original_train"]

    original_df= original_df[original_df.Cycle==current_cycle]
    original_df= original_df[original_df.Cell==cell_id]
    original_df = original_df[original_df.I < MAX_DISCHARGE_CURRENT]
    original_df = original_df[original_df.I > MIN_DISCHARGE_CURRENT]

    original_df['Qd'] = (original_df['Qd']-original_df['Qd'].min())/(original_df['Qd'].max()-original_df['Qd'].min())

    if len(original_df)< 2:
        continue
    q_new = original_df.Qd.values
    t_new = original_df['t'].values
    time_interpolator = interp1d(x = q_new,  y = t_new, fill_value="extrapolate")
    
    
    for j in range(nf_steps):
        df_pred = pd.DataFrame(model.predict(datasets["X_train"][i : i + batch_size], verbose=0)[:, j, 1])
    
        ref_capacities_wrt_nf = REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1]

        prediction_interpolator = interp1d(x = ref_capacities_wrt_nf, 
                                           y = df_pred[0].values*(TEMPERATURE_MAX - TEMPERATURE_MIN) + TEMPERATURE_MIN,
                                           
                                          )
        
        q_j = q_new[q_new>= min(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        q_j = q_j[q_j<= max(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        t_interp = time_interpolator(REFERENCE_CAPACITIES[window_length:-nf_steps-1])
        if j==0:
            q_j0 = q_j
        V_pred = prediction_interpolator(q_j)
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(q_j),
                y=V_pred,
                showlegend=True,
                mode="lines",
                line_dash="dash",
                name = "Interpolated Predictions",
                marker_color=pallete[i//skip],
                opacity=opacity_list[j]
            )
        )
        
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(ref_capacities_wrt_nf),
                y=df_pred[0].values*(TEMPERATURE_MAX - TEMPERATURE_MIN) + TEMPERATURE_MIN,
                showlegend=True,
                mode="markers",
                name = f"Predictions Forecast Horizon {j+1}",
                marker_color=pallete[i//skip],
                marker_symbol = symbol_list[j],
                marker_size=8,
            )
        )


    odf = original_df[original_df.Qd > q_j0.min()]
    odf = odf[odf.Qd < q_j.max()]
    fig.add_trace(
        go.Scatter(
            x=odf['t'],
            y=odf['temp'],
            showlegend=True,
            mode="lines",
            name = f"{cell_id} Cycle {current_cycle}",
            line_color=pallete[i//skip]
        )
    )


fig.update_yaxes(title="Temperature [°C]", showgrid=True)
fig.update_xaxes(title="Times (min)", showgrid=True)
fig.update_layout(template="simple_white")

## True vs Predicted Traces (Test Set)

In [None]:
test_cell_ids = datasets['original_test']['Cell'].unique()
test_cell_ids
len(datasets["X_test"])

In [None]:
skip = 300
test_cell_id_idx = 0
last_cycle = 1
current_cycle = 1
opacity_list = [1, 0.6, 0.3]
fig = go.Figure()
for i in range(0, len(datasets["X_test"]), batch_size * skip):
    df_true = pd.DataFrame(datasets["y_test"][i : i + batch_size][:, 0, 0])
    
    last_cycle = current_cycle
    current_cycle = int(datasets["X_test"][i : i + batch_size][0][0][-1]*MAX_CYCLE)

    if last_cycle> current_cycle:
        test_cell_id_idx += 1
        
    cell_id = test_cell_ids[test_cell_id_idx]
    original_df = datasets["original_test"]

    original_df= original_df[original_df.Cycle==current_cycle]
    original_df= original_df[original_df.Cell==cell_id]
    original_df = original_df[original_df.I < MAX_DISCHARGE_CURRENT]
    original_df = original_df[original_df.I > MIN_DISCHARGE_CURRENT]

    original_df['Qd'] = (original_df['Qd']-original_df['Qd'].min())/(original_df['Qd'].max()-original_df['Qd'].min())

    if len(original_df)< 2:
        continue
    q_new = original_df.Qd.values
    t_new = original_df['t'].values
    time_interpolator = interp1d(x = q_new,  y = t_new, fill_value="extrapolate")
    
    
    for j in range(nf_steps):
        df_pred = pd.DataFrame(model.predict(datasets["X_test"][i : i + batch_size], verbose=0)[:, j, 0])
    
        ref_capacities_wrt_nf = REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1]

        prediction_interpolator = interp1d(x = ref_capacities_wrt_nf, 
                                           y = df_pred[0].values*(VOLTAGE_MAX - VOLTAGE_MIN) + VOLTAGE_MIN,
                                           
                                          )
        
        q_j = q_new[q_new>= min(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        q_j = q_j[q_j<= max(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        t_interp = time_interpolator(REFERENCE_CAPACITIES[window_length:-nf_steps-1])
        if j==0:
            q_j0 = q_j
        V_pred = prediction_interpolator(q_j)
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(q_j),
                y=V_pred,
                showlegend=True,
                mode="lines",
                line_dash="dash",
                name = "Interpolated Predictions",
                marker_color=pallete[i//skip],
                opacity=opacity_list[j]
            )
        )
        
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(ref_capacities_wrt_nf),
                y=df_pred[0].values*(VOLTAGE_MAX - VOLTAGE_MIN) + VOLTAGE_MIN,
                showlegend=True,
                mode="markers",
                name = f"Predictions Forecast Horizon {j+1}",
                marker_color=pallete[i//skip],
                marker_symbol = symbol_list[j],
                marker_size=8,
#                 marker_opacity=opacity_list[j]
            )
        )


    odf = original_df[original_df.Qd > q_j0.min()]
    odf = odf[odf.Qd < q_j.max()]
    fig.add_trace(
        go.Scatter(
            x=odf['t'],
            y=odf['V'],
            showlegend=True,
            mode="lines",
            name = f"{cell_id} Cycle {current_cycle}",
            line_color=pallete[i//skip]
        )
    )


fig.update_yaxes(title="Voltage [V]", showgrid=True)
fig.update_xaxes(title="Times (min)", showgrid=True)
fig.update_layout(template="simple_white")

In [None]:
test_cell_id_idx = 0
last_cycle = 1
current_cycle = 1

fig = go.Figure()
for i in range(0, len(datasets["X_test"]), batch_size * skip):
    
    last_cycle = current_cycle
    current_cycle = int(datasets["X_test"][i : i + batch_size][0][0][-1]*MAX_CYCLE)
    
    if last_cycle> current_cycle:
        test_cell_id_idx += 1
        
    cell_id = test_cell_ids[test_cell_id_idx]
    original_df = datasets["original_test"]

    original_df= original_df[original_df.Cycle==current_cycle]
    original_df= original_df[original_df.Cell==cell_id]
    original_df = original_df[original_df.I < MAX_DISCHARGE_CURRENT]
    original_df = original_df[original_df.I > MIN_DISCHARGE_CURRENT]

    original_df['Qd'] = (original_df['Qd']-original_df['Qd'].min())/(original_df['Qd'].max()-original_df['Qd'].min())

    if len(original_df)< 2:
        continue
    q_new = original_df.Qd.values
    t_new = original_df['t'].values
    time_interpolator = interp1d(x = q_new,  y = t_new, fill_value="extrapolate")
    
    
    for j in range(nf_steps):
        df_pred = pd.DataFrame(model.predict(datasets["X_test"][i : i + batch_size], verbose=0)[:, j, 1])
    
        ref_capacities_wrt_nf = REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1]

        prediction_interpolator = interp1d(x = ref_capacities_wrt_nf, 
                                           y = df_pred[0].values*(TEMPERATURE_MAX - TEMPERATURE_MIN) + TEMPERATURE_MIN,
                                           
                                          )
        
        q_j = q_new[q_new>= min(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        q_j = q_j[q_j<= max(REFERENCE_CAPACITIES[window_length+j:-nf_steps+j-1])]
        t_interp = time_interpolator(REFERENCE_CAPACITIES[window_length:-nf_steps-1])
        if j==0:
            q_j0 = q_j
        V_pred = prediction_interpolator(q_j)
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(q_j),
                y=V_pred,
                showlegend=True,
                mode="lines",
                line_dash="dash",
                name = "Interpolated Predictions",
                marker_color=pallete[i//skip],
                opacity=opacity_list[j]
            )
        )
        
        fig.add_trace(
            go.Scatter(
                x=time_interpolator(ref_capacities_wrt_nf),
                y=df_pred[0].values*(TEMPERATURE_MAX - TEMPERATURE_MIN) + TEMPERATURE_MIN,
                showlegend=True,
                mode="markers",
                name = f"Predictions Forecast Horizon {j+1}",
                marker_color=pallete[i//skip],
                marker_symbol = symbol_list[j],
                marker_size=8,
#                 marker_opacity=opacity_list[j]
            )
        )


    odf = original_df[original_df.Qd > q_j0.min()]
    odf = odf[odf.Qd < q_j.max()]
    fig.add_trace(
        go.Scatter(
            x=odf['t'],
            y=odf['temp'],
            showlegend=True,
            mode="lines",
            name = f"{cell_id} Cycle {current_cycle}",
            line_color=pallete[i//skip]
        )
    )


fig.update_yaxes(title="Temperature [°C]", showgrid=True)
fig.update_xaxes(title="Times (min)", showgrid=True)
fig.update_layout(template="simple_white")

## Metrics

In [None]:
def error_calculation(model, datasets, error_function):
    results = {}
    batch_size = datasets["batch_size"]
    n_outputs = datasets["y_train"].shape[-1]
    nf_steps = datasets["y_train"].shape[1]
    for dset in ["train", "test"]:
        skip = 200 if dset == "train"  else 50
        for output in range(n_outputs):
            for step in range(nf_steps):
                collector = []
                for i in range(0, len(datasets[f"X_{dset}"]), batch_size*skip):
                    true = datasets[f"y_{dset}"][i : i + batch_size][:, j, output]
                    pred = model.predict(datasets[f"X_{dset}"][i : i + batch_size], verbose=0)[:, j, output]
                    error = error_function(true, pred)
                    collector.append(error)
                results[f"{dset}_output{output}_forecasthorizon{step}"] =sum(collector)/len(collector)
                         
    return results

In [None]:
root_mean_square_error = lambda y_true, y_pred : np.sqrt(((y_true - y_pred)**2).sum()/len(y_true))
mean_absolute_error = lambda y_true, y_pred : abs(y_true - y_pred).sum()/len(y_true)
mean_absolute_percent_error = lambda y_true, y_pred : (abs(y_true - y_pred)/y_true).sum()/len(y_true)

In [None]:
error_calculation(model, datasets, error_function=mean_absolute_error)