In [1]:
# EKF + Coulomb counting SoC estimator
# This script includes the EKF & Coulomb-counting formulas in comments
# and stores intermediate EKF variables in the dataframe.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# ---------------------------
# Load data
# ---------------------------
csv_file = "ev_true_soc_output.csv"    # update path if needed
df = pd.read_csv(csv_file)

# ensure expected columns exist
required_cols = ["time_s","current_A","temperature_C","voltage_V","True_SoC"]
for c in required_cols:
    if c not in df.columns:
        raise ValueError(f"Missing column: {c}")

# ---------------------------
# Battery / simulation params
# ---------------------------
capacity_Ah = 50.0                   # battery capacity used to generate dataset (Ah)
Q_coulombs = capacity_Ah * 3600.0    # convert Ah to Coulombs
dt = 1.0                              # time step in seconds (dataset is 1s)
N = len(df)

# ---------------------------
# 1) Coulomb counting (recompute True SoC)
# Formula: SOC_k = SOC_{k-1} - (I_{k-1} * dt) / Q_coulombs
# ---------------------------
soc_cc = np.zeros(N)
soc_cc[0] = 0.995   # same initial SOC used to generate dataset
for k in range(1, N):
    I = df.loc[k-1, "current_A"]
    soc_cc[k] = soc_cc[k-1] - (I * dt) / Q_coulombs
    soc_cc[k] = np.clip(soc_cc[k], 0.0, 1.0)

df["True_SoC_Recomputed"] = np.round(soc_cc, 6)

# ---------------------------
# 2) Fit an OCV(SOC) curve (nonlinear measurement model)
# Measurement model used in EKF:
#   z_k = OCV(x_k) - R_int * I_k + v_k
# where x_k = SOC_k
# ---------------------------
soc_for_fit = df["True_SoC"].values
volt_for_fit = df["voltage_V"].values

poly_deg = 3
p_coeff = np.polyfit(soc_for_fit, volt_for_fit, poly_deg)

def ocv_from_soc(soc):
    soc = np.clip(np.asarray(soc), 0.0, 1.0)
    return np.polyval(p_coeff, soc)

p_der = np.polyder(p_coeff)
def docv_dsoc(soc):
    soc = np.clip(np.asarray(soc), 0.0, 1.0)
    return np.polyval(p_der, soc)

# Estimate R_int from residuals: residual = voltage - OCV(soc) â‰ˆ -R_int * I + noise
residuals = volt_for_fit - ocv_from_soc(soc_for_fit)
I_arr = df["current_A"].values.reshape(-1,1)
A = -I_arr
R_int_est, *_ = np.linalg.lstsq(A, residuals.reshape(-1,1), rcond=None)
R_int = float(R_int_est[0])
if R_int <= 0 or np.isnan(R_int):
    R_int = 0.05

print(f"Fitted OCV poly deg {poly_deg}. Estimated R_internal = {R_int:.5f} ohm")

# ---------------------------
# 3) EKF setup for 1D state (SOC)
# State: x_k = SOC_k
# Process (time update/prediction):
#   x_pred = x_{k-1} - (I_{k-1} * dt) / Q_coulombs
#   P_pred = P_{k-1} + Q_proc
#
# Measurement (correction):
#   z_k = OCV(x_k) - R_int * I_k + v_k
# Jacobian: H = d(OCV)/d(SOC) evaluated at x_pred
# Innovation: y = z_k - (OCV(x_pred) - R_int * I_k)
# Kalman gain (scalar): K = (P_pred * H) / (H * P_pred * H + R_meas)
# Update:
#   x_upd = x_pred + K * y
#   P_upd = (1 - K * H) * P_pred
# ---------------------------

# allocate arrays and dataframe columns for intermediate values
x_est = np.zeros(N)
P = np.zeros(N)
x_pred_arr = np.zeros(N)
P_pred_arr = np.zeros(N)
K_arr = np.zeros(N)
y_arr = np.zeros(N)
h_x_arr = np.zeros(N)
H_arr = np.zeros(N)

# initial state
x_est[0] = 0.995
P[0] = 1e-6

# noise covariances (tuneable)
Q_proc = 1e-8
R_meas = 1e-4

# EKF loop
for k in range(1, N):
    # time update (prediction)
    I_prev = df.loc[k-1, "current_A"]
    x_pred = x_est[k-1] - (I_prev * dt) / Q_coulombs
    x_pred = np.clip(x_pred, 0.0, 1.0)
    P_pred = P[k-1] + Q_proc

    # measurement update (correction)
    z_meas = df.loc[k, "voltage_V"]
    I_k = df.loc[k, "current_A"]
    h_x = ocv_from_soc(x_pred) - R_int * I_k
    H = docv_dsoc(x_pred)   # derivative dOCV/dSOC

    S = H * P_pred * H + R_meas
    K = 0.0
    if S > 0:
        K = (P_pred * H) / S

    y = z_meas - h_x
    x_upd = x_pred + K * y
    x_upd = np.clip(x_upd, 0.0, 1.0)
    P_upd = (1 - K * H) * P_pred
    if P_upd < 0:
        P_upd = abs(P_upd)

    # save values
    x_pred_arr[k] = x_pred
    P_pred_arr[k] = P_pred
    h_x_arr[k] = h_x
    H_arr[k] = H
    K_arr[k] = K
    y_arr[k] = y

    x_est[k] = x_upd
    P[k] = P_upd

# save into dataframe (rounded for readability)
df["True_SoC_Recomputed"] = np.round(soc_cc, 6)
df["EKF_SoC"] = np.round(x_est, 6)
df["EKF_P"] = P
df["EKF_x_pred"] = np.round(x_pred_arr, 6)
df["EKF_P_pred"] = np.round(P_pred_arr, 8)
df["EKF_h_x"] = np.round(h_x_arr, 6)
df["EKF_H"] = np.round(H_arr, 6)
df["EKF_K"] = np.round(K_arr, 8)
df["EKF_innovation"] = np.round(y_arr, 6)

# ---------------------------
# 4) Save outputs
# ---------------------------
out_csv = "ev_coulomb_and_ekf_output.csv"
df.to_csv(out_csv, index=False)
print(f"Saved CSV: {out_csv}")

# Try to save to Excel; if openpyxl missing, fall back to CSV (already saved)
out_xlsx = "ev_coulomb_and_ekf_output.xlsx"
try:
    df.to_excel(out_xlsx, index=False)
    print(f"Saved Excel: {out_xlsx}")
except Exception as e:
    print(f"Excel save failed (likely openpyxl not installed). Exception: {e}")
    print("CSV is available and can be opened with Excel.")

# ---------------------------
# 5) Optional plot: SoC comparison
# ---------------------------
plt.figure(figsize=(10,4))
plt.plot(df["time_s"], df["True_SoC"], label="Provided True_SoC", linewidth=1)
plt.plot(df["time_s"], df["True_SoC_Recomputed"], "--", label="Coulomb Counting (recomputed)", linewidth=1)
plt.plot(df["time_s"], df["EKF_SoC"], label="EKF_SOC", linewidth=1)
plt.xlabel("time (s)")
plt.ylabel("SoC")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# Print first 12 rows for quick inspection
df_out = df[["time_s","current_A","voltage_V","True_SoC","True_SoC_Recomputed","EKF_SoC","EKF_K"]]
print(df_out.head(12).to_string(index=False))


FileNotFoundError: [Errno 2] No such file or directory: 'ev_true_soc_output.csv'