# Import libraries

In [1]:
import re
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from typing import Optional, Tuple, Dict, List
import plotly.graph_objects as go


# Ac side Hycon

## Read csv 

In [21]:
def load_hycon_hybrid_fast(file_path,
                           sep=';',
                           encoding='latin1',
                           strip_trailing_hyphen=True,
                           parse_timestamp_utc=False,
                           use_pyarrow=False):
    file_path = Path(file_path)

    # --- Read only the 6th and 8th lines quickly (no pandas) ---
    row6 = row8 = None
    with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
        for i, line in enumerate(f, start=1):  # 1-based line index
            if i == 6:
                row6 = line.rstrip('\n')
            elif i == 8:
                row8 = line.rstrip('\n')
                break  # we have both; stop reading early
    if row6 is None or row8 is None:
        raise ValueError("File is too short or missing required header lines 6 and 8.")

    # Split into header cells and combine cell-wise
    h6_parts = [p.strip() for p in row6.split(sep)]
    h8_parts = [p.strip() for p in row8.split(sep)]

    # Align lengths (pad/truncate) to avoid shape errors
    width = max(len(h6_parts), len(h8_parts))
    if len(h6_parts) < width:
        h6_parts += [''] * (width - len(h6_parts))
    if len(h8_parts) < width:
        h8_parts += [''] * (width - len(h8_parts))

    combined = [f"{a} {b}".strip() for a, b in zip(h6_parts, h8_parts)]
    # Clean whitespace
    combined = [pd.Series([c]).str.replace(r'\s+', ' ', regex=True).iloc[0].strip() for c in combined]
    if strip_trailing_hyphen:
        combined = [pd.Series([c]).str.replace(r'\s*-\s*$', '', regex=True).iloc[0] for c in combined]

    # First column becomes standard 'Timestamp'
    if combined:
        combined[0] = 'Timestamp'

    # --- Read data once with the prepared header ---
    read_kwargs = dict(filepath_or_buffer=file_path,sep=sep,skiprows=8,header=None,names=combined,encoding=encoding,low_memory=False,)
    if use_pyarrow:
        try:
            read_kwargs['dtype_backend'] = 'pyarrow'
        except TypeError:
            # Older pandas without dtype_backend support
            pass
    df = pd.read_csv(**read_kwargs)
    # Parse Timestamp (fast-path) — coerce invalid to NaT, optional UTC
    df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce', utc=parse_timestamp_utc)
    
    return df

# --- Usage ---
file_path = r'C:\Data_analysis\Decci\Capacity_test\Hybrid_Controller\LogDataFast_2024-02-28.csv'
df = load_hycon_hybrid_fast(
    file_path,
    parse_timestamp_utc=False,  # set True if you prefer UTC-aware timestamps
    use_pyarrow=False           # set True if you have PyArrow & want lower memory
)
print(df.shape)
display(df.head())

(84240, 46)


Unnamed: 0,Timestamp,OpStt,HybridSysState,HybridSysStateTrans,LoadPwrAtTot kW,LoadPwrRtTot kvar,PvBatMin kW,PvBatMax kW,SignalValidity,BatPwrAtTot kW,...,PwrAtLoLimPrlIn kW,PwrAtLoLimSrlIn kW,PwrAtRateMaxIn MW/min,PwrAtSpntOfsIn kW,PwrAtSpntPrlIn kW,PwrAtSpntSrlIn kW,PwrAtUpLimPrlIn kW,PwrAtUpLimSrlIn kW,PwrRtSpntIn kvar,VtgSpntIn V
0,2024-02-28 00:00:04,5: Operate,3: Grid MOP,0: Idle,48.08,-4.89,-24500.0,24500.0,0000 0000 0000 0000,0.0,...,150.0,600.0,7.28,0.0,0.0,500.0,100.0,600.0,0.0,400.0
1,2024-02-28 00:00:05,5: Operate,3: Grid MOP,0: Idle,48.06,-0.47,-24500.0,24500.0,0000 0000 0000 0000,0.0,...,150.0,600.0,7.28,0.0,0.0,500.0,100.0,600.0,0.0,400.0
2,2024-02-28 00:00:06,5: Operate,3: Grid MOP,0: Idle,49.86,11.08,-24500.0,24500.0,0000 0000 0000 0000,0.0,...,150.0,600.0,7.28,0.0,0.0,500.0,100.0,600.0,0.0,400.0
3,2024-02-28 00:00:07,5: Operate,3: Grid MOP,0: Idle,49.83,6.0,-24500.0,24500.0,0000 0000 0000 0000,0.0,...,150.0,600.0,7.28,0.0,0.0,500.0,100.0,600.0,0.0,400.0
4,2024-02-28 00:00:08,5: Operate,3: Grid MOP,0: Idle,50.74,15.36,-24500.0,24500.0,0000 0000 0000 0000,0.0,...,150.0,600.0,7.28,0.0,0.0,500.0,100.0,600.0,0.0,400.0


### Calculation Ac capacity

In [28]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import warnings
from typing import Optional, Dict, List, Tuple

# -------------------------------------------------------------------
# Helper Functions
# -------------------------------------------------------------------

def _sanitize_time_col(d: pd.DataFrame, time_col: str) -> pd.DataFrame:
    """Helper to clean and sort the time column."""
    d = d.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    d = d.dropna(subset=[time_col])
    d = d.sort_values(time_col).reset_index(drop=True)
    return d


def _check_regular_cadence(dt_s: pd.Series,
                            expected_sampling_seconds: Optional[float],
                            rtol: float = 0.01,
                            atol: float = 0.5) -> Dict[str, float]:
    """Helper to check if time cadence is regular."""
    x = dt_s.dropna().to_numpy(dtype=float)
    x = x[x > 0]
    if x.size == 0:
        return dict(is_regular=True, dt_median=np.nan, frac_off=0.0, reason="insufficient intervals")

    dt_median = float(np.median(x))

    if expected_sampling_seconds is None or np.isnan(expected_sampling_seconds):
        tol = max(abs(dt_median) * rtol, atol)
        frac_off = float((np.abs(x - dt_median) > tol).mean())
        is_regular = frac_off <= 0.05
        return dict(is_regular=is_regular, dt_median=dt_median, frac_off=frac_off, reason="no expected cadence")

    tol = max(abs(expected_sampling_seconds) * rtol, atol)
    frac_off = float((np.abs(x - expected_sampling_seconds) > tol).mean())
    is_regular = frac_off <= 0.05 and (abs(dt_median - expected_sampling_seconds) <= tol)
    return dict(is_regular=is_regular, dt_median=dt_median, frac_off=frac_off, reason="expected cadence")

# -------------------------------------------------------------------
# Integrated Main Function
# -------------------------------------------------------------------

def compute_nominal_from_poi_plotly(
    df: pd.DataFrame,
    discharge_start,
    discharge_end,
    P_nom_kW: float,
    tol_pct: float = 5.0,
    required_minutes: Optional[float] = None,
    time_col: str = "Timestamp",
    power_col: str = "PoiPwrAt kW",
    title: str = "Discharge KPI at POI (Plotly)",
    drop_duplicate_timestamps: bool = True,
    
    # Ramp trim for discharge KPI window
    ramp_trim_seconds_discharge: int = 0,

    # Optional RTE parameters
    charge_start: Optional[str] = None,
    charge_end: Optional[str] = None,
    sampling_seconds: Optional[float] = None,
    discharge_positive: bool = True,
    ramp_trim_seconds_charge: int = 0,
    warn_irregular: bool = True,
    rte_min_charge_kWh: float = 0.01
) -> Dict:
    """
    Calculates Discharge KPI, optional RTE, total energy, and 
    cumulative energy (Calc-PoiEgyMtr) from charge_start to discharge_end.
    """
    ts_start_raw = pd.to_datetime(discharge_start, errors="raise")
    ts_end_raw = pd.to_datetime(discharge_end, errors="raise")
    if ts_end_raw <= ts_start_raw:
        raise ValueError("discharge_end must be after discharge_start")

    # --- KPI Window Trimming ---
    trim_dis = pd.to_timedelta(int(ramp_trim_seconds_discharge), unit="s")
    ts_start = ts_start_raw + trim_dis
    ts_end = ts_end_raw - trim_dis
    if ts_end <= ts_start:
        raise ValueError("Ramp trim too large for discharge window.")

    if time_col not in df.columns:
        raise KeyError(f"Time column '{time_col}' not in df")
    if power_col not in df.columns:
        raise KeyError(f"Power column '{power_col}' not in df")

    # --- Data Sanitization ---
    d = df[[time_col, power_col]].copy()
    d = _sanitize_time_col(d, time_col)

    if drop_duplicate_timestamps:
        d = d.groupby(time_col, as_index=False)[power_col].mean()

    if d.empty:
        raise ValueError("No valid rows after parsing timestamps.")

    # --- Total timeframe energy & avg power (Full File) ---
    d_full = d.copy()
    d_full["dt_s"] = d_full[time_col].diff().dt.total_seconds()
    d_full = d_full[d_full["dt_s"] > 0].copy()
    d_full["E_kWh_slice"] = d_full[power_col] * (d_full["dt_s"] / 3600.0)
    total_energy_kWh = float(d_full["E_kWh_slice"].sum())
    total_duration_h = float((d_full[time_col].max() - d_full[time_col].min()).total_seconds()) / 3600.0
    avg_power_kW = total_energy_kWh / total_duration_h if total_duration_h > 0 else np.nan

    # --- KPI discharge window selection --------------------------------
    dKPI = d[(d[time_col] >= ts_start) & (d[time_col] <= ts_end)].copy()
    if dKPI.empty:
        raise ValueError("No samples inside discharge window.")
    if len(dKPI) < 2:
        raise ValueError("Only one sample in KPI window.")

    dKPI["dt_s"] = dKPI[time_col].diff().dt.total_seconds()
    # Handle first dt_s
    if not dKPI.empty and np.isnan(dKPI.loc[dKPI.index[0], "dt_s"]) and len(dKPI) >= 2:
        dKPI.loc[dKPI.index[0], "dt_s"] = (
            dKPI.loc[dKPI.index[1], time_col] - dKPI.loc[dKPI.index[0], time_col]
        ).total_seconds()
    dKPI = dKPI[dKPI["dt_s"] > 0].copy()

    dKPI["E_kWh_slice"] = dKPI[power_col] * (dKPI["dt_s"] / 3600.0)
    actual_energy_kWh = float(dKPI["E_kWh_slice"].sum())

    band_low = float(P_nom_kW) * (1 - tol_pct / 100.0)
    band_high = float(P_nom_kW) * (1 + tol_pct / 100.0)
    dKPI["in_band"] = (dKPI[power_col] >= band_low) & (dKPI[power_col] <= band_high)

    inband_time_s_cum = float(dKPI.loc[dKPI["in_band"], "dt_s"].sum())
    E_nom_cum_kWh = float(P_nom_kW) * (inband_time_s_cum / 3600.0)

    # --- Continuous Segment Logic ---
    segs: List[Tuple[pd.Timestamp, pd.Timestamp, float]] = []
    in_seg = False
    acc_s = 0.0
    seg_start = None
    for i, row in dKPI.iterrows():
        if row["in_band"]:
            if not in_seg:
                in_seg = True
                seg_start = row[time_col]
                acc_s = row["dt_s"]
            else:
                acc_s += row["dt_s"]
        else:
            if in_seg:
                seg_end = row[time_col]
                segs.append((seg_start, seg_end, acc_s))
                in_seg = False
    if in_seg:
        seg_end = dKPI.iloc[-1][time_col]
        segs.append((seg_start, seg_end, acc_s))

    longest_s = max([s for *_, s in segs], default=0.0)
    E_nom_cont_kWh = float(P_nom_kW) * (longest_s / 3600.0)

    # --- Compliance ---
    compliance_cont = compliance_cum = None
    required_str = None
    if required_minutes is not None:
        req_s = float(required_minutes) * 60.0
        compliance_cont = longest_s >= req_s
        compliance_cum = inband_time_s_cum >= req_s
        required_str = f"{required_minutes:.0f} min"

    compliance_fraction = None
    window_time_s = float((ts_end - ts_start).total_seconds())
    if window_time_s > 0:
        compliance_fraction = inband_time_s_cum / window_time_s

    # --- Initialize Optional Outputs ---
    warnings_list: List[str] = []
    E_charge_kWh = E_discharge_kWh = np.nan
    RTE_pct = np.nan
    rte_method = None
    dt_expected = float(sampling_seconds) if sampling_seconds is not None else np.nan
    dt_median_charge = np.nan
    dt_median_discharge = np.nan
    df_calc_poi_egymtr = None # <-- NEW: Initialize cumulative energy df

    # --- Optional RTE & NEW Cumulative Energy Block ---------------------
    if (charge_start is not None) and (charge_end is not None):
        
        # --- NEW: Calc-PoiEgyMtr Calculation ---
        # Window is from raw charge_start to raw discharge_end
        c_start_raw = pd.to_datetime(charge_start, errors="raise")
        # ts_end_raw is already defined from discharge_end
        
        d_ce = d[(d[time_col] >= c_start_raw) & (d[time_col] <= ts_end_raw)].copy()
        
        if d_ce.empty:
            warnings_list.append("No data in Calc-PoiEgyMtr window (charge_start to discharge_end).")
            df_calc_poi_egymtr = pd.DataFrame(columns=[time_col, "Calc-PoiEgy", "Calc-PoiEgyMtr"])
        else:
            d_ce["dt_s"] = d_ce[time_col].diff().dt.total_seconds()
            if np.isnan(d_ce.loc[d_ce.index[0], "dt_s"]) and len(d_ce) >= 2:
                d_ce.loc[d_ce.index[0], "dt_s"] = (
                    d_ce.loc[d_ce.index[1], time_col] - d_ce.loc[d_ce.index[0], time_col]
                ).total_seconds()
            
            d_ce = d_ce[d_ce["dt_s"] > 0].copy()
            d_ce["Calc-PoiEgy"] = d_ce[power_col] * (d_ce["dt_s"] / 3600.0)
            d_ce["Calc-PoiEgyMtr"] = d_ce["Calc-PoiEgy"].cumsum()
            df_calc_poi_egymtr = d_ce[[time_col, "Calc-PoiEgy", "Calc-PoiEgyMtr"]].copy()

        # --- Existing RTE Calculation ---
        dd = d.copy() # Use the full sanitized data
        P = dd[power_col].to_numpy(dtype=float)
        if not discharge_positive:
            P = -P
        dd["P"] = P

        trim_ch = pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s")
        c_start_trimmed = pd.to_datetime(charge_start, errors="raise") + trim_ch
        c_end_trimmed = pd.to_datetime(charge_end, errors="raise") - trim_ch

        # Note: ts_start and ts_end are the *trimmed* discharge window
        d_charge = dd[(dd[time_col] >= c_start_trimmed) & (dd[time_col] <= c_end_trimmed)].copy()
        d_dis = dd[(dd[time_col] >= ts_start) & (dd[time_col] <= ts_end)].copy()

        for subset, name in [(d_charge, "charge"), (d_dis, "discharge")]:
            if not subset.empty:
                subset["dt_s"] = subset[time_col].diff().dt.total_seconds()
                if np.isnan(subset.loc[subset.index[0], "dt_s"]) and len(subset) >= 2:
                    subset.loc[subset.index[0], "dt_s"] = (
                        subset.loc[subset.index[1], time_col] - subset.loc[subset.index[0], time_col]
                    ).total_seconds()
                subset.dropna(subset=["dt_s"], inplace=True)
                subset = subset[subset["dt_s"] > 0]
                if name == "charge":
                    d_charge = subset
                else:
                    d_dis = subset

        reg_charge = _check_regular_cadence(
            d_charge["dt_s"] if not d_charge.empty else pd.Series([], dtype=float),
            expected_sampling_seconds=sampling_seconds)
        reg_dis = _check_regular_cadence(
            d_dis["dt_s"] if not d_dis.empty else pd.Series([], dtype=float),
            expected_sampling_seconds=sampling_seconds)
        
        dt_median_charge = reg_charge["dt_median"]
        dt_median_discharge = reg_dis["dt_median"]

        P_charge_star = (-d_charge["P"]).clip(lower=0.0).to_numpy() if not d_charge.empty else np.array([], dtype=float)
        P_dis_star = d_dis["P"].clip(lower=0.0).to_numpy() if not d_dis.empty else np.array([], dtype=float)

        if sampling_seconds is not None and reg_charge["is_regular"] and reg_dis["is_regular"]:
            dt_h = float(sampling_seconds) / 3600.0
            E_charge_kWh = float(P_charge_star.sum() * dt_h)
            E_discharge_kWh = float(P_dis_star.sum() * dt_h)
            rte_method = f"constant_dt({int(sampling_seconds)}s)"
        else:
            rte_method = "trapezoid_dt"
            msg = "Detected irregular cadence for RTE; using trapezoidal integration."
            if warn_irregular:
                warnings.warn(msg)
            warnings_list.append(msg)
            if not d_charge.empty:
                t_c = d_charge[time_col].astype("int64").to_numpy() / 1e9
                E_charge_kWh = float(np.trapz(P_charge_star, x=t_c) / 3600.0)
            else:
                E_charge_kWh = 0.0
            if not d_dis.empty:
                t_d = d_dis[time_col].astype("int64").to_numpy() / 1e9
                E_discharge_kWh = float(np.trapz(P_dis_star, x=t_d) / 3600.0)
            else:
                E_discharge_kWh = 0.0

        if E_charge_kWh > float(rte_min_charge_kWh):
            RTE_pct = 100.0 * E_discharge_kWh / E_charge_kWh
        else:
            RTE_pct = np.nan
            warnings_list.append(f"E_charge ({E_charge_kWh:.6f}) ≤ guard ({rte_min_charge_kWh:.6f}); RTE=NaN.")

    # --- Plotly Figure ------------------------------------------------
    fig = go.Figure()
    fig.add_trace(go.Scatter(x=dKPI[time_col], y=dKPI[power_col],
                                mode="lines", name=f"{power_col}",
                                line=dict(color="#1f77b4", width=2)))
    fig.add_trace(go.Scatter(x=[dKPI[time_col].min(), dKPI[time_col].max()],
                                y=[band_low, band_low], mode="lines", name=f"−{tol_pct:.0f}% band",
                                line=dict(color="#2ca02c", width=1.5, dash="dash")))
    fig.add_trace(go.Scatter(x=[dKPI[time_col].min(), dKPI[time_col].max()],
                                y=[band_high, band_high], mode="lines", name=f"+{tol_pct:.0f}% band",
                                line=dict(color="#2ca02c", width=1.5, dash="dash")))

    shapes = []
    for (s, e, dur_s) in segs:
        shapes.append(dict(type="rect", xref="x", yref="y",
                            x0=s, x1=e, y0=band_low, y1=band_high,
                            fillcolor="rgba(46,204,113,0.18)", line=dict(width=0),
                            layer="below"))
    fig.update_layout(shapes=shapes)

    subtitle = [
        f"P_nom={P_nom_kW:.0f} kW, tol=±{tol_pct:.1f}%",
        f"Actual={actual_energy_kWh:.1f} kWh",
        f"Nom(cum)={E_nom_cum_kWh:.1f} kWh",
        f"Nom(cont)={E_nom_cont_kWh:.1f} kWh"
    ]
    if required_minutes is not None:
        subtitle.append(f"Req={required_str}, pass(cont)={bool(compliance_cont)}, pass(cum)={bool(compliance_cum)}")
    if not np.isnan(RTE_pct):
        subtitle.append(f"RTE={RTE_pct:.2f}%")

    fig.update_layout(
        title=dict(text=f"{title}<br><sup>{' | '.join(subtitle)}</sup>", x=0.01),
        xaxis_title="Time",
        yaxis_title="Power (kW)",
        hovermode="x unified",
        template="plotly_white",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0.01),
        margin=dict(l=60, r=30, t=80, b=40),
    )

    # --- Summary table ------------------------------------------------
    metrics = [
        "Window start", "Window end",
        "P_nom (kW)", "Tolerance (%)", "Band low (kW)", "Band high (kW)",
        "Actual energy (kWh)",
        "In-band time (continuous, min)",
        "In-band time (cumulative, min)",
        "Nominal energy (continuous, kWh)",
        "Nominal energy (cumulative, kWh)",
        "Compliance required duration",
        "Compliance (continuous)",
        "Compliance (cumulative)",
        "Compliance (fraction)",
        "Ramp trim (discharge, s)",
        # Total stats
        "Total timeframe start",
        "Total timeframe end",
        "Total duration (h)",
        "Total energy (kWh)",
        "Average power (kW)"
    ]
    values = [
        ts_start, ts_end,
        float(P_nom_kW), float(tol_pct), float(band_low), float(band_high),
        round(actual_energy_kWh, 3),
        round(longest_s/60.0, 3),
        round(inband_time_s_cum/60.0, 3),
        round(E_nom_cont_kWh, 3),
        round(E_nom_cum_kWh, 3),
        required_str,
        compliance_cont,
        compliance_cum,
        round(compliance_fraction, 4) if compliance_fraction is not None else None,
        int(ramp_trim_seconds_discharge),
        d_full[time_col].min(),
        d_full[time_col].max(),
        round(total_duration_h, 3),
        round(total_energy_kWh, 3),
        round(avg_power_kW, 3) if not np.isnan(avg_power_kW) else None
    ]

    if (charge_start is not None) and (charge_end is not None):
        metrics += [
            "Charge window start", "Charge window end",
            "Ramp trim (charge, s)",
            "Sampling interval expected (s)",
            "Median dt (charge, s)", "Median dt (discharge, s)",
            "RTE method",
            "E_charge (kWh)", "E_discharge (kWh)", "RTE (%)"
        ]
        values += [
            pd.to_datetime(charge_start) + pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s"),
            pd.to_datetime(charge_end) - pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s"),
            int(ramp_trim_seconds_charge),
            dt_expected if not np.isnan(dt_expected) else None,
            None if np.isnan(dt_median_charge) else round(float(dt_median_charge), 6),
            None if np.isnan(dt_median_discharge) else round(float(dt_median_discharge), 6),
            rte_method,
            None if np.isnan(E_charge_kWh) else round(float(E_charge_kWh), 3),
            None if np.isnan(E_discharge_kWh) else round(float(E_discharge_kWh), 3),
            None if np.isnan(RTE_pct) else round(float(RTE_pct), 3),
        ]

    summary = pd.DataFrame({"metric": metrics, "value": values})

    # --- Final Output Dictionary ---
    return {
        "window_start": ts_start,
        "window_end": ts_end,
        "P_nom_kW": float(P_nom_kW),
        "tol_pct": float(tol_pct),
        "band_low_kW": float(band_low),
        "band_high_kW": float(band_high),
        "actual_energy_kWh": float(actual_energy_kWh),
        "inband_time_minutes_continuous": float(longest_s/60.0),
        "inband_time_minutes_cumulative": float(inband_time_s_cum/60.0),
        "nominal_energy_kWh_continuous": float(E_nom_cont_kWh),
        "nominal_energy_kWh_cumulative": float(E_nom_cum_kWh),
        "compliance_continuous": compliance_cont,
        "compliance_cumulative": compliance_cum,
        "compliance_fraction": compliance_fraction,
        "total_timeframe_start": d_full[time_col].min(),
        "total_timeframe_end": d_full[time_col].max(),
        "total_duration_h": float(total_duration_h),
        "total_energy_kWh": float(total_energy_kWh),
        "average_power_kW": None if np.isnan(avg_power_kW) else float(avg_power_kW),
        "rte_pct": None if np.isnan(RTE_pct) else float(RTE_pct),
        "E_charge_kWh": None if np.isnan(E_charge_kWh) else float(E_charge_kWh),
        "E_discharge_kWh": None if np.isnan(E_discharge_kWh) else float(E_discharge_kWh),
        "warnings": warnings_list,
        "figure": fig,
        "summary_table": summary,
        "df_calc_poi_egymtr": df_calc_poi_egymtr, # <-- NEW: Added to output
    }

In [29]:
# -------------------------------------------------------------------
# Example Usage
# -------------------------------------------------------------------
# Create a dummy DataFrame for demonstration
# In your real use, you would load your data (e.g., from CSV)
try:
    # Try to use the existing 'df' if it's in memory
    if df is None or df.empty:
        raise NameError("df not defined")
except NameError:
    print("Creating dummy 'df' for example usage...")
    
    # Generate time range for charge + discharge
    t_charge_start = pd.to_datetime("2024-02-28 10:30:00")
    t_charge_end = pd.to_datetime("2024-02-28 13:53:00")
    t_discharge_start = pd.to_datetime("2024-02-28 14:59:30")
    t_discharge_end = pd.to_datetime("2024-02-28 15:58:40")

    # Full time range from first charge to last discharge
    t_full_start = t_charge_start
    t_full_end = t_discharge_end
    
    # Create timestamps at 1-second intervals
    timestamps = pd.date_range(start=t_full_start, end=t_full_end, freq="1s")
    n = len(timestamps)
    power = np.zeros(n)
    
    # Create charge block
    idx_charge_start = timestamps.get_loc(t_charge_start)
    idx_charge_end = timestamps.get_loc(t_charge_end)
    power[idx_charge_start:idx_charge_end] = -25000 # Charging power
    
    # Create discharge block
    idx_dis_start = timestamps.get_loc(t_discharge_start)
    idx_dis_end = timestamps.get_loc(t_discharge_end)
    power[idx_dis_start:idx_dis_end] = 24500 # Discharging at P_nom
    
    # Add some noise
    power += np.random.normal(0, 50, n)
    
    df = pd.DataFrame({
        "Timestamp": timestamps,
        "PoiPwrAt kW": power
    })
    print("Dummy 'df' created.")


# --- Call the integrated function ---
out = compute_nominal_from_poi_plotly(
    df,

    # Discharge KPI window
    discharge_start="2024-02-28 14:59:30",
    discharge_end="2024-02-28 15:58:40",
    P_nom_kW=24500,
    tol_pct=1.0,
    required_minutes=58,
    time_col="Timestamp",
    power_col="PoiPwrAt kW",
    title="Integrated Discharge KPI & Energy Analysis",
    ramp_trim_seconds_discharge=0, 

    # Optional: Charge window for RTE & Calc-PoiEgyMtr
    charge_start="2024-02-28 10:30:00",
    charge_end="2024-02-28 13:53:00",
    sampling_seconds=1, 
    discharge_positive=True,
    ramp_trim_seconds_charge=0,
    warn_irregular=True
)

# --- View outputs ---
print("\n--- Summary Table ---")
# To display well in non-notebook environments:
with pd.option_context('display.max_rows', None, 'display.width', 1000):
    print(out["summary_table"])

print("\n--- Warnings ---")
if not out["warnings"]:
    print("No warnings.")
else:
    for w in out["warnings"]:
        print(f"Warning: {w}")

# --- NEW: Access the cumulative energy DataFrame ---
print("\n--- Cumulative Energy (Calc-PoiEgyMtr) ---")
df_energy_mtr = out["df_calc_poi_egymtr"]
if df_energy_mtr is not None:
    print(f"Calculation window: {df_energy_mtr['Timestamp'].min()} to {df_energy_mtr['Timestamp'].max()}")
    print("Tail of cumulative energy dataframe:")
    print(df_energy_mtr.tail())
else:
    print("Cumulative energy was not calculated (charge_start or charge_end not provided).")


# --- Interactive KPI plot ---
print("\nShowing interactive KPI plot...")
# In a script, this might open in a browser. 
# In a notebook (like Jupyter), it will display inline.
out["figure"].show()


--- Summary Table ---
                              metric                value
0                       Window start  2024-02-28 14:59:30
1                         Window end  2024-02-28 15:58:40
2                         P_nom (kW)              24500.0
3                      Tolerance (%)                  1.0
4                      Band low (kW)              24255.0
5                     Band high (kW)              24745.0
6                Actual energy (kWh)            24166.391
7     In-band time (continuous, min)               59.183
8     In-band time (cumulative, min)               59.183
9   Nominal energy (continuous, kWh)            24166.528
10  Nominal energy (cumulative, kWh)            24166.528
11      Compliance required duration               58 min
12           Compliance (continuous)                 True
13           Compliance (cumulative)                 True
14             Compliance (fraction)               1.0003
15          Ramp trim (discharge, s)             

In [32]:
import plotly.graph_objects as go
import pandas as pd
import warnings

def plot_calc_poi_egymtr(df_energy: pd.DataFrame, 
                         time_col: str = "Timestamp",
                         title: str = "Cumulative Energy (Calc-PoiEgyMtr)") -> go.Figure:
    """
    Plots the cumulative and incremental energy from the Calc-PoiEgyMtr dataframe.
    (Corrected version with valid Plotly layout properties)
    """
    if df_energy is None or df_energy.empty:
        warnings.warn("Cannot plot: 'df_calc_poi_egymtr' is empty or None.")
        return go.Figure()

    fig = go.Figure()

    # Plot cumulative energy (Calc-PoiEgyMtr)
    fig.add_trace(go.Scatter(
        x=df_energy[time_col],
        y=df_energy["Calc-PoiEgyMtr"],
        mode="lines",
        name="Cumulative Energy (kWh)",
        line=dict(color="#FF5733", width=2.5)
    ))

    # Plot incremental energy (Calc-PoiEgy)
    fig.add_trace(go.Scatter(
        x=df_energy[time_col],
        y=df_energy["Calc-PoiEgy"],
        mode="lines",
        name="Interval Energy (kWh)",
        line=dict(color="#337AFF", width=1, dash="dot"),
        yaxis="y2" # Plot on a secondary y-axis
    ))

    # --- CORRECTED LAYOUT ---
    fig.update_layout(
        title=title,
        xaxis_title="Time",
        
        # Corrected yaxis definition
        yaxis=dict(
            title=dict(
                text="<b>Cumulative Energy (kWh)</b>",
                font=dict(color="#FF5733") # Use 'font' inside 'title'
            ),
            tickfont=dict(color="#FF5733")
        ),
        
        # Corrected yaxis2 definition
        yaxis2=dict(
            title=dict(
                text="<b>Interval Energy (kWh)</b>",
                font=dict(color="#337AFF") # Use 'font' inside 'title'
            ),
            tickfont=dict(color="#337AFF"),
            overlaying="y",
            side="right",
            showgrid=False,
        ),
        
        template="plotly_white",
        hovermode="x unified",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0.01)
    )
    return fig

In [33]:
# 1. Extract the dataframe from the output dictionary
df_energy_mtr = out["df_calc_poi_egymtr"]

# 2. Call the (new) corrected plotting function
fig_energy = plot_calc_poi_egymtr(
    df_energy_mtr, 
    title="Cumulative Energy from Charge Start to Discharge End"
)

# 3. Show the plot
fig_energy.show()

# DC Side Hymon

## Integrated Dc side code

In [5]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
import re
from typing import Optional, Dict, List, Tuple, Set
from pathlib import Path


# =======================================================================
# SECTION 1: DC DATA LOADING FUNCTION
# =======================================================================

def load_and_prep_dc_data(file_path, sep=';', dayfirst=False) -> pd.DataFrame:
    """
    Loads and prepares the DC-side CSV file, handling timezone normalization.
    (This is your DC-side loading script)
    """
    if not Path(file_path).exists():
        raise FileNotFoundError(f"DC file not found: {file_path}")
        
    df = pd.read_csv(file_path, sep=sep, dtype=str, engine='python')

    df['Date'] = df['Date'].str.strip()
    df['Time'] = df['Time'].str.strip()
    df['TZ'] = df['TZ'].astype(str).str.strip()

    df['Datetime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'],
                                    errors='coerce',
                                    dayfirst=dayfirst)

    def extract_tz_hours(tz_str):
        if pd.isna(tz_str): return pd.NA
        m = re.search(r'([+-]?\d{1,3})', tz_str)
        if not m: return pd.NA
        try: return int(m.group(1))
        except Exception: return pd.NA

    df['TZ_hours'] = df['TZ'].apply(extract_tz_hours)

    mask = df['TZ_hours'].notna() & df['Datetime'].notna()
    df.loc[mask, 'Datetime'] = df.loc[mask, 'Datetime'] + pd.to_timedelta(df.loc[mask, 'TZ_hours'], unit='h')

    df = df.drop(columns=['Date', 'Time', 'TZ', 'TZ_hours'])
    df = df.set_index('Datetime').sort_index()
    return df

# =======================================================================
# SECTION 2: CONSOLIDATED HELPER FUNCTIONS
# =======================================================================

def _check_cadence(dt_s: pd.Series,
                   expected_seconds: Optional[float],
                   rtol: float = 0.02,
                   atol: float = 0.5) -> dict:
    """Consolidated cadence checker for DC analysis."""
    x = dt_s.dropna().to_numpy(dtype=float)
    x = x[x > 0]
    if x.size == 0:
        return dict(is_regular=False, dt_median=np.nan, dt_p95=np.nan, frac_off=1.0)
    
    dt_median = float(np.median(x))
    dt_p95 = float(np.quantile(x, 0.95))
    
    if expected_seconds is None or np.isnan(expected_seconds):
        tol = max(abs(dt_median) * rtol, atol)
        frac_off = float((np.abs(x - dt_median) > tol).mean())
        is_regular = frac_off <= 0.05
    else:
        tol = max(abs(expected_seconds) * rtol, atol)
        frac_off = float((np.abs(x - expected_seconds) > tol).mean())
        is_regular = (frac_off <= 0.05) and (abs(dt_median - expected_seconds) <= tol)
        
    return dict(is_regular=is_regular, dt_median=dt_median, dt_p95=dt_p95, frac_off=frac_off)

# --- DC-Side Numeric Cleaning Helpers ---

def _strip_spaces(s: str) -> str:
    if not isinstance(s, str): return s
    return s.replace('\u00A0', '').replace('\u202F', '').replace(' ', '').strip()

def _classify_value(s: str):
    if s is None or s == '': return 'other'
    has_comma = ',' in s
    has_dot = '.' in s
    if has_comma and has_dot:
        return 'EU' if s.rfind(',') > s.rfind('.') else 'US'
    if has_comma: return 'comma_only'
    if has_dot: return 'dot_only'
    if re.fullmatch(r'[+-]?\d+', s): return 'int'
    return 'other'

def _convert_value(s: str, preference: str):
    if s is None or (isinstance(s, float) and pd.isna(s)): return np.nan
    if not isinstance(s, str): return s
    s0 = _strip_spaces(s)
    if s0 == '' or s0.lower() in ('nan', 'none', 'null'): return np.nan
    kind = _classify_value(s0)
    if kind == 'EU': # e.g., 1.234,56
        try: return float(s0.replace('.', '').replace(',', '.'))
        except Exception: return np.nan
    if kind == 'US': # e.g., 1,234.56
        try: return float(s0.replace(',', ''))
        except Exception: return np.nan
    if kind == 'comma_only':
        if preference == 'EU': # comma as decimal
            try: return float(s0.replace(',', '.'))
            except: return np.nan
        if preference == 'US': # comma as thousands
            try: return float(s0.replace(',', ''))
            except: return np.nan
        last_grp = s0.split(',')[-1]
        if last_grp.isdigit() and len(last_grp) == 3 and len(s0.split(',')) >= 2:
            try: return float(s0.replace(',', ''))
            except: return np.nan
        try: return float(s0.replace(',', '.'))
        except: return np.nan
    if kind == 'dot_only':
        if preference == 'US': # dot as decimal
            try: return float(s0)
            except: return np.nan
        if preference == 'EU': # dot as thousands
            try: return float(s0.replace('.', ''))
            except: return np.nan
        last_grp = s0.split('.')[-1]
        if last_grp.isdigit() and len(last_grp) == 3 and len(s0.split('.')) >= 2:
            try: return float(s0.replace('.', ''))
            except: return np.nan
        try: return float(s0)
        except: return np.nan
    if kind == 'int':
        try: return float(s0)
        except Exception: return np.nan
    return np.nan

def convert_mixed_numeric_columns(df_in: pd.DataFrame, exclude: set = None, verbose: bool = True) -> pd.DataFrame:
    """Robustly converts string columns to numeric, handling EU/US formats."""
    df_out = df_in.copy()
    exclude = set() if exclude is None else set(exclude)
    diagnostics = {}
    for col in df_out.columns:
        if col in exclude:
            continue
        if pd.api.types.is_numeric_dtype(df_out[col]):
            continue
        s = df_out[col].astype(str)
        if not s.str.contains(r'\d', regex=True).any():
            continue
        s_clean = s.map(_strip_spaces)
        kinds = s_clean.map(_classify_value)
        eu_votes = int((kinds == 'EU').sum())
        us_votes = int((kinds == 'US').sum())
        preference = 'EU' if eu_votes > us_votes else ('US' if us_votes > eu_votes else None)
        converted = s_clean.map(lambda x: _convert_value(x, preference))
        valid_ratio = np.isfinite(converted).sum() / max(len(converted), 1)
        if valid_ratio < 0.1:
            diagnostics[col] = f"Skipped (valid_ratio={valid_ratio:.2f} < 0.1)"
            continue
        df_out[col] = pd.Series(converted, index=df_out.index, dtype="Float64")
        diagnostics[col] = f"Converted (pref={preference}, valid_ratio={valid_ratio:.2f})"
    if verbose and diagnostics:
        print("\n[DC Numeric conversion diagnostics]")
        for c, info in diagnostics.items():
            print(f"- {c}: {info}")
    return df_out


# =======================================================================
# SECTION 3: THE INTEGRATED DC-ONLY ANALYZER CLASS
# =======================================================================

class DcCapacityTestAnalyzer:
    """
    Integrates all DC capacity test analyses into a single class.
    
    1. Instantiate with a DataFrame and a config dictionary.
    2. Call .run_analysis()
    3. Call .show_results() and .show_plots()
    """
    def __init__(self, master_config: dict, df_dc: pd.DataFrame):
        self.config = master_config
        self.df_dc = df_dc.copy()
        
        # --- Result properties ---
        self.dfs_by_device = None
        self.dc_rte_summary = None
        self.dc_rte_system_totals = None
        self.dc_system_cumulative_energy = None
        self.dc_system_soc = None
        
        # --- Plot properties ---
        self.dc_cumulative_energy_plot = None
        self.dc_soc_plot = None
        
        print("DcCapacityTestAnalyzer initialized.")
        print(f"DC data: {self.df_dc.shape[0]} rows.")

    def run_analysis(self):
        """Runs all DC analyses."""
        print("\n--- Starting Full DC Analysis ---")

        # --- 1. DC Analysis (Prep) ---
        try:
            print("Running DC-side data preparation...")
            self._clean_and_partition_dc_df()
            print(f"DC data partitioned into {len(self.dfs_by_device)} devices.")
        except Exception as e:
            print(f"*** ERROR in DC prep: {e} ***")
            warnings.warn(f"DC data prep failed: {e}")
            return

        # --- 2. DC Analysis (RTE) ---
        try:
            print("Running DC-side RTE analysis...")
            self._run_dc_rte_analysis()
            print("DC-side RTE analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC RTE analysis: {e} ***")
            warnings.warn(f"DC RTE analysis failed: {e}")

        # --- 3. DC Analysis (Cumulative Energy) ---
        try:
            print("Running DC-side Cumulative Energy analysis...")
            self._run_dc_cumulative_energy_analysis()
            print("DC-side Cumulative Energy analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC Cumulative Energy analysis: {e} ***")
            warnings.warn(f"DC Cumulative Energy analysis failed: {e}")

        # --- 4. DC Analysis (SOC) ---
        try:
            print("Running DC-side SOC analysis...")
            self._run_dc_soc_analysis()
            print("DC-side SOC analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC SOC analysis: {e} ***")
            warnings.warn(f"DC SOC analysis failed: {e}")
            
        print("--- Full DC Analysis Complete ---")

    def _clean_and_partition_dc_df(self):
        """Runs DC scripts #2 (clean) and #3 (partition)."""
        dc_device_col = self.config['dc_device_col']
        exclude_cols = {dc_device_col}
        
        self.df_dc = convert_mixed_numeric_columns(self.df_dc, exclude=exclude_cols, verbose=True)
        
        dc_power_col = self.config['dc_power_col']
        dc_soc_col = self.config['dc_soc_col']
        wanted_cols = [dc_power_col, dc_soc_col]
        
        available_cols = [c for c in wanted_cols if c in self.df_dc.columns]
        
        if dc_device_col not in self.df_dc.columns:
             raise KeyError(f"Required column '{dc_device_col}' not found in DC df.")
        
        dfs_by_device = {}
        for dev, g in self.df_dc.groupby(dc_device_col, sort=False):
            g2 = g[available_cols].sort_index()
            g2 = g2.dropna(how='all', subset=available_cols)
            dfs_by_device[dev] = g2
            
        self.dfs_by_device = dfs_by_device

    def _run_dc_rte_analysis(self):
        """Runs DC script #4 (DC RTE)."""
        per_device_rows = []
        
        CHARGE_START = pd.Timestamp(self.config['charge_start'])
        CHARGE_END = pd.Timestamp(self.config['charge_end'])
        DISCHARGE_START = pd.Timestamp(self.config['discharge_start'])
        DISCHARGE_END = pd.Timestamp(self.config['discharge_end'])
        POWER_COL = self.config['dc_power_col']
        DISCHARGE_POSITIVE = self.config.get('dc_discharge_positive', True)
        P_EPS_KW = self.config.get('dc_p_eps_kw', 0.0)
        SAMPLING_SECONDS = self.config.get('sampling_seconds')
        RTE_MIN_CHARGE_KWH = self.config.get('rte_min_charge_kwh', 0.01)
        
        def _prep_window(g: pd.DataFrame) -> pd.DataFrame:
            if g.empty: return g
            g["dt_s"] = g.index.to_series().diff().dt.total_seconds()
            if len(g) >= 2 and pd.isna(g.iloc[0]["dt_s"]):
                g.iloc[0, g.columns.get_loc("dt_s")] = (g.index[1] - g.index[0]).total_seconds()
            g = g.dropna(subset=["dt_s"])
            g = g[g["dt_s"] > 0]
            return g

        for dev, d in self.dfs_by_device.items():
            if d.empty or POWER_COL not in d.columns:
                continue
            
            dd = d.sort_index().copy()
            P = pd.to_numeric(dd[POWER_COL], errors='coerce').fillna(0.0).to_numpy(dtype=float)
            if not DISCHARGE_POSITIVE:
                P = -P
            if P_EPS_KW > 0:
                P = np.where(np.abs(P) < P_EPS_KW, 0.0, P)
            dd["P"] = P

            d_charge = dd[(dd.index >= CHARGE_START) & (dd.index <= CHARGE_END)].copy()
            d_dis = dd[(dd.index >= DISCHARGE_START) & (dd.index <= DISCHARGE_END)].copy()

            d_charge = _prep_window(d_charge)
            d_dis = _prep_window(d_dis)

            reg_charge = _check_cadence(d_charge["dt_s"] if not d_charge.empty else pd.Series([], dtype=float), SAMPLING_SECONDS)
            reg_dis = _check_cadence(d_dis["dt_s"] if not d_dis.empty else pd.Series([], dtype=float), SAMPLING_SECONDS)

            P_charge_star = (-d_charge["P"]).clip(lower=0.0).to_numpy(dtype=float) if not d_charge.empty else np.array([], dtype=float)
            P_dis_star = (d_dis["P"]).clip(lower=0.0).to_numpy(dtype=float) if not d_dis.empty else np.array([], dtype=float)

            E_charge_kWh = 0.0
            E_discharge_kWh = 0.0
            
            if SAMPLING_SECONDS is not None and reg_charge["is_regular"] and reg_dis["is_regular"]:
                dt_h = float(SAMPLING_SECONDS) / 3600.0
                E_charge_kWh = float(P_charge_star.sum() * dt_h) if P_charge_star.size else 0.0
                E_discharge_kWh = float(P_dis_star.sum() * dt_h) if P_dis_star.size else 0.0
                rte_method = f"constant_dt({int(SAMPLING_SECONDS)}s)"
            else:
                rte_method = "trapezoid_dt"
                if not d_charge.empty and P_charge_star.size:
                    t_c = d_charge.index.view("int64").to_numpy() / 1e9
                    E_charge_kWh = float(np.trapz(P_charge_star, x=t_c) / 3600.0)
                if not d_dis.empty and P_dis_star.size:
                    t_d = d_dis.index.view("int64").to_numpy() / 1e9
                    E_discharge_kWh = float(np.trapz(P_dis_star, x=t_d) / 3600.0)

            eta_dc = np.nan
            if E_charge_kWh > float(RTE_MIN_CHARGE_KWH):
                eta_dc = E_discharge_kWh / E_charge_kWh
            
            per_device_rows.append({
                "Device": dev, "E_dc_in_kWh": E_charge_kWh, "E_dc_out_kWh": E_discharge_kWh,
                "eta_dc": eta_dc, "method": rte_method,
                "dt_med_charge": reg_charge["dt_median"], "dt_p95_charge": reg_charge["dt_p95"],
                "dt_med_dis": reg_dis["dt_median"], "dt_p95_dis": reg_dis["dt_p95"],
            })

        self.dc_rte_summary = pd.DataFrame(per_device_rows).sort_values("Device", kind="stable")
        
        system_totals = {}
        system_totals["Total_E_dc_in_kWh"] = float(self.dc_rte_summary["E_dc_in_kWh"].sum())
        system_totals["Total_E_dc_out_kWh"] = float(self.dc_rte_summary["E_dc_out_kWh"].sum())
        system_totals["eta_dc_system"] = (
            system_totals["Total_E_dc_out_kWh"] / system_totals["Total_E_dc_in_kWh"]
            if system_totals["Total_E_dc_in_kWh"] > 0 else np.nan
        )
        self.dc_rte_system_totals = system_totals

    def _run_dc_cumulative_energy_analysis(self):
        """Runs DC script #5 (Cumulative Energy)."""
        
        def _prep_device_power(df: pd.DataFrame, cfg: dict) -> pd.Series:
            POWER_COL = cfg['dc_power_col']
            if df is None or df.empty or (POWER_COL not in df.columns):
                return pd.Series(dtype=float)
            P = pd.to_numeric(df[POWER_COL], errors="coerce").fillna(0.0).astype(float)
            P_kW = P / 1000.0 if cfg.get('dc_is_power_in_watts', False) else P
            if not cfg.get('dc_discharge_positive', True):
                P_kW = -P_kW
            P_EPS_KW = cfg.get('dc_p_eps_kw', 0.0)
            if P_EPS_KW and P_EPS_KW > 0:
                P_kW = P_kW.where(P_kW.abs() >= P_EPS_KW, 0.0)
            P_kW.name = "P_kW"
            return P_kW.sort_index()

        def _compute_dt_seconds(idx: pd.DatetimeIndex) -> np.ndarray:
            if len(idx) == 0: return np.array([], dtype=float)
            dt_s = np.diff(idx.view("int64")) / 1e9
            if len(dt_s) == 0: return np.array([0.0], dtype=float)
            first = dt_s[0]
            return np.concatenate([[first], dt_s])

        def _cumulative_energy_from_power(P_kW: pd.Series, cfg: dict) -> pd.Series:
            if P_kW.empty:
                return pd.Series(index=P_kW.index, data=[], dtype=float, name="E_system_cum_kWh")
            idx = P_kW.index
            dt_s_series = pd.Series(index=idx, data=_compute_dt_seconds(idx))
            
            SAMPLING_SECONDS = cfg.get('sampling_seconds')
            reg = _check_cadence(dt_s_series, SAMPLING_SECONDS)
            
            if SAMPLING_SECONDS is not None and reg["is_regular"]:
                dt_h = float(SAMPLING_SECONDS) / 3600.0
                E = np.cumsum(P_kW.values) * dt_h
            else:
                dt_h = dt_s_series.values / 3600.0
                P = P_kW.values.astype(float)
                inc = P * dt_h # Left-rectangle method
                E = np.cumsum(inc)
                
            return pd.Series(index=idx, data=E, name="E_system_cum_kWh")
        
        per_device_power = {}
        for dev, df in self.dfs_by_device.items():
            P_kW = _prep_device_power(df, self.config)
            if not P_kW.empty:
                per_device_power[dev] = P_kW

        pow_df = pd.DataFrame(per_device_power).sort_index()
        P_system_kW = pow_df.fillna(0.0).sum(axis=1)
        
        self.dc_system_cumulative_energy = _cumulative_energy_from_power(P_system_kW, self.config)

        plt.style.use("seaborn-v0_8-whitegrid")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(self.dc_system_cumulative_energy.index, self.dc_system_cumulative_energy.values, color="tab:blue", label="System")
        ax.set_title("System Cumulative DC Energy (signed, kWh)")
        ax.set_xlabel("Time")
        ax.set_ylabel("Energy (kWh)")
        ax.axhline(0, color="k", lw=0.8)
        ax.legend()
        plt.tight_layout()
        self.dc_cumulative_energy_plot = fig
        plt.close(fig) 

    def _run_dc_soc_analysis(self):
        """Runs DC script #6 (SOC analysis)."""
        SOC_COL = self.config['dc_soc_col']
        IS_SOC_PERCENT = self.config.get('dc_is_soc_percent', True)
        CLIP_MIN, CLIP_MAX = (0.0, 100.0) if IS_SOC_PERCENT else (0.0, 1.0)
        
        soc_by_device = {}
        for dev, df in self.dfs_by_device.items():
            if df is None or df.empty or (SOC_COL not in df.columns):
                continue
            s = pd.to_numeric(df[SOC_COL], errors="coerce").astype(float)
            s = s.clip(CLIP_MIN, CLIP_MAX)
            soc_by_device[dev] = s.sort_index()

        if not soc_by_device:
            warnings.warn("No device had a valid SOC series. Skipping SOC analysis.")
            return

        soc_df_union = pd.DataFrame(soc_by_device).sort_index()
        
        first_valid_per_dev = {c: soc_df_union[c].first_valid_index() for c in soc_df_union.columns}
        last_valid_per_dev  = {c: soc_df_union[c].last_valid_index() for c in soc_df_union.columns}
        drop_cols = [c for c in soc_df_union.columns if first_valid_per_dev[c] is None or last_valid_per_dev[c] is None]
        
        if drop_cols:
            soc_df_union = soc_df_union.drop(columns=drop_cols)
            first_valid_per_dev = {c: soc_df_union[c].first_valid_index() for c in soc_df_union.columns}
            last_valid_per_dev  = {c: soc_df_union[c].last_valid_index() for c in soc_df_union.columns}

        if soc_df_union.empty:
            warnings.warn("All devices dropped due to missing SOC. Skipping SOC analysis.")
            return

        common_start = max(first_valid_per_dev.values())
        common_end = min(last_valid_per_dev.values())
        if (common_start is None) or (common_end is None) or (common_start >= common_end):
            warnings.warn("No overlapping time window where all devices have SOC. Skipping SOC analysis.")
            return

        soc_df = soc_df_union[(soc_df_union.index >= common_start) & (soc_df_union.index <= common_end)]
        soc_df_ff = soc_df.ffill().clip(CLIP_MIN, CLIP_MAX)

        N = soc_df_ff.shape[1]
        self.dc_system_soc = soc_df_ff.sum(axis=1) / float(N)
        self.dc_system_soc.name = "SOC_total"

        plt.style.use("seaborn-v0_8-whitegrid")
        fig, ax = plt.subplots(figsize=(10, 4.5))
        ax.plot(self.dc_system_soc.index, self.dc_system_soc.values, color="tab:blue", label=f"System SOC (N={N})")
        ax.set_title("System State of Charge (Simple Mean Across Devices)")
        ax.set_xlabel("Time")
        ax.set_ylabel("SOC (%)" if IS_SOC_PERCENT else "SOC (fraction)")
        ax.set_ylim(CLIP_MIN - 1 if IS_SOC_PERCENT else -0.02, CLIP_MAX + 1 if IS_SOC_PERCENT else 1.02)
        ax.legend()
        plt.tight_layout()
        self.dc_soc_plot = fig
        plt.close(fig) 

    def show_results(self):
        """Prints all summary tables to the console."""
        print("\n\n" + "="*50)
        print("          BESS DC CAPACITY TEST RESULTS")
        print("="*50 + "\n")

        if self.dc_rte_summary is not None:
            print("--- DC-Side Per-Device RTE Summary ---")
            pd.set_option('display.float_format', lambda v: f"{v:,.6f}")
            print(self.dc_rte_summary)
            print("\n--- DC-Side System RTE Summary ---")
            for k, v in self.dc_rte_system_totals.items():
                print(f"- {k}: {v:,.6f}")
        else:
            print("\n\n--- DC-Side Analysis Not Run ---")
            
    def show_plots(self):
        """Displays all generated plots."""
        print("\n\n" + "="*50)
        print("          BESS DC CAPACITY TEST PLOTS")
        print("="*50 + "\n")
            
        if self.dc_cumulative_energy_plot:
            print("Displaying DC-Side System Cumulative Energy Plot...")
            self.dc_cumulative_energy_plot.show()
        else:
            print("DC-Side Cumulative Energy Plot not available.")
            
        if self.dc_soc_plot:
            print("Displaying DC-Side System SOC Plot...")
            self.dc_soc_plot.show()
        else:
            print("DC-Side System SOC Plot not available.")
            



In [6]:
# =======================================================================
# SECTION 4: EXAMPLE USAGE
# (This is the only part you need to edit)
# =======================================================================

if __name__ == "__main__":
    
    # --- 1. DEFINE FILE PATHS AND CONFIG ---
    
    DC_FILE_PATH = r'C:\Data_analysis\Decci\Capacity_test\Hybrid_Monitoring\data-2024-02-28(MVPS).csv'

    master_config = {
        # --- Time Windows (Critical) ---
        "charge_start": "2024-02-28 11:53:00",
        "charge_end": "2024-02-28 14:53:00",
        "discharge_start": "2024-02-28 14:55:30",
        "discharge_end": "2024-02-28 16:02:40",
        
        # --- General Settings ---
        "sampling_seconds": 1,
        "rte_min_charge_kwh": 0.01,
        
        # --- DC-Side Config (Check column names) ---
        "dc_device_col": "Device",
        "dc_power_col": "DcTotWatt",
        "dc_soc_col": "Bat.SOCTot",
        "dc_discharge_positive": True,
        "dc_is_power_in_watts": False, # Set True if 'DcTotWatt' is in W, False if kW
        "dc_p_eps_kw": 0.0,
        "dc_is_soc_percent": True,
    }

    # --- 2. LOAD DATA (OR CREATE DUMMY DATA) ---
    
    try:
        # Try to load real DC data
        print(f"Loading DC data from: {DC_FILE_PATH}")
        df_dc = load_and_prep_dc_data(DC_FILE_PATH)
        print(f"Loaded & Prepared DC DataFrame with {df_dc.shape} rows/cols.")
    except Exception as e:
        print(f"Warning: Could not load DC data ({e}). Creating dummy DC data.")
        # Create Dummy DC Data
        dc_time = pd.date_range("2024-02-28 10:00:00", "2024-02-28 17:00:00", freq="1s")
        def create_device_data(name, time, charge_p, discharge_p, soc_start, soc_end_charge, soc_end_dis):
            P = np.zeros(len(time))
            SOC = np.zeros(len(time))
            ch_mask = (time >= pd.Timestamp(master_config['charge_start'])) & (time < pd.Timestamp(master_config['charge_end']))
            dis_mask = (time >= pd.Timestamp(master_config['discharge_start'])) & (time < pd.Timestamp(master_config['discharge_end']))
            P[ch_mask] = -charge_p
            P[dis_mask] = discharge_p
            SOC[:] = soc_start
            SOC[ch_mask] = np.linspace(soc_start, soc_end_charge, ch_mask.sum())
            SOC[dis_mask] = np.linspace(soc_end_charge, soc_end_dis, dis_mask.sum())
            SOC[time > pd.Timestamp(master_config['discharge_end'])] = soc_end_dis
            return pd.DataFrame({
                "Datetime": time,
                master_config['dc_device_col']: name,
                master_config['dc_power_col']: P + np.random.normal(0, 50, len(time)),
                master_config['dc_soc_col']: SOC + np.random.normal(0, 0.1, len(time)),
            })
        df_dc1 = create_device_data("Inverter_1", dc_time, 13000, 12500, 10, 95, 15)
        df_dc2 = create_device_data("Inverter_2", dc_time, 13000, 12500, 10.2, 94.8, 15.1)
        df_dc = pd.concat([df_dc1, df_dc2]).set_index("Datetime").sort_index()


    # --- 3. RUN ANALYSIS ---
    
    # Check if dataframe is loaded before proceeding
    if 'df_dc' in locals():
        analyzer = DcCapacityTestAnalyzer(master_config, df_dc)
        
        # Run all DC calculations
        analyzer.run_analysis()

        # --- 4. SHOW RESULTS ---
        
        # Print all summary tables
        analyzer.show_results()

        # Show all plots
        analyzer.show_plots()
    else:
        print("\n*** Analysis failed: DC Data could not be loaded or created. ***")

Loading DC data from: C:\Data_analysis\Decci\Capacity_test\Hybrid_Monitoring\data-2024-02-28(MVPS).csv
Loaded & Prepared DC DataFrame with (146521, 32) rows/cols.
DcCapacityTestAnalyzer initialized.
DC data: 146521 rows.

--- Starting Full DC Analysis ---
Running DC-side data preparation...

[DC Numeric conversion diagnostics]
- DcTotVolt: Converted (pref=US, valid_ratio=1.00)
- DcTotAmp: Converted (pref=None, valid_ratio=1.00)
- DcTotWatt: Converted (pref=None, valid_ratio=1.00)
- GridTotAPhsA: Converted (pref=None, valid_ratio=1.00)
- GridTotAPhsB: Converted (pref=None, valid_ratio=1.00)
- GridTotAPhsC: Converted (pref=None, valid_ratio=1.00)
- InvTotVA: Converted (pref=None, valid_ratio=1.00)
- InvTotW: Converted (pref=None, valid_ratio=1.00)
- InvTotVAr: Converted (pref=None, valid_ratio=1.00)
- GridVoltagePhsAB: Converted (pref=None, valid_ratio=1.00)
- GridVoltagePhsBC: Converted (pref=None, valid_ratio=1.00)
- GridVoltagePhsCA: Converted (pref=None, valid_ratio=1.00)
- GridFrequ


FigureCanvasAgg is non-interactive, and thus cannot be shown


FigureCanvasAgg is non-interactive, and thus cannot be shown



In [7]:
import plotly.graph_objects as go

# Extract the data from the analyzer
dc_energy_data = analyzer.dc_system_cumulative_energy

if dc_energy_data is not None and not dc_energy_data.empty:
    fig_dc_energy = go.Figure()

    # Add the cumulative energy trace
    fig_dc_energy.add_trace(go.Scatter(
        x=dc_energy_data.index,
        y=dc_energy_data.values,
        mode='lines',
        name='System DC Cumulative Energy (kWh)',
        line=dict(color='blue', width=2)
    ))

    fig_dc_energy.update_layout(
        title="System Cumulative DC Energy (signed, kWh)",
        xaxis_title="Time",
        yaxis_title="Energy (kWh)",
        template="plotly_white",
        hovermode="x unified"
    )

    fig_dc_energy.show()
else:
    print("No DC cumulative energy data to plot.")

In [8]:
import plotly.graph_objects as go

# Extract the data from the analyzer
dc_soc_data = analyzer.dc_system_soc

if dc_soc_data is not None and not dc_soc_data.empty:
    fig_dc_soc = go.Figure()

    # Get SOC unit for the label
    is_percent = analyzer.config.get('dc_is_soc_percent', True)
    soc_unit = "(%)" if is_percent else "(fraction)"
    n_devices = len(analyzer.dfs_by_device) # Get number of devices

    # Add the SOC trace
    fig_dc_soc.add_trace(go.Scatter(
        x=dc_soc_data.index,
        y=dc_soc_data.values,
        mode='lines',
        name=f'System SOC (Avg. N={n_devices})',
        line=dict(color='green', width=2)
    ))

    fig_dc_soc.update_layout(
        title="System State of Charge (Simple Mean Across Devices)",
        xaxis_title="Time",
        yaxis_title=f"SOC {soc_unit}",
        template="plotly_white",
        hovermode="x unified"
    )

    fig_dc_soc.show()
else:
    print("No DC SOC data to plot (check for overlapping time windows).")

## Non integrated Dc 

In [14]:
# =======================================================================
# SECTION 1: CONSOLIDATED HELPER FUNCTIONS
# (Combined from all your scripts)
# =======================================================================

def _sanitize_time_col(d: pd.DataFrame, time_col: str) -> pd.DataFrame:
    """Helper to clean and sort the time column."""
    d = d.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    d = d.dropna(subset=[time_col])
    d = d.sort_values(time_col).reset_index(drop=True)
    return d


def _check_cadence(dt_s: pd.Series,
                   expected_seconds: Optional[float],
                   rtol: float = 0.02,
                   atol: float = 0.5) -> dict:
    """
    Consolidated cadence checker for both AC and DC analysis.
    Checks median, p95, and fraction of outliers.
    """
    x = dt_s.dropna().to_numpy(dtype=float)
    x = x[x > 0]
    if x.size == 0:
        return dict(is_regular=False, dt_median=np.nan, dt_p95=np.nan, frac_off=1.0)
    
    dt_median = float(np.median(x))
    dt_p95 = float(np.quantile(x, 0.95))
    
    if expected_seconds is None or np.isnan(expected_seconds):
        # Use median as the reference if no expectation
        tol = max(abs(dt_median) * rtol, atol)
        frac_off = float((np.abs(x - dt_median) > tol).mean())
        is_regular = frac_off <= 0.05 # 5% or less are outliers
    else:
        # Use expected as the reference
        tol = max(abs(expected_seconds) * rtol, atol)
        frac_off = float((np.abs(x - expected_seconds) > tol).mean())
        # Regular if <5% outliers AND median is very close to expected
        is_regular = (frac_off <= 0.05) and (abs(dt_median - expected_seconds) <= tol)
        
    return dict(is_regular=is_regular, dt_median=dt_median, dt_p95=dt_p95, frac_off=frac_off)


# --- DC-Side Numeric Cleaning Helpers (from your script) ---

def _strip_spaces(s: str) -> str:
    if not isinstance(s, str):
        return s
    return s.replace('\u00A0', '').replace('\u202F', '').replace(' ', '').strip()

def _classify_value(s: str):
    if s is None or s == '':
        return 'other'
    has_comma = ',' in s
    has_dot = '.' in s
    if has_comma and has_dot:
        return 'EU' if s.rfind(',') > s.rfind('.') else 'US'
    if has_comma:
        return 'comma_only'
    if has_dot:
        return 'dot_only'
    if re.fullmatch(r'[+-]?\d+', s):
        return 'int'
    return 'other'

def _convert_value(s: str, preference: str):
    if s is None or (isinstance(s, float) and pd.isna(s)):
        return np.nan
    if not isinstance(s, str):
        return s
    s0 = _strip_spaces(s)
    if s0 == '' or s0.lower() in ('nan', 'none', 'null'):
        return np.nan
    kind = _classify_value(s0)
    if kind == 'EU': # e.g., 1.234,56
        try: return float(s0.replace('.', '').replace(',', '.'))
        except Exception: return np.nan
    if kind == 'US': # e.g., 1,234.56
        try: return float(s0.replace(',', ''))
        except Exception: return np.nan
    if kind == 'comma_only':
        if preference == 'EU': # comma as decimal
            try: return float(s0.replace(',', '.'))
            except: return np.nan
        if preference == 'US': # comma as thousands
            try: return float(s0.replace(',', ''))
            except: return np.nan
        last_grp = s0.split(',')[-1]
        if last_grp.isdigit() and len(last_grp) == 3 and len(s0.split(',')) >= 2:
            try: return float(s0.replace(',', ''))
            except: return np.nan
        try: return float(s0.replace(',', '.'))
        except: return np.nan
    if kind == 'dot_only':
        if preference == 'US': # dot as decimal
            try: return float(s0)
            except: return np.nan
        if preference == 'EU': # dot as thousands
            try: return float(s0.replace('.', ''))
            except: return np.nan
        last_grp = s0.split('.')[-1]
        if last_grp.isdigit() and len(last_grp) == 3 and len(s0.split('.')) >= 2:
            try: return float(s0.replace('.', ''))
            except: return np.nan
        try: return float(s0)
        except: return np.nan
    if kind == 'int':
        try: return float(s0)
        except Exception: return np.nan
    return np.nan

def convert_mixed_numeric_columns(df_in: pd.DataFrame, exclude: set = None, verbose: bool = True) -> pd.DataFrame:
    """Robustly converts string columns to numeric, handling EU/US formats."""
    df_out = df_in.copy()
    exclude = set() if exclude is None else set(exclude)
    diagnostics = {}
    for col in df_out.columns:
        if col in exclude:
            continue
        if pd.api.types.is_numeric_dtype(df_out[col]):
            continue
        s = df_out[col].astype(str)
        if not s.str.contains(r'\d', regex=True).any():
            continue
        s_clean = s.map(_strip_spaces)
        kinds = s_clean.map(_classify_value)
        eu_votes = int((kinds == 'EU').sum())
        us_votes = int((kinds == 'US').sum())
        preference = 'EU' if eu_votes > us_votes else ('US' if us_votes > eu_votes else None)
        converted = s_clean.map(lambda x: _convert_value(x, preference))
        valid_ratio = np.isfinite(converted).sum() / max(len(converted), 1)
        if valid_ratio < 0.1:
            diagnostics[col] = f"Skipped (valid_ratio={valid_ratio:.2f} < 0.1)"
            continue
        df_out[col] = pd.Series(converted, index=df_out.index, dtype="Float64")
        diagnostics[col] = f"Converted (pref={preference}, valid_ratio={valid_ratio:.2f})"
    if verbose and diagnostics:
        print("\n[DC Numeric conversion diagnostics]")
        for c, info in diagnostics.items():
            print(f"- {c}: {info}")
    return df_out


# =======================================================================
# SECTION 2: AC-SIDE ANALYSIS FUNCTION
# (From your first scripts, modified to use new _check_cadence)
# =======================================================================

def compute_nominal_from_poi_plotly(
    df: pd.DataFrame,
    discharge_start,
    discharge_end,
    P_nom_kW: float,
    tol_pct: float = 5.0,
    required_minutes: Optional[float] = None,
    time_col: str = "Timestamp",
    power_col: str = "PoiPwrAt kW",
    title: str = "Discharge KPI at POI (Plotly)",
    drop_duplicate_timestamps: bool = True,
    ramp_trim_seconds_discharge: int = 0,
    charge_start: Optional[str] = None,
    charge_end: Optional[str] = None,
    sampling_seconds: Optional[float] = None,
    discharge_positive: bool = True,
    ramp_trim_seconds_charge: int = 0,
    warn_irregular: bool = True,
    rte_min_charge_kWh: float = 0.01
) -> Dict:
    """Calculates AC-side KPI, RTE, and cumulative energy."""
    
    ts_start_raw = pd.to_datetime(discharge_start, errors="raise")
    ts_end_raw = pd.to_datetime(discharge_end, errors="raise")
    if ts_end_raw <= ts_start_raw:
        raise ValueError("discharge_end must be after discharge_start")

    trim_dis = pd.to_timedelta(int(ramp_trim_seconds_discharge), unit="s")
    ts_start = ts_start_raw + trim_dis
    ts_end = ts_end_raw - trim_dis
    if ts_end <= ts_start:
        raise ValueError("Ramp trim too large for discharge window.")

    if time_col not in df.columns:
        raise KeyError(f"Time column '{time_col}' not in df")
    if power_col not in df.columns:
        raise KeyError(f"Power column '{power_col}' not in df")

    d = df[[time_col, power_col]].copy()
    d = _sanitize_time_col(d, time_col)

    if drop_duplicate_timestamps:
        d = d.groupby(time_col, as_index=False)[power_col].mean()

    if d.empty:
        raise ValueError("No valid rows after parsing timestamps.")

    d_full = d.copy()
    d_full["dt_s"] = d_full[time_col].diff().dt.total_seconds()
    d_full = d_full[d_full["dt_s"] > 0].copy()
    d_full["E_kWh_slice"] = d_full[power_col] * (d_full["dt_s"] / 3600.0)
    total_energy_kWh = float(d_full["E_kWh_slice"].sum())
    total_duration_h = float((d_full[time_col].max() - d_full[time_col].min()).total_seconds()) / 3600.0
    avg_power_kW = total_energy_kWh / total_duration_h if total_duration_h > 0 else np.nan

    dKPI = d[(d[time_col] >= ts_start) & (d[time_col] <= ts_end)].copy()
    if dKPI.empty:
        raise ValueError("No samples inside discharge window.")
    if len(dKPI) < 2:
        raise ValueError("Only one sample in KPI window.")

    dKPI["dt_s"] = dKPI[time_col].diff().dt.total_seconds()
    if not dKPI.empty and np.isnan(dKPI.loc[dKPI.index[0], "dt_s"]) and len(dKPI) >= 2:
        dKPI.loc[dKPI.index[0], "dt_s"] = (
            dKPI.loc[dKPI.index[1], time_col] - dKPI.loc[dKPI.index[0], time_col]
        ).total_seconds()
    dKPI = dKPI[dKPI["dt_s"] > 0].copy()

    dKPI["E_kWh_slice"] = dKPI[power_col] * (dKPI["dt_s"] / 3600.0)
    actual_energy_kWh = float(dKPI["E_kWh_slice"].sum())

    band_low = float(P_nom_kW) * (1 - tol_pct / 100.0)
    band_high = float(P_nom_kW) * (1 + tol_pct / 100.0)
    dKPI["in_band"] = (dKPI[power_col] >= band_low) & (dKPI[power_col] <= band_high)

    inband_time_s_cum = float(dKPI.loc[dKPI["in_band"], "dt_s"].sum())
    E_nom_cum_kWh = float(P_nom_kW) * (inband_time_s_cum / 3600.0)

    segs: List[Tuple[pd.Timestamp, pd.Timestamp, float]] = []
    in_seg = False
    acc_s = 0.0
    seg_start = None
    for i, row in dKPI.iterrows():
        if row["in_band"]:
            if not in_seg:
                in_seg = True
                seg_start = row[time_col]
                acc_s = row["dt_s"]
            else:
                acc_s += row["dt_s"]
        else:
            if in_seg:
                seg_end = row[time_col]
                segs.append((seg_start, seg_end, acc_s))
                in_seg = False
    if in_seg:
        seg_end = dKPI.iloc[-1][time_col]
        segs.append((seg_start, seg_end, acc_s))

    longest_s = max([s for *_, s in segs], default=0.0)
    E_nom_cont_kWh = float(P_nom_kW) * (longest_s / 3600.0)

    compliance_cont = compliance_cum = None
    required_str = None
    if required_minutes is not None:
        req_s = float(required_minutes) * 60.0
        compliance_cont = longest_s >= req_s
        compliance_cum = inband_time_s_cum >= req_s
        required_str = f"{required_minutes:.0f} min"

    compliance_fraction = None
    window_time_s = float((ts_end - ts_start).total_seconds())
    if window_time_s > 0:
        compliance_fraction = inband_time_s_cum / window_time_s

    warnings_list: List[str] = []
    E_charge_kWh = E_discharge_kWh = np.nan
    RTE_pct = np.nan
    rte_method = None
    dt_expected = float(sampling_seconds) if sampling_seconds is not None else np.nan
    dt_median_charge = np.nan
    dt_median_discharge = np.nan
    df_calc_poi_egymtr = None 

    if (charge_start is not None) and (charge_end is not None):
        c_start_raw = pd.to_datetime(charge_start, errors="raise")
        
        d_ce = d[(d[time_col] >= c_start_raw) & (d[time_col] <= ts_end_raw)].copy()
        
        if d_ce.empty:
            warnings_list.append(f"No data in Calc-PoiEgyMtr window ({c_start_raw} to {ts_end_raw}).")
            df_calc_poi_egymtr = pd.DataFrame(columns=[time_col, "Calc-PoiEgy", "Calc-PoiEgyMtr"])
        else:
            d_ce["dt_s"] = d_ce[time_col].diff().dt.total_seconds()
            if np.isnan(d_ce.loc[d_ce.index[0], "dt_s"]) and len(d_ce) >= 2:
                d_ce.loc[d_ce.index[0], "dt_s"] = (
                    d_ce.loc[d_ce.index[1], time_col] - d_ce.loc[d_ce.index[0], time_col]
                ).total_seconds()
            
            d_ce = d_ce[d_ce["dt_s"] > 0].copy()
            d_ce["Calc-PoiEgy"] = d_ce[power_col] * (d_ce["dt_s"] / 3600.0)
            d_ce["Calc-PoiEgyMtr"] = d_ce["Calc-PoiEgy"].cumsum()
            df_calc_poi_egymtr = d_ce[[time_col, "Calc-PoiEgy", "Calc-PoiEgyMtr"]].copy()

        dd = d.copy() 
        P = dd[power_col].to_numpy(dtype=float)
        if not discharge_positive:
            P = -P
        dd["P"] = P

        trim_ch = pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s")
        c_start_trimmed = pd.to_datetime(charge_start, errors="raise") + trim_ch
        c_end_trimmed = pd.to_datetime(charge_end, errors="raise") - trim_ch

        d_charge = dd[(dd[time_col] >= c_start_trimmed) & (dd[time_col] <= c_end_trimmed)].copy()
        d_dis = dd[(dd[time_col] >= ts_start) & (dd[time_col] <= ts_end)].copy()

        for subset, name in [(d_charge, "charge"), (d_dis, "discharge")]:
            if not subset.empty:
                subset["dt_s"] = subset[time_col].diff().dt.total_seconds()
                if np.isnan(subset.loc[subset.index[0], "dt_s"]) and len(subset) >= 2:
                    subset.loc[subset.index[0], "dt_s"] = (
                        subset.loc[subset.index[1], time_col] - subset.loc[subset.index[0], time_col]
                    ).total_seconds()
                subset.dropna(subset=["dt_s"], inplace=True)
                subset = subset[subset["dt_s"] > 0]
                if name == "charge":
                    d_charge = subset
                else:
                    d_dis = subset

        # *** USE CONSOLIDATED CADENCE CHECKER ***
        reg_charge = _check_cadence(
            d_charge["dt_s"] if not d_charge.empty else pd.Series([], dtype=float),
            expected_seconds=sampling_seconds)
        reg_dis = _check_cadence(
            d_dis["dt_s"] if not d_dis.empty else pd.Series([], dtype=float),
            expected_seconds=sampling_seconds)
        
        dt_median_charge = reg_charge["dt_median"]
        dt_median_discharge = reg_dis["dt_median"]

        P_charge_star = (-d_charge["P"]).clip(lower=0.0).to_numpy() if not d_charge.empty else np.array([], dtype=float)
        P_dis_star = d_dis["P"].clip(lower=0.0).to_numpy() if not d_dis.empty else np.array([], dtype=float)

        if sampling_seconds is not None and reg_charge["is_regular"] and reg_dis["is_regular"]:
            dt_h = float(sampling_seconds) / 3600.0
            E_charge_kWh = float(P_charge_star.sum() * dt_h)
            E_discharge_kWh = float(P_dis_star.sum() * dt_h)
            rte_method = f"constant_dt({int(sampling_seconds)}s)"
        else:
            rte_method = "trapezoid_dt"
            msg = "Detected irregular cadence for AC-RTE; using trapezoidal integration."
            if warn_irregular:
                warnings.warn(msg)
            warnings_list.append(msg)
            if not d_charge.empty:
                t_c = d_charge[time_col].astype("int64").to_numpy() / 1e9
                E_charge_kWh = float(np.trapz(P_charge_star, x=t_c) / 3600.0)
            else:
                E_charge_kWh = 0.0
            if not d_dis.empty:
                t_d = d_dis[time_col].astype("int64").to_numpy() / 1e9
                E_discharge_kWh = float(np.trapz(P_dis_star, x=t_d) / 3600.0)
            else:
                E_discharge_kWh = 0.0

        if E_charge_kWh > float(rte_min_charge_kWh):
            RTE_pct = 100.0 * E_discharge_kWh / E_charge_kWh
        else:
            RTE_pct = np.nan
            warnings_list.append(f"E_charge ({E_charge_kWh:.6f}) ≤ guard ({rte_min_charge_kWh:.6f}); RTE=NaN.")

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=dKPI[time_col], y=dKPI[power_col],
                                mode="lines", name=f"{power_col}",
                                line=dict(color="#1f77b4", width=2)))
    fig.add_trace(go.Scatter(x=[dKPI[time_col].min(), dKPI[time_col].max()],
                                y=[band_low, band_low], mode="lines", name=f"−{tol_pct:.0f}% band",
                                line=dict(color="#2ca02c", width=1.5, dash="dash")))
    fig.add_trace(go.Scatter(x=[dKPI[time_col].min(), dKPI[time_col].max()],
                                y=[band_high, band_high], mode="lines", name=f"+{tol_pct:.0f}% band",
                                line=dict(color="#2ca0g02c", width=1.5, dash="dash")))

    shapes = []
    for (s, e, dur_s) in segs:
        shapes.append(dict(type="rect", xref="x", yref="y",
                            x0=s, x1=e, y0=band_low, y1=band_high,
                            fillcolor="rgba(46,204,113,0.18)", line=dict(width=0),
                            layer="below"))
    fig.update_layout(shapes=shapes)

    subtitle = [
        f"P_nom={P_nom_kW:.0f} kW, tol=±{tol_pct:.1f}%",
        f"Actual={actual_energy_kWh:.1f} kWh",
        f"Nom(cum)={E_nom_cum_kWh:.1f} kWh",
        f"Nom(cont)={E_nom_cont_kWh:.1f} kWh"
    ]
    if required_minutes is not None:
        subtitle.append(f"Req={required_str}, pass(cont)={bool(compliance_cont)}, pass(cum)={bool(compliance_cum)}")
    if not np.isnan(RTE_pct):
        subtitle.append(f"RTE={RTE_pct:.2f}%")

    fig.update_layout(
        title=dict(text=f"{title}<br><sup>{' | '.join(subtitle)}</sup>", x=0.01),
        xaxis_title="Time",
        yaxis_title="Power (kW)",
        hovermode="x unified",
        template="plotly_white",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0.01),
        margin=dict(l=60, r=30, t=80, b=40),
    )

    metrics = [
        "Window start", "Window end", "P_nom (kW)", "Tolerance (%)",
        "Actual energy (kWh)", "In-band time (continuous, min)", "In-band time (cumulative, min)",
        "Compliance required duration", "Compliance (continuous)", "Compliance (cumulative)",
        "Compliance (fraction)", "Ramp trim (discharge, s)", "Total timeframe start",
        "Total timeframe end", "Total duration (h)", "Total energy (kWh)", "Average power (kW)"
    ]
    values = [
        ts_start, ts_end, float(P_nom_kW), float(tol_pct),
        round(actual_energy_kWh, 3), round(longest_s/60.0, 3), round(inband_time_s_cum/60.0, 3),
        required_str, compliance_cont, compliance_cum,
        round(compliance_fraction, 4) if compliance_fraction is not None else None,
        int(ramp_trim_seconds_discharge), d_full[time_col].min(),
        d_full[time_col].max(), round(total_duration_h, 3),
        round(total_energy_kWh, 3), round(avg_power_kW, 3) if not np.isnan(avg_power_kW) else None
    ]

    if (charge_start is not None) and (charge_end is not None):
        metrics += [
            "Charge window start", "Charge window end", "Ramp trim (charge, s)",
            "Sampling interval expected (s)", "Median dt (charge, s)", "Median dt (discharge, s)",
            "RTE method", "E_charge (kWh)", "E_discharge (kWh)", "RTE (%)"
        ]
        values += [
            pd.to_datetime(charge_start) + pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s"),
            pd.to_datetime(charge_end) - pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s"),
            int(ramp_trim_seconds_charge),
            dt_expected if not np.isnan(dt_expected) else None,
            None if np.isnan(dt_median_charge) else round(float(dt_median_charge), 6),
            None if np.isnan(dt_median_discharge) else round(float(dt_median_discharge), 6),
            rte_method,
            None if np.isnan(E_charge_kWh) else round(float(E_charge_kWh), 3),
            None if np.isnan(E_discharge_kWh) else round(float(E_discharge_kWh), 3),
            None if np.isnan(RTE_pct) else round(float(RTE_pct), 3),
        ]
    summary = pd.DataFrame({"metric": metrics, "value": values})

    return {
        "summary_table": summary,
        "figure": fig,
        "df_calc_poi_egymtr": df_calc_poi_egymtr,
        "warnings": warnings_list,
        # (add any other specific KPIs you want to return)
    }

def plot_calc_poi_egymtr(df_energy: pd.DataFrame, 
                         time_col: str = "Timestamp",
                         title: str = "AC-Side Cumulative Energy (Calc-PoiEgyMtr)") -> go.Figure:
    """Plots the cumulative and incremental energy from the AC dataframe."""
    if df_energy is None or df_energy.empty:
        warnings.warn("Cannot plot AC cumulative energy: DataFrame is empty or None.")
        return go.Figure()
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=df_energy[time_col], y=df_energy["Calc-PoiEgyMtr"],
        mode="lines", name="Cumulative Energy (kWh)",
        line=dict(color="#FF5733", width=2.5)
    ))
    fig.add_trace(go.Scatter(
        x=df_energy[time_col], y=df_energy["Calc-PoiEgy"],
        mode="lines", name="Interval Energy (kWh)",
        line=dict(color="#337AFF", width=1, dash="dot"), yaxis="y2"
    ))
    fig.update_layout(
        title=title, xaxis_title="Time",
        yaxis=dict(title=dict(text="<b>Cumulative Energy (kWh)</b>", font=dict(color="#FF5733")),
                   tickfont=dict(color="#FF5733")),
        yaxis2=dict(title=dict(text="<b>Interval Energy (kWh)</b>", font=dict(color="#337AFF")),
                    tickfont=dict(color="#337AFF"),
                    overlaying="y", side="right", showgrid=False),
        template="plotly_white", hovermode="x unified",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0.01)
    )
    return fig


# =======================================================================
# SECTION 3: THE INTEGRATED ANALYZER CLASS
# =======================================================================

class BessCapacityTestAnalyzer:
    """
    Integrates AC and DC capacity test analysis into a single class.
    
    1. Instantiate with DataFrames and a config dictionary.
    2. Call .run_analysis()
    3. Call .show_results() and .show_plots()
    """
    def __init__(self, master_config: dict, df_ac: pd.DataFrame, df_dc: pd.DataFrame):
        self.config = master_config
        self.df_ac = df_ac.copy()
        self.df_dc = df_dc.copy()
        
        # --- Result properties ---
        self.ac_results = None
        self.dfs_by_device = None
        self.dc_rte_summary = None
        self.dc_rte_system_totals = None
        self.dc_system_cumulative_energy = None
        self.dc_system_soc = None
        
        # --- Plot properties ---
        self.ac_kpi_plot = None
        self.ac_cumulative_plot = None
        self.dc_cumulative_energy_plot = None
        self.dc_soc_plot = None
        
        print("BessCapacityTestAnalyzer initialized.")
        print(f"AC data: {self.df_ac.shape[0]} rows. DC data: {self.df_dc.shape[0]} rows.")

    def run_analysis(self):
        """Runs all AC and DC analyses."""
        print("\n--- Starting Full Analysis ---")
        
        # --- 1. AC Analysis ---
        try:
            print("Running AC-side analysis...")
            self.ac_results = compute_nominal_from_poi_plotly(
                self.df_ac,
                discharge_start=self.config['discharge_start'],
                discharge_end=self.config['discharge_end'],
                P_nom_kW=self.config['ac_p_nom_kw'],
                tol_pct=self.config['ac_tol_pct'],
                required_minutes=self.config.get('ac_required_minutes'),
                time_col=self.config['ac_time_col'],
                power_col=self.config['ac_power_col'],
                title="AC-Side Discharge KPI",
                drop_duplicate_timestamps=self.config.get('ac_drop_duplicates', True),
                ramp_trim_seconds_discharge=self.config.get('ac_ramp_trim_discharge', 0),
                charge_start=self.config['charge_start'],
                charge_end=self.config['charge_end'],
                sampling_seconds=self.config.get('sampling_seconds'),
                discharge_positive=self.config.get('ac_discharge_positive', True),
                ramp_trim_seconds_charge=self.config.get('ac_ramp_trim_charge', 0),
                rte_min_charge_kWh=self.config.get('rte_min_charge_kwh', 0.01)
            )
            self.ac_kpi_plot = self.ac_results['figure']
            self.ac_cumulative_plot = plot_calc_poi_egymtr(self.ac_results['df_calc_poi_egymtr'])
            print("AC-side analysis complete.")
        except Exception as e:
            print(f"*** ERROR in AC analysis: {e} ***")
            warnings.warn(f"AC analysis failed: {e}")

        # --- 2. DC Analysis (Prep) ---
        try:
            print("Running DC-side data preparation...")
            self._clean_and_partition_dc_df()
            print(f"DC data partitioned into {len(self.dfs_by_device)} devices.")
        except Exception as e:
            print(f"*** ERROR in DC prep: {e} ***")
            warnings.warn(f"DC data prep failed: {e}")
            return

        # --- 3. DC Analysis (RTE) ---
        try:
            print("Running DC-side RTE analysis...")
            self._run_dc_rte_analysis()
            print("DC-side RTE analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC RTE analysis: {e} ***")
            warnings.warn(f"DC RTE analysis failed: {e}")

        # --- 4. DC Analysis (Cumulative Energy) ---
        try:
            print("Running DC-side Cumulative Energy analysis...")
            self._run_dc_cumulative_energy_analysis()
            print("DC-side Cumulative Energy analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC Cumulative Energy analysis: {e} ***")
            warnings.warn(f"DC Cumulative Energy analysis failed: {e}")

        # --- 5. DC Analysis (SOC) ---
        try:
            print("Running DC-side SOC analysis...")
            self._run_dc_soc_analysis()
            print("DC-side SOC analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC SOC analysis: {e} ***")
            warnings.warn(f"DC SOC analysis failed: {e}")
            
        print("--- Full Analysis Complete ---")

    def _clean_and_partition_dc_df(self):
        """Runs DC scripts #2 (clean) and #3 (partition)."""
        dc_device_col = self.config['dc_device_col']
        exclude_cols = {dc_device_col}
        
        # Run cleaner (from DC script #2)
        self.df_dc = convert_mixed_numeric_columns(self.df_dc, exclude=exclude_cols, verbose=True)
        
        # Run partitioner (from DC script #3)
        dc_power_col = self.config['dc_power_col']
        dc_soc_col = self.config['dc_soc_col']
        wanted_cols = [dc_power_col, dc_soc_col]
        
        available_cols = [c for c in wanted_cols if c in self.df_dc.columns]
        
        if dc_device_col not in self.df_dc.columns:
             raise KeyError(f"Required column '{dc_device_col}' not found in DC df.")
        
        dfs_by_device = {}
        for dev, g in self.df_dc.groupby(dc_device_col, sort=False):
            g2 = g[available_cols].sort_index()
            g2 = g2.dropna(how='all', subset=available_cols)
            dfs_by_device[dev] = g2
            
        self.dfs_by_device = dfs_by_device

    def _run_dc_rte_analysis(self):
        """Runs DC script #4 (DC RTE)."""
        per_device_rows = []
        
        # Get config
        CHARGE_START = pd.Timestamp(self.config['charge_start'])
        CHARGE_END = pd.Timestamp(self.config['charge_end'])
        DISCHARGE_START = pd.Timestamp(self.config['discharge_start'])
        DISCHARGE_END = pd.Timestamp(self.config['discharge_end'])
        POWER_COL = self.config['dc_power_col']
        DISCHARGE_POSITIVE = self.config.get('dc_discharge_positive', True)
        P_EPS_KW = self.config.get('dc_p_eps_kw', 0.0)
        SAMPLING_SECONDS = self.config.get('sampling_seconds')
        RTE_MIN_CHARGE_KWH = self.config.get('rte_min_charge_kwh', 0.01)
        
        def _prep_window(g: pd.DataFrame) -> pd.DataFrame:
            if g.empty: return g
            g["dt_s"] = g.index.to_series().diff().dt.total_seconds()
            if len(g) >= 2 and pd.isna(g.iloc[0]["dt_s"]):
                g.iloc[0, g.columns.get_loc("dt_s")] = (g.index[1] - g.index[0]).total_seconds()
            g = g.dropna(subset=["dt_s"])
            g = g[g["dt_s"] > 0]
            return g

        for dev, d in self.dfs_by_device.items():
            if d.empty or POWER_COL not in d.columns:
                continue
            
            dd = d.sort_index().copy()
            P = pd.to_numeric(dd[POWER_COL], errors='coerce').fillna(0.0).to_numpy(dtype=float)
            if not DISCHARGE_POSITIVE:
                P = -P
            if P_EPS_KW > 0:
                P = np.where(np.abs(P) < P_EPS_KW, 0.0, P)
            dd["P"] = P

            d_charge = dd[(dd.index >= CHARGE_START) & (dd.index <= CHARGE_END)].copy()
            d_dis = dd[(dd.index >= DISCHARGE_START) & (dd.index <= DISCHARGE_END)].copy()

            d_charge = _prep_window(d_charge)
            d_dis = _prep_window(d_dis)

            # *** USE CONSOLIDATED CADENCE CHECKER ***
            reg_charge = _check_cadence(d_charge["dt_s"] if not d_charge.empty else pd.Series([], dtype=float), SAMPLING_SECONDS)
            reg_dis = _check_cadence(d_dis["dt_s"] if not d_dis.empty else pd.Series([], dtype=float), SAMPLING_SECONDS)

            P_charge_star = (-d_charge["P"]).clip(lower=0.0).to_numpy(dtype=float) if not d_charge.empty else np.array([], dtype=float)
            P_dis_star = (d_dis["P"]).clip(lower=0.0).to_numpy(dtype=float) if not d_dis.empty else np.array([], dtype=float)

            E_charge_kWh = 0.0
            E_discharge_kWh = 0.0
            
            if SAMPLING_SECONDS is not None and reg_charge["is_regular"] and reg_dis["is_regular"]:
                dt_h = float(SAMPLING_SECONDS) / 3600.0
                E_charge_kWh = float(P_charge_star.sum() * dt_h) if P_charge_star.size else 0.0
                E_discharge_kWh = float(P_dis_star.sum() * dt_h) if P_dis_star.size else 0.0
                rte_method = f"constant_dt({int(SAMPLING_SECONDS)}s)"
            else:
                rte_method = "trapezoid_dt"
                if not d_charge.empty and P_charge_star.size:
                    t_c = d_charge.index.view("int64").to_numpy() / 1e9
                    E_charge_kWh = float(np.trapz(P_charge_star, x=t_c) / 3600.0)
                if not d_dis.empty and P_dis_star.size:
                    t_d = d_dis.index.view("int64").to_numpy() / 1e9
                    E_discharge_kWh = float(np.trapz(P_dis_star, x=t_d) / 3600.0)

            eta_dc = np.nan
            if E_charge_kWh > float(RTE_MIN_CHARGE_KWH):
                eta_dc = E_discharge_kWh / E_charge_kWh
            
            per_device_rows.append({
                "Device": dev, "E_dc_in_kWh": E_charge_kWh, "E_dc_out_kWh": E_discharge_kWh,
                "eta_dc": eta_dc, "method": rte_method,
                "dt_med_charge": reg_charge["dt_median"], "dt_p95_charge": reg_charge["dt_p95"],
                "dt_med_dis": reg_dis["dt_median"], "dt_p95_dis": reg_dis["dt_p95"],
            })

        self.dc_rte_summary = pd.DataFrame(per_device_rows).sort_values("Device", kind="stable")
        
        system_totals = {}
        system_totals["Total_E_dc_in_kWh"] = float(self.dc_rte_summary["E_dc_in_kWh"].sum())
        system_totals["Total_E_dc_out_kWh"] = float(self.dc_rte_summary["E_dc_out_kWh"].sum())
        system_totals["eta_dc_system"] = (
            system_totals["Total_E_dc_out_kWh"] / system_totals["Total_E_dc_in_kWh"]
            if system_totals["Total_E_dc_in_kWh"] > 0 else np.nan
        )
        self.dc_rte_system_totals = system_totals

    def _run_dc_cumulative_energy_analysis(self):
        """Runs DC script #5 (Cumulative Energy)."""
        
        # --- Helpers embedded from script #5 ---
        def _prep_device_power(df: pd.DataFrame, cfg: dict) -> pd.Series:
            POWER_COL = cfg['dc_power_col']
            if df is None or df.empty or (POWER_COL not in df.columns):
                return pd.Series(dtype=float)
            P = pd.to_numeric(df[POWER_COL], errors="coerce").fillna(0.0).astype(float)
            P_kW = P / 1000.0 if cfg.get('dc_is_power_in_watts', False) else P
            if not cfg.get('dc_discharge_positive', True):
                P_kW = -P_kW
            P_EPS_KW = cfg.get('dc_p_eps_kw', 0.0)
            if P_EPS_KW and P_EPS_KW > 0:
                P_kW = P_kW.where(P_kW.abs() >= P_EPS_KW, 0.0)
            P_kW.name = "P_kW"
            return P_kW.sort_index()

        def _compute_dt_seconds(idx: pd.DatetimeIndex) -> np.ndarray:
            if len(idx) == 0: return np.array([], dtype=float)
            dt_s = np.diff(idx.view("int64")) / 1e9
            if len(dt_s) == 0: return np.array([0.0], dtype=float)
            first = dt_s[0]
            return np.concatenate([[first], dt_s])

        def _cumulative_energy_from_power(P_kW: pd.Series, cfg: dict) -> pd.Series:
            if P_kW.empty:
                return pd.Series(index=P_kW.index, data=[], dtype=float, name="E_system_cum_kWh")
            idx = P_kW.index
            dt_s_series = pd.Series(index=idx, data=_compute_dt_seconds(idx))
            
            SAMPLING_SECONDS = cfg.get('sampling_seconds')
            reg = _check_cadence(dt_s_series, SAMPLING_SECONDS)
            
            if SAMPLING_SECONDS is not None and reg["is_regular"]:
                dt_h = float(SAMPLING_SECONDS) / 3600.0
                E = np.cumsum(P_kW.values) * dt_h
            else:
                dt_h = dt_s_series.values / 3600.0
                P = P_kW.values.astype(float)
                inc = P * dt_h # Left-rectangle method
                E = np.cumsum(inc)
                
            return pd.Series(index=idx, data=E, name="E_system_cum_kWh")
        # --- End embedded helpers ---

        per_device_power = {}
        for dev, df in self.dfs_by_device.items():
            P_kW = _prep_device_power(df, self.config)
            if not P_kW.empty:
                per_device_power[dev] = P_kW

        pow_df = pd.DataFrame(per_device_power).sort_index()
        P_system_kW = pow_df.fillna(0.0).sum(axis=1)
        
        self.dc_system_cumulative_energy = _cumulative_energy_from_power(P_system_kW, self.config)

        # Create plot and store it
        plt.style.use("seaborn-v0_8-whitegrid")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(self.dc_system_cumulative_energy.index, self.dc_system_cumulative_energy.values, color="tab:blue", label="System")
        ax.set_title("System Cumulative DC Energy (signed, kWh)")
        ax.set_xlabel("Time")
        ax.set_ylabel("Energy (kWh)")
        ax.axhline(0, color="k", lw=0.8)
        ax.legend()
        plt.tight_layout()
        self.dc_cumulative_energy_plot = fig
        plt.close(fig) # Close to prevent auto-display

    def _run_dc_soc_analysis(self):
        """Runs DC script #6 (SOC analysis)."""
        SOC_COL = self.config['dc_soc_col']
        IS_SOC_PERCENT = self.config.get('dc_is_soc_percent', True)
        CLIP_MIN, CLIP_MAX = (0.0, 100.0) if IS_SOC_PERCENT else (0.0, 1.0)
        
        soc_by_device = {}
        for dev, df in self.dfs_by_device.items():
            if df is None or df.empty or (SOC_COL not in df.columns):
                continue
            s = pd.to_numeric(df[SOC_COL], errors="coerce").astype(float)
            s = s.clip(CLIP_MIN, CLIP_MAX)
            soc_by_device[dev] = s.sort_index()

        if not soc_by_device:
            warnings.warn("No device had a valid SOC series. Skipping SOC analysis.")
            return

        soc_df_union = pd.DataFrame(soc_by_device).sort_index()
        
        first_valid_per_dev = {c: soc_df_union[c].first_valid_index() for c in soc_df_union.columns}
        last_valid_per_dev  = {c: soc_df_union[c].last_valid_index() for c in soc_df_union.columns}
        drop_cols = [c for c in soc_df_union.columns if first_valid_per_dev[c] is None or last_valid_per_dev[c] is None]
        
        if drop_cols:
            soc_df_union = soc_df_union.drop(columns=drop_cols)
            first_valid_per_dev = {c: soc_df_union[c].first_valid_index() for c in soc_df_union.columns}
            last_valid_per_dev  = {c: soc_df_union[c].last_valid_index() for c in soc_df_union.columns}

        if soc_df_union.empty:
            warnings.warn("All devices dropped due to missing SOC. Skipping SOC analysis.")
            return

        common_start = max(first_valid_per_dev.values())
        common_end = min(last_valid_per_dev.values())
        if (common_start is None) or (common_end is None) or (common_start >= common_end):
            warnings.warn("No overlapping time window where all devices have SOC. Skipping SOC analysis.")
            return

        soc_df = soc_df_union[(soc_df_union.index >= common_start) & (soc_df_union.index <= common_end)]
        soc_df_ff = soc_df.ffill().clip(CLIP_MIN, CLIP_MAX)

        N = soc_df_ff.shape[1]
        self.dc_system_soc = soc_df_ff.sum(axis=1) / float(N)
        self.dc_system_soc.name = "SOC_total"

        # Create plot and store it
        plt.style.use("seaborn-v0_8-whitegrid")
        fig, ax = plt.subplots(figsize=(10, 4.5))
        ax.plot(self.dc_system_soc.index, self.dc_system_soc.values, color="tab:blue", label=f"System SOC (N={N})")
        ax.set_title("System State of Charge (Simple Mean Across Devices)")
        ax.set_xlabel("Time")
        ax.set_ylabel("SOC (%)" if IS_SOC_PERCENT else "SOC (fraction)")
        ax.set_ylim(CLIP_MIN - 1 if IS_SOC_PERCENT else -0.02, CLIP_MAX + 1 if IS_SOC_PERCENT else 1.02)
        ax.legend()
        plt.tight_layout()
        self.dc_soc_plot = fig
        plt.close(fig) # Close to prevent auto-display

    def show_results(self):
        """Prints all summary tables to the console."""
        print("\n\n" + "="*50)
        print("          BESS CAPACITY TEST RESULTS")
        print("="*50 + "\n")
        
        if self.ac_results:
            print("--- AC-Side KPI & RTE Summary ---")
            with pd.option_context('display.max_rows', None, 'display.width', 1000):
                print(self.ac_results['summary_table'])
            print("\nAC-Side Warnings:")
            if self.ac_results['warnings']:
                for w in self.ac_results['warnings']: print(f"- {w}")
            else:
                print("- None")
        else:
            print("--- AC-Side Analysis Not Run ---")

        if self.dc_rte_summary is not None:
            print("\n\n--- DC-Side Per-Device RTE Summary ---")
            pd.set_option('display.float_format', lambda v: f"{v:,.6f}")
            print(self.dc_rte_summary)
            print("\n--- DC-Side System RTE Summary ---")
            for k, v in self.dc_rte_system_totals.items():
                print(f"- {k}: {v:,.6f}")
        else:
            print("\n\n--- DC-Side Analysis Not Run ---")
            
    def show_plots(self):
        """Displays all generated plots."""
        print("\n\n" + "="*50)
        print("          BESS CAPACITY TEST PLOTS")
        print("="*50 + "\n")
        
        if self.ac_kpi_plot:
            print("Displaying AC-Side KPI Plot...")
            self.ac_kpi_plot.show()
        else:
            print("AC-Side KPI Plot not available.")
            
        if self.ac_cumulative_plot:
            print("Displaying AC-Side Cumulative Energy Plot...")
            self.ac_cumulative_plot.show()
        else:
            print("AC-Side Cumulative Energy Plot not available.")
            
        if self.dc_cumulative_energy_plot:
            print("Displaying DC-Side System Cumulative Energy Plot...")
            self.dc_cumulative_energy_plot.show()
        else:
            print("DC-Side Cumulative Energy Plot not available.")
            
        if self.dc_soc_plot:
            print("Displaying DC-Side System SOC Plot...")
            self.dc_soc_plot.show()
        else:
            print("DC-Side System SOC Plot not available.")

In [15]:
# --- STEP 1: Load and Prep DC Data (from your script #1) ---
# (You would load your AC data normally)

dc_file_path = r'C:\Data_analysis\Decci\Capacity_test\Hybrid_Monitoring\data-2024-02-28(MVPS).csv'
df_dc_raw = pd.read_csv(dc_file_path, sep=';', dtype=str, engine='python')

# # Trim whitespace
df_dc_raw['Date'] = df_dc_raw['Date'].str.strip()
df_dc_raw['Time'] = df_dc_raw['Time'].str.strip()
df_dc_raw['TZ'] = df_dc_raw['TZ'].astype(str).str.strip()

# # Parse Datetime
df_dc_raw['Datetime'] = pd.to_datetime(df_dc_raw['Date'] + ' ' + df_dc_raw['Time'], errors='coerce', dayfirst=False)

def extract_tz_hours(tz_str):
    if pd.isna(tz_str): return pd.NA
    m = re.search(r'([+-]?\d{1,3})', tz_str)
    if not m: return pd.NA
    try: return int(m.group(1))
    except Exception: return pd.NA

df_dc_raw['TZ_hours'] = df_dc_raw['TZ'].apply(extract_tz_hours)
mask = df_dc_raw['TZ_hours'].notna() & df_dc_raw['Datetime'].notna()
df_dc_raw.loc[mask, 'Datetime'] = df_dc_raw.loc[mask, 'Datetime'] + pd.to_timedelta(df_dc_raw.loc[mask, 'TZ_hours'], unit='h')

# # Cleaned DC DataFrame
df_dc = df_dc_raw.drop(columns=['Date', 'Time', 'TZ', 'TZ_hours'])
df_dc = df_dc.set_index('Datetime').sort_index()

print(f"Prepared DC DataFrame with {df_dc.shape[0]} rows.")

# --- Assume df_ac is already loaded ---
# df_ac = pd.read_csv(...)
# print(f"Loaded AC DataFrame with {df_ac.shape[0]} rows.")

Prepared DC DataFrame with 146521 rows.


In [16]:
# --- STEP 2: Define your Master Configuration ---
master_config = {
    # --- Time Windows ---
    "charge_start": "2024-02-28 10:30:00",
    "charge_end": "2024-02-28 13:53:00",
    "discharge_start": "2024-02-28 14:59:30",
    "discharge_end": "2024-02-28 15:58:40",
    
    # --- General Settings ---
    "sampling_seconds": 1,
    "rte_min_charge_kwh": 0.01,
    
    # --- AC-Side Config ---
    "ac_time_col": "Timestamp",
    "ac_power_col": "PoiPwrAt kW",
    "ac_p_nom_kw": 24500,
    "ac_tol_pct": 5.0,
    "ac_required_minutes": 58,
    "ac_discharge_positive": True,
    "ac_ramp_trim_discharge": 0,
    "ac_ramp_trim_charge": 0,
    
    # --- DC-Side Config ---
    "dc_device_col": "Device",
    "dc_power_col": "DcTotWatt",
    "dc_soc_col": "Bat.SOCTot",
    "dc_discharge_positive": True,
    "dc_is_power_in_watts": False, # Set True if 'DcTotWatt' is in W, False if kW
    "dc_p_eps_kw": 0.0,
    "dc_is_soc_percent": True,
}

In [17]:
# --- STEP 3: Instantiate, Run, and View ---
analyzer = BessCapacityTestAnalyzer(master_config,df_ac, df_dc)

# Run all AC and DC calculations
analyzer.run_analysis()

# Print all summary tables
analyzer.show_results()

# Show all plots
analyzer.show_plots()

NameError: name 'df_ac' is not defined

# AC non functional + Dc

In [26]:
import numpy as np
import pandas as pd
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import warnings
import re
from typing import Optional, Dict, List, Tuple, Set
from pathlib import Path

# =======================================================================
# SECTION 1: DATA LOADING FUNCTIONS
# =======================================================================

def load_hycon_hybrid_fast(file_path,
                           sep=';',
                           encoding='latin1',
                           strip_trailing_hyphen=True,
                           parse_timestamp_utc=False,
                           use_pyarrow=False):
    """
    Loads the AC-side CSV file with the special 8-line header.
    (This is your AC-side loading script)
    """
    file_path = Path(file_path)
    if not file_path.exists():
        raise FileNotFoundError(f"AC file not found: {file_path}")

    # --- Read only the 6th and 8th lines quickly (no pandas) ---
    row6 = row8 = None
    with open(file_path, 'r', encoding=encoding, errors='ignore') as f:
        for i, line in enumerate(f, start=1):  # 1-based line index
            if i == 6:
                row6 = line.rstrip('\n')
            elif i == 8:
                row8 = line.rstrip('\n')
                break  # we have both; stop reading early
    if row6 is None or row8 is None:
        raise ValueError("File is too short or missing required header lines 6 and 8.")

    # Split into header cells and combine cell-wise
    h6_parts = [p.strip() for p in row6.split(sep)]
    h8_parts = [p.strip() for p in row8.split(sep)]

    # Align lengths
    width = max(len(h6_parts), len(h8_parts))
    if len(h6_parts) < width:
        h6_parts += [''] * (width - len(h6_parts))
    if len(h8_parts) < width:
        h8_parts += [''] * (width - len(h8_parts))

    combined = [f"{a} {b}".strip() for a, b in zip(h6_parts, h8_parts)]
    combined = [pd.Series([c]).str.replace(r'\s+', ' ', regex=True).iloc[0].strip() for c in combined]
    if strip_trailing_hyphen:
        combined = [pd.Series([c]).str.replace(r'\s*-\s*$', '', regex=True).iloc[0] for c in combined]

    if combined:
        combined[0] = 'Timestamp'

    # --- Read data once with the prepared header ---
    read_kwargs = dict(
        filepath_or_buffer=file_path,
        sep=sep,
        skiprows=8,
        header=None,
        names=combined,
        encoding=encoding,
        low_memory=False,
    )
    if use_pyarrow:
        try:
            read_kwargs['dtype_backend'] = 'pyarrow'
        except TypeError:
            pass # Older pandas
            
    df = pd.read_csv(**read_kwargs)
    df['Timestamp'] = pd.to_datetime(df['Timestamp'], errors='coerce', utc=parse_timestamp_utc)
    return df

def load_and_prep_dc_data(file_path, sep=';', dayfirst=False):
    """
    Loads and prepares the DC-side CSV file, handling timezone normalization.
    (This is your DC-side loading script)
    """
    if not Path(file_path).exists():
        raise FileNotFoundError(f"DC file not found: {file_path}")
        
    df = pd.read_csv(file_path, sep=sep, dtype=str, engine='python')

    df['Date'] = df['Date'].str.strip()
    df['Time'] = df['Time'].str.strip()
    df['TZ'] = df['TZ'].astype(str).str.strip()

    df['Datetime'] = pd.to_datetime(df['Date'] + ' ' + df['Time'],
                                    errors='coerce',
                                    dayfirst=dayfirst)

    def extract_tz_hours(tz_str):
        if pd.isna(tz_str): return pd.NA
        m = re.search(r'([+-]?\d{1,3})', tz_str)
        if not m: return pd.NA
        try: return int(m.group(1))
        except Exception: return pd.NA

    df['TZ_hours'] = df['TZ'].apply(extract_tz_hours)

    mask = df['TZ_hours'].notna() & df['Datetime'].notna()
    df.loc[mask, 'Datetime'] = df.loc[mask, 'Datetime'] + pd.to_timedelta(df.loc[mask, 'TZ_hours'], unit='h')

    df = df.drop(columns=['Date', 'Time', 'TZ', 'TZ_hours'])
    df = df.set_index('Datetime').sort_index()
    return df

# =======================================================================
# SECTION 2: CONSOLIDATED HELPER FUNCTIONS
# =======================================================================

def _sanitize_time_col(d: pd.DataFrame, time_col: str) -> pd.DataFrame:
    """Helper to clean and sort the time column."""
    d = d.copy()
    d[time_col] = pd.to_datetime(d[time_col], errors="coerce")
    d = d.dropna(subset=[time_col])
    d = d.sort_values(time_col).reset_index(drop=True)
    return d


def _check_cadence(dt_s: pd.Series,
                   expected_seconds: Optional[float],
                   rtol: float = 0.02,
                   atol: float = 0.5) -> dict:
    """Consolidated cadence checker for both AC and DC analysis."""
    x = dt_s.dropna().to_numpy(dtype=float)
    x = x[x > 0]
    if x.size == 0:
        return dict(is_regular=False, dt_median=np.nan, dt_p95=np.nan, frac_off=1.0)
    
    dt_median = float(np.median(x))
    dt_p95 = float(np.quantile(x, 0.95))
    
    if expected_seconds is None or np.isnan(expected_seconds):
        tol = max(abs(dt_median) * rtol, atol)
        frac_off = float((np.abs(x - dt_median) > tol).mean())
        is_regular = frac_off <= 0.05
    else:
        tol = max(abs(expected_seconds) * rtol, atol)
        frac_off = float((np.abs(x - expected_seconds) > tol).mean())
        is_regular = (frac_off <= 0.05) and (abs(dt_median - expected_seconds) <= tol)
        
    return dict(is_regular=is_regular, dt_median=dt_median, dt_p95=dt_p95, frac_off=frac_off)

# --- DC-Side Numeric Cleaning Helpers ---

def _strip_spaces(s: str) -> str:
    if not isinstance(s, str): return s
    return s.replace('\u00A0', '').replace('\u202F', '').replace(' ', '').strip()

def _classify_value(s: str):
    if s is None or s == '': return 'other'
    has_comma = ',' in s
    has_dot = '.' in s
    if has_comma and has_dot:
        return 'EU' if s.rfind(',') > s.rfind('.') else 'US'
    if has_comma: return 'comma_only'
    if has_dot: return 'dot_only'
    if re.fullmatch(r'[+-]?\d+', s): return 'int'
    return 'other'

def _convert_value(s: str, preference: str):
    if s is None or (isinstance(s, float) and pd.isna(s)): return np.nan
    if not isinstance(s, str): return s
    s0 = _strip_spaces(s)
    if s0 == '' or s0.lower() in ('nan', 'none', 'null'): return np.nan
    kind = _classify_value(s0)
    if kind == 'EU': # e.g., 1.234,56
        try: return float(s0.replace('.', '').replace(',', '.'))
        except Exception: return np.nan
    if kind == 'US': # e.g., 1,234.56
        try: return float(s0.replace(',', ''))
        except Exception: return np.nan
    if kind == 'comma_only':
        if preference == 'EU': # comma as decimal
            try: return float(s0.replace(',', '.'))
            except: return np.nan
        if preference == 'US': # comma as thousands
            try: return float(s0.replace(',', ''))
            except: return np.nan
        last_grp = s0.split(',')[-1]
        if last_grp.isdigit() and len(last_grp) == 3 and len(s0.split(',')) >= 2:
            try: return float(s0.replace(',', ''))
            except: return np.nan
        try: return float(s0.replace(',', '.'))
        except: return np.nan
    if kind == 'dot_only':
        if preference == 'US': # dot as decimal
            try: return float(s0)
            except: return np.nan
        if preference == 'EU': # dot as thousands
            try: return float(s0.replace('.', ''))
            except: return np.nan
        last_grp = s0.split('.')[-1]
        if last_grp.isdigit() and len(last_grp) == 3 and len(s0.split('.')) >= 2:
            try: return float(s0.replace('.', ''))
            except: return np.nan
        try: return float(s0)
        except: return np.nan
    if kind == 'int':
        try: return float(s0)
        except Exception: return np.nan
    return np.nan

def convert_mixed_numeric_columns(df_in: pd.DataFrame, exclude: set = None, verbose: bool = True) -> pd.DataFrame:
    """Robustly converts string columns to numeric, handling EU/US formats."""
    df_out = df_in.copy()
    exclude = set() if exclude is None else set(exclude)
    diagnostics = {}
    for col in df_out.columns:
        if col in exclude:
            continue
        if pd.api.types.is_numeric_dtype(df_out[col]):
            continue
        s = df_out[col].astype(str)
        if not s.str.contains(r'\d', regex=True).any():
            continue
        s_clean = s.map(_strip_spaces)
        kinds = s_clean.map(_classify_value)
        eu_votes = int((kinds == 'EU').sum())
        us_votes = int((kinds == 'US').sum())
        preference = 'EU' if eu_votes > us_votes else ('US' if us_votes > eu_votes else None)
        converted = s_clean.map(lambda x: _convert_value(x, preference))
        valid_ratio = np.isfinite(converted).sum() / max(len(converted), 1)
        if valid_ratio < 0.1:
            diagnostics[col] = f"Skipped (valid_ratio={valid_ratio:.2f} < 0.1)"
            continue
        df_out[col] = pd.Series(converted, index=df_out.index, dtype="Float64")
        diagnostics[col] = f"Converted (pref={preference}, valid_ratio={valid_ratio:.2f})"
    if verbose and diagnostics:
        print("\n[DC Numeric conversion diagnostics]")
        for c, info in diagnostics.items():
            print(f"- {c}: {info}")
    return df_out


# =======================================================================
# SECTION 3: AC-SIDE ANALYSIS FUNCTIONS
# =======================================================================

def compute_nominal_from_poi_plotly(
    df: pd.DataFrame,
    discharge_start,
    discharge_end,
    P_nom_kW: float,
    tol_pct: float = 5.0,
    required_minutes: Optional[float] = None,
    time_col: str = "Timestamp",
    power_col: str = "PoiPwrAt kW",
    title: str = "Discharge KPI at POI (Plotly)",
    drop_duplicate_timestamps: bool = True,
    ramp_trim_seconds_discharge: int = 0,
    charge_start: Optional[str] = None,
    charge_end: Optional[str] = None,
    sampling_seconds: Optional[float] = None,
    discharge_positive: bool = True,
    ramp_trim_seconds_charge: int = 0,
    warn_irregular: bool = True,
    rte_min_charge_kWh: float = 0.01
) -> Dict:
    """Calculates AC-side KPI, RTE, and cumulative energy."""
    
    ts_start_raw = pd.to_datetime(discharge_start, errors="raise")
    ts_end_raw = pd.to_datetime(discharge_end, errors="raise")
    if ts_end_raw <= ts_start_raw:
        raise ValueError("discharge_end must be after discharge_start")

    trim_dis = pd.to_timedelta(int(ramp_trim_seconds_discharge), unit="s")
    ts_start = ts_start_raw + trim_dis
    ts_end = ts_end_raw - trim_dis
    if ts_end <= ts_start:
        raise ValueError("Ramp trim too large for discharge window.")

    if time_col not in df.columns:
        raise KeyError(f"Time column '{time_col}' not in df")
    if power_col not in df.columns:
        raise KeyError(f"Power column '{power_col}' not in df")

    d = df[[time_col, power_col]].copy()
    d = _sanitize_time_col(d, time_col)

    if drop_duplicate_timestamps:
        d = d.groupby(time_col, as_index=False)[power_col].mean()

    if d.empty:
        raise ValueError("No valid rows after parsing timestamps.")

    d_full = d.copy()
    d_full["dt_s"] = d_full[time_col].diff().dt.total_seconds()
    d_full = d_full[d_full["dt_s"] > 0].copy()
    d_full["E_kWh_slice"] = d_full[power_col] * (d_full["dt_s"] / 3600.0)
    total_energy_kWh = float(d_full["E_kWh_slice"].sum())
    total_duration_h = float((d_full[time_col].max() - d_full[time_col].min()).total_seconds()) / 3600.0
    avg_power_kW = total_energy_kWh / total_duration_h if total_duration_h > 0 else np.nan

    dKPI = d[(d[time_col] >= ts_start) & (d[time_col] <= ts_end)].copy()
    if dKPI.empty:
        raise ValueError("No samples inside discharge window.")
    if len(dKPI) < 2:
        raise ValueError("Only one sample in KPI window.")

    dKPI["dt_s"] = dKPI[time_col].diff().dt.total_seconds()
    if not dKPI.empty and np.isnan(dKPI.loc[dKPI.index[0], "dt_s"]) and len(dKPI) >= 2:
        dKPI.loc[dKPI.index[0], "dt_s"] = (
            dKPI.loc[dKPI.index[1], time_col] - dKPI.loc[dKPI.index[0], time_col]
        ).total_seconds()
    dKPI = dKPI[dKPI["dt_s"] > 0].copy()

    dKPI["E_kWh_slice"] = dKPI[power_col] * (dKPI["dt_s"] / 3600.0)
    actual_energy_kWh = float(dKPI["E_kWh_slice"].sum())

    band_low = float(P_nom_kW) * (1 - tol_pct / 100.0)
    band_high = float(P_nom_kW) * (1 + tol_pct / 100.0)
    dKPI["in_band"] = (dKPI[power_col] >= band_low) & (dKPI[power_col] <= band_high)

    inband_time_s_cum = float(dKPI.loc[dKPI["in_band"], "dt_s"].sum())
    E_nom_cum_kWh = float(P_nom_kW) * (inband_time_s_cum / 3600.0)

    segs: List[Tuple[pd.Timestamp, pd.Timestamp, float]] = []
    in_seg = False
    acc_s = 0.0
    seg_start = None
    for i, row in dKPI.iterrows():
        if row["in_band"]:
            if not in_seg:
                in_seg = True
                seg_start = row[time_col]
                acc_s = row["dt_s"]
            else:
                acc_s += row["dt_s"]
        else:
            if in_seg:
                seg_end = row[time_col]
                segs.append((seg_start, seg_end, acc_s))
                in_seg = False
    if in_seg:
        seg_end = dKPI.iloc[-1][time_col]
        segs.append((seg_start, seg_end, acc_s))

    longest_s = max([s for *_, s in segs], default=0.0)
    E_nom_cont_kWh = float(P_nom_kW) * (longest_s / 3600.0)

    compliance_cont = compliance_cum = None
    required_str = None
    if required_minutes is not None:
        req_s = float(required_minutes) * 60.0
        compliance_cont = longest_s >= req_s
        compliance_cum = inband_time_s_cum >= req_s
        required_str = f"{required_minutes:.0f} min"

    compliance_fraction = None
    window_time_s = float((ts_end - ts_start).total_seconds())
    if window_time_s > 0:
        compliance_fraction = inband_time_s_cum / window_time_s

    warnings_list: List[str] = []
    E_charge_kWh = E_discharge_kWh = np.nan
    RTE_pct = np.nan
    rte_method = None
    dt_expected = float(sampling_seconds) if sampling_seconds is not None else np.nan
    dt_median_charge = np.nan
    dt_median_discharge = np.nan
    df_calc_poi_egymtr = None 

    if (charge_start is not None) and (charge_end is not None):
        c_start_raw = pd.to_datetime(charge_start, errors="raise")
        
        d_ce = d[(d[time_col] >= c_start_raw) & (d[time_col] <= ts_end_raw)].copy()
        
        if d_ce.empty:
            warnings_list.append(f"No data in Calc-PoiEgyMtr window ({c_start_raw} to {ts_end_raw}).")
            df_calc_poi_egymtr = pd.DataFrame(columns=[time_col, "Calc-PoiEgy", "Calc-PoiEgyMtr"])
        else:
            d_ce["dt_s"] = d_ce[time_col].diff().dt.total_seconds()
            if np.isnan(d_ce.loc[d_ce.index[0], "dt_s"]) and len(d_ce) >= 2:
                d_ce.loc[d_ce.index[0], "dt_s"] = (
                    d_ce.loc[d_ce.index[1], time_col] - d_ce.loc[d_ce.index[0], time_col]
                ).total_seconds()
            
            d_ce = d_ce[d_ce["dt_s"] > 0].copy()
            d_ce["Calc-PoiEgy"] = d_ce[power_col] * (d_ce["dt_s"] / 3600.0)
            d_ce["Calc-PoiEgyMtr"] = d_ce["Calc-PoiEgy"].cumsum()
            df_calc_poi_egymtr = d_ce[[time_col, "Calc-PoiEgy", "Calc-PoiEgyMtr"]].copy()

        dd = d.copy() 
        P = dd[power_col].to_numpy(dtype=float)
        if not discharge_positive:
            P = -P
        dd["P"] = P

        trim_ch = pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s")
        c_start_trimmed = pd.to_datetime(charge_start, errors="raise") + trim_ch
        c_end_trimmed = pd.to_datetime(charge_end, errors="raise") - trim_ch

        d_charge = dd[(dd[time_col] >= c_start_trimmed) & (dd[time_col] <= c_end_trimmed)].copy()
        d_dis = dd[(dd[time_col] >= ts_start) & (dd[time_col] <= ts_end)].copy()

        for subset, name in [(d_charge, "charge"), (d_dis, "discharge")]:
            if not subset.empty:
                subset["dt_s"] = subset[time_col].diff().dt.total_seconds()
                if np.isnan(subset.loc[subset.index[0], "dt_s"]) and len(subset) >= 2:
                    subset.loc[subset.index[0], "dt_s"] = (
                        subset.loc[subset.index[1], time_col] - subset.loc[subset.index[0], time_col]
                    ).total_seconds()
                subset.dropna(subset=["dt_s"], inplace=True)
                subset = subset[subset["dt_s"] > 0]
                if name == "charge":
                    d_charge = subset
                else:
                    d_dis = subset

        reg_charge = _check_cadence(
            d_charge["dt_s"] if not d_charge.empty else pd.Series([], dtype=float),
            expected_seconds=sampling_seconds)
        reg_dis = _check_cadence(
            d_dis["dt_s"] if not d_dis.empty else pd.Series([], dtype=float),
            expected_seconds=sampling_seconds)
        
        dt_median_charge = reg_charge["dt_median"]
        dt_median_discharge = reg_dis["dt_median"]

        P_charge_star = (-d_charge["P"]).clip(lower=0.0).to_numpy() if not d_charge.empty else np.array([], dtype=float)
        P_dis_star = d_dis["P"].clip(lower=0.0).to_numpy() if not d_dis.empty else np.array([], dtype=float)

        if sampling_seconds is not None and reg_charge["is_regular"] and reg_dis["is_regular"]:
            dt_h = float(sampling_seconds) / 3600.0
            E_charge_kWh = float(P_charge_star.sum() * dt_h)
            E_discharge_kWh = float(P_dis_star.sum() * dt_h)
            rte_method = f"constant_dt({int(sampling_seconds)}s)"
        else:
            rte_method = "trapezoid_dt"
            msg = "Detected irregular cadence for AC-RTE; using trapezoidal integration."
            if warn_irregular:
                warnings.warn(msg)
            warnings_list.append(msg)
            if not d_charge.empty:
                t_c = d_charge[time_col].astype("int64").to_numpy() / 1e9
                E_charge_kWh = float(np.trapz(P_charge_star, x=t_c) / 3600.0)
            else:
                E_charge_kWh = 0.0
            if not d_dis.empty:
                t_d = d_dis[time_col].astype("int64").to_numpy() / 1e9
                E_discharge_kWh = float(np.trapz(P_dis_star, x=t_d) / 3600.0)
            else:
                E_discharge_kWh = 0.0

        if E_charge_kWh > float(rte_min_charge_kWh):
            RTE_pct = 100.0 * E_discharge_kWh / E_charge_kWh
        else:
            RTE_pct = np.nan
            warnings_list.append(f"E_charge ({E_charge_kWh:.6f}) ≤ guard ({rte_min_charge_kWh:.6f}); RTE=NaN.")

    fig = go.Figure()
    fig.add_trace(go.Scatter(x=dKPI[time_col], y=dKPI[power_col],
                                mode="lines", name=f"{power_col}",
                                line=dict(color="#1f77b4", width=2)))
    fig.add_trace(go.Scatter(x=[dKPI[time_col].min(), dKPI[time_col].max()],
                                y=[band_low, band_low], mode="lines", name=f"−{tol_pct:.0f}% band",
                                line=dict(color="#2ca02c", width=1.5, dash="dash")))
    fig.add_trace(go.Scatter(x=[dKPI[time_col].min(), dKPI[time_col].max()],
                                y=[band_high, band_high], mode="lines", name=f"+{tol_pct:.0f}% band",
                                line=dict(color="#2ca0g02c", width=1.5, dash="dash")))

    shapes = []
    for (s, e, dur_s) in segs:
        shapes.append(dict(type="rect", xref="x", yref="y",
                            x0=s, x1=e, y0=band_low, y1=band_high,
                            fillcolor="rgba(46,204,113,0.18)", line=dict(width=0),
                            layer="below"))
    fig.update_layout(shapes=shapes)

    subtitle = [
        f"P_nom={P_nom_kW:.0f} kW, tol=±{tol_pct:.1f}%",
        f"Actual={actual_energy_kWh:.1f} kWh",
        f"Nom(cum)={E_nom_cum_kWh:.1f} kWh",
        f"Nom(cont)={E_nom_cont_kWh:.1f} kWh"
    ]
    if required_minutes is not None:
        subtitle.append(f"Req={required_str}, pass(cont)={bool(compliance_cont)}, pass(cum)={bool(compliance_cum)}")
    if not np.isnan(RTE_pct):
        subtitle.append(f"RTE={RTE_pct:.2f}%")

    fig.update_layout(
        title=dict(text=f"{title}<br><sup>{' | '.join(subtitle)}</sup>", x=0.01),
        xaxis_title="Time",
        yaxis_title="Power (kW)",
        hovermode="x unified",
        template="plotly_white",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0.01),
        margin=dict(l=60, r=30, t=80, b=40),
    )

    metrics = [
        "Window start", "Window end", "P_nom (kW)", "Tolerance (%)",
        "Actual energy (kWh)", "In-band time (continuous, min)", "In-band time (cumulative, min)",
        "Compliance required duration", "Compliance (continuous)", "Compliance (cumulative)",
        "Compliance (fraction)", "Ramp trim (discharge, s)", "Total timeframe start",
        "Total timeframe end", "Total duration (h)", "Total energy (kWh)", "Average power (kW)"
    ]
    values = [
        ts_start, ts_end, float(P_nom_kW), float(tol_pct),
        round(actual_energy_kWh, 3), round(longest_s/60.0, 3), round(inband_time_s_cum/60.0, 3),
        required_str, compliance_cont, compliance_cum,
        round(compliance_fraction, 4) if compliance_fraction is not None else None,
        int(ramp_trim_seconds_discharge), d_full[time_col].min(),
        d_full[time_col].max(), round(total_duration_h, 3),
        round(total_energy_kWh, 3), round(avg_power_kW, 3) if not np.isnan(avg_power_kW) else None
    ]

    if (charge_start is not None) and (charge_end is not None):
        metrics += [
            "Charge window start", "Charge window end", "Ramp trim (charge, s)",
            "Sampling interval expected (s)", "Median dt (charge, s)", "Median dt (discharge, s)",
            "RTE method", "E_charge (kWh)", "E_discharge (kWh)", "RTE (%)"
        ]
        values += [
            pd.to_datetime(charge_start) + pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s"),
            pd.to_datetime(charge_end) - pd.to_timedelta(int(ramp_trim_seconds_charge), unit="s"),
            int(ramp_trim_seconds_charge),
            dt_expected if not np.isnan(dt_expected) else None,
            None if np.isnan(dt_median_charge) else round(float(dt_median_charge), 6),
            None if np.isnan(dt_median_discharge) else round(float(dt_median_discharge), 6),
            rte_method,
            None if np.isnan(E_charge_kWh) else round(float(E_charge_kWh), 3),
            None if np.isnan(E_discharge_kWh) else round(float(E_discharge_kWh), 3),
            None if np.isnan(RTE_pct) else round(float(RTE_pct), 3),
        ]
    summary = pd.DataFrame({"metric": metrics, "value": values})

    return {
        "summary_table": summary,
        "figure": fig,
        "df_calc_poi_egymtr": df_calc_poi_egymtr,
        "warnings": warnings_list,
    }

def plot_calc_poi_egymtr(df_energy: pd.DataFrame, 
                         time_col: str = "Timestamp",
                         title: str = "AC-Side Cumulative Energy (Calc-PoiEgyMtr)") -> go.Figure:
    """Plots the cumulative and incremental energy from the AC dataframe."""
    if df_energy is None or df_energy.empty:
        warnings.warn("Cannot plot AC cumulative energy: DataFrame is empty or None.")
        return go.Figure()
    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=df_energy[time_col], y=df_energy["Calc-PoiEgyMtr"],
        mode="lines", name="Cumulative Energy (kWh)",
        line=dict(color="#FF5733", width=2.5)
    ))
    fig.add_trace(go.Scatter(
        x=df_energy[time_col], y=df_energy["Calc-PoiEgy"],
        mode="lines", name="Interval Energy (kWh)",
        line=dict(color="#337AFF", width=1, dash="dot"), yaxis="y2"
    ))
    fig.update_layout(
        title=title, xaxis_title="Time",
        yaxis=dict(title=dict(text="<b>Cumulative Energy (kWh)</b>", font=dict(color="#FF5733")),
                   tickfont=dict(color="#FF5733")),
        yaxis2=dict(title=dict(text="<b>Interval Energy (kWh)</b>", font=dict(color="#337AFF")),
                    tickfont=dict(color="#337AFF"),
                    overlaying="y", side="right", showgrid=False),
        template="plotly_white", hovermode="x unified",
        legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="left", x=0.01)
    )
    return fig


# =======================================================================
# SECTION 4: THE INTEGRATED ANALYZER CLASS
# (This class contains all your DC-side logic)
# =======================================================================

class BessCapacityTestAnalyzer:
    """
    Integrates AC and DC capacity test analysis into a single class.
    
    1. Instantiate with DataFrames and a config dictionary.
    2. Call .run_analysis()
    3. Call .show_results() and .show_plots()
    """
    def __init__(self, master_config: dict, df_ac: pd.DataFrame, df_dc: pd.DataFrame):
        self.config = master_config
        self.df_ac = df_ac.copy()
        self.df_dc = df_dc.copy()
        
        # --- Result properties ---
        self.ac_results = None
        self.dfs_by_device = None
        self.dc_rte_summary = None
        self.dc_rte_system_totals = None
        self.dc_system_cumulative_energy = None
        self.dc_system_soc = None
        
        # --- Plot properties ---
        self.ac_kpi_plot = None
        self.ac_cumulative_plot = None
        self.dc_cumulative_energy_plot = None
        self.dc_soc_plot = None
        
        print("BessCapacityTestAnalyzer initialized.")
        print(f"AC data: {self.df_ac.shape[0]} rows. DC data: {self.df_dc.shape[0]} rows.")

    def run_analysis(self):
        """Runs all AC and DC analyses."""
        print("\n--- Starting Full Analysis ---")
        
        # --- 1. AC Analysis ---
        try:
            print("Running AC-side analysis...")
            self.ac_results = compute_nominal_from_poi_plotly(
                self.df_ac,
                discharge_start=self.config['discharge_start'],
                discharge_end=self.config['discharge_end'],
                P_nom_kW=self.config['ac_p_nom_kw'],
                tol_pct=self.config['ac_tol_pct'],
                required_minutes=self.config.get('ac_required_minutes'),
                time_col=self.config['ac_time_col'],
                power_col=self.config['ac_power_col'],
                title="AC-Side Discharge KPI",
                drop_duplicate_timestamps=self.config.get('ac_drop_duplicates', True),
                ramp_trim_seconds_discharge=self.config.get('ac_ramp_trim_discharge', 0),
                charge_start=self.config['charge_start'],
                charge_end=self.config['charge_end'],
                sampling_seconds=self.config.get('sampling_seconds'),
                discharge_positive=self.config.get('ac_discharge_positive', True),
                ramp_trim_seconds_charge=self.config.get('ac_ramp_trim_charge', 0),
                rte_min_charge_kWh=self.config.get('rte_min_charge_kwh', 0.01)
            )
            self.ac_kpi_plot = self.ac_results['figure']
            self.ac_cumulative_plot = plot_calc_poi_egymtr(self.ac_results['df_calc_poi_egymtr'])
            print("AC-side analysis complete.")
        except Exception as e:
            print(f"*** ERROR in AC analysis: {e} ***")
            warnings.warn(f"AC analysis failed: {e}")

        # --- 2. DC Analysis (Prep) ---
        try:
            print("Running DC-side data preparation...")
            self._clean_and_partition_dc_df()
            print(f"DC data partitioned into {len(self.dfs_by_device)} devices.")
        except Exception as e:
            print(f"*** ERROR in DC prep: {e} ***")
            warnings.warn(f"DC data prep failed: {e}")
            return

        # --- 3. DC Analysis (RTE) ---
        try:
            print("Running DC-side RTE analysis...")
            self._run_dc_rte_analysis()
            print("DC-side RTE analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC RTE analysis: {e} ***")
            warnings.warn(f"DC RTE analysis failed: {e}")

        # --- 4. DC Analysis (Cumulative Energy) ---
        try:
            print("Running DC-side Cumulative Energy analysis...")
            self._run_dc_cumulative_energy_analysis()
            print("DC-side Cumulative Energy analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC Cumulative Energy analysis: {e} ***")
            warnings.warn(f"DC Cumulative Energy analysis failed: {e}")

        # --- 5. DC Analysis (SOC) ---
        try:
            print("Running DC-side SOC analysis...")
            self._run_dc_soc_analysis()
            print("DC-side SOC analysis complete.")
        except Exception as e:
            print(f"*** ERROR in DC SOC analysis: {e} ***")
            warnings.warn(f"DC SOC analysis failed: {e}")
            
        print("--- Full Analysis Complete ---")

    def _clean_and_partition_dc_df(self):
        """Runs DC scripts #2 (clean) and #3 (partition)."""
        dc_device_col = self.config['dc_device_col']
        exclude_cols = {dc_device_col}
        
        self.df_dc = convert_mixed_numeric_columns(self.df_dc, exclude=exclude_cols, verbose=True)
        
        dc_power_col = self.config['dc_power_col']
        dc_soc_col = self.config['dc_soc_col']
        wanted_cols = [dc_power_col, dc_soc_col]
        
        available_cols = [c for c in wanted_cols if c in self.df_dc.columns]
        
        if dc_device_col not in self.df_dc.columns:
             raise KeyError(f"Required column '{dc_device_col}' not found in DC df.")
        
        dfs_by_device = {}
        for dev, g in self.df_dc.groupby(dc_device_col, sort=False):
            g2 = g[available_cols].sort_index()
            g2 = g2.dropna(how='all', subset=available_cols)
            dfs_by_device[dev] = g2
            
        self.dfs_by_device = dfs_by_device

    def _run_dc_rte_analysis(self):
        """Runs DC script #4 (DC RTE)."""
        per_device_rows = []
        
        CHARGE_START = pd.Timestamp(self.config['charge_start'])
        CHARGE_END = pd.Timestamp(self.config['charge_end'])
        DISCHARGE_START = pd.Timestamp(self.config['discharge_start'])
        DISCHARGE_END = pd.Timestamp(self.config['discharge_end'])
        POWER_COL = self.config['dc_power_col']
        DISCHARGE_POSITIVE = self.config.get('dc_discharge_positive', True)
        P_EPS_KW = self.config.get('dc_p_eps_kw', 0.0)
        SAMPLING_SECONDS = self.config.get('sampling_seconds')
        RTE_MIN_CHARGE_KWH = self.config.get('rte_min_charge_kwh', 0.01)
        
        def _prep_window(g: pd.DataFrame) -> pd.DataFrame:
            if g.empty: return g
            g["dt_s"] = g.index.to_series().diff().dt.total_seconds()
            if len(g) >= 2 and pd.isna(g.iloc[0]["dt_s"]):
                g.iloc[0, g.columns.get_loc("dt_s")] = (g.index[1] - g.index[0]).total_seconds()
            g = g.dropna(subset=["dt_s"])
            g = g[g["dt_s"] > 0]
            return g

        for dev, d in self.dfs_by_device.items():
            if d.empty or POWER_COL not in d.columns:
                continue
            
            dd = d.sort_index().copy()
            P = pd.to_numeric(dd[POWER_COL], errors='coerce').fillna(0.0).to_numpy(dtype=float)
            if not DISCHARGE_POSITIVE:
                P = -P
            if P_EPS_KW > 0:
                P = np.where(np.abs(P) < P_EPS_KW, 0.0, P)
            dd["P"] = P

            d_charge = dd[(dd.index >= CHARGE_START) & (dd.index <= CHARGE_END)].copy()
            d_dis = dd[(dd.index >= DISCHARGE_START) & (dd.index <= DISCHARGE_END)].copy()

            d_charge = _prep_window(d_charge)
            d_dis = _prep_window(d_dis)

            reg_charge = _check_cadence(d_charge["dt_s"] if not d_charge.empty else pd.Series([], dtype=float), SAMPLING_SECONDS)
            reg_dis = _check_cadence(d_dis["dt_s"] if not d_dis.empty else pd.Series([], dtype=float), SAMPLING_SECONDS)

            P_charge_star = (-d_charge["P"]).clip(lower=0.0).to_numpy(dtype=float) if not d_charge.empty else np.array([], dtype=float)
            P_dis_star = (d_dis["P"]).clip(lower=0.0).to_numpy(dtype=float) if not d_dis.empty else np.array([], dtype=float)

            E_charge_kWh = 0.0
            E_discharge_kWh = 0.0
            
            if SAMPLING_SECONDS is not None and reg_charge["is_regular"] and reg_dis["is_regular"]:
                dt_h = float(SAMPLING_SECONDS) / 3600.0
                E_charge_kWh = float(P_charge_star.sum() * dt_h) if P_charge_star.size else 0.0
                E_discharge_kWh = float(P_dis_star.sum() * dt_h) if P_dis_star.size else 0.0
                rte_method = f"constant_dt({int(SAMPLING_SECONDS)}s)"
            else:
                rte_method = "trapezoid_dt"
                if not d_charge.empty and P_charge_star.size:
                    t_c = d_charge.index.view("int64").to_numpy() / 1e9
                    E_charge_kWh = float(np.trapz(P_charge_star, x=t_c) / 3600.0)
                if not d_dis.empty and P_dis_star.size:
                    t_d = d_dis.index.view("int64").to_numpy() / 1e9
                    E_discharge_kWh = float(np.trapz(P_dis_star, x=t_d) / 3600.0)

            eta_dc = np.nan
            if E_charge_kWh > float(RTE_MIN_CHARGE_KWH):
                eta_dc = E_discharge_kWh / E_charge_kWh
            
            per_device_rows.append({
                "Device": dev, "E_dc_in_kWh": E_charge_kWh, "E_dc_out_kWh": E_discharge_kWh,
                "eta_dc": eta_dc, "method": rte_method,
                "dt_med_charge": reg_charge["dt_median"], "dt_p95_charge": reg_charge["dt_p95"],
                "dt_med_dis": reg_dis["dt_median"], "dt_p95_dis": reg_dis["dt_p95"],
            })

        self.dc_rte_summary = pd.DataFrame(per_device_rows).sort_values("Device", kind="stable")
        
        system_totals = {}
        system_totals["Total_E_dc_in_kWh"] = float(self.dc_rte_summary["E_dc_in_kWh"].sum())
        system_totals["Total_E_dc_out_kWh"] = float(self.dc_rte_summary["E_dc_out_kWh"].sum())
        system_totals["eta_dc_system"] = (
            system_totals["Total_E_dc_out_kWh"] / system_totals["Total_E_dc_in_kWh"]
            if system_totals["Total_E_dc_in_kWh"] > 0 else np.nan
        )
        self.dc_rte_system_totals = system_totals

    def _run_dc_cumulative_energy_analysis(self):
        """Runs DC script #5 (Cumulative Energy)."""
        
        def _prep_device_power(df: pd.DataFrame, cfg: dict) -> pd.Series:
            POWER_COL = cfg['dc_power_col']
            if df is None or df.empty or (POWER_COL not in df.columns):
                return pd.Series(dtype=float)
            P = pd.to_numeric(df[POWER_COL], errors="coerce").fillna(0.0).astype(float)
            P_kW = P / 1000.0 if cfg.get('dc_is_power_in_watts', False) else P
            if not cfg.get('dc_discharge_positive', True):
                P_kW = -P_kW
            P_EPS_KW = cfg.get('dc_p_eps_kw', 0.0)
            if P_EPS_KW and P_EPS_KW > 0:
                P_kW = P_kW.where(P_kW.abs() >= P_EPS_KW, 0.0)
            P_kW.name = "P_kW"
            return P_kW.sort_index()

        def _compute_dt_seconds(idx: pd.DatetimeIndex) -> np.ndarray:
            if len(idx) == 0: return np.array([], dtype=float)
            dt_s = np.diff(idx.view("int64")) / 1e9
            if len(dt_s) == 0: return np.array([0.0], dtype=float)
            first = dt_s[0]
            return np.concatenate([[first], dt_s])

        def _cumulative_energy_from_power(P_kW: pd.Series, cfg: dict) -> pd.Series:
            if P_kW.empty:
                return pd.Series(index=P_kW.index, data=[], dtype=float, name="E_system_cum_kWh")
            idx = P_kW.index
            dt_s_series = pd.Series(index=idx, data=_compute_dt_seconds(idx))
            
            SAMPLING_SECONDS = cfg.get('sampling_seconds')
            reg = _check_cadence(dt_s_series, SAMPLING_SECONDS)
            
            if SAMPLING_SECONDS is not None and reg["is_regular"]:
                dt_h = float(SAMPLING_SECONDS) / 3600.0
                E = np.cumsum(P_kW.values) * dt_h
            else:
                dt_h = dt_s_series.values / 3600.0
                P = P_kW.values.astype(float)
                inc = P * dt_h # Left-rectangle method
                E = np.cumsum(inc)
                
            return pd.Series(index=idx, data=E, name="E_system_cum_kWh")
        
        per_device_power = {}
        for dev, df in self.dfs_by_device.items():
            P_kW = _prep_device_power(df, self.config)
            if not P_kW.empty:
                per_device_power[dev] = P_kW

        pow_df = pd.DataFrame(per_device_power).sort_index()
        P_system_kW = pow_df.fillna(0.0).sum(axis=1)
        
        self.dc_system_cumulative_energy = _cumulative_energy_from_power(P_system_kW, self.config)

        plt.style.use("seaborn-v0_8-whitegrid")
        fig, ax = plt.subplots(figsize=(10, 5))
        ax.plot(self.dc_system_cumulative_energy.index, self.dc_system_cumulative_energy.values, color="tab:blue", label="System")
        ax.set_title("System Cumulative DC Energy (signed, kWh)")
        ax.set_xlabel("Time")
        ax.set_ylabel("Energy (kWh)")
        ax.axhline(0, color="k", lw=0.8)
        ax.legend()
        plt.tight_layout()
        self.dc_cumulative_energy_plot = fig
        plt.close(fig) 

    def _run_dc_soc_analysis(self):
        """Runs DC script #6 (SOC analysis)."""
        SOC_COL = self.config['dc_soc_col']
        IS_SOC_PERCENT = self.config.get('dc_is_soc_percent', True)
        CLIP_MIN, CLIP_MAX = (0.0, 100.0) if IS_SOC_PERCENT else (0.0, 1.0)
        
        soc_by_device = {}
        for dev, df in self.dfs_by_device.items():
            if df is None or df.empty or (SOC_COL not in df.columns):
                continue
            s = pd.to_numeric(df[SOC_COL], errors="coerce").astype(float)
            s = s.clip(CLIP_MIN, CLIP_MAX)
            soc_by_device[dev] = s.sort_index()

        if not soc_by_device:
            warnings.warn("No device had a valid SOC series. Skipping SOC analysis.")
            return

        soc_df_union = pd.DataFrame(soc_by_device).sort_index()
        
        first_valid_per_dev = {c: soc_df_union[c].first_valid_index() for c in soc_df_union.columns}
        last_valid_per_dev  = {c: soc_df_union[c].last_valid_index() for c in soc_df_union.columns}
        drop_cols = [c for c in soc_df_union.columns if first_valid_per_dev[c] is None or last_valid_per_dev[c] is None]
        
        if drop_cols:
            soc_df_union = soc_df_union.drop(columns=drop_cols)
            first_valid_per_dev = {c: soc_df_union[c].first_valid_index() for c in soc_df_union.columns}
            last_valid_per_dev  = {c: soc_df_union[c].last_valid_index() for c in soc_df_union.columns}

        if soc_df_union.empty:
            warnings.warn("All devices dropped due to missing SOC. Skipping SOC analysis.")
            return

        common_start = max(first_valid_per_dev.values())
        common_end = min(last_valid_per_dev.values())
        if (common_start is None) or (common_end is None) or (common_start >= common_end):
            warnings.warn("No overlapping time window where all devices have SOC. Skipping SOC analysis.")
            return

        soc_df = soc_df_union[(soc_df_union.index >= common_start) & (soc_df_union.index <= common_end)]
        soc_df_ff = soc_df.ffill().clip(CLIP_MIN, CLIP_MAX)

        N = soc_df_ff.shape[1]
        self.dc_system_soc = soc_df_ff.sum(axis=1) / float(N)
        self.dc_system_soc.name = "SOC_total"

        plt.style.use("seaborn-v0_8-whitegrid")
        fig, ax = plt.subplots(figsize=(10, 4.5))
        ax.plot(self.dc_system_soc.index, self.dc_system_soc.values, color="tab:blue", label=f"System SOC (N={N})")
        ax.set_title("System State of Charge (Simple Mean Across Devices)")
        ax.set_xlabel("Time")
        ax.set_ylabel("SOC (%)" if IS_SOC_PERCENT else "SOC (fraction)")
        ax.set_ylim(CLIP_MIN - 1 if IS_SOC_PERCENT else -0.02, CLIP_MAX + 1 if IS_SOC_PERCENT else 1.02)
        ax.legend()
        plt.tight_layout()
        self.dc_soc_plot = fig
        plt.close(fig) 

    def show_results(self):
        """Prints all summary tables to the console."""
        print("\n\n" + "="*50)
        print("          BESS CAPACITY TEST RESULTS")
        print("="*50 + "\n")
        
        if self.ac_results:
            print("--- AC-Side KPI & RTE Summary ---")
            with pd.option_context('display.max_rows', None, 'display.width', 1000):
                print(self.ac_results['summary_table'])
            print("\nAC-Side Warnings:")
            if self.ac_results['warnings']:
                for w in self.ac_results['warnings']: print(f"- {w}")
            else:
                print("- None")
        else:
            print("--- AC-Side Analysis Not Run ---")

        if self.dc_rte_summary is not None:
            print("\n\n--- DC-Side Per-Device RTE Summary ---")
            pd.set_option('display.float_format', lambda v: f"{v:,.6f}")
            print(self.dc_rte_summary)
            print("\n--- DC-Side System RTE Summary ---")
            for k, v in self.dc_rte_system_totals.items():
                print(f"- {k}: {v:,.6f}")
        else:
            print("\n\n--- DC-Side Analysis Not Run ---")
            
    def show_plots(self):
        """Displays all generated plots."""
        print("\n\n" + "="*50)
        print("          BESS CAPACITY TEST PLOTS")
        print("="*50 + "\n")
        
        if self.ac_kpi_plot:
            print("Displaying AC-Side KPI Plot...")
            self.ac_kpi_plot.show()
        else:
            print("AC-Side KPI Plot not available.")
            
        if self.ac_cumulative_plot:
            print("Displaying AC-Side Cumulative Energy Plot...")
            self.ac_cumulative_plot.show()
        else:
            print("AC-Side Cumulative Energy Plot not available.")
            
        if self.dc_cumulative_energy_plot:
            print("Displaying DC-Side System Cumulative Energy Plot...")
            self.dc_cumulative_energy_plot.show()
        else:
            print("DC-Side Cumulative Energy Plot not available.")
            
        if self.dc_soc_plot:
            print("Displaying DC-Side System SOC Plot...")
            self.dc_soc_plot.show()
        else:
            print("DC-Side System SOC Plot not available.")


In [27]:
# =======================================================================
# SECTION 5: EXAMPLE USAGE
# (This is the only part you need to edit)
# =======================================================================

if __name__ == "__main__":
    
    # --- 1. DEFINE FILE PATHS AND CONFIG ---
    
    AC_FILE_PATH = r'C:\Data_analysis\Decci\Capacity_test\Hybrid_Controller\LogDataFast_2024-02-28.csv'
    DC_FILE_PATH = r'C:\Data_analysis\Decci\Capacity_test\Hybrid_Monitoring\data-2024-02-28(MVPS).csv'

    master_config = {
        # --- Time Windows (Critical) ---
        "charge_start": "2024-02-28 10:30:00",
        "charge_end": "2024-02-28 13:53:00",
        "discharge_start": "2024-02-28 14:59:30",
        "discharge_end": "2024-02-28 15:58:40",
        
        # --- General Settings ---
        "sampling_seconds": 1,
        "rte_min_charge_kwh": 0.01,
        
        # --- AC-Side Config (Check column names) ---
        "ac_time_col": "Timestamp",
        "ac_power_col": "PoiPwrAt kW",
        "ac_p_nom_kw": 24500,
        "ac_tol_pct": 5.0,
        "ac_required_minutes": 58,
        "ac_discharge_positive": True,
        "ac_ramp_trim_discharge": 0,
        "ac_ramp_trim_charge": 0,
        
        # --- DC-Side Config (Check column names) ---
        "dc_device_col": "Device",
        "dc_power_col": "DcTotWatt",
        "dc_soc_col": "Bat.SOCTot",
        "dc_discharge_positive": True,
        "dc_is_power_in_watts": False, # Set True if 'DcTotWatt' is in W, False if kW
        "dc_p_eps_kw": 0.0,
        "dc_is_soc_percent": True,
    }

    # --- 2. LOAD DATA (OR CREATE DUMMY DATA) ---
    
    try:
        # Try to load real AC data
        print(f"Loading AC data from: {AC_FILE_PATH}")
        df_ac = load_hycon_hybrid_fast(AC_FILE_PATH)
        print(f"Loaded AC DataFrame with {df_ac.shape} rows/cols.")
        
        # -----------------------------------------------------------------
        # >>>>>>>>>>>> NEW FIX IS HERE <<<<<<<<<<<<
        # -----------------------------------------------------------------
        # We must convert the AC data columns to numeric, just like the DC data
        print("Converting AC numeric columns...")
        # Define all known non-numeric columns in the AC file to exclude them
        ac_string_cols = {
            'Timestamp', 'OpStt', 'HybridSysState', 'HybridSysStateTrans', 
            'SignalValidity', 'CtrlModeCmdIn'
        }
        df_ac = convert_mixed_numeric_columns(df_ac, exclude=ac_string_cols, verbose=True)
        print("AC numeric conversion complete.")
        # -----------------------------------------------------------------
        # >>>>>>>>>>>> END OF FIX <<<<<<<<<<<<
        # -----------------------------------------------------------------

    except Exception as e:
        print(f"Warning: Could not load AC data ({e}). Creating dummy AC data.")
        # (Dummy data creation as fallback)
        ac_time = pd.date_range("2024-02-28 10:00:00", "2024-02-28 17:00:00", freq="1s")
        ac_power = np.zeros(len(ac_time))
        ac_power[(ac_time >= pd.Timestamp(master_config['charge_start'])) & (ac_time < pd.Timestamp(master_config['charge_end']))] = -25000
        ac_power[(ac_time >= pd.Timestamp(master_config['discharge_start'])) & (ac_time < pd.Timestamp(master_config['discharge_end']))] = 24500
        df_ac = pd.DataFrame({
            master_config['ac_time_col']: ac_time, 
            master_config['ac_power_col']: ac_power + np.random.normal(0, 50, len(ac_time))
        })

    try:
        # Try to load real DC data
        print(f"Loading DC data from: {DC_FILE_PATH}")
        df_dc = load_and_prep_dc_data(DC_FILE_PATH)
        print(f"Loaded & Prepared DC DataFrame with {df_dc.shape} rows/cols.")
    except Exception as e:
        print(f"Warning: Could not load DC data ({e}). Creating dummy DC data.")
        # (Dummy data creation as fallback)
        dc_time = pd.date_range("2024-02-28 10:00:00", "2024-02-28 17:00:00", freq="1s")
        def create_device_data(name, time, charge_p, discharge_p, soc_start, soc_end_charge, soc_end_dis):
            P = np.zeros(len(time))
            SOC = np.zeros(len(time))
            ch_mask = (time >= pd.Timestamp(master_config['charge_start'])) & (time < pd.Timestamp(master_config['charge_end']))
            dis_mask = (time >= pd.Timestamp(master_config['discharge_start'])) & (time < pd.Timestamp(master_config['discharge_end']))
            P[ch_mask] = -charge_p
            P[dis_mask] = discharge_p
            SOC[:] = soc_start
            SOC[ch_mask] = np.linspace(soc_start, soc_end_charge, ch_mask.sum())
            SOC[dis_mask] = np.linspace(soc_end_charge, soc_end_dis, dis_mask.sum())
            SOC[time > pd.Timestamp(master_config['discharge_end'])] = soc_end_dis
            return pd.DataFrame({
                "Datetime": time,
                master_config['dc_device_col']: name,
                master_config['dc_power_col']: P + np.random.normal(0, 50, len(time)),
                master_config['dc_soc_col']: SOC + np.random.normal(0, 0.1, len(time)),
            })
        df_dc1 = create_device_data("Inverter_1", dc_time, 13000, 12500, 10, 95, 15)
        df_dc2 = create_device_data("Inverter_2", dc_time, 13000, 12500, 10.2, 94.8, 15.1)
        df_dc = pd.concat([df_dc1, df_dc2]).set_index("Datetime").sort_index()


    # --- 3. RUN ANALYSIS ---
    
    # Check if dataframes are loaded before proceeding
    if 'df_ac' in locals() and 'df_dc' in locals():
        analyzer = BessCapacityTestAnalyzer(master_config, df_ac, df_dc)
        
        # Run all AC and DC calculations
        analyzer.run_analysis()

        # --- 4. SHOW RESULTS ---
        
        # Print all summary tables
        analyzer.show_results()

        # Show all plots
        analyzer.show_plots()
    else:
        print("\n*** Analysis failed: Data could not be loaded or created. ***")

Loading AC data from: C:\Data_analysis\Decci\Capacity_test\Hybrid_Controller\LogDataFast_2024-02-28.csv
Loaded AC DataFrame with (84240, 46) rows/cols.
Converting AC numeric columns...
AC numeric conversion complete.
Loading DC data from: C:\Data_analysis\Decci\Capacity_test\Hybrid_Monitoring\data-2024-02-28(MVPS).csv
Loaded & Prepared DC DataFrame with (146521, 32) rows/cols.
BessCapacityTestAnalyzer initialized.
AC data: 84240 rows. DC data: 146521 rows.

--- Starting Full Analysis ---
Running AC-side analysis...
*** ERROR in AC analysis: 
    Invalid value of type 'builtins.str' received for the 'color' property of scatter.line
        Received value: '#2ca0g02c'

    The 'color' property is a color and may be specified as:
      - A hex string (e.g. '#ff0000')
      - An rgb/rgba string (e.g. 'rgb(255,0,0)')
      - An hsl/hsla string (e.g. 'hsl(0,100%,50%)')
      - An hsv/hsva string (e.g. 'hsv(0,100%,100%)')
      - A named CSS color: see https://plotly.com/python/css-colors/ fo


AC analysis failed: 
    Invalid value of type 'builtins.str' received for the 'color' property of scatter.line
        Received value: '#2ca0g02c'

    The 'color' property is a color and may be specified as:
      - A hex string (e.g. '#ff0000')
      - An rgb/rgba string (e.g. 'rgb(255,0,0)')
      - An hsl/hsla string (e.g. 'hsl(0,100%,50%)')
      - An hsv/hsva string (e.g. 'hsv(0,100%,100%)')
      - A named CSS color: see https://plotly.com/python/css-colors/ for a list




[DC Numeric conversion diagnostics]
- DcTotVolt: Converted (pref=US, valid_ratio=1.00)
- DcTotAmp: Converted (pref=None, valid_ratio=1.00)
- DcTotWatt: Converted (pref=None, valid_ratio=1.00)
- GridTotAPhsA: Converted (pref=None, valid_ratio=1.00)
- GridTotAPhsB: Converted (pref=None, valid_ratio=1.00)
- GridTotAPhsC: Converted (pref=None, valid_ratio=1.00)
- InvTotVA: Converted (pref=None, valid_ratio=1.00)
- InvTotW: Converted (pref=None, valid_ratio=1.00)
- InvTotVAr: Converted (pref=None, valid_ratio=1.00)
- GridVoltagePhsAB: Converted (pref=None, valid_ratio=1.00)
- GridVoltagePhsBC: Converted (pref=None, valid_ratio=1.00)
- GridVoltagePhsCA: Converted (pref=None, valid_ratio=1.00)
- GridFrequency: Converted (pref=None, valid_ratio=1.00)
- DcSw1Stt: Converted (pref=None, valid_ratio=1.00)
- DcSw2Stt: Converted (pref=None, valid_ratio=1.00)
- DcSw3Stt: Converted (pref=None, valid_ratio=1.00)
- AcSwStt: Converted (pref=None, valid_ratio=1.00)
- DCEnergyDrawnTotal: Converted (pref=N


FigureCanvasAgg is non-interactive, and thus cannot be shown


FigureCanvasAgg is non-interactive, and thus cannot be shown



In [23]:
# ... after this line:
df_ac = load_hycon_hybrid_fast(AC_FILE_PATH)

# ADD THIS LINE:
print(f"AC Columns Loaded: {list(df_ac.columns)}")

AC Columns Loaded: ['Timestamp', 'OpStt', 'HybridSysState', 'HybridSysStateTrans', 'LoadPwrAtTot kW', 'LoadPwrRtTot kvar', 'PvBatMin kW', 'PvBatMax kW', 'SignalValidity', 'BatPwrAtTot kW', 'BatPwrRtTot kvar', 'NoOfActBat 1', 'SocAvg %', 'BatPwrAtSpnt kW', 'BatPwrRtSpnt kvar', 'BatPwrAtChrLimTot kW', 'BatPwrAtDisLimTot kW', 'SocCtrlPwrAtSpnt kW', 'EgyShiftPwrAtSpnt kW', 'RampRatePwrAtSpnt kW', 'GridPwrAtTot kW', 'GridPwrRtTot kvar', 'EstDomTm s', 'BatGfPwrAtSpntOfs kW', 'BatGfPwrRtSpntOfs kvar', 'BatGfAtNomTot kW', 'PoiPwrAt kW', 'PoiPwrRt kvar', 'PoiFrq Hz', 'PoiVtg V', 'FrqRespPwrAtSpnt kW', 'SocMgmtPwrAtSpnt kW', 'PwrRtSpntTot kvar', 'BatPwrAtSpntIn kW', 'CtrlModeCmdIn', 'FrqSpntIn Hz', 'PwrAtLoLimPrlIn kW', 'PwrAtLoLimSrlIn kW', 'PwrAtRateMaxIn MW/min', 'PwrAtSpntOfsIn kW', 'PwrAtSpntPrlIn kW', 'PwrAtSpntSrlIn kW', 'PwrAtUpLimPrlIn kW', 'PwrAtUpLimSrlIn kW', 'PwrRtSpntIn kvar', 'VtgSpntIn V']


In [24]:
print(f"ACTUAL AC COLUMNS ARE: {list(df_ac.columns)}")

ACTUAL AC COLUMNS ARE: ['Timestamp', 'OpStt', 'HybridSysState', 'HybridSysStateTrans', 'LoadPwrAtTot kW', 'LoadPwrRtTot kvar', 'PvBatMin kW', 'PvBatMax kW', 'SignalValidity', 'BatPwrAtTot kW', 'BatPwrRtTot kvar', 'NoOfActBat 1', 'SocAvg %', 'BatPwrAtSpnt kW', 'BatPwrRtSpnt kvar', 'BatPwrAtChrLimTot kW', 'BatPwrAtDisLimTot kW', 'SocCtrlPwrAtSpnt kW', 'EgyShiftPwrAtSpnt kW', 'RampRatePwrAtSpnt kW', 'GridPwrAtTot kW', 'GridPwrRtTot kvar', 'EstDomTm s', 'BatGfPwrAtSpntOfs kW', 'BatGfPwrRtSpntOfs kvar', 'BatGfAtNomTot kW', 'PoiPwrAt kW', 'PoiPwrRt kvar', 'PoiFrq Hz', 'PoiVtg V', 'FrqRespPwrAtSpnt kW', 'SocMgmtPwrAtSpnt kW', 'PwrRtSpntTot kvar', 'BatPwrAtSpntIn kW', 'CtrlModeCmdIn', 'FrqSpntIn Hz', 'PwrAtLoLimPrlIn kW', 'PwrAtLoLimSrlIn kW', 'PwrAtRateMaxIn MW/min', 'PwrAtSpntOfsIn kW', 'PwrAtSpntPrlIn kW', 'PwrAtSpntSrlIn kW', 'PwrAtUpLimPrlIn kW', 'PwrAtUpLimSrlIn kW', 'PwrRtSpntIn kvar', 'VtgSpntIn V']


In [25]:
# ... after this line:
df_ac = load_hycon_hybrid_fast(AC_FILE_PATH)
print(f"ACTUAL AC COLUMNS ARE: {list(df_ac.columns)}") 

# ADD THESE NEW LINES:
if "Timestamp" in df_ac.columns:
    # 1. Sanitize timestamps to be sure
    df_ac['Timestamp'] = pd.to_datetime(df_ac['Timestamp'], errors='coerce')
    df_ac = df_ac.dropna(subset=['Timestamp'])
    
    # 2. Print the time range
    if not df_ac.empty:
        print(f"AC Timestamp data type: {df_ac['Timestamp'].dtype}")
        print(f"AC data START time: {df_ac['Timestamp'].min()}")
        print(f"AC data END time: {df_ac['Timestamp'].max()}")
    else:
        print("AC data is EMPTY after timestamp conversion.")
else:
    print("ERROR: 'Timestamp' column not found in AC data.")

ACTUAL AC COLUMNS ARE: ['Timestamp', 'OpStt', 'HybridSysState', 'HybridSysStateTrans', 'LoadPwrAtTot kW', 'LoadPwrRtTot kvar', 'PvBatMin kW', 'PvBatMax kW', 'SignalValidity', 'BatPwrAtTot kW', 'BatPwrRtTot kvar', 'NoOfActBat 1', 'SocAvg %', 'BatPwrAtSpnt kW', 'BatPwrRtSpnt kvar', 'BatPwrAtChrLimTot kW', 'BatPwrAtDisLimTot kW', 'SocCtrlPwrAtSpnt kW', 'EgyShiftPwrAtSpnt kW', 'RampRatePwrAtSpnt kW', 'GridPwrAtTot kW', 'GridPwrRtTot kvar', 'EstDomTm s', 'BatGfPwrAtSpntOfs kW', 'BatGfPwrRtSpntOfs kvar', 'BatGfAtNomTot kW', 'PoiPwrAt kW', 'PoiPwrRt kvar', 'PoiFrq Hz', 'PoiVtg V', 'FrqRespPwrAtSpnt kW', 'SocMgmtPwrAtSpnt kW', 'PwrRtSpntTot kvar', 'BatPwrAtSpntIn kW', 'CtrlModeCmdIn', 'FrqSpntIn Hz', 'PwrAtLoLimPrlIn kW', 'PwrAtLoLimSrlIn kW', 'PwrAtRateMaxIn MW/min', 'PwrAtSpntOfsIn kW', 'PwrAtSpntPrlIn kW', 'PwrAtSpntSrlIn kW', 'PwrAtUpLimPrlIn kW', 'PwrAtUpLimSrlIn kW', 'PwrRtSpntIn kvar', 'VtgSpntIn V']
AC Timestamp data type: datetime64[ns]
AC data START time: 2024-02-28 00:00:04
AC data 