In [None]:
"""
Purpose
-------
Plot repeatability across multiple experimental trials by showing:
  1) Prefilter pressure drop (ΔP) vs time (Trial 1 ± 1 SD across trials)
  2) Face velocity vs time (Trial 1 ± 1 SD across trials)
  3) Gross pressure drop normalized to a constant reference velocity (Trial 1 ± 1 SD)

What this script does
---------------------
- Reads 5 CSV "trials" pasted into raw1...raw5 (same time grid required).
- Smooths each trial's signals (ΔP and velocity) with a rolling mean.
- Computes gross ΔP = dp1 + dp2 and normalizes it to a constant face velocity v_ref by:
    gross(t) = media(t) + K * v_ref^n
  where K is fit from the early portion of the run (EARLY_FRACTION).
- Stacks the 5 cleaned trials, computes pointwise standard deviation (SD) vs time,
  and overlays ±1 SD bands around Trial 1 on a 1×3 figure.
- Exports PNG + SVG into ./fig_exports

Input CSV format (per raw block)
--------------------------------
Columns required (header row required):
time,dp1,dp2,velocity
Units are up to you, but be consistent (e.g., time in s, dp in Pa, velocity in m/s).

Notes
-----
- All trials must share the exact same time array (same values in same order).
  If not, resample/align before using this script.
"""

from __future__ import annotations

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from io import StringIO
from pathlib import Path

# =============================
# USER SETTINGS
# =============================
V_REF_USER = 0.75        # reference velocity for normalization (m/s)
N_GROSS = 1              # exponent in gross ΔP ~ v^n
ROLL_WIN = 1             # rolling-mean smoothing window for signals (>=2 smooths)
SD_SMOOTH_WIN = 1        # rolling-mean smoothing window for SD envelopes
MIN_VEL = 1e-6           # avoid divide-by-zero / v^n issues
LINE_WIDTH = 1
DPI = 300
OUTDIR = Path("fig_exports")

EARLY_FRACTION = 0.10    # fraction of earliest points used to fit K_gross
CLIP_NEGATIVES = True    # clip media term to >= 0 after subtraction
PRINT_SUMMARY = True     # print per-trial normalization summary stats

# =============================
# Colors
# =============================
C_PREF = "#ff7f0e"
C_GROSS = "#2ca02c"
C_VEL = "#17becf"

# =============================
# Labels & Legends
# =============================
YL_PRESSURE = "Pa"
YL_VELOCITY = "m/s"

TITLE_PREF_COMPACT = "Prefilter ΔP — Experimental"
TITLE_VEL_COMPACT = "Velocity — Experimental"
TITLE_GROSS_CV = "Gross ΔP — Constant Face Velocity"

LEG_PREF_MAIN = "Prefilter ΔP"
LEG_VEL_MAIN = "Velocity"
LEG_GROSS_MAIN = f"Gross ΔP @ v_ref = {V_REF_USER:.3f} m/s"

# =============================
# DATA: paste FIVE CSV trials
# =============================
raw1 = """
time,dp1,dp2,velocity

"""

raw2 = """
time,dp1,dp2,velocity

"""

raw3 = """
time,dp1,dp2,velocity

"""

raw4 = """
time,dp1,dp2,velocity

"""

raw5 = """
time,dp1,dp2,velocity

"""

raw_list = [raw1.strip(), raw2.strip(), raw3.strip(), raw4.strip(), raw5.strip()]

# =============================
# Helper functions
# =============================
def smooth(arr: np.ndarray, win: int) -> np.ndarray:
    """Centered rolling-mean smoothing."""
    x = np.asarray(arr, dtype=float)
    if not win or win < 2:
        return x
    return pd.Series(x).rolling(win, center=True, min_periods=1).mean().to_numpy()

def load_trial_csv(raw: str) -> pd.DataFrame:
    """Parse a raw CSV string into a cleaned, numeric DataFrame."""
    df = pd.read_csv(StringIO(raw), comment="#")
    required = ["time", "dp1", "dp2", "velocity"]
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")

    df = (
        df[required]
        .apply(pd.to_numeric, errors="coerce")
        .dropna()
        .sort_values("time")
        .reset_index(drop=True)
    )
    return df

def fit_K_gross(dp_gross: np.ndarray, v: np.ndarray, n: float, frac: float) -> float:
    """
    Fit K in dp_gross ≈ K * v^n using early portion of run (least-squares through origin).
    """
    m = max(3, int(len(v) * frac))
    v_n = np.power(v[:m], n)
    denom = np.dot(v_n, v_n)
    if denom <= 0:
        return 0.0
    K = float(np.dot(dp_gross[:m], v_n) / denom)
    return max(K, 0.0)

def gross_to_const_velocity(
    dp_gross: np.ndarray,
    v: np.ndarray,
    vref: float,
    n: float,
    K: float,
    clip_negatives: bool = True,
) -> tuple[np.ndarray, np.ndarray]:
    """
    Convert gross ΔP to an equivalent constant-velocity gross ΔP at vref.

    dp_gross = media + K*v^n  =>  media = dp_gross - K*v^n
    gross_const(vref) = media + K*vref^n
    """
    media = dp_gross - K * np.power(v, n)
    if clip_negatives:
        media = np.maximum(media, 0.0)
    gross_const = media + K * (vref ** n)
    return gross_const, media

def summarize(name: str, x: np.ndarray) -> str:
    return f"{name}: min={np.min(x):.2f}, max={np.max(x):.2f}, Δ={np.ptp(x):.2f}"

# =============================
# Load and process each trial
# =============================
t_ref: np.ndarray | None = None
dp2_s_list: list[np.ndarray] = []     # smoothed dp2 per trial
v_s_list: list[np.ndarray] = []       # smoothed velocity per trial
gross_c_list: list[np.ndarray] = []   # smoothed gross ΔP @ v_ref per trial

for i, raw in enumerate(raw_list, start=1):
    if not raw:
        raise ValueError(f"raw{i} is empty. Paste CSV data into raw{i}.")

    df = load_trial_csv(raw)

    t = df["time"].to_numpy(dtype=float)
    v = np.maximum(df["velocity"].to_numpy(dtype=float), MIN_VEL)
    dp1 = df["dp1"].to_numpy(dtype=float)
    dp2 = df["dp2"].to_numpy(dtype=float)

    # Enforce identical time arrays across trials
    if t_ref is None:
        t_ref = t
    else:
        if not np.array_equal(t_ref, t):
            raise ValueError(
                "Time arrays differ between trials. Resample/align all trials "
                "to the same time grid before using this script."
            )

    # Smooth "cleaned" signals
    dp2_clean = smooth(dp2, ROLL_WIN)
    v_clean = smooth(v, ROLL_WIN)

    # Gross normalization to v_ref
    gross_raw = dp1 + dp2
    K_gross = fit_K_gross(gross_raw, v, n=N_GROSS, frac=EARLY_FRACTION)
    gross_const, _gross_media = gross_to_const_velocity(
        gross_raw, v, vref=V_REF_USER, n=N_GROSS, K=K_gross, clip_negatives=CLIP_NEGATIVES
    )
    gross_clean = smooth(gross_const, ROLL_WIN)

    if PRINT_SUMMARY:
        print(f"\n=== Trial {i} gross-only normalization summary ===")
        print(f"K_gross={K_gross:.4g}, n={N_GROSS:g}")
        print(summarize("Gross raw ΔP", gross_raw))
        print(summarize("Gross ΔP @ v_ref", gross_const))

    dp2_s_list.append(dp2_clean)
    v_s_list.append(v_clean)
    gross_c_list.append(gross_clean)

t = t_ref  # type: ignore[assignment]

# Stack trials: shape (n_times, n_trials=5)
dp2_stack = np.column_stack(dp2_s_list)
v_stack = np.column_stack(v_s_list)
gross_stack = np.column_stack(gross_c_list)

# SD across trials at each time
dp2_std_raw = dp2_stack.std(axis=1, ddof=1)
v_std_raw = v_stack.std(axis=1, ddof=1)
gross_std_raw = gross_stack.std(axis=1, ddof=1)

# Smooth SD envelopes (optional)
dp2_std = smooth(dp2_std_raw, SD_SMOOTH_WIN)
v_std = smooth(v_std_raw, SD_SMOOTH_WIN)
gross_std = smooth(gross_std_raw, SD_SMOOTH_WIN)

# Use Trial 1 as the "main" trace
main_dp2 = dp2_s_list[0]
main_v = v_s_list[0]
main_gross = gross_c_list[0]

# =============================
# 1×3 Compact Figure
# =============================
OUTDIR.mkdir(parents=True, exist_ok=True)

fig, (ax0, ax1, ax2) = plt.subplots(1, 3, figsize=(14, 4.5), sharex=False)

band_label = "±1 SD (5 trials)"

# Panel 1 — Prefilter ΔP (Trial 1 ± SD)
ax0.plot(t, main_dp2, color=C_PREF, lw=LINE_WIDTH, label=LEG_PREF_MAIN)
ax0.fill_between(t, main_dp2 - dp2_std, main_dp2 + dp2_std,
                 color=C_PREF, alpha=0.20, linewidth=0, label=band_label)
ax0.set_title(TITLE_PREF_COMPACT)
ax0.set_ylabel(YL_PRESSURE)
ax0.set_xlabel("Time (s)")
ax0.grid(True, ls="--", alpha=0.35)
ax0.legend(loc="upper left", frameon=True)

# Panel 2 — Velocity (Trial 1 ± SD)
ax1.plot(t, main_v, color=C_VEL, lw=LINE_WIDTH, label=LEG_VEL_MAIN)
ax1.fill_between(t, main_v - v_std, main_v + v_std,
                 color=C_VEL, alpha=0.20, linewidth=0, label=band_label)
ax1.set_title(TITLE_VEL_COMPACT)
ax1.set_ylabel(YL_VELOCITY)
ax1.set_xlabel("Time (s)")
ax1.grid(True, ls="--", alpha=0.35)
ax1.legend(loc="lower left", frameon=True)

# Panel 3 — Gross ΔP @ constant v_ref (Trial 1 ± SD)
ax2.plot(t, main_gross, color=C_GROSS, lw=LINE_WIDTH, label=LEG_GROSS_MAIN)
ax2.fill_between(t, main_gross - gross_std, main_gross + gross_std,
                 color=C_GROSS, alpha=0.20, linewidth=0, label=band_label)
ax2.set_title(TITLE_GROSS_CV)
ax2.set_ylabel(YL_PRESSURE)
ax2.set_xlabel("Time (s)")
ax2.grid(True, ls="--", alpha=0.35)
ax2.legend(loc="lower right", frameon=True)

fig.tight_layout()
png = OUTDIR / "prefilter_test_row_trial1_sd.png"
svg = OUTDIR / "prefilter_test_row_trial1_sd.svg"
fig.savefig(png, dpi=DPI)
fig.savefig(svg)
plt.close(fig)

print(f"Saved: {png}  |  {svg}")

In [None]:
"""
Script: hepa_velocity_normalization_and_sd.py

Purpose
-------
Processes multiple experimental wind-tunnel trials to:
1) Smooth HEPA differential pressure (ΔP) and airflow velocity signals
2) Normalize gross system pressure drop to a constant face velocity
3) Compute trial-to-trial variability (±1 SD)
4) Generate a compact 1×3 figure showing:
   - HEPA ΔP (experimental)
   - Airflow velocity (experimental)
   - Gross ΔP normalized to constant face velocity

This script is used to produce figures for the ASCE journal manuscript
on prefilter performance and HEPA loading behavior.

Environment
-----------
Designed for Google Colab, Jupyter Notebook, or standard Python with:
  - pandas
  - numpy
  - matplotlib

Input
-----
Five CSV-formatted trials, each with columns:
  time, dp1, dp2, velocity

Where:
  dp1 = HEPA ΔP
  dp2 = prefilter ΔP
  velocity = duct / face velocity

Output
------
Saves:
  fig_exports/hepa_test_row_trial1_sd.png
  fig_exports/hepa_test_row_trial1_sd.svg
"""

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from io import StringIO
from pathlib import Path


# ============================================================
# USER SETTINGS
# ============================================================
V_REF_USER     = 0.75    # reference velocity for normalization (m/s)
V_INIT_TARGET  = 0.75    # enforced starting velocity for each trial (m/s)
N_GROSS        = 1       # exponent for ΔP ~ v^n (viscous regime)
ROLL_WIN       = 5       # smoothing window for signals (0 disables)
SD_SMOOTH_WIN  = 15      # smoothing window for SD bands
MIN_VEL        = 1e-6
DPI            = 300
OUTDIR         = Path("fig_exports")

EARLY_FRACTION = 0.10    # early fraction used to fit K_gross
CLIP_NEGATIVES = True
PRINT_SUMMARY  = True


# ============================================================
# COLORS
# ============================================================
C_HEPA  = "#ff7f0e"
C_GROSS = "#2ca02c"
C_VEL   = "#17becf"


# ============================================================
# LABELS
# ============================================================
YL_PRESSURE = "Pa"
YL_VELOCITY = "m/s"

TITLE_HEPA  = "HEPA ΔP — Experimental"
TITLE_VEL   = "Velocity — Experimental"
TITLE_GROSS = "Gross ΔP — Constant Face Velocity"

LEG_HEPA  = "HEPA ΔP"
LEG_VEL   = "Velocity"
LEG_GROSS = f"Gross ΔP @ v_ref = {V_REF_USER:.2f} m/s"


# ============================================================
# RAW DATA (FIVE TRIALS)
# ============================================================
# Each block must contain:
# time,dp1,dp2,velocity

raw1 = """time,dp1,dp2,velocity"""
raw2 = """time,dp1,dp2,velocity"""
raw3 = """time,dp1,dp2,velocity"""
raw4 = """time,dp1,dp2,velocity"""
raw5 = """time,dp1,dp2,velocity"""

raw_list = [raw1, raw2, raw3, raw4, raw5]


# ============================================================
# HELPER FUNCTIONS
# ============================================================
def smooth(arr, win):
    """Centered rolling mean smoothing."""
    if not win or win < 2:
        return np.asarray(arr)
    return (
        pd.Series(arr)
        .rolling(win, center=True, min_periods=1)
        .mean()
        .to_numpy()
    )


def fit_K_gross(dp_gross, v, n, frac):
    """
    Fit K in ΔP ≈ K · v^n using the earliest fraction of data
    to approximate clean-media behavior.
    """
    m = max(3, int(len(v) * frac))
    v_n = np.power(v[:m], n)
    denom = np.dot(v_n, v_n)
    if denom <= 0:
        return 0.0
    return max(np.dot(dp_gross[:m], v_n) / denom, 0.0)


def gross_to_const_velocity(dp_gross, v, vref, n, K):
    """Normalize gross ΔP to constant face velocity."""
    media = dp_gross - K * np.power(v, n)
    if CLIP_NEGATIVES:
        media = np.maximum(media, 0.0)
    return media + K * (vref ** n)


def scale_velocity_to_target(v, t, target):
    """Scale velocity so the initial timepoint matches target."""
    t0 = t[0]
    v0 = np.mean(v[t == t0])
    if v0 <= 0:
        return v
    return np.maximum(v * (target / v0), MIN_VEL)


# ============================================================
# LOAD & PROCESS TRIALS
# ============================================================
t_ref = None
hepa_trials  = []
vel_trials   = []
gross_trials = []

for i, raw in enumerate(raw_list, start=1):
    if not raw.strip():
        raise ValueError(f"Trial {i} data block is empty.")

    df = pd.read_csv(StringIO(raw))
    df = df[["time", "dp1", "dp2", "velocity"]].apply(
        pd.to_numeric, errors="coerce"
    ).dropna()
    df = df.sort_values("time")

    t   = df["time"].to_numpy()
    v   = scale_velocity_to_target(df["velocity"].to_numpy(), t, V_INIT_TARGET)
    dp1 = df["dp1"].to_numpy()
    dp2 = df["dp2"].to_numpy()

    if t_ref is None:
        t_ref = t
    elif not np.array_equal(t_ref, t):
        raise ValueError("Time arrays differ between trials.")

    hepa_s = smooth(dp1, ROLL_WIN)
    vel_s  = smooth(v,   ROLL_WIN)

    gross_raw = dp1 + dp2
    K_gross   = fit_K_gross(gross_raw, v, N_GROSS, EARLY_FRACTION)
    gross_c   = smooth(
        gross_to_const_velocity(gross_raw, v, V_REF_USER, N_GROSS, K_gross),
        ROLL_WIN
    )

    if PRINT_SUMMARY:
        print(f"\n=== Trial {i} summary ===")
        print(f"K_gross = {K_gross:.4g}")

    hepa_trials.append(hepa_s)
    vel_trials.append(vel_s)
    gross_trials.append(gross_c)


# ============================================================
# STACK & COMPUTE SD
# ============================================================
hepa_stack  = np.column_stack(hepa_trials)
vel_stack   = np.column_stack(vel_trials)
gross_stack = np.column_stack(gross_trials)

hepa_sd  = smooth(hepa_stack.std(axis=1, ddof=1),  SD_SMOOTH_WIN)
vel_sd   = smooth(vel_stack.std(axis=1, ddof=1),   SD_SMOOTH_WIN)
gross_sd = smooth(gross_stack.std(axis=1, ddof=1), SD_SMOOTH_WIN)

main_hepa  = hepa_trials[0]
main_vel   = vel_trials[0]
main_gross = gross_trials[0]


# ============================================================
# PLOT (1×3 COMPACT)
# ============================================================
OUTDIR.mkdir(exist_ok=True)

fig, (ax0, ax1, ax2) = plt.subplots(1, 3, figsize=(14, 4.5))

sd_label = "±1 SD (5 trials)"

ax0.plot(t_ref, main_hepa, color=C_HEPA, label=LEG_HEPA)
ax0.fill_between(t_ref, main_hepa - hepa_sd, main_hepa + hepa_sd,
                 color=C_HEPA, alpha=0.20)
ax0.set(title=TITLE_HEPA, ylabel=YL_PRESSURE, xlabel="Time (s)")
ax0.grid(True, ls="--", alpha=0.35)
ax0.legend()

ax1.plot(t_ref, main_vel, color=C_VEL, label=LEG_VEL)
ax1.fill_between(t_ref, main_vel - vel_sd, main_vel + vel_sd,
                 color=C_VEL, alpha=0.20)
ax1.set(title=TITLE_VEL, ylabel=YL_VELOCITY, xlabel="Time (s)")
ax1.grid(True, ls="--", alpha=0.35)
ax1.legend()

ax2.plot(t_ref, main_gross, color=C_GROSS, label=LEG_GROSS)
ax2.fill_between(t_ref, main_gross - gross_sd, main_gross + gross_sd,
                 color=C_GROSS, alpha=0.20)
ax2.set(title=TITLE_GROSS, ylabel=YL_PRESSURE, xlabel="Time (s)")
ax2.grid(True, ls="--", alpha=0.35)
ax2.legend()

fig.tight_layout()
fig.savefig(OUTDIR / "hepa_test_row_trial1_sd.png", dpi=DPI)
fig.savefig(OUTDIR / "hepa_test_row_trial1_sd.svg")
plt.close(fig)

print("Saved figure to fig_exports/")