# Main notebook for battery state estimation

In [None]:
import numpy as np
import pandas as pd
import scipy.io
import math
import os
import ntpath
import sys
import logging
import time
import sys

from importlib import reload
import plotly.graph_objects as go

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD, Adam
from keras.utils import np_utils
from keras.layers import LSTM, Embedding, RepeatVector, TimeDistributed, Masking
from keras.callbacks import EarlyStopping, ModelCheckpoint, LambdaCallback


IS_COLAB = True

if IS_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    data_path = "/content/drive/My Drive/battery-state-estimation/battery-state-estimation/"
else:
    data_path = "../"

sys.path.append(data_path)
from data_processing.unibo_powertools_data import UniboPowertoolsData, CycleCols
from data_processing.model_data_handler import ModelDataHandler

### Config logging

In [None]:
reload(logging)
logging.basicConfig(format='%(asctime)s [%(levelname)s]: %(message)s', level=logging.DEBUG, datefmt='%Y/%m/%d %H:%M:%S')

# Load Data

### Initial the data object

Load the cycle and capacity data to memory based on the specified chunk size

In [None]:
dataset = UniboPowertoolsData(
    test_types=['S'],
    chunk_size=1000000,
    lines=[37, 40],
    charge_line=37,
    discharge_line=40,
    base_path=data_path
)

### Determine the training and testing name

Prepare the training and testing data for model data handler to load the model input and output data.

In [None]:
train_data_test_names = [
    '000-DM-3.0-4019-S', 
    '001-DM-3.0-4019-S', 
    '002-DM-3.0-4019-S', 
    '006-EE-2.85-0820-S', 
    '007-EE-2.85-0820-S', 
    '018-DP-2.00-1320-S', 
    '019-DP-2.00-1320-S',
    '036-DP-2.00-1720-S', 
    '037-DP-2.00-1720-S', 
    '038-DP-2.00-2420-S', 
    '040-DM-4.00-2320-S',
    '042-EE-2.85-0820-S', 
    '045-BE-2.75-2019-S'
]

test_data_test_names = [
    '003-DM-3.0-4019-S',
    '008-EE-2.85-0820-S',
    '039-DP-2.00-2420-S', 
    '041-DM-4.00-2320-S',    
]

dataset.prepare_data(train_data_test_names, test_data_test_names)

### Initial the model data handler

Model data handler will be used to get the model input and output data for further training purpose.

In [None]:
mdh = ModelDataHandler(dataset, [
    CycleCols.VOLTAGE,
    CycleCols.CURRENT,
    CycleCols.TEMPERATURE
])

# Model training

In [None]:
train_x, train_y, test_x, test_y = mdh.get_discharge_whole_cycle(soh = False, output_capacity = False)


In [None]:
train_y = mdh.keep_only_capacity(train_y, is_multiple_output = True)
test_y = mdh.keep_only_capacity(test_y, is_multiple_output = True)

In [None]:
EXPERIMENT = "lstm_soc_percentage"

experiment_name = time.strftime("%Y-%m-%d-%H-%M-%S") + '_' + EXPERIMENT
print(experiment_name)

# Model definition

opt = tf.keras.optimizers.Adam(lr=0.00003)

model = Sequential()
model.add(LSTM(256, activation='selu',
                return_sequences=True,
                input_shape=(train_x.shape[1], train_x.shape[2])))
model.add(LSTM(256, activation='selu', return_sequences=True))
model.add(LSTM(128, activation='selu', return_sequences=True))
model.add(Dense(64, activation='selu'))
model.add(Dense(1, activation='linear'))
model.summary()

model.compile(optimizer=opt, loss='huber', metrics=['mse', 'mae', 'mape', tf.keras.metrics.RootMeanSquaredError(name='rmse')])

es = EarlyStopping(monitor='val_loss', patience=50)
mc = ModelCheckpoint(data_path + 'results/trained_model/%s_best.h5' % experiment_name, 
                             save_best_only=True, 
                             monitor='val_loss')

In [None]:
history = model.fit(train_x, train_y, 
                                epochs=30, 
                                batch_size=32, 
                                verbose=1,
                                validation_split=0.2,
                                callbacks = [es, mc]
                               )

In [None]:
model.save(data_path + 'results/trained_model/%s.h5' % experiment_name)

hist_df = pd.DataFrame(history.history)
hist_csv_file = data_path + 'results/trained_model/%s_history.csv' % experiment_name
with open(hist_csv_file, mode='w') as f:
    hist_df.to_csv(f)

### Testing

In [None]:
results = model.evaluate(test_x, test_y)
print(results)

# Data Visualization

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=history.history['loss'],
                    mode='lines', name='train'))
fig.add_trace(go.Scatter(y=history.history['val_loss'],
                    mode='lines', name='validation'))
fig.update_layout(title='Loss trend',
                  xaxis_title='epoch',
                  yaxis_title='loss',
                  width=1400,
                  height=600)
fig.show()

In [None]:
train_predictions = model.predict(train_x)

In [None]:
cycle_num = 0
steps_num = 8000
step_index = np.arange(cycle_num*steps_num, (cycle_num+1)*steps_num)

fig = go.Figure()
fig.add_trace(go.Scatter(x=step_index, y=train_predictions.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
                    mode='lines', name='SoC predicted'))
fig.add_trace(go.Scatter(x=step_index, y=train_y.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
                    mode='lines', name='SoC actual'))
fig.update_layout(title='Results on training',
                  xaxis_title='Cycle',
                  yaxis_title='SoC percentage',
                  width=1400,
                  height=600)
fig.show()

In [None]:
test_predictions = model.predict(test_x)

In [None]:
cycle_num = 0
steps_num = 1000
step_index = np.arange(cycle_num*steps_num, (cycle_num+1)*steps_num)

fig = go.Figure()
fig.add_trace(go.Scatter(x=step_index, y=test_predictions.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
                    mode='lines', name='SoC predicted'))
fig.add_trace(go.Scatter(x=step_index, y=test_y.flatten()[cycle_num*steps_num:(cycle_num+1)*steps_num],
                    mode='lines', name='SoC actual'))
fig.update_layout(title='Results on testing',
                  xaxis_title='Cycle',
                  yaxis_title='SoC percentage',
                  width=1400,
                  height=600)
fig.show()