In [None]:
%pip install -q numpy pandas matplotlib seaborn scikit-learn tensorflow h5py tensorflow-gpu

In [None]:
import os
# os.add_dll_directory("C:/Program Files/NVIDIA GPU Computing Toolkit/CUDA/v11.7/bin") // path to nvidia dlls, must have CUDA drivers installed
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

from tensorflow import keras
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold

from keras.models import Sequential
from keras.layers import Dense
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import mixed_precision

### GPU Acceleration

** IMPORTANT NOTE **
 - You must have CUDA driver and CUDA toolkit installed on your machine. To do so, follow the instructions here:
https://www.tensorflow.org/install/pip

- Enable GPU Acceleration if you have a NVIDIA GPU with compute capability >= 7.0

In [None]:
# print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
# sess = tf.compat.v1.Session(config=tf.compat.v1.ConfigProto(log_device_placement=True))

# ## ENABLE THIS IF YOU HAVE RTX GPU WITH COMPUTE CAPABILITY 7.0 or higher
# policy = mixed_precision.Policy('mixed_float16')
# mixed_precision.set_global_policy(policy)
# print('Compute dtype: %s' % policy.compute_dtype)
# print('Variable dtype: %s' % policy.variable_dtype)

### Loading Data

In [None]:
fp = "./features_combined.csv"
batch_pd = pd.read_csv(fp, index_col=False)
dataset = batch_pd.copy()
dataset.sort_values(by=['policy'], ascending=True, inplace=True)

# dataset
dataset.isna().sum()
dataset = dataset.dropna().drop(columns=['policy', 'barcode'])

In [None]:
## Global Model Config
EPOCHS = 2500
UNITS = 1
LEARNING_RATE = 0.01
CALLBACK = tf.keras.callbacks.EarlyStopping(monitor='mae', patience=15, min_delta=0.01)

## Data split
Split by policy fast charge first(5C - 8C), and then by policy slow charge (1C - 4C)


In [None]:
normal_charge_dataset = dataset.iloc[0:29, :] # 29 data points
fast_charge_dataset = dataset.iloc[29:, :] # 51 data points 
print(normal_charge_dataset.shape, fast_charge_dataset.shape)

## Fast-Charge Test-Train split
Selecting alternate batteries for training and testing

In [None]:
fast_charge_train_ds = fast_charge_dataset.iloc[0::2, :]
fast_charge_test_ds = fast_charge_dataset.iloc[1::2, :]
fast_charge_train_ds.describe().transpose()

In [None]:
fastcharge_train_features = fast_charge_train_ds.copy()
fastcharge_test_features = fast_charge_test_ds.copy()

train_labels = fastcharge_train_features.pop('cycle_life')
test_labels = fastcharge_test_features.pop('cycle_life')


# Linear Regress
### Layering and Build Model

In [None]:
# Normalize layer
QDiffLinVar = np.array(fastcharge_train_features['QDiffLinVar'])
QDiffLinVar_normalizer = layers.Normalization(input_shape=[1,], axis=None)
QDiffLinVar_normalizer.adapt(QDiffLinVar)

# Input and output layers
variance_model_fast_charge = tf.keras.Sequential([
    QDiffLinVar_normalizer,
    layers.Dense(UNITS, input_dim=1 ,activation='relu' ),
    layers.Dense(1, activation='linear', dtype='float32', name='predictions')
])
variance_model_fast_charge.summary()

print("Number of weights after calling the model:", len(variance_model_fast_charge.weights))
print("weights:", len(variance_model_fast_charge.weights))
print("trainable_weights:", len(variance_model_fast_charge.trainable_weights))
print("non_trainable_weights:", len(variance_model_fast_charge.non_trainable_weights))

## Compile the model and make a prediction
variance_model_fast_charge.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss=tf.keras.losses.MeanSquaredError(),
    metrics=['mae']
    )

## Making a prediction on first 10 samples
variance_model_fast_charge.predict(QDiffLinVar[:10])

### Train the Model

In [None]:
%%time
tf.debugging.set_log_device_placement(True)
history = variance_model_fast_charge.fit(
    fastcharge_train_features['QDiffLinVar'],
    train_labels,
    epochs=EPOCHS,
    verbose=2,
    # callbacks=[CALLBACK],
    validation_data=(fastcharge_test_features['QDiffLinVar'], test_labels)
    )

In [None]:
def plot_loss(history):
  plt.figure("BaseMode", figsize=(5,5), dpi=100, facecolor='w', edgecolor='k')
  plt.plot(np.sqrt(history.history['loss']), label='loss')
  plt.plot(np.sqrt(history.history['val_loss']), label='val_loss')
  # plt.ylim([200, 130])
  plt.ylim([10, 800])
  plt.xlabel('Epoch')
  plt.ylabel('Error [cycles]')
  plt.legend()
  plt.grid(True)
  
plot_loss(history)

## Display model's loss and accuracy history
hist = pd.DataFrame(history.history)
hist = hist.pow(0.5) # Power 1/2 is the same as square root
hist['epoch'] = history.epoch
hist

### Evaluate the Model and Save the Results for comparison

In [None]:
test_results = {}
test_results['fast_charge_variance_model'] = variance_model_fast_charge.evaluate(
    fastcharge_test_features['QDiffLinVar'],
    test_labels, verbose=1) #sqrt for mse
## Based on Author's calculation for MSE
test_results['fast_charge_variance_model'][0] = test_results['fast_charge_variance_model'][0] ** 0.5
pd.DataFrame(test_results, index=['MSE', 'MAE']).T

### Make Predictions

In [None]:
def plot_prediction(y_train, y_test):
  plt.figure("BaseModelPrediction", figsize=(5,5), dpi=100, facecolor='w', edgecolor='k')
  plt.axes(aspect='equal')
  plt.scatter(y_train, train_labels, label='Predictions (train)')
  plt.scatter(y_test, test_labels, label='Predictions (test)')
  lims = [0, 2000]
  plt.xlim(lims)
  plt.ylim(lims)
  plt.plot(lims, lims, 'k', )
  plt.xlabel('Predicted Cycle life')
  plt.ylabel('Actual Cycle life')
  plt.legend()

train_prediction = variance_model_fast_charge.predict(fast_charge_train_ds['QDiffLinVar'])
test_prediction = variance_model_fast_charge.predict(fast_charge_test_ds['QDiffLinVar'])
train_prediction
plot_prediction(train_prediction, test_prediction)

# Transfer Learning

### Normal-charge Data Train-Test split

In [None]:
normal_charge_train_ds = normal_charge_dataset.iloc[0::2, :]
normal_charge_test_ds = normal_charge_dataset.iloc[1::2, :]

normalcharge_train_features = normal_charge_train_ds.copy()
normalcharge_test_features = normal_charge_test_ds.copy()

normal_train_labels = normalcharge_train_features.pop('cycle_life')
normal_test_labels = normalcharge_test_features.pop('cycle_life')

### Model Building
Transfer Learning Model

In [None]:
## Make a copy of base model
variance_model_normal_charge_tl = variance_model_fast_charge
## Freezing every layer except the last layer
for layer in variance_model_normal_charge_tl.layers[:-1]:
  layer.trainable = False
variance_model_normal_charge_tl.summary()

variance_model_normal_charge_tl.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss=tf.keras.losses.MeanSquaredError(),
    metrics=['mae']
    )

Traditional Model

In [None]:
variance_model_normal_charge_traditional = tf.keras.Sequential([
    QDiffLinVar_normalizer,
    layers.Dense(UNITS, input_dim=1, activation='relu'),
    layers.Dense(1, activation='linear', dtype='float32', name='predictions')
])
variance_model_normal_charge_traditional.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=LEARNING_RATE),
    loss=tf.keras.losses.MeanSquaredError(),
    metrics=['mae']
    )


### Training the Models

In [None]:
%%time
history_tl = variance_model_normal_charge_tl.fit(
    normalcharge_train_features['QDiffLinVar'],
    normal_train_labels,
    epochs=EPOCHS,
    verbose=2,
    validation_data=(normalcharge_test_features['QDiffLinVar'], normal_test_labels),
    callbacks=[CALLBACK]
    )

In [None]:
%%time
history_traditional = variance_model_normal_charge_traditional.fit(
    normalcharge_train_features['QDiffLinVar'],
    normal_train_labels,
    epochs=EPOCHS,
    verbose=2,
    # callbacks=[CALLBACK],
    validation_data=(normalcharge_test_features['QDiffLinVar'], normal_test_labels))

### Plot loss graph and Evaluate

In [None]:
def plot_loss(hist_trad, hist_tl):
  plt.figure("TransferLearning vs Traditional", figsize=(5,5), dpi=100, facecolor='w', edgecolor='k')
  plt.plot(np.sqrt(hist_trad.history['loss']), label='loss_traditional')
  plt.plot(np.sqrt(hist_trad.history['val_loss']), label='val_loss_traditional')
  plt.plot(np.sqrt(hist_tl.history['loss']), label='loss_tl')
  plt.plot(np.sqrt(hist_tl.history['val_loss']), label='val_loss_tl')
  # plt.ylim([200, 130])
  plt.ylim([10, 800])
  plt.xlabel('Epoch')
  plt.ylabel('Error [cycles]')
  plt.legend()
  plt.grid(True)

plot_loss(history_traditional, history_tl)

In [None]:
test_results['normal_charge_variance_model_tl'] = variance_model_normal_charge_tl.evaluate(
    normalcharge_test_features['QDiffLinVar'],
    normal_test_labels, verbose=1) #sqrt for mse
test_results['normal_charge_variance_model_traditional'] = variance_model_normal_charge_traditional.evaluate(
    normalcharge_test_features['QDiffLinVar'],
    normal_test_labels, verbose=1) #sqrt for mse

### Make Predictions

In [None]:
def plot_prediction(y_train, y_test):
  plt.figure("Transfer Learning vs Traditional", figsize=(5,5), dpi=100, facecolor='w', edgecolor='k')
  plt.axes(aspect='equal')
  plt.scatter(y_train, normal_train_labels, label='Predictions (train)')
  plt.scatter(y_test, normal_test_labels, label='Predictions (test)')
  lims = [0, 2000]
  plt.xlim(lims)
  plt.ylim(lims)
  plt.plot(lims, lims, 'k', )
  plt.xlabel('Predicted Cycle life')
  plt.ylabel('Actual Cycle life')
  plt.legend()

normal_train_prediction = variance_model_normal_charge_tl.predict(normal_charge_train_ds['QDiffLinVar'])
normal_test_prediction = variance_model_normal_charge_tl.predict(normal_charge_test_ds['QDiffLinVar'])
plot_prediction(normal_train_prediction, normal_test_prediction)

In [None]:
normal_train_prediction2 = variance_model_normal_charge_traditional.predict(normal_charge_train_ds['QDiffLinVar'])
normal_test_prediction2 = variance_model_normal_charge_traditional.predict(normal_charge_test_ds['QDiffLinVar'])
plot_prediction(normal_train_prediction2, normal_test_prediction2)

### Final Results and conclusions

In [None]:
test_results['normal_charge_variance_model_tl'][0] = test_results['normal_charge_variance_model_tl'][0] ** 0.5
test_results['normal_charge_variance_model_traditional'][0] = test_results['normal_charge_variance_model_traditional'][0] ** 0.5
pd.DataFrame(test_results, index=['RMSE', 'MAE']).T