# OpenFDD-style Fault Rules (YAML) — Standalone Pandas Demo

This notebook shows a **standalone** workflow for running HVAC fault rules using:

- a **messy CSV** (raw BAS point names)
- a **manual mapping dict** from raw point names → **Brick-style names**
- **YAML rule files** (like `ahu_fc1.yaml`, `sensor_bounds.yaml`, etc.)
- pure **pandas + numpy** (no platform services required)

You can adapt this pattern to any building data export as long as you can map raw columns to the Brick names referenced by the YAML.


In [None]:
# If you need installs (often not needed in many environments):
# %pip install pandas numpy pyyaml matplotlib

import pandas as pd
import numpy as np
import yaml
from pathlib import Path


## 1) Load YAML rule files

In your own repo/project, point these to wherever your rule YAML files live.

In this ChatGPT sandbox, the YAMLs were uploaded here:

- `/mnt/data/ahu_fc1.yaml`
- `/mnt/data/ahu_fc2.yaml`
- `/mnt/data/ahu_fc3.yaml`
- `/mnt/data/ahu_fc4.yaml`
- `/mnt/data/sensor_bounds.yaml`
- `/mnt/data/sensor_flatline.yaml`


In [None]:
rule_paths = [
    Path("/mnt/data/ahu_fc1.yaml"),
    Path("/mnt/data/ahu_fc2.yaml"),
    Path("/mnt/data/ahu_fc3.yaml"),
    Path("/mnt/data/ahu_fc4.yaml"),
    Path("/mnt/data/sensor_bounds.yaml"),
    Path("/mnt/data/sensor_flatline.yaml"),
]

rules = []
for p in rule_paths:
    with p.open("r") as f:
        rules.append(yaml.safe_load(f))

# Quick peek
[(r["name"], r["type"], r["flag"]) for r in rules]


## 2) Make fake HVAC-ish data + save a messy CSV

We'll generate a 5-minute dataset for 2 days.

The raw columns are intentionally **messy** (typical BAS exports).

In [None]:
# Fake time index: 2 days at 5-minute intervals
idx = pd.date_range("2026-02-20", periods=2*24*12, freq="5min", tz="America/New_York")
n = len(idx)

rng = np.random.default_rng(42)

# Outside air temp: sinusoid + noise (°F)
oa = 25 + 15*np.sin(np.linspace(0, 4*np.pi, n)) + rng.normal(0, 1.2, n)

# Return air temp: around 72°F with slight noise
ra = 72 + rng.normal(0, 0.6, n)

# Supply fan speed command (0..1)
fan = np.clip(0.6 + 0.2*np.sin(np.linspace(0, 6*np.pi, n)) + rng.normal(0, 0.04, n), 0, 1)

# Duct static setpoint (in w.c.)
dsp_sp = 1.5 + 0.1*np.sin(np.linspace(0, 2*np.pi, n))

# Duct static actual: usually near setpoint but with a low-static fault window
dsp = dsp_sp + rng.normal(0, 0.07, n)

# Inject a "low static at max fan" segment (to trigger FC1)
fault_slice = slice(int(n*0.35), int(n*0.45))
fan[fault_slice] = 0.98  # near max
dsp[fault_slice] = dsp_sp[fault_slice] - 0.35  # too low

# Mixed air temp: should be between oa and ra, but we inject too-low & too-high segments
ma = 0.6*ra + 0.4*oa + rng.normal(0, 0.5, n)

too_low = slice(int(n*0.55), int(n*0.60))
ma[too_low] = np.minimum(ra[too_low], oa[too_low]) - 6  # too low

too_high = slice(int(n*0.70), int(n*0.75))
ma[too_high] = np.maximum(ra[too_high], oa[too_high]) + 6  # too high

# Supply air temp (°F)
sa = 55 + rng.normal(0, 0.7, n)

# Damper and valves (0..1) for a simple "hunting" example (FC4 style)
oa_damper = np.clip(0.3 + 0.25*np.sin(np.linspace(0, 10*np.pi, n)) + rng.normal(0, 0.05, n), 0, 1)
htg_vlv = np.clip(0.1 + 0.1*np.sin(np.linspace(0, 8*np.pi, n)) + rng.normal(0, 0.03, n), 0, 1)
clg_vlv = np.clip(0.2 + 0.1*np.cos(np.linspace(0, 7*np.pi, n)) + rng.normal(0, 0.03, n), 0, 1)

# Inject a "hunting" region: rapid toggling between heating/cooling-ish states
hunt = slice(int(n*0.80), int(n*0.88))
htg_vlv[hunt] = (np.sign(np.sin(np.linspace(0, 50*np.pi, hunt.stop-hunt.start))) > 0).astype(float) * 0.9
clg_vlv[hunt] = (np.sign(np.cos(np.linspace(0, 50*np.pi, hunt.stop-hunt.start))) > 0).astype(float) * 0.9
oa_damper[hunt] = np.clip(0.5 + 0.4*np.sin(np.linspace(0, 50*np.pi, hunt.stop-hunt.start)), 0, 1)

# Inject a stuck sensor (flatline) on SA-T during a window
flat = slice(int(n*0.15), int(n*0.20))
sa[flat] = sa[flat.start]  # perfectly flat

# Inject an out-of-bounds sensor: MA-T goes crazy briefly
oob = slice(int(n*0.10), int(n*0.11))
ma[oob] = 170  # out of range for typical mixed air temp

# Build a "messy" raw dataframe (these names are the BAS-style exports)
raw_df = pd.DataFrame(
    {
        "ts": idx,  # keep a timestamp column like many exports
        "AHU1:SA-T (°F)": sa,
        "AHU1:MA-T": ma,
        "AHU1:RA-T": ra,
        "Weather_OAT_F": oa,
        "AHU1:DSP (inwc)": dsp,
        "AHU1:DSP_SP": dsp_sp,
        "AHU1:SF_VFD_SPD_CMD": fan,
        "AHU1:OA_DAMP_CMD_%": oa_damper,
        "AHU1:HTG_VLV_CMD_%": htg_vlv,
        "AHU1:CLG_VLV_CMD_%": clg_vlv,
    }
)

raw_csv_path = Path("fake_raw_ahu_export.csv")
raw_df.to_csv(raw_csv_path, index=False)
raw_csv_path


## 3) Load the CSV and map raw point names → Brick-style names

This is the key step when you want to run a single rule in a "standalone" way.

- Left side: your raw CSV column names
- Right side: the Brick-style names referenced in the YAML under `inputs:`

Tip: **First match wins** is a nice pattern if you later evolve this into a deterministic mapping table.


In [None]:
df_raw = pd.read_csv(raw_csv_path, parse_dates=["ts"])
df_raw.head(3)


In [None]:
# Manual mapping dict (raw export column -> Brick name used by rules)
# You would edit this per site/system.
raw_to_brick = {
    "AHU1:SA-T (°F)": "Supply_Air_Temperature_Sensor",
    "AHU1:MA-T": "Mixed_Air_Temperature_Sensor",
    "AHU1:RA-T": "Return_Air_Temperature_Sensor",
    "Weather_OAT_F": "Outside_Air_Temperature_Sensor",
    "AHU1:DSP (inwc)": "Supply_Air_Static_Pressure_Sensor",
    "AHU1:DSP_SP": "Supply_Air_Static_Pressure_Setpoint",
    "AHU1:SF_VFD_SPD_CMD": "Supply_Fan_Speed_Command",
    "AHU1:OA_DAMP_CMD_%": "Damper_Position_Command",
    "AHU1:HTG_VLV_CMD_%": "Heating_Valve_Command",
    "AHU1:CLG_VLV_CMD_%": "Cooling_Valve_Command",
}

# Apply mapping: rename columns, keep ts as index
df = df_raw.rename(columns=raw_to_brick).set_index("ts").sort_index()

# Confirm which Brick columns we now have
sorted(df.columns)


## 4) Minimal rule engine (expression / bounds / flatline / hunting)

This is intentionally **simple** and designed for notebooks.

If you later want this production-grade, you can:

- validate required inputs exist
- standardize units
- support multiple equipment instances
- add resampling logic for rules that need it


In [None]:
def ensure_inputs(df: pd.DataFrame, rule: dict) -> None:
    missing = []
    for input_name in rule.get("inputs", {}):
        if input_name not in df.columns:
            missing.append(input_name)
    if missing:
        raise KeyError(f"Missing inputs for rule '{rule.get('name')}': {missing}")

def run_expression_rule(df: pd.DataFrame, rule: dict) -> pd.Series:
    # Expression references columns by their Brick names and params by key name.
    ensure_inputs(df, rule)
    params = rule.get("params", {}).copy()

    # Safe-ish eval namespace: numpy, pandas, and the params.
    local_ns = {"np": np, **params}
    # Provide each column as a variable.
    local_ns.update({c: df[c] for c in rule["inputs"].keys()})

    expr = rule["expression"].strip()
    out = eval(expr, {"__builtins__": {}}, local_ns)
    return out.astype(bool)

def run_bounds_rule(df: pd.DataFrame, rule: dict) -> pd.Series:
    ensure_inputs(df, rule)
    units = rule.get("params", {}).get("units", "imperial")
    flags = pd.Series(False, index=df.index)

    for input_name, meta in rule["inputs"].items():
        bounds = meta.get("bounds", {}).get(units)
        if bounds is None:
            continue
        lo, hi = bounds
        v = df[input_name]
        flags = flags | (v < lo) | (v > hi)

    return flags.astype(bool)

def run_flatline_rule(df: pd.DataFrame, rule: dict) -> pd.Series:
    ensure_inputs(df, rule)
    window = int(rule.get("params", {}).get("window", 40))
    tol = float(rule.get("params", {}).get("tolerance", 1e-6))

    flags = pd.Series(False, index=df.index)

    for input_name in rule["inputs"].keys():
        s = df[input_name]
        # rolling range
        r = (s.rolling(window=window, min_periods=window).max()
             - s.rolling(window=window, min_periods=window).min())
        flags = flags | (r <= tol)

    return flags.fillna(False).astype(bool)

def run_hunting_rule(df: pd.DataFrame, rule: dict) -> pd.Series:
    """A simple 'hunting' detector:
    1) Convert key commands to a coarse AHU operating state
    2) Count state changes in a rolling window
    3) Flag if changes exceed threshold while fan is on and OA damper above min
    """
    ensure_inputs(df, rule)
    p = rule.get("params", {})
    window = int(p.get("window", 60))
    delta_os_max = int(p.get("delta_os_max", 10))
    min_oa_dpr = float(p.get("ahu_min_oa_dpr", 0.1))

    damper = df["Damper_Position_Command"]
    fan = df["Supply_Fan_Speed_Command"]
    htg = df["Heating_Valve_Command"]
    clg = df["Cooling_Valve_Command"]

    # Discretize into an operating state id
    # 0=off, 1=vent, 2=heating, 3=cooling, 4=both (bad/transition)
    state = pd.Series(0, index=df.index)
    on = fan > 0.01
    state.loc[on] = 1
    state.loc[on & (htg > 0.2) & (clg <= 0.2)] = 2
    state.loc[on & (clg > 0.2) & (htg <= 0.2)] = 3
    state.loc[on & (clg > 0.2) & (htg > 0.2)] = 4

    # Count changes: 1 when state differs from previous
    changes = (state != state.shift(1)).astype(int)

    # Rolling sum of changes
    change_count = changes.rolling(window=window, min_periods=window).sum()

    flags = (change_count > delta_os_max) & (on) & (damper >= min_oa_dpr)
    return flags.fillna(False).astype(bool)

def run_rule(df: pd.DataFrame, rule: dict) -> pd.Series:
    t = rule.get("type")
    if t == "expression":
        return run_expression_rule(df, rule)
    if t == "bounds":
        return run_bounds_rule(df, rule)
    if t == "flatline":
        return run_flatline_rule(df, rule)
    if t == "hunting":
        return run_hunting_rule(df, rule)
    raise ValueError(f"Unsupported rule type: {t}")


## 5) Run the rules and append fault flags to the dataframe

In [None]:
results = df.copy()

for rule in rules:
    flag_col = rule["flag"]
    results[flag_col] = run_rule(results, rule)

# Show a quick summary
summary = (results[[r["flag"] for r in rules]]
           .sum()
           .sort_values(ascending=False)
           .to_frame("fault_true_count"))
summary


## 6) Inspect where faults happened

In [None]:
# Show a few rows around each injected event by filtering on any fault flag
flag_cols = [r["flag"] for r in rules]
any_fault = results[flag_cols].any(axis=1)

results.loc[any_fault, flag_cols + [
    "Supply_Fan_Speed_Command",
    "Supply_Air_Static_Pressure_Sensor",
    "Supply_Air_Static_Pressure_Setpoint",
    "Mixed_Air_Temperature_Sensor",
    "Return_Air_Temperature_Sensor",
    "Outside_Air_Temperature_Sensor",
    "Supply_Air_Temperature_Sensor",
]].head(25)


## 7) Optional plot (quick sanity check)

This plot overlays a few signals and highlights fault windows.


In [None]:
import matplotlib.pyplot as plt

plot_df = results.copy()

fig, ax = plt.subplots(figsize=(12, 4))
plot_df["Supply_Air_Static_Pressure_Sensor"].plot(ax=ax, label="DSP")
plot_df["Supply_Air_Static_Pressure_Setpoint"].plot(ax=ax, label="DSP_SP")
ax.set_title("Duct Static Pressure and Setpoint (fake data)")
ax.set_ylabel("in. w.c.")
ax.legend()
plt.show()

fig, ax = plt.subplots(figsize=(12, 4))
plot_df["Mixed_Air_Temperature_Sensor"].plot(ax=ax, label="MA-T")
plot_df["Return_Air_Temperature_Sensor"].plot(ax=ax, label="RA-T")
plot_df["Outside_Air_Temperature_Sensor"].plot(ax=ax, label="OA-T")
ax.set_title("Mixed / Return / Outside Air Temps (fake data)")
ax.set_ylabel("°F")
ax.legend()
plt.show()

# Visualize a fault flag as 0/1
fig, ax = plt.subplots(figsize=(12, 2.5))
plot_df["rule_a_flag"].astype(int).plot(ax=ax, label="FC1: low static at max fan")
ax.set_title("FC1 flag (1 = fault)")
ax.set_ylim(-0.1, 1.1)
ax.legend()
plt.show()


## 8) Adapting this to your real CSV

1) Replace `raw_csv_path` with your exported timeseries CSV.

2) Edit `raw_to_brick` until the Brick names match the YAML `inputs:` keys.

3) Run the rule(s) you care about and write results back to disk.

### Tip: run only one rule
If you want *just* FC2, for example:

```python
fc2 = next(r for r in rules if r['name'] == 'mix_temp_too_low')
results['fc2_flag'] = run_rule(results, fc2)
```


In [None]:
# Save an output CSV with fault flags added
out_path = Path("fake_with_fault_flags.csv")
results.reset_index().to_csv(out_path, index=False)
out_path
