# Training a CNN to Predict Battery Cycle Life
*Noah Paulson, Argonne National Laboratory*


In this notebook we will collect data, visualize the machine learning features, define a convolutional neural network architecture, train it, and evaluate the results.

### Import packages, define data loader

In [None]:
import h5py
import time
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
from tensorflow import keras
from urllib import request

In [None]:
def get_data(logy=True, output=False):

    remote_url = 'https://drive.google.com/uc?export=download&id=1orXDHcq2TzRLhGkHyWVtsuGn7_10pSAn'
    fname = 'energy_capacity.h5'
    request.urlretrieve(remote_url, fname)
    
    # load the raw data from hdf5 file
    f = h5py.File(fname, 'r')
    
    # evaluated cycles and voltages
    cyceval = f['evaluated cycles']
    veval = f['evaluated voltages']
    
    Xin = []  # feature array
    cycfailV = np.array([])  # failure cycle array
    batchV = np.array([])  # batch array

    anameL = f['cell data'].keys()
    ncell = len(anameL)
    for aname in anameL:

        g = f['cell data'][aname]

        cycfail = g['cycles to 90pct capacity'][0]
        cycfailV = np.append(cycfailV, cycfail)

        VvC = g['discharging voltage vs capacity']

        # our features are based on the difference in capacity between cycles
        farr = VvC[1:, :] - VvC[0, :]  
        Xin += [farr]

        if 'Batch1' in aname:
            batchV = np.append(batchV, 1)
        elif 'Batch2' in aname:
            batchV = np.append(batchV, 2)
        elif 'Batch3' in aname:
            batchV = np.append(batchV, 3)
        else:
            batchV = np.append(batchV, 0)

    nfeat = farr.size
    ncyc = len(cyceval)

    Xin = np.stack(Xin)
    if output:
        print('number of cells:', ncell)
        print('number of features:', nfeat)
        print(Xin.shape)

    # create the train, validation, and test sets

    dsetL = ['trn', 'val', 'tst']

    iD = {}

    # batch 3 is reserved for test
    iD['tst'] = batchV == 3
    ntst = np.sum(iD['tst'])

    irst = np.invert(iD['tst'])
    nrst = ncell - ntst

    # validation and training sets are pulled from batches 1 and 2
    nval = np.int16(0.2*nrst)

    irnd = np.random.rand(ncell)
    irnd[iD['tst']] = 0
    top_nval = np.argsort(irnd)[-nval:]

    iD['val'] = np.zeros((ncell,), dtype='bool')
    iD['val'][top_nval] = True
    iD['trn'] = np.invert(iD['val'])*np.invert(iD['tst'])

    X = {}  # features dictionary
    y = {}  # failure cycles dictionary
    n = {}  # number of cells dictionary
    for dset in dsetL:
        X[dset] = Xin[iD[dset], ..., None]
        if logy:
            y[dset] = np.log10(cycfailV[iD[dset]])
        else:
            y[dset] = cycfailV[iD[dset]]
        n[dset] = np.sum(iD[dset])

    # define weights for each cell based on the relative number of
    # cells available for that cycle-life bin. For example, there
    # are many low cycle-life cells so those have low weights, and
    # there are relatively few high cycle-life cells so those have
    # higher weights.
    bins = 4
    hist, bin_edges = np.histogram(y['trn'], bins=bins, density=True)

    invhist = np.mean(hist)/hist

    weights = np.ones(y['trn'].shape)
    for ii in range(bins):
        lwr = bin_edges[ii]
        upr = bin_edges[ii+1]
        if ii == 0:
            sel = (lwr <= y['trn'])*(y['trn'] <= upr)
        else:
            sel = (lwr < y['trn'])*(y['trn'] <= upr)
        weights[sel] = invhist[ii]


    if output:
        print('bin_edges:', bin_edges)
        print('original training densities:', hist)
        print('category weights:', invhist)
        print('y_trn weights:', weights)
        histw, bew = np.histogram(
            y['trn'], bins=bins, density=True, weights=weights)
        print('new densities:', histw)

    return X, y, n, iD, weights, cyceval, veval

### Define key variables for the fitting and extract the data

In [None]:
epochs = 200
logy=True  # apply a log transform to the cycle lives as a normalization scheme
output=True
batch_size=32
dsetL = ['trn', 'val', 'tst']

sns.set()
np.random.seed(1)

X, y, n, iD, weights, cyceval, veval = get_data(logy, output)

### Visualize features for machine learning

In this work, we define the features as the difference in capacity at different voltages during discharge between cycle X and cycle 10. Cycle X can be in the set [20, 30, 40, 50, 60 ,70, 80, 90, 100]. There are a total of 180 features.

In [None]:
arr = X['trn'][0, ..., 0]
f, ax = plt.subplots()
img = ax.imshow(arr)
plt.colorbar(img, ax=ax, label='Q(Voltage, Cycle_X) - \nQ(Voltage, Cycle_10)   (Ah)')
ax.set_xticks(np.arange(arr.shape[1])[::2])
ax.set_yticks(np.arange(arr.shape[0]))
yticklabels = cyceval[()][1:]
yticklabels = yticklabels.astype(str)
ax.set_xticklabels(np.round(veval[::2], 1))
ax.set_yticklabels(yticklabels)
ax.grid(False)
plt.xlabel('Voltage')
plt.ylabel('Cycle')

### Define the model

We define the convolutional network with two convolutional layers and one dense layer. Feel free to explore the number of filters/neurons, the shape of the filters, and the number of different types of layers. It is also interesting to see the effect of learning rate and loss metric on fitting and performance. We print a summary of the model that shows the types of layers, the shape of their outputs, and the number of parameters associated with the layer.

In [None]:
inputs = keras.Input(shape=(X['trn'].shape[1], X['trn'].shape[2], 1), name='farr')
x = keras.layers.Conv2D(16, (3, 3), activation='relu')(inputs)
x = keras.layers.Conv2D(8, (3, 3), activation='relu')(inputs)
x = keras.layers.Flatten()(x)
# x = keras.layers.Dropout(0.2)(x)
# x = keras.layers.BatchNormalization()(x)
x = keras.layers.Dense(16, activation='relu')(x)
outputs = keras.layers.Dense(1)(x)
model = keras.Model(inputs=inputs, outputs=outputs, name='functional_1')

model.summary()

model.compile(optimizer=tf.optimizers.Adam(learning_rate=0.01, amsgrad=False),
              loss='MSE',
              metrics=[])

### Train the model

In [None]:
cb2 = keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=50,
    restore_best_weights=True)

st = time.time()

history = model.fit(X['trn'], y['trn'],
    epochs=epochs, batch_size=batch_size,
    sample_weight=weights, callbacks=[cb2],
    validation_data=(X['val'], y['val']))

print('fit time:', np.round(time.time()-st))

### Make predictions

In [None]:
ypred = {}
for dset in dsetL:
    ypred[dset] = np.squeeze(model.predict(X[dset]))

### Plot loss versus epoch

In [None]:
plt.figure('metrics_vs_epoch')
nhist = len(history.history['loss'])
plt.semilogy(range(1, nhist+1), history.history['loss'],
             'b-', alpha=0.5, label='training')
plt.semilogy(range(1, nhist+1), history.history['val_loss'],
             'r-', alpha=0.5, label='validation')
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend()
plt.tight_layout()

### Parity Plot

The parity plot shows the experimental cycle lives versus the predicted cycle lives. Accurate predictions are close to the 45 degree centerline.

In [None]:
minL = []
maxL = []
cL = sns.color_palette('cubehelix', 5)[1:-1]
mL = ['o', 's', 'D']
lblL = ['train', 'validate', 'test']
plt.figure(num='parity-plot', figsize=(5.5, 5))
for dset, c, m, lbl in zip(dsetL, cL, mL, lblL):
    if logy:
        yP = 10**ypred[dset]
        yT = 10**y[dset]
    else:
        yP = ypred[dset]
        yT = y[dset]

    minL += [np.min(yP), np.min(yT)]
    maxL += [np.max(yP), np.max(yT)]

    plt.plot(
        yT, yP,
        color=c, marker=m,
        ls='', ms=5, alpha=0.6, label=lbl)

ymin = np.min(minL)
ymax = np.max(maxL)
yrng = ymax - ymin
lwr = ymin - .05*yrng
upr = ymax + .05*yrng

plt.plot([lwr, upr], [lwr, upr], 'k-')
plt.xlim([lwr, upr])
plt.ylim([lwr, upr])
plt.xlabel('experimental cycles to failure')
plt.ylabel('predicted cycles to failure')
plt.legend()
plt.gca().set_aspect('equal', adjustable='box')
plt.tight_layout()

### Error Metrics

In [None]:
# calculate errors and load into dictionary
metricD = {}
for dset in dsetL:

    metricD[dset + '_loss'] = np.mean((ypred[dset]-y[dset])**2)

    if logy:
        mapre = 100*np.mean(np.abs(10**ypred[dset]-10**y[dset])/10**y[dset])
        mape = 100*np.mean(np.abs(10**ypred[dset]-10**y[dset])/10**y[dset].max())
    else:
        mapre = 100*np.mean(np.abs(ypred[dset]-y[dset])/y[dset])
        mape = 100*np.mean(np.abs(ypred[dset]-y[dset])/y[dset].max())

    metricD[dset + '_mapre'] = mapre 
    metricD[dset + '_mape'] = mape

# print results
for dset in dsetL:
    print('\n')
    print(dset, 'mean squared error:',
          np.round(metricD[dset + '_loss'], 4))
    print(dset, 'mean absolute percent relative error:',
          np.round(metricD[dset + '_mapre'], 1))
    print(dset, 'mean absolute percent error:',
          np.round(metricD[dset + '_mape'], 1))