# Perform State of Health Estimation
The goal of this notebook is to estimate how the advanced state of health (ASOH) variables change given only the current and voltage over time.

In [5]:
%matplotlib inline
from matplotlib import pyplot as plt
from moirae.estimators.online.filters.distributions import MultivariateGaussian
from moirae.estimators.online.joint import JointEstimator
from moirae.models.ecm import EquivalentCircuitModel
from moirae.models.ecm.advancedSOH import ECMASOH
from moirae.models.ecm.transient import ECMTransientVector
from moirae.models.ecm.ins_outs import ECMInput, ECMMeasurement
from moirae.interface import run_online_estimate
from batdata.data import BatteryDataset
from pathlib import Path
from tqdm import tqdm
import numpy as np

Configuration

In [6]:
estimate_dir = Path('estimates')
estimate_dir.mkdir(exist_ok=True)

In [7]:
initial_asoh = ECMASOH.model_validate_json(Path('initial-asoh.json').read_text())
initial_asoh.ocv(0.5) # quick and dirty init for soc_pinpoints
# fix reference OCV
soc_pinpoints = [-0.1] + initial_asoh.ocv.ocv_ref.soc_pinpoints.flatten().tolist() + [1.1]
base_vals = [0.] + initial_asoh.ocv.ocv_ref.base_values.flatten().tolist() + [6.5]
initial_asoh.ocv.ocv_ref.base_values = np.array([base_vals])
initial_asoh.ocv.ocv_ref.soc_pinpoints = np.array(soc_pinpoints)
initial_asoh.ocv.ocv_ref.interpolation_style='linear'
initial_asoh.mark_updatable('r0.base_values')
initial_asoh.mark_updatable('q_t.base_values')
print(f'Preparing to update {initial_asoh.num_updatable} parameters from: {initial_asoh.updatable_names}')

Preparing to update 10 parameters from: ('q_t.base_values', 'r0.base_values')


## Find cells
Find all the runs then pull out one at example

In [8]:
all_cells = sorted(Path('synth-data').glob('*.hdf5'))
print(f'Found {len(all_cells)}. Will use {all_cells[0].name} as an example')

Found 64. Will use bilinear-0.hdf5 as an example


In [9]:
example_data = BatteryDataset.from_batdata_hdf(all_cells[0])
print(f'Loaded a cell with {len(example_data.raw_data)} current and voltage measurements.')

Loaded a cell with 1362215 current and voltage measurements.


## Prepare Estimation Function
We need a function to prepare the [estimator](https://rovi-org.github.io/auto-soh/estimator.html#online-estimators) used for tracking changes in parameters.

In [10]:
def create_estimator(dataset: BatteryDataset):
    """Generate an estimator based on initial parameter estimates

    Args:
        dataset: Dataset on which we are running the estimator. [Not being used for now]
    Returns:
        Estimator ready for use
    """

    # Uncertainties for the parameters
    # For A-SOH, assume 2*standard_dev is 0.5% of the value of the parameter
    asoh_covariance = [(2.5e-03 * initial_asoh.q_t.base_values.item()) ** 2] # +/- std_dev^2 Qt
    asoh_covariance += ((2.5e-03 * initial_asoh.r0.base_values.flatten()) ** 2).tolist() # +/- std_dev^2 of R0
    asoh_covariance = np.diag(asoh_covariance)
    # For the transients, assume SOC is a uniform random variable in [0,1], and hysteresis has 2*std_dev of 1 mV
    init_transients = ECMTransientVector.from_asoh(initial_asoh)
    init_transients.soc = np.atleast_2d(1.)
    tran_covariance = np.diag([1/12, 2.5e-07])

    # Make the noise terms
    #  Logic from: https://github.com/ROVI-org/auto-soh/blob/main/notebooks/demonstrate_joint_ukf.ipynb
    voltage_err = 1.0e-03 # mV voltage error
    noise_sensor = ((voltage_err / 2) ** 2) * np.eye(1)
    noise_asoh = 1.0e-10 * np.eye(asoh_covariance.shape[0])
    noise_tran = 1.0e-08 * np.eye(2)

    return JointEstimator.initialize_unscented_kalman_filter(
        cell_model=EquivalentCircuitModel(),
        initial_asoh=initial_asoh.model_copy(deep=True),
        initial_inputs=ECMInput(
            time=dataset.raw_data['test_time'].iloc[0], 
            current=dataset.raw_data['current'].iloc[0],
        ),
        initial_transients=init_transients,
        covariance_asoh=asoh_covariance,
        covariance_transient=tran_covariance,
        transient_covariance_process_noise=noise_tran,
        asoh_covariance_process_noise=noise_asoh,
        covariance_sensor_noise=noise_sensor
    )

## Loop over all cells
Generate the state estimates then save the estimates in a new HDF5 file

In [None]:
for path in tqdm(all_cells):
    out_path = estimate_dir / path.name
    if out_path.exists():
        continue

    # Read the cell
    cell = BatteryDataset.from_batdata_hdf(path)
    # cell.raw_data = cell.raw_data.query('cycle_number < 10')
    estimator = create_estimator(cell)
    try:
        output, _ = run_online_estimate(cell, estimator)

    except np.linalg.LinAlgError:
        print(f'Cell {path} failed due to linear algebra error')
        continue

    # Append the estimated parameters to the raw data
    cell.raw_data = cell.raw_data.join(output.rename(columns=lambda x: f'est_{x}'))

    # Average the estimated parameters by cycle
    output['cycle_number'] = cell.raw_data['cycle_number']
    by_cycle = output.groupby('cycle_number').mean().reset_index()

    # Add them to the cycle stats
    cell.cycle_stats = cell.cycle_stats.join(by_cycle.drop(columns=['cycle_number']).rename(columns=lambda x: f'est_{x}'))
    cell.to_batdata_hdf(out_path, complevel=9)

 61%|███████████████████████████████████████████████████████████████████████████████████████████████████▎                                                               | 39/64 [16:08:20<10:48:25, 1556.22s/it]

The output file includes the actual and estimated parameters in the cycle_stats

In [None]:
cell = BatteryDataset.from_batdata_hdf('estimates/bilinear-0.hdf5')
cell.cycle_stats.head()

Plot estimated compared to actual parameters

In [None]:
fig, axs = plt.subplots(3, 1, figsize=(3.5, 3.), sharex=True)

for ax, l in zip(axs, ['soc', 'hyst', 'voltage']):
    ax.plot(cell.raw_data['test_time'][-5000:] / 3600,
            cell.raw_data[l][-5000:], 'k--', label='Actual', zorder=1)
    # ax.set_ylim()
    ax.plot(cell.raw_data['test_time'][-5000:] / 3600,
            cell.raw_data[f'est_{l}' if l != "voltage" else "est_terminal_voltage"][-5000:], 
            'r', label='Estimated',
            zorder=0)

    ax.set_ylabel(l)

# axs[0].set_ylim(-0.1, 1.5)
axs[-1].set_xlabel('Time (hr)')

In [None]:
fig, axs = plt.subplots(10, 1, figsize=(3.5, 12.), sharex=True)

for ax, l in zip(axs, ['q_t.base_values',
                       'r0.base_values[0]',
                       'r0.base_values[1]',
                       'r0.base_values[2]',
                       'r0.base_values[3]',
                       'r0.base_values[4]',
                       'r0.base_values[5]',
                       'r0.base_values[6]',
                       'r0.base_values[7]',
                       'r0.base_values[8]',
                       ]):
    actual = cell.cycle_stats['actual_' + l]
#     ax.plot(cell.raw_data['test_time'] / 3600,
#             cell.cycle_stats['actual_' + l], 'k--', label='Actual', zorder=1)
    ax.plot(cell.raw_data['test_time'] / 3600, 
            actual[cell.raw_data['cycle_number'].to_numpy().astype(int)],
            'k--', label='Actual', zorder=2)
    # ax.set_ylim()
    ax.plot(cell.raw_data['test_time'] / 3600,
            cell.raw_data[f'est_{l}'], 'r', label='Estimated',
            zorder=1)
    ax.fill_between(cell.raw_data['test_time'] / 3600,
                    cell.raw_data[f'est_{l}'] + 2 * cell.raw_data[f'est_{l}_std'],
                    cell.raw_data[f'est_{l}'] - 2 * cell.raw_data[f'est_{l}_std'],
                    color='red', alpha=0.5, zorder=0)

    ax.set_ylabel(l)

axs[-1].set_xlabel('Time (hr)')