'''DEM-ECM model: 
OCP_ecm : OCP simulation from the ECM model---only based on the test data
OCP_test: OCP from the test data--pseudo OCP
OCP_dem:  predicted OCP based on DEM model, with time setting
OCP_decm: predicted OCP based on DEM-ECM model, with extend data (time setting)
OCP_conv: converge OCP based on DEM model, with threshold setting, time changalbe
'''

In [1]:
# -*- coding: utf-8 -*-
import numpy as np
import torch.nn.functional as F
import math
import pandas as pd 
import matplotlib.pyplot as plt 
from sklearn.metrics import mean_squared_error,r2_score
from scipy.integrate import cumulative_trapezoid
from scipy.ndimage import gaussian_filter
import time
import scienceplots as sp
import csv
import warnings
plt.style.use(['science','nature','no-latex'])

In [None]:
''' load the raw csv file'''
file = ['.../test_3_7_20C_10min10min_20250218.csv',]

'''load the analysised data from the DE'''
 # taking 1h extrapolation data as an example
file_ex = '.../test_3_7_20C_10min10min_20250218_1hpred.csv'   #(extrapolation data)
vol_pred = pd.read_csv(file_ex)
file_conv_OCP = '.../test_3_7_20C_10min10min_20250218_3hconv.csv' #(convergence data)
OCP_conv = pd.read_csv(file_conv_OCP)

'''save path'''
save_path = ''


3000


In [None]:
warnings.filterwarnings("ignore")
data = pd.concat([pd.read_csv(file) for file in file],ignore_index=True)  
file_name = file[0].split('/')[-1].split('.')[0]
def file_read (file,data):
    '''Read every column data'''
    column_dict = {column_name: list(column_data) for column_name, column_data in data.items()}
    '''find split index of every pulse'''
    diff_ = np.diff(data['Steps'])
    split_index = np.where(diff_ != 0)[0] +1   #the first index of every step
    cccv_index = split_index[1]  # end point of CCCV
    '''trans time to seconds'''
    each_t = [int(x.split(":")[0])*60*60+int(x.split(":")[1])*60+int(x.split(":")[2].split(".")[0]) for x in data['Relative Time(h:min:s.ms)'].values]
    '''export current and voltage data'''
    current = data['Current(A)'].values
    voltage = data['Voltage(V)'].values
    return column_dict, split_index, cccv_index, each_t, current, voltage
column_dict, split_index, cccv_index, each_t, current, voltage = file_read(file,data)

In [None]:
'''get the every current / voltage segment——  mian used in ECM model'''

'define a function to find the cycle including charge/discharge and rest, ie. only and each GITT cycle'
def GITT_cycle(split_index, voltage,current,each_t):   
  """
    extract the GITT data, including the volatge, current and time of charge/discharge and rest.
  Returns:
      tuple: 
          - Cycle: the voltahe data of each GITT cycle.
          - GITT_time: the time data of each GITT cycle.
          - each_current:  the current data of each GITT cycle.
          - CCCV: the CCCV data.
  """
  Cycle, GITT_time, CCCV,each_current= [], [], [],[]
  for i in range(len(split_index)):
    a = split_index[i-1]
    b = split_index[i]
    if i == 0:
      CCCV.append(voltage[:split_index[i+1]])
    elif i % 2 ==0 and i < len(split_index)-1:
      c = split_index[i+1]
      Cycle.append(voltage[a:c])
      each_current.append(current[a:c])
      max_step1 = max(each_t[a:b])
      step_time = each_t[a:b] + [x + max_step1 for x in each_t[b:c]]
      GITT_time.append(step_time)

    elif i == len(split_index)-1:
      Cycle.append(voltage[a:])
      each_current.append(current[a:])
      max_step1 = max(each_t[a:b])
      step_time = each_t[a:b] + [x + max_step1 for x in each_t[b:]]
      GITT_time.append(step_time)
  return Cycle, GITT_time, each_current,CCCV

GITT_voltage, GITT_time, GITT_current, CCCV= GITT_cycle(split_index, voltage, current,each_t)   # only including the GITT


In [None]:

'''ECM_OCV model to get the OCV
a 2-RC ECM constructed and combined with the FRLS and FEKF to 
dynamically estimate the OCV under GITT testing.
more details can be founded in the reference:
1. Yang, Z. & Wang, X. An improved parameter identification method considering multi-timescale characteristics of lithium-ion batteries. 
Journal of Energy Storage 59, 106462 (2023).
2. Ji, S. et al. Continuous physics-informed learning expedites universal battery mechanism decoupling. 
Preprint at https://doi.org/10.26434/chemrxiv-2025-14zsv (2025).  
https://github.com/SLJME42/2RCECM
'''
def RFF_KF_3U(I,U,Ts,OCV_ini):
    U0_vec=[]
    U1_vec=[]
    U2_vec=[]
    Uoc_vec=[]
    term_vec=[]
    R2_vec=[]
    tal2_vec=[]
#####Initializing fast dynamic
    c1=10/(Ts+20)#tal1/(tal1+Ts)
    c2=50+(Ts*0.1)/(Ts+10)#R0+Ts*R1/(Ts+tal1)     
    c3=-500/(Ts+7)#-R0*tal1/(Ts+tal1)   #-500
    thita_ls=np.array([[c1,c2,c3]]).T#c1,c2,c3
    P = 10**(-4)*np.eye(3)
    lamda = 1  # forgetting factor
    Ufd_k_1=1
    Ufd_k=1.1
    U1=0.1  
    phi = np.array([Ufd_k_1, I[1], I[0]]).reshape(1, 3)
    #K = (P @ (phi.T)) / (lamda + phi * P @ (phi.T))
    J=0
    D=[[1,0,0],[0,0,0],[0,0,0]]
#####Initializing slow dynamic
    #w_k=10**(6)*np.eye(4)
    w_k=np.diag([10**(-8),10**(-6),10**(-8),10**(-6)])
    v_k=np.array([[10**(-4)]])
    X_k=np.array([[OCV_ini,0.01,200,0.01]]).reshape(-1,1)  #OCV,R2,tal2,U2  = OCV_ini,0.01,200,0.01
    P_k=10**(-4)*np.eye(4)

    time_cos = []
    for k in range(1,len(I)):
###################FRLS Fast Dynamic#############
        start_time=time.time()
        phi = np.array([[Ufd_k_1, I[k], I[k-1]]])
        K = (P @ (phi.T)) / (lamda + phi * P @ (phi.T))
        P = (P - K @ phi @ P) / lamda
        J=J+((Ufd_k-phi@thita_ls)**2)/(lamda + phi * P @ (phi.T))
        thita_ls = thita_ls + K @ (Ufd_k - phi @ thita_ls)
        #print(thita_ls)

        #update variables
        tal1=thita_ls[0]*Ts/(1-thita_ls[0])
        R0=(-thita_ls[2]*(Ts+tal1))/tal1
        R1=(thita_ls[1]-R0)*(Ts+tal1)/Ts
        U0=R0*I[k]
        U1=np.exp(-Ts/tal1)*U1+R1*(1-np.exp(-Ts/tal1))*I[k]
        Ufde=phi @ thita_ls
        #print([thita_ls[0],thita_ls[1]-R0])
        if isinstance(Ufd_k, (int, float)):
            Ufd_k_1=Ufd_k
        else:
            Ufd_k_1=Ufd_k[0]

        Usd_k=U[k]-Ufde
        U0_vec.append(U0)
        U1_vec.append(U1)
####################EKF Slow dynamic###############
        a=np.exp(-Ts/X_k[2][0])
        b=(1-a)*I[k]

        A_k = np.array([[1, 0, 0, 0], [0, 1, 0, 0],
             [0, 0, 1, 0], [0,  b, 0, a]])
        C_k = np.array([[1, 0, 0, 1]])
        ckt=C_k.T
        # Prior prediction
        X_k = A_k@X_k
        P_k = A_k@P_k@A_k.T + w_k
        # Kalman gain computation
        SSS=C_k@P_k@C_k.T+v_k
        K_k = P_k@C_k.T@np.linalg.inv(SSS)
        # Posteriori update


        er = Usd_k - C_k @ X_k
        bb = K_k @ (Usd_k - C_k @ X_k)
        X_k = X_k + K_k@(Usd_k - C_k@X_k)
        #if I[k] == 0:
        #    X_k[0] = Uoc_vec[-1]
        nn= np.eye(4, dtype=float)-K_k@C_k.reshape(1,4)
        P_k = np.dot((np.eye(4, dtype=float) - K_k@C_k.reshape(1,4)), P_k)
        Usd_k=np.array(C_k@X_k)[0]


        Uoc_vec.append(X_k[0])
        R2_vec.append(X_k[1])
        tal2_vec.append(X_k[2])
        U2_vec.append(X_k[3])
        term_vec.append(Ufd_k + Usd_k)

        #w_k = 0.9 * w_k + 0.1 * np.dot(bb.T, bb)
        #v_k = 0.9 * v_k + 0.1 * np.dot(er.T, er)

        Ufd_k = U[k] - Usd_k
        end_time=time.time()
        time_cos.append(end_time - start_time)
    print(f"consume time: {np.mean(time_cos):.6f} 秒")

    return np.array(Uoc_vec),np.array(U2_vec)


In [None]:
'''extract the test OCV and real_simulated OCV only based on ECM model'''
Ts = 1          # test time interval
OCV_ini = 4.2   # initial OCV

'''extract all ocv_ecm and ocv_test  (without any prediction)'''
OCP_ecm = []    # from the ECM model
OCP_test = []   # from the test data
for i in range(len(GITT_current)):
    I = GITT_current[i]
    V = GITT_voltage[i]
    ocv_test = V[-1]
    OCP_test.append(ocv_test)
    Uoc_vec,U2_vec = RFF_KF_3U(I, V ,Ts,OCV_ini)
    ocv_ecm = Uoc_vec[-1]
    OCP_ecm.append(ocv_ecm)

OCP_test = np.array(OCP_test)
OCP_ecm = np.array(OCP_ecm)

'''define a func to calculate the errors'''
def errs_measure(y_test,y_pred):
    MSE = mean_squared_error(y_test, y_pred)
    RMSE = np.sqrt(MSE)
    R2 = r2_score(y_test, y_pred)
    return RMSE,MSE,R2

RMSE,MSE,R2 = errs_measure(OCP_test,OCP_ecm)
print(f"RMSE: {RMSE:.6f}, MSE: {MSE:.6f}, R2: {R2:.6f}")

In [None]:
'''load the extrap results from the DEM model---predicted voltage'''
'''define a func to extend I and V, then,using the ECM model to predict the OCV(extrapolation part)'''
def OCP_DEM_ECM(GITT_current, GITT_voltage, vol_pred, Ts, OCV_ini):
    OCP_decm = []   
    OCP_dem = []
    ext_len = len(vol_pred.columns) - 1         # the length of the extrapolation prediction
    for i in range(len(GITT_current)):
        I_test = GITT_current[i]
        V_test = GITT_voltage[i]
        I_extend = np.concatenate([I_test, np.zeros(ext_len)])  
        pred_V = vol_pred.iloc[i, 1:].tolist()
        V_extend = np.concatenate([V_test, pred_V])
        Uoc_vec,U2_vec = RFF_KF_3U(I_extend, V_extend ,Ts,OCV_ini)
        OCP_decm.append(Uoc_vec[-1])
    for j in range(len(vol_pred)):
        pred_V = vol_pred.iloc[j, 1:].tolist()
        OCP_dem.append(pred_V[-1])
    return OCP_decm, OCP_dem
OCP_decm, OCP_dem= OCP_DEM_ECM(GITT_current, GITT_voltage, vol_pred, Ts, OCV_ini)


In [None]:
'''flatten data to the 1 demiension, then save the data'''
OCP_ecm = np.array(OCP_ecm).flatten().tolist()  
OCP_decm = np.array(OCP_decm).flatten().tolist()

'''define a func to save the data'''
def save_csv (OCP_test,OCP_dem, OCP_conv,OCP_ecm,OCP_decm):
    with open (save_path +file_name +'.csv','w') as fc:  # name changalbe (default 1h)
        writer = csv.writer(fc)
        writer.writerow(['OCP_test', 'OCP_dem', 'OCP_conv','OCP_ecm','OCP_decm'])  
        for i in range(len(OCP_test)):
            writer.writerow([OCP_test[i], OCP_dem[i], OCP_conv[i],OCP_ecm[i],OCP_decm[i]])  
    fc.close()
    print('csv file has been saved')
save_csv(OCP_test, OCP_dem, OCP_conv, OCP_ecm, OCP_decm)