<a href="https://colab.research.google.com/github/LinOna/agroforestry/blob/main/notebooks/01_data_cleaning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Step 1: parse timestamp, standardize node, add station_type**
- Reads RawData180925.csv,

- Parses timestamp into a pandas datetime without timezone (naive),

- Keeps your node_id as-is and makes a lowercase copy node for grouping,

- Maps station_type exactly as specified (node_176 → open_field, node_189 → reference, else agroforestry),

- Writes versioned outputs (no overwrite),

- Prints concise diagnostics.

In [1]:
# =========================
# Step 1: parse timestamp, standardize node, add station_type
# GUARANTEES:
#  - No row drops. Adds a stable row_id for invariants.
#  - Timestamp fixed ONCE here; later stages should NOT re-parse free text.
#  - Writes both a canonical ISO string and an integer ts_ns for lossless reloads.
# =========================
import pandas as pd
import numpy as np
from pathlib import Path

# --- Config: exact column names (no autodetection), defines hard-coded names for the input file and
# two required columns ---
RAW_CSV_PATH  = "RawData180925.csv"   # adjust if your path differs
TIMESTAMP_COL = "timestamp"           # REQUIRED
NODE_ID_COL   = "node_id"             # REQUIRED

OUT_DIR = Path("af_clean_v01") # creates a versioned working directory (af_clean_v01/) for all putputs
OUT_DIR.mkdir(parents=True, exist_ok=True)

# finds the next unused file name like name, without overwriting existing files
def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    """Create <stem>_v001.csv, <stem>_v002.csv, ... without overwriting."""
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

# --- Load raw into Pandas Dataframe (unaltered) ---
df_raw = pd.read_csv(RAW_CSV_PATH, low_memory=False)

# Invariant 0: required columns present
missing_required = [c for c in (TIMESTAMP_COL, NODE_ID_COL) if c not in df_raw.columns]
if missing_required:
    raise ValueError(f"Missing required columns: {missing_required}. "
                     f"Available columns: {list(df_raw.columns)}")

# Save a small head preview (versioned, no overwrite, writes the first 20 rows of the raw data to a versioned csv
# in af_clean_v01/, then prints the path, for quick sanity checks
head_path = next_versioned_csv(OUT_DIR, "stage1_head")
df_raw.head(20).to_csv(head_path, index=False)
print("Wrote preview:", head_path)

# -Work on a copy, creates a seperate DataFrame df to perfom all subsequent transformations, keeping
# dr_raw untouched---
df = df_raw.copy()

# ---Step 1A: Add stable row_id (for invariants across all stages) ----------
df.insert(0, "row_id", np.arange(len(df), dtype=np.int64))

# ---Step 1B: Parse timestamp ONCE (keep NAIVE; no timezone) ----------
# Keep original for provenance
df["timestamp_raw"] = df[TIMESTAMP_COL].astype(str)

# Parse; if you know the exact format, you could pass format="%Y-%m-%d %H:%M:%S"
# Parses the string column timestamp_raw into a pandas datetime (datetime64[ns])
# It won’t convert or attach a UTC timezone; the result is intended to be timezone-naive.
ts = pd.to_datetime(df["timestamp_raw"], errors="coerce", utc=False)

# Summarizes parse success: total rows vs. how many became NaT.
n_total = len(df)
n_nat = int(ts.isna().sum())
print(f"Timestamp parsed: {n_total - n_nat}/{n_total} ({(1 - n_nat/n_total)*100:.2f}%).")

# timestamps are fixed once
if n_nat > 0:
    # Drop a small sample for inspection and STOP — timestamps must be fixed here.
    miss_ts_report = next_versioned_csv(OUT_DIR, "stage1_missing_timestamp_rows")
    df.loc[ts.isna()].head(200).to_csv(miss_ts_report, index=False)
    raise ValueError(f"{n_nat} timestamps failed to parse. Sample saved to {miss_ts_report}. "
                     "Fix timestamp issues in Stage 1 before proceeding.")

# Canonicalize: make an ISO-like string and a numeric epoch (ns) column
# Use second precision in ISO to keep files compact but keep nanoseconds in ts_ns for lossless reloads.
df["timestamp_iso"] = ts.dt.strftime("%Y-%m-%d %H:%M:%S")
df["ts_ns"] = ts.view("int64")  # nanoseconds since epoch (int64)
# All downstream joins, resamples, and deduplication can safely key on ts_ns.
# timestamp_iso is for files humans read; ts_ns is for machines


# For convenience within the in-memory DataFrame, keep a datetime column named 'timestamp'
# Adds a working datetime column for in-process operations (sorting, grouping, resampling)
# without re-parsing strings each time.
df["timestamp"] = ts  # datetime64[ns], NAIVE

# NOTE FOR LATER STAGES:
# - Do NOT call pd.to_datetime on free text again.
# - If you need datetime after reloading a CSV, reconstruct via:
#     ts = pd.to_datetime(df["ts_ns"].astype("int64"))

# ---- Step 1C: Standardize station ID -> 'node' (simple, explicit) ----
df["node"] = (
    df[NODE_ID_COL]
      .astype("string") # forsinc string dtype
      .str.strip() # trimming white space
      .str.lower() # lower casing all
)

# ----- Step 1D: station_type mapping (explicit rules), creates new column "station_type" ----
# be aware of this definitions when possibly adding more nodes
def map_station_type(node_label) -> object:
    if pd.isna(node_label):
        return pd.NA
    s = str(node_label)
    if s == "node_176":
        return "open_field"
    if s == "node_189":
        return "reference"
    return "agroforestry"

df["station_type"] = df["node"].apply(map_station_type)

# ------ Step 1E: Deterministic ordering ---
# Sorts rows first by station (node), then by time (timestamp), then by tie-breaker (row_id).
df.sort_values(["node", "timestamp", "row_id"], inplace=True, ignore_index=True)

# ------- Invariants & hygiene (no row loss; report duplicates, don’t drop) ----------
# Invariant 1: row count preserved
# Hard check that Stage 1 didn’t add/drop rows. If counts differ, the script stops here.
assert len(df) == len(df_raw), "Row count changed in Stage 1 — this should never happen."

# Hygiene: check duplicate keys (node, timestamp_iso) — just report, do not alter
dup_mask = df.duplicated(subset=["node", "timestamp_iso"], keep=False)
n_dups = int(dup_mask.sum())
if n_dups > 0:
    dup_report = next_versioned_csv(OUT_DIR, "stage1_duplicate_node_timestamp_rows")
    df.loc[dup_mask, ["node", "timestamp_iso", "row_id"]].to_csv(dup_report, index=False)
    print(f"NOTE: Found {n_dups} rows sharing the same (node, timestamp_iso). "
          f"Listed in: {dup_report}. No rows were dropped.")

# ---------- Save Stage 1 (versioned, no overwrite) ----------
stage1_path = next_versioned_csv(OUT_DIR, "stage1_parsed")
df.to_csv(stage1_path, index=False)
print("Wrote Stage 1:", stage1_path)

# ---------- Minimal diagnostics ----------
print("\nSummary")
print("  rows:", len(df)) # report total row count
print("  unique nodes:", sorted(df['node'].dropna().unique().tolist())) # lists the distinct node labels
print("  station_type counts:\n", df["station_type"].value_counts(dropna=False)) # counts per station_type,incl. miss.

# Dataset-wide time span (authoritative), shows earliest and latest parsed timestamps
print("  timestamp range:", df["timestamp"].min(), "→", df["timestamp"].max())

# Columns you’ll carry forward (documented)
carry = ["row_id", "timestamp_iso", "ts_ns", "timestamp", "node_id", "node", "station_type"]
print("\nCarry-forward columns:", carry)


Wrote preview: af_clean_v01/stage1_head_v001.csv
Timestamp parsed: 483787/483787 (100.00%).


  df["ts_ns"] = ts.view("int64")  # nanoseconds since epoch (int64)


NOTE: Found 2 rows sharing the same (node, timestamp_iso). Listed in: af_clean_v01/stage1_duplicate_node_timestamp_rows_v001.csv. No rows were dropped.
Wrote Stage 1: af_clean_v01/stage1_parsed_v001.csv

Summary
  rows: 483787
  unique nodes: ['node_176', 'node_179', 'node_181', 'node_182', 'node_183', 'node_184', 'node_186', 'node_187', 'node_189']
  station_type counts:
 station_type
agroforestry    369066
reference        69759
open_field       44962
Name: count, dtype: int64
  timestamp range: 2023-10-30 12:18:43 → 2025-09-18 10:29:42

Carry-forward columns: ['row_id', 'timestamp_iso', 'ts_ns', 'timestamp', 'node_id', 'node', 'station_type']


How/When to use which timestamp:

ts_ns (int64, nanoseconds since epoch) → the authoritative time. Use it for joins/merges, deduplication, exact equality checks, and persistent IDs in files. It’s lossless and immune to locale/format shenanigans.

timestamp (datetime64[ns], tz-naive) → the working time for analysis inside Python: sorting, rolling windows, resampling, plotting. It’s convenient and fast, but don’t rely on its string form for persistence.

timestamp_iso (string, to the second) → human-readable/export only (logs, previews). Don’t analyze with it; it drops sub-second precision.

Short version: compute with timestamp, key and persist with ts_ns, display with timestamp_iso.

**Step 2: Soil moisture Range QC** for ground_humidity_0 and ground_humidity_1
* Loads the latest Stage 1 file.
* Checks required columns exist (timestamp, node_id, node, station_type, ground_humidity_0, ground_humidity_1).
* Classifies each moisture value as: OK (within 0–100),WARN_boundary (tiny tolerance outside bounds: −0.5…0 or 100…100.5), CRIT_out_of_range (anything <−0.5 or >100.5).
* Builds rollups (qc_sm_anycrit_*, qc_sm_anywarn_*).
* Creates clean columns: *_clean (CRIT → NaN; WARN at bounds → clamped to 0 or 100; OK → unchanged).
* Saves a versioned Stage 2 file and a small summary CSV.

In [2]:
# =========================
# Step 2: Soil moisture Range QC (no unit inference)
# GUARANTEES:
#  - Reads the latest Stage 1 only
#  - Reconstructs timestamp from ts_ns (no free-text reparse)
#  - No row loss; preserves row_id
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob

# ---- Config (fixed column names; no autodetection), creates/ensures the working output directory ----
OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Defines the canonical column names expected from Stage 1.
# These constants are used to avoid typos and to make the schema explicit.
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns" # authoritative time key (used to rebuild timestamp).
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"      # in-memory datetime64[ns], reconstructed from ts_ns
NODE_ID_COL   = "node_id"
NODE_COL      = "node"
STATION_TYPE  = "station_type"

SM_COLS = ["ground_humidity_0", "ground_humidity_1"]  # soil moisture probes (VWC %)

# Range thresholds
LOWER_OK, UPPER_OK = 0.0, 100.0
LOWER_TOL, UPPER_TOL = -0.5, 100.5   # tolerance band treated as WARN_boundary

# versioned file name helper
def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    """Create <stem>_v001.csv, <stem>_v002.csv, ... without overwriting."""
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

# ---- Load input: latest Stage 1 file (only) ----
stage1_files = sorted(glob(str(OUT_DIR / "stage1_parsed_v*.csv"))) # lists all files matching, and sorts
assert stage1_files, "No Stage 1 file found (expected: af_clean_v01/stage1_parsed_v*.csv)."
in_path = stage1_files[-1] # picks the latest version (highest vNNN)
print("Reading:", in_path)

# loads the chosen step-1 file into a new DataFrame named df
df = pd.read_csv(in_path, low_memory=False)

# ---- Required columns must be present (including invariants from Stage 1) ----
required = {
    ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_ID_COL, NODE_COL, STATION_TYPE, *SM_COLS
}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns from Stage 1: {sorted(missing)}")

# ---- Reconstruct timestamp from ts_ns (authoritative); DO NOT parse free text ----
# Keep an in-memory datetime column for convenience; do not overwrite ts_ns or timestamp_iso.
df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))

# ---- Invariant: no row loss relative to Stage 1 input ----
n_in = len(df)
assert df[ROW_ID_COL].is_unique, "row_id should be unique from Stage 1."
# (Since we load Stage 1 directly here, this mainly asserts uniqueness.)

# Ensure numeric for moisture columns (raw columns remain unchanged)
for c in SM_COLS:
    df[c] = pd.to_numeric(df[c], errors="coerce")

# ---- Helper to classify and build clean series for one probe ----
# qc_sm_anycrit_<k>: boolean (True if that row is CRIT for this probe).
# qc_sm_anywarn_<k>: boolean (True if that row is WARN for this probe).


def range_qc_for_probe(df: pd.DataFrame, col: str):
    """
    Builds:
      - qc_sm_range_<k>   : 'OK' / 'WARN_boundary' / 'CRIT_out_of_range'
      - qc_sm_anycrit_<k> : boolean
      - qc_sm_anywarn_<k> : boolean
      - <col>_clean       : CRIT->NaN, WARN_boundary clamped to bounds, OK unchanged
    """
    # Suffix k is '0' or '1' inferred from the column name
    if col.endswith("_0"):
        k = "0"
    elif col.endswith("_1"):
        k = "1"
    else:
        k = col  # fallback

    val = df[col]

    # Start with all OK; NaNs remain OK here (they'll stay NaN in _clean)
    qc = pd.Series("OK", index=val.index, dtype="object")

    # Boundary tolerance (warn)
    warn_low  = val.ge(LOWER_TOL) & val.lt(LOWER_OK)   # [-0.5, 0)
    warn_high = val.gt(UPPER_OK) & val.le(UPPER_TOL)   # (100, 100.5]
    warn_mask = warn_low | warn_high

    # Critical (beyond tolerance)
    crit_low  = val.lt(LOWER_TOL)
    crit_high = val.gt(UPPER_TOL)
    crit_mask = crit_low | crit_high

    # Assign flags
    qc.loc[warn_mask] = "WARN_boundary"
    qc.loc[crit_mask] = "CRIT_out_of_range"

    # Build clean series
    clean = val.copy()
    clean.loc[crit_mask] = np.nan           # CRIT -> NaN
    clean.loc[warn_low]  = LOWER_OK         # WARN at low edge -> clamp
    clean.loc[warn_high] = UPPER_OK         # WARN at high edge -> clamp

    # Rollups (booleans)
    anycrit = qc.eq("CRIT_out_of_range")
    anywarn = qc.eq("WARN_boundary")

    # Attach to df, creating new columns
    df[f"qc_sm_range_{k}"]   = qc
    df[f"qc_sm_anycrit_{k}"] = anycrit
    df[f"qc_sm_anywarn_{k}"] = anywarn
    df[f"{col}_clean"]       = clean

    # Return some quick counts for the console
    return {
        "probe": col,
        "n_OK": int(qc.eq("OK").sum()),
        "n_WARN_boundary": int(anywarn.sum()),
        "n_CRIT_out_of_range": int(anycrit.sum()),
        "n_clamped_warn": int(warn_mask.sum()),
        "n_masked_crit": int(crit_mask.sum()),
    }

# ---- Apply QC to both soil moisture probes ----
summaries = []
for col in SM_COLS:
    summaries.append(range_qc_for_probe(df, col))

# Deterministic order
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)

# ---- Save outputs, writes the full DataFrame with new QC labels and
# <col>_clean columns to a versioned csv, no overwrite ----
out_main = next_versioned_csv(OUT_DIR, "stage2_soilmoisture_range")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# ---- Small summary table overall and per node/probe ----
summary_df = pd.DataFrame(summaries)
summary_path = next_versioned_csv(OUT_DIR, "stage2_sm_range_counts_overall")
summary_df.to_csv(summary_path, index=False)
print("Wrote overall counts:", summary_path)

# Per-node flag counts (useful to spot troublesome stations), produces the table
pernode = (
    df.melt(id_vars=[NODE_COL, TIMESTAMP_COL], value_vars=[f"qc_sm_range_0", f"qc_sm_range_1"],
            var_name="which", value_name="flag")
      .groupby([NODE_COL, "which", "flag"]).size().rename("n").reset_index()
      .sort_values([NODE_COL, "which", "flag"])
)
pernode_path = next_versioned_csv(OUT_DIR, "stage2_sm_range_counts_pernode")
pernode.to_csv(pernode_path, index=False)
print("Wrote per-node counts:", pernode_path)

# ---- Minimal diagnostics to the console, prints the table ----
print("\nSummary (overall):")
print(summary_df.to_string(index=False))

# Dataset-wide time span (authoritative, from ts_ns)
tmin = pd.to_datetime(df[TS_NS_COL].astype("int64")).min()
tmax = pd.to_datetime(df[TS_NS_COL].astype("int64")).max()
print("\nTime span (dataset, from ts_ns):", tmin, "→", tmax)

# Invariant echo, row preservation, repeats the row-count check
print("Rows in/out (should match Stage 1):", n_in, "→", len(df))


Reading: af_clean_v01/stage1_parsed_v001.csv
Wrote: af_clean_v01/stage2_soilmoisture_range_v001.csv
Wrote overall counts: af_clean_v01/stage2_sm_range_counts_overall_v001.csv
Wrote per-node counts: af_clean_v01/stage2_sm_range_counts_pernode_v001.csv

Summary (overall):
            probe   n_OK  n_WARN_boundary  n_CRIT_out_of_range  n_clamped_warn  n_masked_crit
ground_humidity_0 483787                0                    0               0              0
ground_humidity_1 483787                0                    0               0              0

Time span (dataset, from ts_ns): 2023-10-30 12:18:43 → 2025-09-18 10:29:42
Rows in/out (should match Stage 1): 483787 → 483787


**Step 2b — Soil moisture diagnostics (read-only)**
* Loads the latest Stage 2 (range-QC) table and parses timestamp; works with node, ground_humidity_0/1 raw and their *_clean versions.
* Ensures numeric types for the soil-moisture columns (raw & clean).
* Computes median sampling interval per node (minutes) to understand cadence and to scale later duration reasoning.
* Per node × probe (0 / 1), it summarizes:
* Availability: row count, % missing in raw and in clean.
* Distribution (clean): min, Q1, median, Q3, max, IQR, std.
* Edge occupancy (clean): % of values at/near 0% and 100% VWC (saturation/near-empty behavior).
* Step behavior (clean): median absolute step |Δ|, 95th percentile of |Δ|, and fraction of zero steps (exact repeats) as a quick flatness indicator.
* Daily structure: for each day, median moisture → std of daily medians (overall variability) and fraction of “low-change” day-to-day steps (simple stability proxy).
* Pair consistency within each node (probe 0 vs 1):
* Aligns timestamps, compares *_clean series.
* Reports median absolute difference, 95th percentile difference, fraction of large gaps (>|15%|), and Pearson correlation between the two probes.
* Outputs only summary tables (CSV):
* A per-node×probe diagnostic table (availability, distributions, step stats, daily stats).
* A per-node pair table (0 vs 1 comparison metrics).
* No masking, no flags changed: this stage is strictly observational to spot suspicious nodes/probes before applying flatline or step QC thresholds later.

In [3]:
# =========================
# Step 2b — Soil moisture diagnostics (read-only)
# Identifies nodes that look suspicious BEFORE flatline/step flags.
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ---- Inputs ----
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"     # in-memory only (reconstructed)
NODE_COL      = "node"

PROBES  = ["ground_humidity_0", "ground_humidity_1"]           # raw
CLEAN   = [f"{c}_clean" for c in PROBES]                        # clean (after Stage 2)

# ---- Locate latest Stage 2 file (range QC output) ----
stage2_files = sorted(glob(str(OUT_DIR / "stage2_soilmoisture_range_v*.csv")))
assert stage2_files, "Run Stage 2 (range QC) first (expected: af_clean_v01/stage2_soilmoisture_range_v*.csv)."
in_path = stage2_files[-1]
print("Reading:", in_path)

# ---- Load & reconstruct timestamp from ts_ns (authoritative) ----
df = pd.read_csv(in_path, low_memory=False)
required_cols = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL, *PROBES, *CLEAN}
missing = required_cols - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns in Stage 2 file: {sorted(missing)}")

# Reconstruct 'timestamp' (do NOT parse free-text)
df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))

# Echo invariants
print("Rows:", len(df), "| Nodes:", df[NODE_COL].nunique())
print("Dataset time span:", df[TIMESTAMP_COL].min(), "→", df[TIMESTAMP_COL].max())

# Ensure numeric for probe columns
for c in PROBES + CLEAN:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce")

# ---- Helper: median sampling interval (minutes) per node ----
def median_dt_minutes(s: pd.Series) -> float:
    if s.size < 2:
        return np.nan
    # s is datetime64[ns]; use numpy timedeltas
    d = (s.values[1:] - s.values[:-1]).astype("timedelta64[s]").astype(float)
    good = d[d > 0]
    return float(np.nanmedian(good) / 60.0) if good.size else np.nan

dt_per_node = (
    df.sort_values([NODE_COL, TIMESTAMP_COL])
      .groupby(NODE_COL, as_index=False)[TIMESTAMP_COL]
      .apply(lambda s: median_dt_minutes(s.reset_index(drop=True)))
      .rename(columns={TIMESTAMP_COL: "median_dt_min"})
)

# ---- Daily variability helper on a clean column ----
def daily_variability_metrics(g: pd.DataFrame, clean_col: str, std_thresh: float = 0.05):
    """Returns daily median std and fraction of small day-to-day change.
       std_thresh in % VWC (absolute)."""
    s = g[[TIMESTAMP_COL, clean_col]].dropna().copy()
    if s.empty:
        return pd.Series({"daily_std": np.nan, "frac_days_lowvar": np.nan})
    s["date"] = s[TIMESTAMP_COL].dt.floor("D")
    per_day = s.groupby("date")[clean_col].median()
    if per_day.size < 2:
        return pd.Series({"daily_std": np.nan, "frac_days_lowvar": np.nan})
    daily_std = float(per_day.std())
    # fraction of adjacent days with small change
    daydiff = per_day.diff().abs().fillna(0)
    frac_low = float((daydiff <= std_thresh).mean())
    return pd.Series({"daily_std": daily_std, "frac_days_lowvar": frac_low})

# ---- Per-node, per-probe summary (availability & variability on CLEAN) ----
rows = []
df_sorted = df.sort_values([NODE_COL, TIMESTAMP_COL])

for node_id, g in df_sorted.groupby(NODE_COL, dropna=False):
    # median dt for this node
    med_dt = float(dt_per_node.loc[dt_per_node[NODE_COL] == node_id, "median_dt_min"].values[0]
                   ) if (dt_per_node[NODE_COL] == node_id).any() else np.nan

    for raw_col, clean_col in zip(PROBES, CLEAN):
        s_raw   = g[raw_col]
        s_clean = g[clean_col]

        n = int(len(g))
        n_nan_raw   = int(s_raw.isna().sum())
        n_nan_clean = int(s_clean.isna().sum())

        # Clean value distribution
        v = s_clean.dropna()
        if not v.empty:
            q = v.quantile([0.0, 0.25, 0.5, 0.75, 1.0])
            v_min, v_q1, v_med, v_q3, v_max = [float(q.loc[x]) for x in [0.0, 0.25, 0.5, 0.75, 1.0]]
            v_iqr = float(v_q3 - v_q1)
            v_std = float(v.std())
            edge_lo = float((v <= 0.1).mean())
            edge_hi = float((v >= 99.9).mean())
        else:
            v_min=v_q1=v_med=v_q3=v_max=v_iqr=v_std=edge_lo=edge_hi=np.nan

        # Absolute step stats on CLEAN (not rate, just |Δ|)
        v_with_ts = g[[TIMESTAMP_COL]].copy()
        v_with_ts["val"] = s_clean.values
        v_with_ts = v_with_ts.dropna().sort_values(TIMESTAMP_COL)
        if len(v_with_ts) >= 2:
            dv = v_with_ts["val"].diff().abs().dropna()
            d_med = float(dv.median())
            d_p95 = float(dv.quantile(0.95))
            d_zero_frac = float((dv == 0).mean())
        else:
            d_med = d_p95 = d_zero_frac = np.nan

        # Daily stability metrics
        daily = daily_variability_metrics(g, clean_col, std_thresh=0.05)

        rows.append({
            "node": node_id,
            "probe": raw_col,
            "n_rows": n,
            "pct_missing_raw": round(100*n_nan_raw / n, 2) if n else np.nan,
            "pct_missing_clean": round(100*n_nan_clean / n, 2) if n else np.nan,
            "median_dt_min": med_dt,
            "min": v_min, "median": v_med, "max": v_max,
            "IQR": v_iqr, "std": v_std,
            "edge_lo_pct": round(100*edge_lo, 2) if pd.notna(edge_lo) else np.nan,
            "edge_hi_pct": round(100*edge_hi, 2) if pd.notna(edge_hi) else np.nan,
            "d_abs_median": d_med, "d_abs_p95": d_p95, "d_zero_frac": d_zero_frac,
            "daily_std": daily["daily_std"],
            "frac_days_lowvar": daily["frac_days_lowvar"],
        })

summary_node_probe = pd.DataFrame(rows)

# ---- Optional pair summary (0 vs 1), purely diagnostic ----
pairs = []
for node_id, g in df_sorted.groupby(NODE_COL, dropna=False):
    cols_needed = [TIMESTAMP_COL, CLEAN[0], CLEAN[1]]
    gg = g[cols_needed].dropna().sort_values(TIMESTAMP_COL)
    if len(gg) >= 3:
        diff = (gg[CLEAN[0]] - gg[CLEAN[1]]).abs()
        med_diff = float(diff.median())
        p95_diff = float(diff.quantile(0.95))
        big_gap_frac = float((diff > 15.0).mean())  # >15% absolute gap
        # Pearson correlation (guard against constant series)
        x = gg[CLEAN[0]].to_numpy()
        y = gg[CLEAN[1]].to_numpy()
        if np.isfinite(x).any() and np.isfinite(y).any() and (np.std(x) > 0) and (np.std(y) > 0):
            r = float(pd.Series(x).corr(pd.Series(y)))
        else:
            r = np.nan
        pairs.append({
            "node": node_id,
            "n_aligned": int(len(gg)),
            "median_abs_diff": med_diff,
            "p95_abs_diff": p95_diff,
            "frac_diff_gt_15pct": big_gap_frac,
            "corr_clean_0_vs_1": r,
        })
pair_summary = pd.DataFrame(pairs)

# ---- Save diagnostics (versioned, no overwrite) ----
def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

nodeprobe_path = next_versioned_csv(OUT_DIR, "stage2_sm_diagnostics_nodeprobe")
summary_node_probe.to_csv(nodeprobe_path, index=False)
print("Wrote per-node/probe diagnostics:", nodeprobe_path)

if not pair_summary.empty:
    pair_path = next_versioned_csv(OUT_DIR, "stage2_sm_diagnostics_pairs")
    pair_summary.to_csv(pair_path, index=False)
    print("Wrote pair diagnostics:", pair_path)
else:
    print("Pair diagnostics: not enough aligned data to compute.")

# ---- Quick console overview ----
print("\nTop nodes by % missing (clean):")
print(summary_node_probe.sort_values("pct_missing_clean", ascending=False)
      .head(10).to_string(index=False))

print("\nNodes with large p95 |Δ| on clean (possible jumpiness):")
print(summary_node_probe.sort_values("d_abs_p95", ascending=False)
      .head(10).to_string(index=False))

if not pair_summary.empty:
    print("\nPair summary (largest median_abs_diff):")
    print(pair_summary.sort_values("median_abs_diff", ascending=False)
          .head(10).to_string(index=False))


Reading: af_clean_v01/stage2_soilmoisture_range_v001.csv
Rows: 483787 | Nodes: 9
Dataset time span: 2023-10-30 12:18:43 → 2025-09-18 10:29:42
Wrote per-node/probe diagnostics: af_clean_v01/stage2_sm_diagnostics_nodeprobe_v001.csv
Wrote pair diagnostics: af_clean_v01/stage2_sm_diagnostics_pairs_v001.csv

Top nodes by % missing (clean):
    node             probe  n_rows  pct_missing_raw  pct_missing_clean  median_dt_min  min  median   max    IQR       std  edge_lo_pct  edge_hi_pct  d_abs_median  d_abs_p95  d_zero_frac  daily_std  frac_days_lowvar
node_176 ground_humidity_0   44962              0.0                0.0      15.483333 1.87    4.19  8.56  1.610  1.282132         0.00          0.0          0.04       0.16     0.088988   1.274833          0.527307
node_176 ground_humidity_1   44962              0.0                0.0      15.483333 3.59    7.33 25.90  1.190  2.035530         0.00          0.0          0.04       0.16     0.127889   1.925398          0.391714
node_179 ground_hu

**Step 3 — Soil moisture: known broken probes**
- Load your latest Stage 2 output.

- Add qc_sm_known_broken_0 and qc_sm_known_broken_1:

- From 2025-04-04 onward:

- node_184 → probe _0 is CRIT_known_broken
- node_186 → probe _1 is CRIT_known_broken
- node_187 → probe _0 is CRIT_known_broken
- Elsewhere: OK

- Update rollups (qc_sm_anycrit_*) by OR-ing with these new CRIT flags.

- Further mask ground_humidity_*_clean to NaN where known-broken is CRIT.

- Save a versioned Stage 3 file and small summary files.

In [4]:
# =========================
# Step 3: Known-broken probes (CRIT from FAIL_START onward)
# - Originally SM-only; now also propagates to soil temperature.
# - Keeps filename pattern: stage3_sm_knownbroken_v*.csv
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core columns
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"
NODE_COL      = "node"

# Ground sensor variables
SM0, SM1 = "ground_humidity_0", "ground_humidity_1"
ST0, ST1 = "ground_temp_0", "ground_temp_1"

SM0_CLEAN, SM1_CLEAN = f"{SM0}_clean", f"{SM1}_clean"
ST0_CLEAN, ST1_CLEAN = f"{ST0}_clean", f"{ST1}_clean"

# Rollups
QC_SM_ANYCRIT_0, QC_SM_ANYCRIT_1 = "qc_sm_anycrit_0", "qc_sm_anycrit_1"
QC_ST_ANYCRIT_0, QC_ST_ANYCRIT_1 = "qc_st_anycrit_0", "qc_st_anycrit_1"

# Known failure start (inclusive)
FAIL_START = pd.Timestamp("2025-04-04")

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

# ---- Load latest Stage 2 output (or later if you’ve chained steps) ----
stage2_files = sorted(glob(str(OUT_DIR / "stage2_soilmoisture_range_v*.csv")))
assert stage2_files, "Run Stage 2 first (expected: af_clean_v01/stage2_soilmoisture_range_v*.csv)."
in_path = stage2_files[-1]
print("Reading:", in_path)

df = pd.read_csv(in_path, low_memory=False)

# ---- Required base columns ----
required = {
    ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL,
    SM0, SM1, SM0_CLEAN, SM1_CLEAN,
    "qc_sm_range_0", QC_SM_ANYCRIT_0,
    "qc_sm_range_1", QC_SM_ANYCRIT_1,
    # temperature raws should be present from Stage 1; clean may not exist yet
    ST0, ST1,
}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns for Step 3: {sorted(missing)}")

# Authoritative timestamp
df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))

# Invariants
n_in = len(df)
assert df[ROW_ID_COL].is_unique, "row_id must be unique (from Stage 1)."

# Ensure numeric clean columns exist for temperature (pass-through from raw if absent)
if ST0_CLEAN not in df.columns:
    df[ST0_CLEAN] = pd.to_numeric(df[ST0], errors="coerce")
if ST1_CLEAN not in df.columns:
    df[ST1_CLEAN] = pd.to_numeric(df[ST1], errors="coerce")

# Ensure ST rollups exist (bool)
if QC_ST_ANYCRIT_0 not in df.columns:
    df[QC_ST_ANYCRIT_0] = False
if QC_ST_ANYCRIT_1 not in df.columns:
    df[QC_ST_ANYCRIT_1] = False

# Keep previous anycrit booleans for delta reporting
prev_sm_anycrit_0 = df[QC_SM_ANYCRIT_0].astype(bool).copy()
prev_sm_anycrit_1 = df[QC_SM_ANYCRIT_1].astype(bool).copy()
prev_st_anycrit_0 = df[QC_ST_ANYCRIT_0].astype(bool).copy()
prev_st_anycrit_1 = df[QC_ST_ANYCRIT_1].astype(bool).copy()

# Initialize known-broken flags (SM + ST)
df["qc_sm_known_broken_0"] = "OK"
df["qc_sm_known_broken_1"] = "OK"
df["qc_st_known_broken_0"] = "OK"
df["qc_st_known_broken_1"] = "OK"

# ---- Rules ----
# Existing:
mask_184_p0 = (df[NODE_COL] == "node_184") & (df[TIMESTAMP_COL] >= FAIL_START)  # probe 0
mask_186_p1 = (df[NODE_COL] == "node_186") & (df[TIMESTAMP_COL] >= FAIL_START)  # probe 1
# New request: node_187 probe 0 (both SM0 and ST0)
mask_187_p0 = (df[NODE_COL] == "node_187") & (df[TIMESTAMP_COL] >= FAIL_START) # probe 0

# Apply to SM flags
df.loc[mask_184_p0, "qc_sm_known_broken_0"] = "CRIT_known_broken"
df.loc[mask_186_p1, "qc_sm_known_broken_1"] = "CRIT_known_broken"
df.loc[mask_187_p0, "qc_sm_known_broken_0"] = "CRIT_known_broken"

# Apply to ST flags
df.loc[mask_184_p0, "qc_st_known_broken_0"] = "CRIT_known_broken"
df.loc[mask_186_p1, "qc_st_known_broken_1"] = "CRIT_known_broken"
df.loc[mask_187_p0, "qc_st_known_broken_0"] = "CRIT_known_broken"

# ---- Update rollups ----
kb_sm0 = df["qc_sm_known_broken_0"].eq("CRIT_known_broken")
kb_sm1 = df["qc_sm_known_broken_1"].eq("CRIT_known_broken")
df[QC_SM_ANYCRIT_0] = prev_sm_anycrit_0 | kb_sm0
df[QC_SM_ANYCRIT_1] = prev_sm_anycrit_1 | kb_sm1

kb_st0 = df["qc_st_known_broken_0"].eq("CRIT_known_broken")
kb_st1 = df["qc_st_known_broken_1"].eq("CRIT_known_broken")
df[QC_ST_ANYCRIT_0] = prev_st_anycrit_0 | kb_st0
df[QC_ST_ANYCRIT_1] = prev_st_anycrit_1 | kb_st1

# ---- Mask clean series on CRIT known-broken ----
# Soil moisture
df.loc[kb_sm0, SM0_CLEAN] = np.nan
df.loc[kb_sm1, SM1_CLEAN] = np.nan
# Soil temperature
df.loc[kb_st0, ST0_CLEAN] = np.nan
df.loc[kb_st1, ST1_CLEAN] = np.nan

# ---- Save with same Stage 3 name for downstream compatibility ----
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)
out_main = next_versioned_csv(OUT_DIR, "stage3_sm_knownbroken")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# ---- Summaries ----
summary = pd.DataFrame([
    {"family":"SM", "probe":"0",
     "n_CRIT_known_broken": int(kb_sm0.sum()),
     "new_anycrit_added":   int((~prev_sm_anycrit_0 & kb_sm0).sum())},
    {"family":"SM", "probe":"1",
     "n_CRIT_known_broken": int(kb_sm1.sum()),
     "new_anycrit_added":   int((~prev_sm_anycrit_1 & kb_sm1).sum())},
    {"family":"ST", "probe":"0",
     "n_CRIT_known_broken": int(kb_st0.sum()),
     "new_anycrit_added":   int((~prev_st_anycrit_0 & kb_st0).sum())},
    {"family":"ST", "probe":"1",
     "n_CRIT_known_broken": int(kb_st1.sum()),
     "new_anycrit_added":   int((~prev_st_anycrit_1 & kb_st1).sum())},
])
sum_path = next_versioned_csv(OUT_DIR, "stage3_knownbroken_counts_overall")
summary.to_csv(sum_path, index=False)
print("Wrote overall counts:", sum_path)
print(summary.to_string(index=False))

# Per-node breakdown (both families)
pernode = (
    pd.concat([
        df[[NODE_COL, "qc_sm_known_broken_0"]].rename(columns={"qc_sm_known_broken_0":"flag"}).assign(family="SM", probe="0"),
        df[[NODE_COL, "qc_sm_known_broken_1"]].rename(columns={"qc_sm_known_broken_1":"flag"}).assign(family="SM", probe="1"),
        df[[NODE_COL, "qc_st_known_broken_0"]].rename(columns={"qc_st_known_broken_0":"flag"}).assign(family="ST", probe="0"),
        df[[NODE_COL, "qc_st_known_broken_1"]].rename(columns={"qc_st_known_broken_1":"flag"}).assign(family="ST", probe="1"),
    ], ignore_index=True)
    .groupby([NODE_COL, "family", "probe", "flag"]).size().rename("n").reset_index()
    .sort_values([NODE_COL, "family", "probe", "flag"])
)
pernode_path = next_versioned_csv(OUT_DIR, "stage3_knownbroken_counts_pernode")
pernode.to_csv(pernode_path, index=False)
print("Wrote per-node counts:", pernode_path)

# Breadcrumbs
tmin = pd.to_datetime(df[TS_NS_COL].astype("int64")).min()
tmax = pd.to_datetime(df[TS_NS_COL].astype("int64")).max()
print("\nTime span:", tmin, "→", tmax, "| Rows:", n_in, "→", len(df))


Reading: af_clean_v01/stage2_soilmoisture_range_v001.csv
Wrote: af_clean_v01/stage3_sm_knownbroken_v001.csv
Wrote overall counts: af_clean_v01/stage3_knownbroken_counts_overall_v001.csv
family probe  n_CRIT_known_broken  new_anycrit_added
    SM     0                30230              30230
    SM     1                15620              15620
    ST     0                30230              30230
    ST     1                15620              15620
Wrote per-node counts: af_clean_v01/stage3_knownbroken_counts_pernode_v001.csv

Time span: 2023-10-30 12:18:43 → 2025-09-18 10:29:42 | Rows: 483787 → 483787


New columns: qc_sm_known_broken_0, qc_sm_known_broken_1 with OK or CRIT_known_broken.

Updated rollups: qc_sm_anycrit_0/1 now reflect these CRITs.

ground_humidity_0_clean and ground_humidity_1_clean set to NaN during the known-broken periods.

Summary CSVs with how many rows were flagged per probe and per node.

**Step 4: Soil moisture — flatline detection**
* resolution-aware exact repeats only (no arbitrary rolling std),
* time-based thresholds (WARN ≥ 24 h, CRIT ≥ 48 h),
* per-node sampling interval used for duration,
* **no** boosters yet (rain responsiveness / pair-consistency) — we can add them later, behind switches.

In [5]:
# =========================
# Step 4: Soil moisture — flatline detection (exact-repeat, resolution-aware)
# GUARANTEES:
#  - Reads latest Stage 3 (else Stage 2)
#  - Reconstructs timestamp from ts_ns (no free-text parsing)
#  - No row loss; preserves row_id
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob
import math

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core columns from Stage 1
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"   # in-memory datetime64[ns]
NODE_COL      = "node"

# Soil moisture
SM_COLS  = ["ground_humidity_0", "ground_humidity_1"]
SM_CLEAN = {c: f"{c}_clean" for c in SM_COLS}

# Thresholds (time-based)
WARN_HOURS = 24    # identical value for ≥ 24h -> WARN
CRIT_HOURS = 48    # identical value for ≥ 48h -> CRIT

# Optional boosters (keep OFF for now)
USE_RAIN_BOOSTER = False
RAIN_COL = "rain_amount"
RAIN_EVENT_MM = 2.0
RAIN_WINDOW_H = 6
NO_RESPONSE_THRESHOLD = 0.5

USE_PAIR_BOOSTER = False

# the previous block defines what counts as a flatline, how long it must
# persist to be WARN vs CRIT, and prepares the columns and optional heuristics
# the stage will use

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

# ---- Load latest prior stage (prefer Stage 3, else Stage 2) ----
cands = sorted(glob(str(OUT_DIR / "stage3_sm_knownbroken_v*.csv")))
if not cands:
    cands = sorted(glob(str(OUT_DIR / "stage2_soilmoisture_range_v*.csv")))
assert cands, "Run Stage 2 (and Stage 3 if available) first."
in_path = cands[-1]
print("Reading:", in_path)

df = pd.read_csv(in_path, low_memory=False)

# ---- Required columns must exist ----
required = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL, *SM_COLS, *SM_CLEAN.values()}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns: {sorted(missing)}")

# ---- Reconstruct timestamp from ts_ns (authoritative) ----
df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))

# Invariants (echo)
n_in = len(df)
assert df[ROW_ID_COL].is_unique, "row_id should be unique from Stage 1."

# Ensure numeric for moisture columns (raw; clean can be NaN already)
for c in SM_COLS:
    df[c] = pd.to_numeric(df[c], errors="coerce")

# ---- Helpers ----
def infer_resolution(x: pd.Series) -> float:
    """Infer numeric resolution from unique deltas; clamp to [1e-3, 1.0]."""
    vals = np.sort(x.dropna().unique())
    if len(vals) < 2:
        return 0.01
    diffs = np.diff(vals)
    pos = diffs[diffs > 0]
    res = float(np.nanmin(pos)) if len(pos) else 0.01
    return max(1e-3, min(res, 1.0))

def decimals_from_resolution(res: float) -> int:
    if res <= 0:
        return 2
    dec = max(0, int(round(-np.log10(res))))
    return min(dec, 6)

def median_sampling_minutes(ts: pd.Series) -> float:
    if ts.size < 2:
        return np.nan
    d = (ts.values[1:] - ts.values[:-1]).astype("timedelta64[s]").astype(float)
    med = np.nanmedian(d) if d.size else np.nan
    if not (med > 0):
        return np.nan
    return float(med) / 60.0

def flag_flatlines_for_probe(g: pd.DataFrame, col: str) -> pd.Series:
    """Return per-row 'OK' / 'WARN_flatline' / 'CRIT_flatline' for one node+probe group."""
    # Estimate sampling interval (per node)
    dt_min = median_sampling_minutes(g[TIMESTAMP_COL].reset_index(drop=True))
    if not (dt_min > 0):
        dt_min = 10.0  # fallback

    warn_needed = max(2, int(math.ceil((WARN_HOURS * 60.0) / dt_min)))
    crit_needed = max(2, int(math.ceil((CRIT_HOURS * 60.0) / dt_min)))

    # Infer resolution and round series to that grid
    res = infer_resolution(g[col])
    dec = decimals_from_resolution(res)
    vals = g[col].to_numpy(dtype=float)
    v_rounded = np.round(vals / res) * res
    v_rounded = np.round(v_rounded, dec)  # tidy FP noise

    # Label runs where the rounded value is identical
    same_as_prev = np.r_[True, v_rounded[1:] == v_rounded[:-1]]  # first row starts a run
    run_id = np.cumsum(~same_as_prev)  # increment when value changes

    # Run lengths in samples
    counts = np.bincount(run_id)
    run_len = counts[run_id]

    # Classify
    flag = np.full(len(g), "OK", dtype=object)
    warn_mask = (run_len >= warn_needed) & (run_len < crit_needed)
    crit_mask = (run_len >= crit_needed)
    flag[warn_mask] = "WARN_flatline"
    flag[crit_mask] = "CRIT_flatline"

    return pd.Series(flag, index=g.index, name=f"qc_sm_flatline_{'0' if col.endswith('_0') else '1'}")

# ---- Apply flatline detection per node for each probe ----
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True)
for col in SM_COLS:
    flag_name = f"qc_sm_flatline_{'0' if col.endswith('_0') else '1'}"
    df[flag_name] = "OK"

    # group by node; pass only needed columns to avoid deprecation warnings
    flags = (
        df.groupby(NODE_COL, group_keys=False)[[TIMESTAMP_COL, col]]
          .apply(lambda g: flag_flatlines_for_probe(g, col))
    )
    df.loc[flags.index, flag_name] = flags.values

# ---- Update rollups and clean series ----
for col in SM_COLS:
    suffix      = "0" if col.endswith("_0") else "1"
    flat_col    = f"qc_sm_flatline_{suffix}"
    anycrit_col = f"qc_sm_anycrit_{suffix}"
    anywarn_col = f"qc_sm_anywarn_{suffix}"
    clean_col   = SM_CLEAN[col]

    if anycrit_col not in df.columns:
        df[anycrit_col] = False
    if anywarn_col not in df.columns:
        df[anywarn_col] = False

    prev_anycrit = df[anycrit_col].astype(bool)
    prev_anywarn = df[anywarn_col].astype(bool)

    new_crit = df[flat_col].eq("CRIT_flatline")
    new_warn = df[flat_col].eq("WARN_flatline")

    updated_anycrit = prev_anycrit | new_crit
    updated_anywarn = (prev_anywarn | new_warn) & (~updated_anycrit)

    df[anycrit_col] = updated_anycrit
    df[anywarn_col] = updated_anywarn

    # Mask CRIT flatlines in the clean series (WARN stays visible)
    df.loc[new_crit, clean_col] = np.nan

# ---- Optional boosters (OFF) ----
if USE_RAIN_BOOSTER:
    print("RAIN booster placeholder (OFF).")
if USE_PAIR_BOOSTER:
    print("PAIR booster placeholder (OFF).")

# ---- Deterministic ordering & save ----
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)

out_main = next_versioned_csv(OUT_DIR, "stage4_sm_flatline")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# ---- Summaries ----
def summarize_flags(df, suffix):
    col = f"qc_sm_flatline_{suffix}"
    return (df[col].value_counts(dropna=False)
              .rename_axis("flag").reset_index(name=f"n_rows_{suffix}"))

sum0 = summarize_flags(df, "0")
sum1 = summarize_flags(df, "1")
summary = sum0.merge(sum1, on="flag", how="outer").fillna(0).sort_values("flag")
sum_path = next_versioned_csv(OUT_DIR, "stage4_sm_flatline_counts_overall")
summary.to_csv(sum_path, index=False)
print("Wrote overall counts:", sum_path)
print("\nOverall flatline counts:")
print(summary.to_string(index=False))

# Per-node duration (approx) of CRIT/WARN (in hours), using median dt per node
def pernode_duration_estimate(df, probe_col, flat_col):
    med_dt = (df.groupby(NODE_COL)[TIMESTAMP_COL]
                .apply(lambda s: median_sampling_minutes(s.sort_values()))
                .rename("dt_min"))
    tmp = df[[NODE_COL, flat_col]].copy()
    tmp["n"] = 1
    tmp = tmp.groupby([NODE_COL, flat_col])["n"].sum().reset_index()
    tmp = tmp.merge(med_dt, on=NODE_COL, how="left")
    tmp["hours"] = (tmp["n"] * tmp["dt_min"]) / 60.0
    tmp["probe"] = probe_col
    return tmp[[NODE_COL, "probe", flat_col, "n", "hours"]]

pernode0 = pernode_duration_estimate(df, "probe0", "qc_sm_flatline_0")
pernode1 = pernode_duration_estimate(df, "probe1", "qc_sm_flatline_1")
pernode = pd.concat([pernode0, pernode1], ignore_index=True)
pernode_path = next_versioned_csv(OUT_DIR, "stage4_sm_flatline_counts_pernode")
pernode.to_csv(pernode_path, index=False)
print("Wrote per-node duration estimates:", pernode_path)

# Minimal console diagnostics
tmin = pd.to_datetime(df[TS_NS_COL].astype("int64")).min()
tmax = pd.to_datetime(df[TS_NS_COL].astype("int64")).max()
print("\nTime span (dataset, from ts_ns):", tmin, "→", tmax)
print("Rows in/out (should match input):", n_in, "→", len(df))
print("Nodes:", sorted(df[NODE_COL].dropna().unique().tolist()))


Reading: af_clean_v01/stage3_sm_knownbroken_v001.csv
Wrote: af_clean_v01/stage4_sm_flatline_v001.csv
Wrote overall counts: af_clean_v01/stage4_sm_flatline_counts_overall_v001.csv

Overall flatline counts:
         flag  n_rows_0  n_rows_1
           OK    483690  483787.0
WARN_flatline        97       0.0
Wrote per-node duration estimates: af_clean_v01/stage4_sm_flatline_counts_pernode_v001.csv

Time span (dataset, from ts_ns): 2023-10-30 12:18:43 → 2025-09-18 10:29:42
Rows in/out (should match input): 483787 → 483787
Nodes: ['node_176', 'node_179', 'node_181', 'node_182', 'node_183', 'node_184', 'node_186', 'node_187', 'node_189']


**Step 5: Soil mositure**

* Reads the latest ≤Stage 4 input (never Step-5 output) → idempotent reruns.
* Parses time, coerces soil-moisture probes to numeric, sorts by node & time.
* Flags long ramps (monotonic rise/fall after median smoothing) as WARN; no masking.
* Flags prolonged high-volatility runaway segments (rolling STD) as CRIT candidates.
* Applies masks conservatively: keep earlier CRIT (range/known-broken/flatline); mask runaway only for (node_187, probe 0) and force-mask after 2025-04-01.
* Rebuilds fresh *_clean series from raw, then re-applies only the chosen CRIT masks.
* Writes a versioned CSV in af_clean_v01/ (no overwrite).
* Prints concise diagnostics (OK/WARN/CRIT counts per probe).

In [2]:
# =========================
# Step 5: Soil moisture — conservative regime QC (ramp/runaway)
# Idempotent: rebuilds *_clean from raw and re-applies ONLY chosen CRIT masks
# =========================
import pandas as pd, numpy as np
from pathlib import Path
from glob import glob

OUT_DIR = Path("af_clean_v01"); OUT_DIR.mkdir(parents=True, exist_ok=True)

ROW_ID_COL, TS_NS_COL, TS_ISO_COL = "row_id", "ts_ns", "timestamp_iso"
TIMESTAMP_COL, NODE_COL = "timestamp", "node"
SM_COLS = ["ground_humidity_0", "ground_humidity_1"]
SM_CLEAN = {c: f"{c}_clean" for c in SM_COLS}

# ---- tuning (conservative) ----
RAMP_MIN_AMP_VWC    = 10.0   # WARN only
RAMP_MIN_HOURS      = 24.0
RAMP_MIN_SIGN_SHARE = 0.85

RUNAWAY_STD_WIN_H   = 12.0   # CRIT (eligible for masking)
RUNAWAY_STD_MIN_VWC = 5.0
RUNAWAY_MIN_HOURS   = 24.0

SMOOTH_WIN_SAMPLES  = 5
MIN_DT_MINUTES      = 5.0

# mask policy: only mask runaway for (node_187, probe 0)
ALLOW_AUTOMASK = {("node_187","0"): True}
FORCE_MASK_AFTER = {("node_187","0"): pd.Timestamp("2025-04-01")}

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists(): return p
        i += 1

def pick_latest_by_priority(patterns):
    for pat in patterns:
        files = sorted(glob(str(OUT_DIR / pat)))
        if files: return files[-1]
    return None

# ---- read strictly from ≤ Stage 4 to avoid reusing Stage 5 outputs ----
in_path = pick_latest_by_priority([
    "stage4_sm_flatline_v*.csv",
    "stage3_sm_knownbroken_v*.csv",
    "stage2_soilmoisture_range_v*.csv",
    "stage1_parsed_v*.csv",
])
assert in_path is not None, "No Stage 1–4 file found."
print("Reading:", in_path)

df = pd.read_csv(in_path, low_memory=False)
n_in = len(df)

# basics
req = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL, *SM_COLS}
missing = req - set(df.columns)
if missing: raise ValueError(f"Missing required columns: {sorted(missing)}")

df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))
assert df[ROW_ID_COL].is_unique, "row_id must be unique."
for c in SM_COLS: df[c] = pd.to_numeric(df[c], errors="coerce")

# ---------- helpers ----------
def median_sampling_minutes(ts_np: np.ndarray) -> float:
    if ts_np.size < 2: return np.nan
    d = (ts_np[1:] - ts_np[:-1]).astype("timedelta64[s]").astype(float)
    med = np.nanmedian(d) if d.size else np.nan
    return float(med)/60.0 if med and med > 0 else np.nan

def smooth_median(x: np.ndarray, win: int) -> np.ndarray:
    if not (win >= 3 and win % 2 == 1): return x.astype(float)
    n = x.size; pad = win//2
    xp = np.pad(x.astype(float), (pad, pad), mode="edge")
    out = np.empty(n, dtype=float)
    for i in range(n):
        w = xp[i:i+win]; w = w[np.isfinite(w)]
        out[i] = np.nan if w.size == 0 else np.median(w)
    return out

def runs_from_bool(mask: np.ndarray):
    if mask.size == 0: return []
    edges = np.diff(np.r_[False, mask.astype(bool), False])
    s = np.where(edges == 1)[0]; e = np.where(edges == -1)[0] - 1
    return list(zip(s, e))

def detect_ramp_flags_group(x: np.ndarray, t: np.ndarray) -> np.ndarray:
    n = x.size
    if n < 3: return np.array(["OK"]*n, dtype=object)
    order = np.argsort(t); rev = np.empty_like(order); rev[order] = np.arange(n)
    xs = smooth_median(x[order].astype(float), SMOOTH_WIN_SAMPLES)
    ts = t[order]
    dt_min = (ts[1:] - ts[:-1]).astype("timedelta64[s]").astype(float)/60.0
    ok_dt  = np.r_[False, dt_min >= MIN_DT_MINUTES]
    dif    = np.full(n, np.nan); dif[1:][ok_dt[1:]] = xs[1:][ok_dt[1:]] - xs[:-1][ok_dt[1:]]
    mono   = np.where(np.abs(dif) < 1e-6, 0.0, np.sign(dif))
    flags_sorted = np.array(["OK"]*n, dtype=object)
    change = np.r_[True, mono[1:] != mono[:-1]] | (mono == 0)
    run_id = np.cumsum(change)
    for rid in np.unique(run_id[mono != 0]):
        mask = (run_id == rid) & (mono != 0)
        idx = np.where(mask)[0]
        if idx.size == 0: continue
        i0, i1 = idx.min(), idx.max()
        dur_min = np.nansum(dt_min[i0:i1]) if i1 > i0 else 0.0
        amp = abs(xs[i1] - xs[i0]) if np.isfinite(xs[i0]) and np.isfinite(xs[i1]) else np.nan
        nz = np.isfinite(dif[i0:i1+1]) & (np.abs(dif[i0:i1+1]) >= 1e-6)
        if nz.any():
            sign_share = np.mean(np.sign(dif[i0:i1+1][nz]) == np.sign(np.nansum(dif[i0:i1+1][nz])))
        else:
            sign_share = 0.0
        if (dur_min >= RAMP_MIN_HOURS*60.0) and (amp >= RAMP_MIN_AMP_VWC) and (sign_share >= RAMP_MIN_SIGN_SHARE):
            flags_sorted[i0:i1+1] = "WARN_ramp"
    return flags_sorted[rev]

def detect_runaway_flags_group(x: np.ndarray, t: np.ndarray) -> np.ndarray:
    n = x.size
    if n < 3: return np.array(["OK"]*n, dtype=object)
    order = np.argsort(t); rev = np.empty_like(order); rev[order] = np.arange(n)
    xs = smooth_median(x[order].astype(float), SMOOTH_WIN_SAMPLES)
    ts = t[order]
    dt = median_sampling_minutes(ts); dt = 10.0 if not (dt > 0) else dt
    win = max(5, int(round((RUNAWAY_STD_WIN_H*60.0)/dt)))
    rs = pd.Series(xs).rolling(win, min_periods=win//2).std().to_numpy()
    high_std = np.isfinite(rs) & (rs >= RUNAWAY_STD_MIN_VWC)
    flags_sorted = np.array(["OK"]*n, dtype=object)
    for s, e in runs_from_bool(high_std):
        dur_h = (ts[e] - ts[s]).astype("timedelta64[s]").astype(float) / 3600.0
        if dur_h >= RUNAWAY_MIN_HOURS:
            flags_sorted[s:e+1] = "CRIT_runaway"
    return flags_sorted[rev]

# ---------- initialize outputs ----------
for k in ("0","1"):
    df[f"qc_sm_regime_{k}"] = "OK"
    df[f"qc_sm_anycrit_{k}"] = False
    df[f"qc_sm_anywarn_{k}"] = False

# ---------- per-node loop (no groupby.apply) ----------
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True)
for col in SM_COLS:
    k = "0" if col.endswith("_0") else "1"
    regime_all = np.full(len(df), "OK", dtype=object)
    for node, g in df[[NODE_COL, TIMESTAMP_COL, col]].groupby(NODE_COL, sort=False):
        idx = g.index.values
        x = g[col].to_numpy(dtype=float); t = g[TIMESTAMP_COL].to_numpy()
        good = np.isfinite(x)
        if not good.any(): continue
        warn = detect_ramp_flags_group(x[good], t[good])
        crit = detect_runaway_flags_group(x[good], t[good])
        combined = np.where(crit == "CRIT_runaway", "CRIT_runaway",
                    np.where(warn == "WARN_ramp", "WARN_ramp", "OK"))
        regime_all[idx[good]] = combined
    df[f"qc_sm_regime_{k}"] = regime_all

# explicit force window(s)
for (node_id, k), tcut in FORCE_MASK_AFTER.items():
    ix = (df[NODE_COL] == node_id) & (df[TIMESTAMP_COL] >= pd.Timestamp(tcut))
    df.loc[ix, f"qc_sm_regime_{k}"] = "CRIT_runaway"

# ---------- REBUILD *_clean FROM RAW and re-apply ONLY desired CRIT masks ----------
for col in SM_COLS:
    k = "0" if col.endswith("_0") else "1"
    clean = df[col].astype(float).copy()

    def is_crit(colname: str, value_prefix="CRIT"):
        return df[colname].astype(str).str.startswith(value_prefix) if colname in df.columns else pd.Series(False, index=df.index)

    # keep Stage 2/3/4 CRIT masks
    crit_range = is_crit(f"qc_sm_range_{k}")                 # CRIT_out_of_range
    crit_kb    = df.get(f"qc_sm_known_broken_{k}", pd.Series("OK", index=df.index)).eq("CRIT_known_broken")
    crit_flat  = is_crit(f"qc_sm_flatline_{k}")              # CRIT_flatline

    # Step 5R runaway (but mask only for allowed node/probe)
    crit_run = df[f"qc_sm_regime_{k}"].eq("CRIT_runaway")
    may_mask = df[NODE_COL].map(lambda n: ALLOW_AUTOMASK.get((n, k), False)).fillna(False)
    crit_run = crit_run & may_mask

    mask_any = crit_range | crit_kb | crit_flat | crit_run
    clean[mask_any] = np.nan
    df[SM_CLEAN[col]] = clean

    # rollups (WARN from regime only; others are CRIT-only stages here)
    df[f"qc_sm_anycrit_{k}"] = mask_any
    df[f"qc_sm_anywarn_{k}"] = (df[f"qc_sm_regime_{k}"] == "WARN_ramp") & (~df[f"qc_sm_anycrit_{k}"])

# ---------- save ----------
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)
out_main = next_versioned_csv(OUT_DIR, "stage5r_sm_regimes")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# quick summary
for k in ("0","1"):
    s = df[f"qc_sm_regime_{k}"].value_counts(dropna=False)
    print(f"Probe {k} regimes:", dict(s))


AssertionError: No Stage 1–4 file found.

The following neighbors_manual defines who is neighbouring node (in the physical setting of the climate stations), to be able to use the neighbouring node as comparison for sanity checks.

In [7]:
# Write neighbors_manual.csv from your mapping (run once)
import pandas as pd

rows = [
    # node_id, sm0 list,                         sm1 list
    ("node_176", ["node_176_1"],                 ["node_176_0"]),
    ("node_182", ["node_182_1","node_187_0","node_187_1"], ["node_182_0","node_187_0","node_187_1"]),
    ("node_187", ["node_187_1","node_182_1","node_182_0"], ["node_187_0","node_179_0","node_179_1"]),
    ("node_179", ["node_179_1","node_187_1","node_187_0"], ["node_179_0","node_187_1","node_187_0"]),  # fixed 187_187_0→187_0
    ("node_183", ["node_183_1","node_179_1","node_181_0"], ["node_183_0","node_181_0","node_181_1"]),
    ("node_181", ["node_181_1","node_186_0","node_186_1"], ["node_181_0","node_186_0","node_186_1"]),
    ("node_186", ["node_186_1","node_181_1","node_181_0"], []),  # no sm1 list provided
    ("node_184", ["node_184_1","node_186_1","node_186_0"], []),  # no sm1 list provided
    ("node_189", ["node_189_1"],                 ["node_189_0"]),
]

df = pd.DataFrame({
    "node_id": [n for n,_,_ in rows],
    "sm0_neighbors": ["|".join(lst) for _,lst,_ in rows],
    "sm1_neighbors": ["|".join(lst) for _,_,lst in rows],
})

df.to_csv("neighbors_manual.csv", index=False)
print("Wrote neighbors_manual.csv:")
print(df)


Wrote neighbors_manual.csv:
    node_id                     sm0_neighbors  \
0  node_176                        node_176_1   
1  node_182  node_182_1|node_187_0|node_187_1   
2  node_187  node_187_1|node_182_1|node_182_0   
3  node_179  node_179_1|node_187_1|node_187_0   
4  node_183  node_183_1|node_179_1|node_181_0   
5  node_181  node_181_1|node_186_0|node_186_1   
6  node_186  node_186_1|node_181_1|node_181_0   
7  node_184  node_184_1|node_186_1|node_186_0   
8  node_189                        node_189_1   

                      sm1_neighbors  
0                        node_176_0  
1  node_182_0|node_187_0|node_187_1  
2  node_187_0|node_179_0|node_179_1  
3  node_179_0|node_187_1|node_187_0  
4  node_183_0|node_181_0|node_181_1  
5  node_181_0|node_186_0|node_186_1  
6                                    
7                                    
8                        node_189_0  


Manual neighbor composite

Purpose: Build a reference soil-moisture time series for a target node/probe from manually listed neighbor nodes—no GPS needed.

Inputs: Main dataframe with node, __t (UTC datetime), ground_humidity_0/1 (optionally …_clean), plus a neighbors_manual.csv with node_id, sm0_neighbors, sm1_neighbors.

Process (robust):
* Load neighbor IDs for the target node/probe.
* Extract duplicate-safe time series for target & neighbors (median if duplicate timestamps).
* Robust-scale each series using recent median & IQR (handles offsets/drift).
* Screen neighbors by Spearman correlation to the target (keeps those that co-move).
* Combine neighbors via median of z-scores, then map back to target units.
* Quality flag: Compute rolling correlation target vs. reference → label confidence “med” or “low.”
* Outputs: The composite reference series (same units as target) + metadata (source, confidence, number of neighbors, correlation).
* Why this way: Median/IQR + Spearman + median combine = outlier-resistant, works with baseline differences, and is idempotent (safe to rerun).

In [8]:
# ---- Manual neighbor composite (no lat/lon needed) ----
# Requires df with columns: node_col, "__t" (UTC datetime), ground_humidity_{0,1}, and *optionally* *_clean
import re

NEIGH_PATH = "neighbors_manual.csv"  # change if needed

def _load_manual_neighbors(path=NEIGH_PATH):
    try:
        nm = pd.read_csv(path)
    except FileNotFoundError:
        return None
    # normalize columns
    nm.columns = [c.strip().lower() for c in nm.columns]
    need = {"node_id","sm0_neighbors","sm1_neighbors"}
    if not need.issubset(set(nm.columns)):
        raise ValueError(f"neighbors_manual.csv is missing columns: {sorted(need - set(nm.columns))}")
    return nm

def _parse_ids(s):
    if pd.isna(s): return []
    items = [x.strip() for x in re.split(r"[|,;]+", str(s)) if x.strip()]
    return items

def _series_for(df_all, node_col, nid, probe, use_clean=False):
    """Return duplicate-safe time series for one neighbor & probe (as float, DatetimeIndex)."""
    sub = df_all[df_all[node_col].astype(str) == str(nid)]
    if sub.empty: return pd.Series(dtype="float64")
    sidx = pd.to_datetime(sub["__t"], utc=True, errors="coerce")
    col = f"ground_humidity_{probe}" + ("_clean" if use_clean and f"ground_humidity_{probe}_clean" in sub.columns else "")
    vals = pd.to_numeric(sub[col], errors="coerce")
    ser = pd.Series(vals.values, index=sidx).dropna()
    if ser.index.duplicated().any():
        ser = ser.groupby(level=0).median(numeric_only=True)
    return ser.sort_index()

def _robust_center_scale(s, horizon_days=90):
    """Return (median, IQR) over recent horizon for robust z-scaling; fall back to whole series."""
    if s.empty: return (np.nan, np.nan)
    tmax = s.index.max()
    recent = s[s.index >= (tmax - pd.Timedelta(days=horizon_days))]
    base = recent if recent.notna().sum() >= 48 else s
    med = base.median()
    iqr = base.quantile(0.75) - base.quantile(0.25)
    if not np.isfinite(iqr) or iqr == 0: iqr = np.nan
    return med, iqr

def build_manual_neighbor_ref(df_all, node_col, target_node, probe, t_index, neighbors_df, use_clean=False,
                              min_neighbors=2, corr_gate=0.5):
    """
    Build a 'virtual buddy' from manually listed neighbors for (target_node, probe).
    Returns (ref_series, src, conf, nref, roll_corr).
    conf ∈ {'med','low'}; src ∈ {'statistical'} for now.
    """
    if neighbors_df is None:
        return None, "none", "low", 0, np.nan

    row = neighbors_df[neighbors_df["node_id"].astype(str) == str(target_node)]
    if row.empty:
        return None, "none", "low", 0, np.nan

    neigh_ids = _parse_ids(row.iloc[0][f"sm{probe}_neighbors"])
    if not neigh_ids:
        return None, "none", "low", 0, np.nan

    # target series (raw preferred for divergence; you can switch to clean)
    tgt = _series_for(df_all, node_col, target_node, probe, use_clean=False).reindex(t_index)
    if tgt.notna().sum() < 100:
        return None, "none", "low", 0, np.nan

    # robust center/scale for mapping back to target units
    t_med, t_iqr = _robust_center_scale(tgt)
    if not np.isfinite(t_iqr):  # if fully flat or no variation, can't z-scale sensibly
        return None, "none", "low", 0, np.nan

    z_cols = {}
    weights = []
    used_ids = []
    for nid in neigh_ids:
        s = _series_for(df_all, node_col, nid, probe, use_clean=use_clean).reindex(t_index)
        if s.notna().sum() < 100:
            continue
        n_med, n_iqr = _robust_center_scale(s)
        if not np.isfinite(n_iqr):
            continue
        z = (s - n_med) / n_iqr
        # similarity via Spearman is a bit more robust to offsets
        ok = tgt.notna() & z.notna()
        if ok.sum() < 100:
            continue
        corr = pd.Series(tgt.values, index=t_index)[ok].corr(pd.Series(z.values, index=t_index)[ok], method="spearman")
        if not np.isfinite(corr) or corr < 0.2:  # very permissive at this stage; we’ll gate later
            continue
        z_cols[nid] = z
        weights.append(max(corr, 0))
        used_ids.append(nid)

    if len(z_cols) < min_neighbors:
        return None, "none", "low", len(z_cols), np.nan

    Z = pd.DataFrame(z_cols, index=t_index)
    z_ref = Z.median(axis=1, skipna=True)  # robust across neighbors
    ref = t_med + z_ref * t_iqr

    # quality: rolling correlation over last ~48h equivalent samples
    ok = tgt.notna() & ref.notna()
    roll_corr = (pd.Series(tgt[ok]).rolling(48, min_periods=24).corr(pd.Series(ref[ok]))).median()
    conf = "med" if (np.isfinite(roll_corr) and roll_corr >= corr_gate) else "low"
    return ref, "statistical", conf, len(z_cols), roll_corr


** Step 5PD: Soil moisture**
* Goal: Catch pair-divergence in soil moisture when a probe drifts away from its expected partner behavior, and mask those bad stretches in …_clean.
* Inputs: Latest QC’d soil-moisture file (auto-picked from Stage 2–5/AR), plus optional neighbors_manual.csv. Requires probe 0/1 columns and timestamps.
* Reference logic (per node, per probe):
* Prefer the sibling probe as reference when it’s present & not already CRIT.
* Otherwise build a manual-neighbor composite (robust median/IQR scaling, Spearman screening, median combine), and only trust it if correlation ≥ NEIGH_CORR_GATE.
* Divergence score: Compute residuals vs. reference, then a causal rolling median baseline and MAD-based σ; flag large & meaningful gaps with thresholds:
* Sibling: |z| ≥ Z_SIB and gap ≥ GAP_SIB
* Neighbor: |z| ≥ Z_NEI and gap ≥ GAP_NEI
* Run gating (fewer false positives): Require MIN_RUN consecutive flagged points; allow edge-relax (half length) if the run touches start/end of the series.
* Masking & flags: Map runs back to rows, set qc_sm_pairdiverge_{0,1} = "CRIT_pair_diverge", update qc_sm_anycrit_{0,1}, and mask in ground_humidity_{0,1}_clean.
* Robustness details: Duplicate timestamps → median; rolling stats use window = 7D with min periods to stabilize the tail; prior CRIT masks respected.
* Outputs: Versioned CSV af_clean_v01/stage5pd_sm_pairdiverge_vNNN.csv, masked-share summary, and provenance columns (…_src, …_nref, …_corr) for auditability.

In [9]:
# =========================
# Step 5PD: Soil moisture Pair-Divergence with Manual-Neighbor Fallback
# =========================
# (UNCHANGED preamble/inputs/outputs)

import os, re, glob
import numpy as np
import pandas as pd
from pathlib import Path

# ---- Config ----
IN_SEARCH = [
    "af_clean_v01/stage5r_sm_regimes_v*.csv",
    "af_clean_v01/stage4_sm_flatline_v*.csv",
    "af_clean_v01/stage3_sm_knownbroken_v*.csv",
    "af_clean_v01/stage2_soilmoisture_range_v*.csv",
    "af_clean_v01/analysis_ready_v*.csv",
    "analysis_ready_v*.csv"
]
OUT_DIR = Path("af_clean_v01"); OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_STEM = "stage5pd_sm_pairdiverge_v"

# Gates
ROLL_WIN = "7D"
MIN_RUN  = 24
Z_SIB    = 6.0
GAP_SIB  = 2.0
Z_NEI    = 7.0
GAP_NEI  = 2.5
EPS_SIG  = 1e-6
NEIGH_CORR_GATE = 0.5

# --- NEW: make rolling robust at the edges and relax runs that touch edges
ROLL_CENTER = False                 # NEW: causal stats so tail has a baseline
ROLL_MINP   = max(12, MIN_RUN//2)   # NEW: enough history to be stable at edges
EDGE_RELAX  = 0.5                   # NEW: half-length ok if a run touches start/end

# ---- small I/O helpers ----
def _pick_latest():
    for pat in IN_SEARCH:
        hits = sorted(glob.glob(pat))
        if hits:
            def _ver(p):
                m = re.search(r"_v(\d+)\.csv$", os.path.basename(p))
                return int(m.group(1)) if m else -1
            best = max(hits, key=_ver)
            return best if _ver(best) >= 0 else hits[-1]
    raise FileNotFoundError("No SM stage file found for 5PD.")

def _next_versioned(stem):
    i = 1
    while True:
        p = OUT_DIR / f"{stem}{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

def _to_dt(df):
    if "timestamp" in df.columns:     return pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
    if "timestamp_iso" in df.columns: return pd.to_datetime(df["timestamp_iso"], utc=True, errors="coerce")
    return pd.to_datetime(pd.to_numeric(df["ts_ns"], errors="coerce").astype("Int64"), utc=True, errors="coerce")

def _dup_median(series):
    s = series.dropna()
    if s.index.duplicated().any():
        s = s.groupby(level=0).median(numeric_only=True)
    return s.sort_index()

# --- CHANGED: causal rolling + smaller min_periods
def _roll_med(series, win=ROLL_WIN, minp=ROLL_MINP, center=ROLL_CENTER):
    return series.rolling(win, min_periods=minp, center=center).median()

def _roll_mad_sigma(diff, win=ROLL_WIN, minp=ROLL_MINP, center=ROLL_CENTER):
    dev = (diff - _roll_med(diff, win, minp, center)).abs()
    mad = dev.rolling(win, min_periods=minp, center=center).median()
    return 1.4826 * mad

# --- CHANGED: edge-relaxed contiguous run detector
def _contig_run_mask(mask_bool, min_len=MIN_RUN, edge_relax=EDGE_RELAX):
    m = pd.Series(mask_bool.astype(bool))
    if m.empty or not m.any():
        return pd.Series(False, index=m.index)
    idx = m.index
    vals = m.values
    n = len(vals)
    out = pd.Series(False, index=idx)
    i = 0
    while i < n:
        if vals[i]:
            j = i + 1
            while j < n and vals[j]:
                j += 1
            seg_len = j - i
            touches_edge = (i == 0) or (j == n)
            need = min_len if not touches_edge else int(np.ceil(edge_relax * min_len))
            if seg_len >= need:
                out.iloc[i:j] = True
            i = j
        else:
            i += 1
    return out

def _coerce_anycrit_bool(s):
    if s is None: return None
    s = pd.Series(s)
    if s.dtype == bool: return s.fillna(False)
    return s.astype(str).str.startswith("CRIT", na=False)

# ---- neighbor helpers must exist (unchanged guard)
if '_load_manual_neighbors' not in globals() or 'build_manual_neighbor_ref' not in globals():
    raise RuntimeError("Missing neighbor helpers. Run the helper cell defining `_load_manual_neighbors` and `build_manual_neighbor_ref` first.")

# ---- Load input (unchanged) ----
src = _pick_latest()
df  = pd.read_csv(src, low_memory=False)
print(f"[5PD] Input: {src} | rows={len(df)}")

# ---- Time & node columns (unchanged) ----
df["__t"] = _to_dt(df)
df = df.dropna(subset=["__t"]).copy()
node_col = None
for c in ["node_id","node","station","device","site"]:
    if c in df.columns: node_col = c; break
if node_col is None:
    df["__node"] = "__all__"; node_col = "__node"
print(f"[5PD] Using node column: {node_col}")

# ---- Ensure clean columns exist (unchanged) ----
for suf in ("0","1"):
    raw = f"ground_humidity_{suf}"
    clean = f"{raw}_clean"
    if clean not in df.columns:
        df[clean] = pd.to_numeric(df[raw], errors="coerce")

# ---- Load manual neighbors (unchanged) ----
neighbors_df = _load_manual_neighbors()

# ---- Pre-anycrit (unchanged) ----
def _pre_anycrit_bool(g, suf):
    idx = pd.to_datetime(g["__t"])
    def _crit_from(col):
        if col not in g.columns:
            return pd.Series(False, index=idx)
        s = g[col].astype(str)
        return pd.Series(s.str.startswith("CRIT", na=False).values, index=idx)
    cols = [f"qc_sm_range_{suf}", f"qc_sm_flatline_{suf}", f"qc_sm_known_broken_{suf}", f"qc_sm_regime_{suf}"]
    m = pd.Series(False, index=idx)
    for c in cols:
        m = m | _crit_from(c)
    if m.index.duplicated().any():
        m = m.groupby(level=0).max()
    return m

# ---- Prepare output columns (unchanged) ----
df["qc_sm_pairdiverge_0"] = "OK"; df["qc_sm_pairdiverge_1"] = "OK"
df["qc_sm_pairdiverge_src_0"] = ""; df["qc_sm_pairdiverge_src_1"] = ""
df["qc_sm_pairdiverge_nref_0"] = 0; df["qc_sm_pairdiverge_nref_1"] = 0
df["qc_sm_pairdiverge_corr_0"] = np.nan; df["qc_sm_pairdiverge_corr_1"] = np.nan
for suf in ("0","1"):
    col_any = f"qc_sm_anycrit_{suf}"
    if col_any in df.columns:
        df[col_any] = _coerce_anycrit_bool(df[col_any])
    else:
        df[col_any] = False

# ---- Per-node processing (unchanged logic apart from new gates) ----
masked_counts = {"0":0, "1":0}
raw_counts    = {"0":0, "1":0}

for node, g in df.sort_values([node_col, "__t"]).groupby(df[node_col].astype(str), sort=False):
    t_idx = pd.to_datetime(g["__t"])
    s0_raw = _dup_median(pd.Series(pd.to_numeric(g["ground_humidity_0"], errors="coerce").values, index=t_idx))
    s1_raw = _dup_median(pd.Series(pd.to_numeric(g["ground_humidity_1"], errors="coerce").values, index=t_idx))
    s0_clean = _dup_median(pd.Series(pd.to_numeric(g["ground_humidity_0_clean"], errors="coerce").values, index=t_idx))
    s1_clean = _dup_median(pd.Series(pd.to_numeric(g["ground_humidity_1_clean"], errors="coerce").values, index=t_idx))

    T = s0_raw.index.union(s1_raw.index).unique().sort_values()
    s0r = s0_raw.reindex(T)
    s1r = s1_raw.reindex(T)

    pre0 = _pre_anycrit_bool(g, "0").reindex(T).fillna(False)
    pre1 = _pre_anycrit_bool(g, "1").reindex(T).fillna(False)

    sib_ok_for_0 = s1r.notna() & (~pre1)
    sib_ok_for_1 = s0r.notna() & (~pre0)

    ref0_nei, src0, conf0, nref0, rcorr0 = build_manual_neighbor_ref(
        df_all=df, node_col=node_col, target_node=node, probe="0",
        t_index=T, neighbors_df=neighbors_df, use_clean=False,
        min_neighbors=2, corr_gate=NEIGH_CORR_GATE
    )
    ref1_nei, src1, conf1, nref1, rcorr1 = build_manual_neighbor_ref(
        df_all=df, node_col=node_col, target_node=node, probe="1",
        t_index=T, neighbors_df=neighbors_df, use_clean=False,
        min_neighbors=2, corr_gate=NEIGH_CORR_GATE
    )
    ref0_nei = ref0_nei if isinstance(ref0_nei, pd.Series) else pd.Series(np.nan, index=T)
    ref1_nei = ref1_nei if isinstance(ref1_nei, pd.Series) else pd.Series(np.nan, index=T)

    ref0 = s1r.where(sib_ok_for_0).combine_first(ref0_nei)
    ref1 = s0r.where(sib_ok_for_1).combine_first(ref1_nei)
    src0_series = pd.Series(np.where(sib_ok_for_0, "sibling",
                            np.where(ref0_nei.notna(), "statistical", "none")), index=T)
    src1_series = pd.Series(np.where(sib_ok_for_1, "sibling",
                            np.where(ref1_nei.notna(), "statistical", "none")), index=T)

    d0 = s0r - ref0; d1 = s1r - ref1
    med0 = _roll_med(d0); med1 = _roll_med(d1)
    sig0 = _roll_mad_sigma(d0).clip(lower=EPS_SIG)
    sig1 = _roll_mad_sigma(d1).clip(lower=EPS_SIG)
    z0 = (d0 - med0).abs() / sig0
    z1 = (d1 - med1).abs() / sig1
    gap0 = (d0 - med0).abs(); gap1 = (d1 - med1).abs()

    cand0_sib = (src0_series.eq("sibling")     & (z0 >= Z_SIB) & (gap0 >= GAP_SIB))
    cand0_nei = (src0_series.eq("statistical") & (conf0 == "med") & (z0 >= Z_NEI) & (gap0 >= GAP_NEI))
    cand1_sib = (src1_series.eq("sibling")     & (z1 >= Z_SIB) & (gap1 >= GAP_SIB))
    cand1_nei = (src1_series.eq("statistical") & (conf1 == "med") & (z1 >= Z_NEI) & (gap1 >= GAP_NEI))

    cand0 = (cand0_sib | cand0_nei).fillna(False)
    cand1 = (cand1_sib | cand1_nei).fillna(False)

    # --- CHANGED: edge-relaxed run gating
    crit0 = _contig_run_mask(cand0, MIN_RUN, EDGE_RELAX)
    crit1 = _contig_run_mask(cand1, MIN_RUN, EDGE_RELAX)

    # Map back to rows
    t_rows = pd.to_datetime(g["__t"])
    mask0_rows = pd.Series(crit0.values, index=cand0.index).reindex(t_rows).fillna(False).values
    mask1_rows = pd.Series(crit1.values, index=cand1.index).reindex(t_rows).fillna(False).values

    idx = g.index
    df.loc[idx, "qc_sm_pairdiverge_0"] = np.where(mask0_rows, "CRIT_pair_diverge", df.loc[idx, "qc_sm_pairdiverge_0"])
    df.loc[idx, "qc_sm_pairdiverge_1"] = np.where(mask1_rows, "CRIT_pair_diverge", df.loc[idx, "qc_sm_pairdiverge_1"])
    df.loc[idx, "ground_humidity_0_clean"] = pd.to_numeric(df.loc[idx, "ground_humidity_0_clean"], errors="coerce").mask(mask0_rows)
    df.loc[idx, "ground_humidity_1_clean"] = pd.to_numeric(df.loc[idx, "ground_humidity_1_clean"], errors="coerce").mask(mask1_rows)

    df.loc[idx, "qc_sm_anycrit_0"] = df.loc[idx, "qc_sm_anycrit_0"] | mask0_rows
    df.loc[idx, "qc_sm_anycrit_1"] = df.loc[idx, "qc_sm_anycrit_1"] | mask1_rows

    src0_rows = pd.Series(src0_series.values, index=cand0.index).reindex(t_rows).astype(str).values
    src1_rows = pd.Series(src1_series.values, index=cand1.index).reindex(t_rows).astype(str).values
    df.loc[idx, "qc_sm_pairdiverge_src_0"] = np.where(mask0_rows, src0_rows, df.loc[idx, "qc_sm_pairdiverge_src_0"])
    df.loc[idx, "qc_sm_pairdiverge_src_1"] = np.where(mask1_rows, src1_rows, df.loc[idx, "qc_sm_pairdiverge_src_1"])

    if nref0:
        df.loc[idx, "qc_sm_pairdiverge_nref_0"] = int(nref0)
        df.loc[idx, "qc_sm_pairdiverge_corr_0"] = float(rcorr0) if pd.notna(rcorr0) else np.nan
    if nref1:
        df.loc[idx, "qc_sm_pairdiverge_nref_1"] = int(nref1)
        df.loc[idx, "qc_sm_pairdiverge_corr_1"] = float(rcorr1) if pd.notna(rcorr1) else np.nan

    masked_counts["0"] += int(mask0_rows.sum())
    masked_counts["1"] += int(mask1_rows.sum())
    raw_counts["0"]    += int(pd.to_numeric(g["ground_humidity_0"], errors="coerce").notna().sum())
    raw_counts["1"]    += int(pd.to_numeric(g["ground_humidity_1"], errors="coerce").notna().sum())

# ---- Save & summary (unchanged) ----
out_path = _next_versioned(OUT_STEM)
df.to_csv(out_path, index=False)
print(f"[5PD] wrote: {out_path}")
for suf in ("0","1"):
    denom = raw_counts[suf] or 1
    print(f"[5PD] Masked share probe {suf}: {masked_counts[suf]}/{denom} = {masked_counts[suf]/denom:.2%}")
print("[5PD] Notes:",
      f" Z_SIB={Z_SIB}, GAP_SIB={GAP_SIB}, Z_NEI={Z_NEI}, GAP_NEI={GAP_NEI}, MIN_RUN={MIN_RUN}, ROLL_MINP={ROLL_MINP}, EDGE_RELAX={EDGE_RELAX}, NEIGH_CORR_GATE={NEIGH_CORR_GATE}")


[5PD] Input: af_clean_v01/stage5r_sm_regimes_v001.csv | rows=483787
[5PD] Using node column: node_id
[5PD] wrote: af_clean_v01/stage5pd_sm_pairdiverge_v001.csv
[5PD] Masked share probe 0: 8440/483787 = 1.74%
[5PD] Masked share probe 1: 7462/483787 = 1.54%
[5PD] Notes:  Z_SIB=6.0, GAP_SIB=2.0, Z_NEI=7.0, GAP_NEI=2.5, MIN_RUN=24, ROLL_MINP=12, EDGE_RELAX=0.5, NEIGH_CORR_GATE=0.5


**Step 6a: Soil temperature range QC**
* Loads your latest stage file (prefers the most recent; doesn’t overwrite anything).
* Parses timestamp (naive) and checks required columns exist: ground_temp_0, ground_temp_1, node, station_type.
* Casts soil temperature columns to numeric (bad parses → NaN), leaving the raw columns untouched.
* Applies physical bounds (°C): OK range: −20.0 to 50.0
* Tolerance band: [−20.5, −20.0) and (50.0, 60.5] → WARN_boundary
* Outside tolerance: < −20.5 or > 50.5 → CRIT_out_of_range
* Creates QC flag columns per probe: qc_st_range_0 and qc_st_range_1 with OK / WARN_boundary / CRIT_out_of_range.
* Adds rollups: qc_st_anycrit_* (True when CRIT), qc_st_anywarn_* (True when WARN and not CRIT).
* Builds clean series: ground_temp_{0|1}_clean
* CRIT → NaN
* WARN_boundary below/above limits → clamped to −20.0 / 50.0
* OK → value passes through unchanged
* Sorts deterministically by node, timestamp and saves a new versioned CSV: stage6a_soiltemp_range_v###.csv.
* Writes summaries: overall counts per flag and per-node counts to separate CSVs.
* Prints breadcrumbs: input used, time span, node list—so you can verify at a glance.

In [10]:
# =========================
# Step 6a: Soil temperature Range QC (no unit inference)
# GUARANTEES:
#  - Reads latest prior stage (prefers 6c→6b→6a→5PD→5R→4→3→2→1)
#  - Reconstructs timestamp from ts_ns (no free-text parsing)
#  - No row loss; preserves row_id
#  - Carries forward SM clean/QC from Step 5PD if present (so masks survive)
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob
import os, re

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core columns from Stage 1
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"     # in-memory datetime64[ns]
NODE_COL      = "node"
STATION_TYPE  = "station_type"

# Soil temperature columns
ST_COLS  = ["ground_temp_0", "ground_temp_1"]   # °C at ~35 cm
ST_CLEAN = {c: f"{c}_clean" for c in ST_COLS}

# Physical bounds
LOWER_OK, UPPER_OK   = -20.0, 50.0
LOWER_TOL, UPPER_TOL = -20.5, 50.5   # tolerance band → WARN_boundary

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

# ---- Load the latest available stage (prefer later) ----
def pick_latest_by_priority(patterns):
    for pat in patterns:
        files = sorted(glob(str(OUT_DIR / pat)))
        if files:
            return files[-1]
    return None

# ---- Carry-forward 5PD soil-moisture outputs (if present) ----
def carry_forward_sm_from_5pd(df_base: pd.DataFrame) -> pd.DataFrame:
    """
    If a stage5pd_sm_pairdiverge_v*.csv exists, bring its SM clean/QC columns
    into df_base (overwrite by row_id). This preserves 5PD masks through later steps.
    """
    hits = sorted(glob(str(OUT_DIR / "stage5pd_sm_pairdiverge_v*.csv")))
    if not hits:
        return df_base

    # prefer highest version number
    def _ver(path):
        m = re.search(r"_v(\d+)\.csv$", os.path.basename(path))
        return int(m.group(1)) if m else -1
    src = max(hits, key=_ver) if _ver(hits[-1]) >= 0 else hits[-1]

    want = [
        # Clean values (these carry the masks)
        "ground_humidity_0_clean","ground_humidity_1_clean",
        # 5PD divergence flags
        "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
        # Rollups and other SM QC families (safe no-op if missing)
        "qc_sm_anycrit_0","qc_sm_anycrit_1",
        "qc_sm_anywarn_0","qc_sm_anywarn_1",
        "qc_sm_range_0","qc_sm_range_1",
        "qc_sm_flatline_0","qc_sm_flatline_1",
        "qc_sm_known_broken_0","qc_sm_known_broken_1",
        "qc_sm_regime_0","qc_sm_regime_1",
    ]
    hdr = pd.read_csv(src, nrows=0, low_memory=False)
    have = [c for c in want if c in hdr.columns]
    if not have:
        return df_base

    add = pd.read_csv(src, usecols=[ROW_ID_COL] + have, low_memory=False)
    out = df_base.drop(columns=[c for c in have if c in df_base.columns], errors="ignore")
    out = out.merge(add, on=ROW_ID_COL, how="left")
    print(f"[carry] Brought {len(have)} SM cols from 5PD: {os.path.basename(src)}")
    return out

# ---- Choose input (prefer 5PD over 5R) ----
in_path = pick_latest_by_priority([
    "stage6c_soiltemp_step_v*.csv",
    "stage6b_soiltemp_flatline_v*.csv",
    "stage6a_soiltemp_range_v*.csv",
    "stage5pd_sm_pairdiverge_v*.csv",   # ← NEW: ensure we see 5PD outputs
    "stage5r_sm_regimes_v*.csv",
    "stage4_sm_flatline_v*.csv",
    "stage3_sm_knownbroken_v*.csv",
    "stage2_soilmoisture_range_v*.csv",
    "stage1_parsed_v*.csv",
])
assert in_path, f"No earlier stage files found in {OUT_DIR}."
print("Reading:", in_path)
df = pd.read_csv(in_path, low_memory=False)

# ---- Carry-forward SM masks/flags from 5PD (if present) ----
df = carry_forward_sm_from_5pd(df)

# ---- Sanity: required columns ----
required = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL, STATION_TYPE, *ST_COLS}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns for Stage 6a: {sorted(missing)}")

# Reconstruct timestamp from ts_ns (authoritative)
df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))

# Invariants
n_in = len(df)
assert df[ROW_ID_COL].is_unique, "row_id must be unique (from Stage 1)."

# Ensure numeric for temp columns (raw columns remain unchanged)
for c in ST_COLS:
    df[c] = pd.to_numeric(df[c], errors="coerce")

def temp_range_qc(df: pd.DataFrame, col: str):
    """
    Adds per-probe:
      - qc_st_range_<k> : 'OK' / 'WARN_boundary' / 'CRIT_out_of_range'
      - qc_st_anycrit_<k> (bool)
      - qc_st_anywarn_<k> (bool)
      - <col>_clean     : CRIT -> NaN; WARN_boundary clamped; OK passthrough
    """
    k = "0" if col.endswith("_0") else "1" if col.endswith("_1") else col

    v = df[col]
    qc = pd.Series("OK", index=v.index, dtype="object")

    warn_low  = v.ge(LOWER_TOL) & v.lt(LOWER_OK)    # [-20.5, -20.0)
    warn_high = v.gt(UPPER_OK) & v.le(UPPER_TOL)    # (50.0, 50.5]
    crit_low  = v.lt(LOWER_TOL)
    crit_high = v.gt(UPPER_TOL)

    qc.loc[warn_low | warn_high] = "WARN_boundary"
    qc.loc[crit_low | crit_high] = "CRIT_out_of_range"

    clean = v.copy()
    clean.loc[crit_low | crit_high] = np.nan
    clean.loc[warn_low]  = LOWER_OK
    clean.loc[warn_high] = UPPER_OK

    df[f"qc_st_range_{k}"]   = qc
    df[f"qc_st_anycrit_{k}"] = qc.eq("CRIT_out_of_range")
    df[f"qc_st_anywarn_{k}"] = qc.eq("WARN_boundary")
    df[f"{col}_clean"]       = clean

    return {
        "probe": col,
        "n_OK": int(qc.eq("OK").sum()),
        "n_WARN_boundary": int(qc.eq("WARN_boundary").sum()),
        "n_CRIT_out_of_range": int(qc.eq("CRIT_out_of_range").sum()),
    }

# Apply to both probes
summaries = [temp_range_qc(df, col) for col in ST_COLS]

# Deterministic sort & save
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)
out_main = next_versioned_csv(OUT_DIR, "stage6a_soiltemp_range")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# Summary CSVs
sum_df = pd.DataFrame(summaries)
sum_path = next_versioned_csv(OUT_DIR, "stage6a_soiltemp_range_counts_overall")
sum_df.to_csv(sum_path, index=False)
print("Wrote overall counts:", sum_path)
print("\nOverall temperature range QC:")
print(sum_df.to_string(index=False))

pernode = (
    df.melt(id_vars=[NODE_COL, TIMESTAMP_COL], value_vars=["qc_st_range_0", "qc_st_range_1"],
            var_name="which", value_name="flag")
      .groupby([NODE_COL, "which", "flag"]).size().rename("n").reset_index()
      .sort_values([NODE_COL, "which", "flag"])
)
pernode_path = next_versioned_csv(OUT_DIR, "stage6a_soiltemp_range_counts_pernode")
pernode.to_csv(pernode_path, index=False)
print("Wrote per-node counts:", pernode_path)

# Breadcrumbs
tmin = pd.to_datetime(df[TS_NS_COL].astype("int64")).min()
tmax = pd.to_datetime(df[TS_NS_COL].astype("int64")).max()
print("\nTime span (dataset, from ts_ns):", tmin, "→", tmax)
print("Rows in/out (should match input):", n_in, "→", len(df))
print("Nodes:", sorted(df[NODE_COL].dropna().unique().tolist()))


Reading: af_clean_v01/stage5pd_sm_pairdiverge_v001.csv
[carry] Brought 16 SM cols from 5PD: stage5pd_sm_pairdiverge_v001.csv
Wrote: af_clean_v01/stage6a_soiltemp_range_v001.csv
Wrote overall counts: af_clean_v01/stage6a_soiltemp_range_counts_overall_v001.csv

Overall temperature range QC:
        probe   n_OK  n_WARN_boundary  n_CRIT_out_of_range
ground_temp_0 455253              154                28380
ground_temp_1 467617               29                16141
Wrote per-node counts: af_clean_v01/stage6a_soiltemp_range_counts_pernode_v001.csv

Time span (dataset, from ts_ns): 2023-10-30 12:18:43 → 2025-09-18 10:29:42
Rows in/out (should match input): 483787 → 483787
Nodes: ['node_176', 'node_179', 'node_181', 'node_182', 'node_183', 'node_184', 'node_186', 'node_187', 'node_189']


**Stage 6b (Soil temperature — flatline detection)**
* Loaded the correct prior file: Picked the latest stage6a_soiltemp_range_v*.csv and rebuilt timestamp from ts_ns (no free-text parsing). Preserved row_id; verified no row loss.
* Prepared temperature series: Ensured numeric ground_temp_0/1 (raw). Used the already-created ground_temp_0_clean / ground_temp_1_clean for future masking.
* Estimated cadence per node: Computed a median sampling interval (minutes) for each node to translate “24 h” and “48 h” into the equivalent number of consecutive samples.
* Resolution-aware equality: In each node×probe, inferred the instrument’s numeric resolution (smallest positive step) and rounded values to that grid before comparing, so tiny floating-point jitter doesn’t break runs.
* Detected flatlines: Marked contiguous runs of identical (rounded) values and flagged per row:
* WARN_flatline if the run length ≥ 24 h but < 48 h,
* CRIT_flatline if the run length ≥ 48 h,
* otherwise OK.
* Updated rollups: For each probe (0/1), updated qc_st_anycrit_* and qc_st_anywarn_*:
* anycrit := previous anycrit OR new CRIT_flatline
* anywarn := (previous anywarn OR new WARN_flatline) but not rows that are now anycrit.
* Masked only where needed: Set ground_temp_*_clean = NaN only on rows flagged CRIT_flatline. WARN_flatline remains visible (no masking).
* Saved versioned outputs: Wrote the full table to stage6b_soiltemp_flatline_v###.csv.
* Produced summaries:
* Overall counts (OK / WARN_flatline / CRIT_flatline) per probe → stage6b_soiltemp_flatline_counts_overall_v###.csv.
* Per-node duration estimates (hours spent in each flatline state), computed from node-specific cadence → stage6b_soiltemp_flatline_counts_pernode_v###.csv.

In [11]:
# =========================
# Step 6b: Soil temperature — flatline detection (exact-repeat, resolution-aware)
# GUARANTEES:
#  - Reads latest Stage 6a (REQUIRED; needs *_clean columns)
#  - Reconstructs timestamp from ts_ns (no free-text parsing)
#  - No row loss; preserves row_id
#  - Flags: qc_st_flatline_{0|1} = OK / WARN_flatline / CRIT_flatline
#  - Rollups: qc_st_anycrit_{0|1}, qc_st_anywarn_{0|1} updated
#  - Masks: ground_temp_{0|1}_clean set to NaN where CRIT_flatline
#  - NEW: prefers 5PD in priority list and carries SM masks forward from 5PD
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob
import os, re, math

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core columns
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"   # in-memory datetime64[ns]
NODE_COL      = "node"

# Soil temperature
ST_COLS  = ["ground_temp_0", "ground_temp_1"]
ST_CLEAN = {c: f"{c}_clean" for c in ST_COLS}

# Thresholds (time-based)
WARN_HOURS = 24    # identical value for ≥ 24 h → WARN
CRIT_HOURS = 48    # identical value for ≥ 48 h → CRIT

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

def pick_latest_by_priority(patterns):
    """Return path from the FIRST pattern that has matches, taking its latest file."""
    for pat in patterns:
        files = sorted(glob(str(OUT_DIR / pat)))
        if files:
            return files[-1]
    return None

# ---- Carry-forward 5PD soil-moisture outputs (if present) ----
def carry_forward_sm_from_5pd(df_base: pd.DataFrame) -> pd.DataFrame:
    """
    If a stage5pd_sm_pairdiverge_v*.csv exists, bring its SM clean/QC columns
    into df_base (overwrite by row_id). This preserves 5PD masks through later steps.
    """
    hits = sorted(glob(str(OUT_DIR / "stage5pd_sm_pairdiverge_v*.csv")))
    if not hits:
        return df_base

    def _ver(path):
        m = re.search(r"_v(\d+)\.csv$", os.path.basename(path))
        return int(m.group(1)) if m else -1
    src = max(hits, key=_ver) if _ver(hits[-1]) >= 0 else hits[-1]

    want = [
        # Clean values (carry masks)
        "ground_humidity_0_clean","ground_humidity_1_clean",
        # 5PD divergence flags
        "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
        # Rollups / other SM QC (safe if missing)
        "qc_sm_anycrit_0","qc_sm_anycrit_1",
        "qc_sm_anywarn_0","qc_sm_anywarn_1",
        "qc_sm_range_0","qc_sm_range_1",
        "qc_sm_flatline_0","qc_sm_flatline_1",
        "qc_sm_known_broken_0","qc_sm_known_broken_1",
        "qc_sm_regime_0","qc_sm_regime_1",
    ]
    hdr = pd.read_csv(src, nrows=0, low_memory=False)
    have = [c for c in want if c in hdr.columns]
    if not have:
        return df_base

    add = pd.read_csv(src, usecols=[ROW_ID_COL] + have, low_memory=False)
    out = df_base.drop(columns=[c for c in have if c in df_base.columns], errors="ignore")
    out = out.merge(add, on=ROW_ID_COL, how="left")
    print(f"[carry] Brought {len(have)} SM cols from 5PD: {os.path.basename(src)}")
    return out

# ---- Load latest (prefer 6b→6a→5PD→5R→4→3→2→1) ----
in_path = pick_latest_by_priority([
    "stage6b_soiltemp_flatline_v*.csv",  # if re-running
    "stage6a_soiltemp_range_v*.csv",     # REQUIRED normally (provides *_clean)
    "stage5pd_sm_pairdiverge_v*.csv",    # NEW: ensure 5PD is seen if 6a missing
    "stage5r_sm_regimes_v*.csv",
    "stage4_sm_flatline_v*.csv",
    "stage3_sm_knownbroken_v*.csv",
    "stage2_soilmoisture_range_v*.csv",
    "stage1_parsed_v*.csv",
])
assert in_path is not None, "No earlier stage files found."
print("Reading:", in_path)
df = pd.read_csv(in_path, low_memory=False)

# ---- Carry-forward SM masks/flags from 5PD (if present) ----
df = carry_forward_sm_from_5pd(df)

# ---- Required columns ----
required = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL, *ST_COLS, *ST_CLEAN.values()}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns for Stage 6b: {sorted(missing)}")

# Reconstruct timestamp from ts_ns (authoritative)
df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))

# Invariants
n_in = len(df)
assert df[ROW_ID_COL].is_unique, "row_id should be unique from Stage 1."

# Ensure numeric for BOTH raw and clean
for c in ST_COLS:
    df[c] = pd.to_numeric(df[c], errors="coerce")
    df[ST_CLEAN[c]] = pd.to_numeric(df[ST_CLEAN[c]], errors="coerce")

# ---- Helpers ----
def infer_resolution(x: pd.Series) -> float:
    """Infer numeric resolution from unique positive deltas; clamp to [1e-3, 1.0]."""
    vals = np.sort(x.dropna().unique())
    if len(vals) < 2:
        return 0.01
    diffs = np.diff(vals)
    pos = diffs[diffs > 0]
    res = float(np.nanmin(pos)) if len(pos) else 0.01
    return max(1e-3, min(res, 1.0))

def decimals_from_resolution(res: float) -> int:
    if res <= 0:
        return 2
    dec = max(0, int(round(-np.log10(res))))
    return min(dec, 6)

def median_sampling_minutes(ts: pd.Series) -> float:
    """Estimate typical sampling interval (minutes) for the node."""
    if ts.size < 2:
        return np.nan
    d = (ts.values[1:] - ts.values[:-1]).astype("timedelta64[s]").astype(float)
    med = np.nanmedian(d) if d.size else np.nan
    if not (med > 0):
        return np.nan
    return float(med) / 60.0

def flag_flatlines_for_probe(g: pd.DataFrame, clean_col: str, suffix: str) -> pd.Series:
    """
    Return per-row 'OK' / 'WARN_flatline' / 'CRIT_flatline' for one node+probe group,
    operating on the CLEAN column.
    """
    dt_min = median_sampling_minutes(g[TIMESTAMP_COL].reset_index(drop=True))
    if not (dt_min > 0):
        dt_min = 10.0

    warn_needed = max(2, int(math.ceil((WARN_HOURS * 60.0) / dt_min)))
    crit_needed = max(2, int(math.ceil((CRIT_HOURS * 60.0) / dt_min)))

    # Infer resolution from CLEAN values and quantize before equality test
    vals = pd.to_numeric(g[clean_col], errors="coerce")
    res = infer_resolution(vals)
    dec = decimals_from_resolution(res)

    v = vals.to_numpy(dtype=float)
    v_rounded = np.round(v / res) * res
    v_rounded = np.round(v_rounded, dec)

    same_as_prev = np.r_[True, v_rounded[1:] == v_rounded[:-1]]
    run_id = np.cumsum(~same_as_prev)
    counts = np.bincount(run_id)
    run_len = counts[run_id]

    flag = np.full(len(g), "OK", dtype=object)
    warn_mask = (run_len >= warn_needed) & (run_len < crit_needed)
    crit_mask = (run_len >= crit_needed)
    flag[warn_mask] = "WARN_flatline"
    flag[crit_mask] = "CRIT_flatline"

    return pd.Series(flag, index=g.index, name=f"qc_st_flatline_{suffix}")

# ---- Apply flatline detection per node for each probe (use *_clean) ----
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True)

for col in ST_COLS:
    suffix    = "0" if col.endswith("_0") else "1"
    clean_col = ST_CLEAN[col]
    flag_name = f"qc_st_flatline_{suffix}"
    df[flag_name] = "OK"
    flags = (
        df.groupby(NODE_COL, group_keys=False)[[TIMESTAMP_COL, clean_col]]
          .apply(lambda g: flag_flatlines_for_probe(g, clean_col, suffix))
    )
    df.loc[flags.index, flag_name] = flags.values

# ---- Update rollups and clean series ----
for col in ST_COLS:
    suffix      = "0" if col.endswith("_0") else "1"
    flat_col    = f"qc_st_flatline_{suffix}"
    anycrit_col = f"qc_st_anycrit_{suffix}"
    anywarn_col = f"qc_st_anywarn_{suffix}"
    clean_col   = ST_CLEAN[col]

    if anycrit_col not in df.columns:
        df[anycrit_col] = False
    if anywarn_col not in df.columns:
        df[anywarn_col] = False

    prev_anycrit = df[anycrit_col].astype(bool)
    prev_anywarn = df[anywarn_col].astype(bool)

    new_crit = df[flat_col].eq("CRIT_flatline")
    new_warn = df[flat_col].eq("WARN_flatline")

    updated_anycrit = prev_anycrit | new_crit
    updated_anywarn = (prev_anywarn | new_warn) & (~updated_anycrit)

    df[anycrit_col] = updated_anycrit
    df[anywarn_col] = updated_anywarn

    # Mask CRIT flatlines in the clean series (WARN stays visible)
    df.loc[new_crit, clean_col] = np.nan

# ---- Deterministic ordering & save ----
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)

out_main = next_versioned_csv(OUT_DIR, "stage6b_soiltemp_flatline")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# ---- Summaries ----
def summarize_flags(df, suffix):
    col = f"qc_st_flatline_{suffix}"
    return (df[col].value_counts(dropna=False)
              .rename_axis("flag").reset_index(name=f"n_rows_{suffix}"))

sum0 = summarize_flags(df, "0")
sum1 = summarize_flags(df, "1")
summary = sum0.merge(sum1, on="flag", how="outer").fillna(0).sort_values("flag")
sum_path = next_versioned_csv(OUT_DIR, "stage6b_soiltemp_flatline_counts_overall")
summary.to_csv(sum_path, index=False)
print("Wrote overall counts:", sum_path)
print("\nOverall soil-temp flatline counts:")
print(summary.to_string(index=False))

# Per-node duration estimate (hours), using median dt per node
def pernode_duration_estimate(df, probe_col, flat_col):
    med_dt = (df.groupby(NODE_COL)[TIMESTAMP_COL]
                .apply(lambda s: median_sampling_minutes(s.sort_values()))
                .rename("dt_min"))
    tmp = df[[NODE_COL, flat_col]].copy()
    tmp["n"] = 1
    tmp = tmp.groupby([NODE_COL, flat_col])["n"].sum().reset_index()
    tmp = tmp.merge(med_dt, on=NODE_COL, how="left")
    tmp["hours"] = (tmp["n"] * tmp["dt_min"]) / 60.0
    tmp["probe"] = probe_col
    return tmp[[NODE_COL, "probe", flat_col, "n", "hours"]]

pernode0 = pernode_duration_estimate(df, "probe0", "qc_st_flatline_0")
pernode1 = pernode_duration_estimate(df, "probe1", "qc_st_flatline_1")
pernode = pd.concat([pernode0, pernode1], ignore_index=True)
pernode_path = next_versioned_csv(OUT_DIR, "stage6b_soiltemp_flatline_counts_pernode")
pernode.to_csv(pernode_path, index=False)
print("Wrote per-node duration estimates:", pernode_path)

# Breadcrumbs
tmin = pd.to_datetime(df[TS_NS_COL].astype("int64")).min()
tmax = pd.to_datetime(df[TS_NS_COL].astype("int64")).max()
print("\nTime span (dataset, from ts_ns):", tmin, "→", tmax)
print("Rows in/out (should match input):", n_in, "→", len(df))
print("Nodes:", sorted(df[NODE_COL].dropna().unique().tolist()))


Reading: af_clean_v01/stage6a_soiltemp_range_v001.csv
[carry] Brought 16 SM cols from 5PD: stage5pd_sm_pairdiverge_v001.csv
Wrote: af_clean_v01/stage6b_soiltemp_flatline_v001.csv
Wrote overall counts: af_clean_v01/stage6b_soiltemp_flatline_counts_overall_v001.csv

Overall soil-temp flatline counts:
flag  n_rows_0  n_rows_1
  OK    483787    483787
Wrote per-node duration estimates: af_clean_v01/stage6b_soiltemp_flatline_counts_pernode_v001.csv

Time span (dataset, from ts_ns): 2023-10-30 12:18:43 → 2025-09-18 10:29:42
Rows in/out (should match input): 483787 → 483787
Nodes: ['node_176', 'node_179', 'node_181', 'node_182', 'node_183', 'node_184', 'node_186', 'node_187', 'node_189']


**Step 6PD — Soil Temperature Pair-Divergence**
* Goal: Detect and mask soil-temperature spikes/divergences (both probes) using sibling/neighbor references and self-based checks if refs are weak/missing.
* Inputs: Latest earlier-stage CSV (auto-picked) + optional neighbors_manual.csv; expects ground_temp_{0,1} (and _clean if present) with timestamps.
* Reference choice (per node, per probe):
* Prefer sibling probe when it’s present and not previously CRIT.
* Else build a statistical neighbor composite (robust median/IQR scaling, Spearman screen, median combine), require min neighbors + corr gate.
* Anomaly signals (any one can trigger):
* Ref z+gap: big residual vs reference (stricter for sibling; slightly looser for neighbors).
* Spikes: fast per-sample jumps + large gap (uses ref gap, falls back to self gap).
* Self anomalies: large z and gap vs own rolling median, or large rate vs self.
* Cross-depth conflict: unusual ST0–ST1 difference; votes the farther probe as outlier.
* Robust stats: Causal 7-day rolling median + MAD-σ; duplicate timestamps collapsed by median; zeros in σ guarded with small EPS.
* Consolidation: Require consecutive runs (edge-relaxed), add a window cover rule, then fill tiny gaps and dilate slightly to avoid pepper noise.
* Index safety: Keeps a DatetimeIndex through all steps so masks align with rows (prevents silent all-False).
* Mask & flags: Write CRIT_pair_diverge to qc_st_pairdiverge_{0,1}, mask ground_temp_{0,1}_clean, and update qc_st_anycrit_{0,1}; store provenance (src, nref, corr).
* *Outputs*: Versioned CSV af_clean_v01/stage6pd_soiltemp_pairdiverge_vNNN.csv + per-probe masked share summary and gate settings for audit.

In [32]:
# =========================
# Step 6PD — Soil Temperature Pair-Divergence (AGGRESSIVE + self fallback, index-safe)
# =========================
# Masks spikes/divergences even when refs are missing by using self-based anomaly checks.
# Maintains DatetimeIndex across all steps to avoid silent "all False" outcomes.

import os, re, glob
import numpy as np
import pandas as pd
from pathlib import Path

OUT_DIR  = Path("af_clean_v01"); OUT_DIR.mkdir(parents=True, exist_ok=True)
OUT_STEM = "stage6pd_soiltemp_pairdiverge_v"

IN_SEARCH = [
    "af_clean_v01/stage6b_soiltemp_flatline_v*.csv",
    "af_clean_v01/stage6a_soiltemp_range_v*.csv",
    "af_clean_v01/stage5pd_sm_pairdiverge_v*.csv",
    "af_clean_v01/stage5r_sm_regimes_v*.csv",
    "af_clean_v01/stage4_sm_flatline_v*.csv",
    "af_clean_v01/stage3_sm_knownbroken_v*.csv",
    "af_clean_v01/stage2_soilmoisture_range_v*.csv",
    "af_clean_v01/stage1_parsed_v*.csv",
    "analysis_ready_v*.csv",
]

# -------- Aggressive gates (tuned to "mask most of it") --------
ROLL_WIN, ROLL_MINP, ROLL_CENTER = "7D", 12, False
EPS_SIG = 1e-6

# z+gap vs sibling / neighbor
Z_SIB, GAP_SIB = 3.5, 1.6
Z_NEI, GAP_NEI = 4.5, 2.0

# spike (fast bursts)
SPIKE_RATE = 2.5       # °C per sample
SPIKE_GAP  = 2.0       # °C vs ref (or self baseline if ref missing)

# cross-depth (difference between ST0 and ST1)
Z_DIFF, GAP_DIFF = 3.5, 1.2

# self-based anomalies (fallback path when refs are weak/missing)
SELF_Z    = 3.0        # z vs own rolling median
SELF_GAP  = 1.8        # °C vs own rolling median
SELF_RATE = 2.2        # °C per sample

# consolidation
MIN_RUN     = 6
EDGE_RELAX  = 0.3
WIN_SIZE    = 24
WIN_COVER   = 0.25
FILL_GAP    = 4
DILATE_STEPS= 1

NEIGH_CORR_GATE, USE_NEIGH_CLEAN = 0.4, True

# -------- helpers (index-preserving) --------
def _pick_latest():
    for pat in IN_SEARCH:
        hits = sorted(glob.glob(pat))
        if hits:
            def _ver(p):
                m = re.search(r"_v(\d+)\.csv$", os.path.basename(p))
                return int(m.group(1)) if m else -1
            best = max(hits, key=_ver)
            return best if _ver(best) >= 0 else hits[-1]
    raise FileNotFoundError("No earlier stage file found for 6PD.")

def _next_versioned(stem):
    i = 1
    while True:
        p = OUT_DIR / f"{stem}{i:03d}.csv"
        if not p.exists(): return p
        i += 1

def _to_dt(df):
    if "timestamp" in df.columns:     return pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
    if "timestamp_iso" in df.columns: return pd.to_datetime(df["timestamp_iso"], utc=True, errors="coerce")
    return pd.to_datetime(pd.to_numeric(df.get("ts_ns", np.nan), errors="coerce").astype("Int64"), utc=True, errors="coerce")

def _dup_med(series):
    s = pd.Series(series).dropna()
    if s.index.duplicated().any():
        s = s.groupby(level=0).median(numeric_only=True)
    return s.sort_index()

def _roll_med(s):
    s = pd.Series(s)
    return s.rolling(ROLL_WIN, min_periods=ROLL_MINP, center=ROLL_CENTER).median()

def _roll_mad_sigma(s):
    s = pd.Series(s)
    dev = (s - _roll_med(s)).abs()
    mad = dev.rolling(ROLL_WIN, min_periods=ROLL_MINP, center=ROLL_CENTER).median()
    sig = 1.4826 * mad
    return sig.replace(0, np.nan)

def _as_bool(x):
    s = pd.Series(x)
    if s.dtype != bool:
        s = s.astype("boolean").fillna(False).astype(bool)
    return s

def _contig(mask, min_len=MIN_RUN, edge_relax=EDGE_RELAX):
    m = _as_bool(mask)
    idx = m.index; v = m.values; n = len(v)
    out = np.zeros(n, dtype=bool); i = 0
    while i < n:
        if v[i]:
            j = i + 1
            while j < n and v[j]: j += 1
            seg = j - i
            need = min_len if (i>0 and j<n) else int(np.ceil(edge_relax * min_len))
            if seg >= need: out[i:j] = True
            i = j
        else:
            i += 1
    return pd.Series(out, index=idx)

def _window(mask, win=WIN_SIZE, cover=WIN_COVER):
    m = _as_bool(mask)
    hits = m.astype(int).rolling(win, min_periods=max(2, win//4)).sum()
    res = (hits >= max(1, int(np.ceil(cover * win))))
    return res.reindex(m.index).fillna(False).astype(bool)

def _fill_gaps(mask, max_gap=FILL_GAP):
    m = _as_bool(mask); idx = m.index
    v = m.values; out = v.copy()
    n = len(v); i = 0
    while i < n:
        if v[i]:
            j = i + 1
            while j < n and v[j]: j += 1
            k = j
            while k < n and (not v[k]) and (k - j) <= max_gap: k += 1
            if k < n and v[k] and (k - j) <= max_gap: out[j:k] = True; i = k
            else: i = j
        else:
            i += 1
    return pd.Series(out, index=idx)

def _dilate(mask, steps=DILATE_STEPS):
    m = _as_bool(mask); idx = m.index
    v = m.values
    if steps <= 0: return m
    out = v.copy()
    for _ in range(steps):
        out = np.logical_or(out, np.r_[False, out[:-1]] | np.r_[out[1:], False])
    return pd.Series(out, index=idx)

# -------- neighbors --------
NEIGH_PATH = "neighbors_manual.csv"
def _load_neigh():
    try:
        nm = pd.read_csv(NEIGH_PATH)
    except FileNotFoundError:
        return None, False
    nm.columns = [c.strip().lower() for c in nm.columns]
    have_st = {"node_id","st0_neighbors","st1_neighbors"}.issubset(nm.columns)
    have_sm = {"node_id","sm0_neighbors","sm1_neighbors"}.issubset(nm.columns)
    if not have_st and not have_sm:
        raise ValueError("neighbors_manual.csv needs st0/st1 or sm0/sm1 neighbor lists.")
    return nm, have_st

def _parse_ids(s):
    if pd.isna(s): return []
    return [x.strip() for x in re.split(r"[|,;]+", str(s)) if x.strip()]

def _series_for(df_all, node_col, nid, probe, use_clean=True):
    sub = df_all[df_all[node_col].astype(str) == str(nid)]
    if sub.empty: return pd.Series(dtype="float64")
    idx = pd.to_datetime(sub["__t"], utc=True, errors="coerce")
    base = f"ground_temp_{probe}"
    col  = base + ("_clean" if use_clean and f"{base}_clean" in sub.columns else "")
    vals = pd.to_numeric(sub[col], errors="coerce")
    ser  = pd.Series(vals.values, index=idx).dropna()
    if ser.index.duplicated().any():
        ser = ser.groupby(level=0).median(numeric_only=True)
    return ser.sort_index()

def _robust_center_scale(s, horizon_days=90):
    if s.empty: return (np.nan, np.nan)
    tmax = s.index.max()
    recent = s[s.index >= (tmax - pd.Timedelta(days=horizon_days))]
    base = recent if recent.notna().sum() >= 48 else s
    med = base.median()
    iqr = base.quantile(0.75) - base.quantile(0.25)
    if not np.isfinite(iqr) or iqr == 0: iqr = np.nan
    return med, iqr

def build_neighbor_ref(df_all, node_col, target_node, probe, t_index, neigh_tuple,
                       prefer_st=True, use_clean=USE_NEIGH_CLEAN, min_neighbors=2, corr_gate=NEIGH_CORR_GATE):
    if neigh_tuple[0] is None: return None, "none", "low", 0, np.nan
    nm, have_st = neigh_tuple
    row = nm[nm["node_id"].astype(str) == str(target_node)]
    if row.empty: return None, "none", "low", 0, np.nan
    field = ("st0_neighbors" if probe=="0" else "st1_neighbors") if (prefer_st and have_st) \
            else ("sm0_neighbors" if probe=="0" else "sm1_neighbors")
    neigh_ids = _parse_ids(row.iloc[0].get(field, np.nan))
    if not neigh_ids: return None, "none", "low", 0, np.nan

    tgt = _series_for(df_all, node_col, target_node, probe, use_clean=False).reindex(t_index)
    if tgt.notna().sum() < 80: return None, "none", "low", 0, np.nan

    t_med, t_iqr = _robust_center_scale(tgt)
    if not np.isfinite(t_iqr): return None, "none", "low", 0, np.nan

    z_cols = {}
    for nid in neigh_ids:
        s = _series_for(df_all, node_col, nid, probe, use_clean=use_clean).reindex(t_index)
        if s.notna().sum() < 80: continue
        n_med, n_iqr = _robust_center_scale(s)
        if not np.isfinite(n_iqr): continue
        z = (s - n_med) / n_iqr
        ok = tgt.notna() & z.notna()
        if ok.sum() < 60: continue
        corr = pd.Series(tgt[ok]).corr(pd.Series(z[ok]), method="spearman")
        if not np.isfinite(corr) or corr < 0.2: continue
        z_cols[nid] = z

    if len(z_cols) < min_neighbors: return None, "none", "low", len(z_cols), np.nan

    Z = pd.DataFrame(z_cols, index=t_index)
    z_ref = Z.median(axis=1, skipna=True)
    ref   = t_med + z_ref * t_iqr

    ok = tgt.notna() & ref.notna()
    roll_corr = (pd.Series(tgt[ok]).rolling(48, min_periods=24).corr(pd.Series(ref[ok]))).median()
    conf = "med" if (np.isfinite(roll_corr) and roll_corr >= corr_gate) else "low"
    return ref, "statistical", conf, len(z_cols), roll_corr

# -------- load input --------
src = _pick_latest()
df  = pd.read_csv(src, low_memory=False)
print(f"[6PD-ST AGG] Input: {src} | rows={len(df):,}")

df["__t"] = _to_dt(df)
df = df.dropna(subset=["__t"]).copy()

node_col = None
for c in ["node_id","node","station","device","site"]:
    if c in df.columns: node_col = c; break
if node_col is None:
    df["__node"] = "__all__"; node_col = "__node"

# ensure *_clean exist
for suf in ("0","1"):
    base = f"ground_temp_{suf}"
    if f"{base}_clean" not in df.columns:
        df[f"{base}_clean"] = pd.to_numeric(df.get(base, np.nan), errors="coerce")

neigh_tuple = _load_neigh()

# init outputs
for suf in ("0","1"):
    df[f"qc_st_pairdiverge_{suf}"]     = "OK"
    df[f"qc_st_pairdiverge_src_{suf}"] = ""
    df[f"qc_st_pairdiverge_nref_{suf}"]= 0
    df[f"qc_st_pairdiverge_corr_{suf}"]= np.nan
    anyc = f"qc_st_anycrit_{suf}"
    df[anyc] = _as_bool(df[anyc]) if anyc in df.columns else False

masked, raw = {"0":0,"1":0}, {"0":0,"1":0}

# -------- per-node pass --------
for node, g in df.sort_values([node_col, "__t"]).groupby(df[node_col].astype(str), sort=False):
    t_rows = pd.to_datetime(g["__t"])
    t_idx  = pd.Index(t_rows.sort_values().unique())

    # raw series
    t0r = _dup_med(pd.Series(pd.to_numeric(g.get("ground_temp_0"), errors="coerce").values, index=t_rows)).reindex(t_idx)
    t1r = _dup_med(pd.Series(pd.to_numeric(g.get("ground_temp_1"), errors="coerce").values, index=t_rows)).reindex(t_idx)
    if (t0r.notna().sum() < 50) and (t1r.notna().sum() < 50):
        continue

    # don't trust sibling if earlier CRIT
    def _pre_anycrit(sub, suf):
        idx = pd.to_datetime(sub["__t"])
        def f(col):
            if col not in sub.columns: return pd.Series(False, index=idx)
            s = sub[col].astype(str)
            return pd.Series(s.str.startswith("CRIT", na=False).values, index=idx)
        cols = [f"qc_st_range_{suf}", f"qc_st_flatline_{suf}", f"qc_st_known_broken_{suf}"]
        m = pd.Series(False, index=idx)
        for c in cols: m = m | f(c)
        if m.index.duplicated().any(): m = m.groupby(level=0).max()
        return m

    pre0 = _pre_anycrit(g, "0").reindex(t_idx).fillna(False)
    pre1 = _pre_anycrit(g, "1").reindex(t_idx).fillna(False)
    sib_ok_for_0 = t1r.notna() & (~pre1)
    sib_ok_for_1 = t0r.notna() & (~pre0)

    # neighbors
    ref0_nei, _, conf0, nref0, rcorr0 = build_neighbor_ref(df, node_col, node, "0", t_idx, neigh_tuple, prefer_st=True)
    ref1_nei, _, conf1, nref1, rcorr1 = build_neighbor_ref(df, node_col, node, "1", t_idx, neigh_tuple, prefer_st=True)
    ref0_nei = ref0_nei if isinstance(ref0_nei, pd.Series) else pd.Series(np.nan, index=t_idx)
    ref1_nei = ref1_nei if isinstance(ref1_nei, pd.Series) else pd.Series(np.nan, index=t_idx)

    # preferred refs
    ref0 = t1r.where(sib_ok_for_0).combine_first(ref0_nei)
    ref1 = t0r.where(sib_ok_for_1).combine_first(ref1_nei)
    src0_series = pd.Series(np.where(sib_ok_for_0, "sibling", np.where(ref0_nei.notna(), "statistical", "none")), index=t_idx)
    src1_series = pd.Series(np.where(sib_ok_for_1, "sibling", np.where(ref1_nei.notna(), "statistical", "none")), index=t_idx)

    # diffs vs ref
    d0 = (t0r - ref0)
    d1 = (t1r - ref1)

    # robust z+gap vs ref (fill sig NaNs; keep med NaNs)
    med0 = _roll_med(d0); sig0 = _roll_mad_sigma(d0).fillna(EPS_SIG)
    med1 = _roll_med(d1); sig1 = _roll_mad_sigma(d1).fillna(EPS_SIG)
    z0   = (d0 - med0).abs()/sig0; gap0 = (d0 - med0).abs()
    z1   = (d1 - med1).abs()/sig1; gap1 = (d1 - med1).abs()

    base0 = ((src0_series.eq("sibling")     & (z0 >= Z_SIB) & (gap0 >= GAP_SIB)) |
             (src0_series.eq("statistical") & (conf0 == "med") & (z0 >= Z_NEI) & (gap0 >= GAP_NEI)))
    base1 = ((src1_series.eq("sibling")     & (z1 >= Z_SIB) & (gap1 >= GAP_SIB)) |
             (src1_series.eq("statistical") & (conf1 == "med") & (z1 >= Z_NEI) & (gap1 >= GAP_NEI)))

    base0 = _as_bool(base0); base1 = _as_bool(base1)

    # own baseline (self) stats
    self_med0 = _roll_med(t0r); self_sig0 = _roll_mad_sigma(t0r).fillna(EPS_SIG)
    self_med1 = _roll_med(t1r); self_sig1 = _roll_mad_sigma(t1r).fillna(EPS_SIG)
    self_gap0 = (t0r - self_med0).abs();  self_z0 = self_gap0 / self_sig0
    self_gap1 = (t1r - self_med1).abs();  self_z1 = self_gap1 / self_sig1
    r0 = pd.Series(t0r).diff().abs();     r1 = pd.Series(t1r).diff().abs()

    # spike: prefer ref gap; if ref missing, fall back to self gap
    spike_gap0 = (d0.abs()).fillna(self_gap0)
    spike_gap1 = (d1.abs()).fillna(self_gap1)
    spike0 = _as_bool((r0 >= SPIKE_RATE) & (spike_gap0 >= SPIKE_GAP))
    spike1 = _as_bool((r1 >= SPIKE_RATE) & (spike_gap1 >= SPIKE_GAP))

    # strong self anomalies (even if ref missing)
    self_anom0 = _as_bool((self_z0 >= SELF_Z) & (self_gap0 >= SELF_GAP) | (r0 >= SELF_RATE) & (self_gap0 >= SELF_GAP))
    self_anom1 = _as_bool((self_z1 >= SELF_Z) & (self_gap1 >= SELF_GAP) | (r1 >= SELF_RATE) & (self_gap1 >= SELF_GAP))

    # cross-depth: assign outlier to the farther probe
    diff01 = t0r - t1r
    mD = _roll_med(diff01); sD = _roll_mad_sigma(diff01).fillna(EPS_SIG)
    bigD = _as_bool(((diff01 - mD).abs()/sD >= Z_DIFF) & ((diff01 - mD).abs() >= GAP_DIFF))
    vote0 = _as_bool(bigD & (spike_gap0 >= spike_gap1))
    vote1 = _as_bool(bigD & (spike_gap1 >  spike_gap0))

    # AGGRESSIVE OR: base OR spike OR self_anom OR cross-depth
    cand0 = _as_bool(base0 | spike0 | self_anom0 | vote0)
    cand1 = _as_bool(base1 | spike1 | self_anom1 | vote1)

    # consolidate
    crit0 = _contig(cand0);   crit1 = _contig(cand1)
    win0  = _window(cand0);   win1  = _window(cand1)
    final0 = _dilate(_fill_gaps(crit0 | win0, FILL_GAP), DILATE_STEPS)
    final1 = _dilate(_fill_gaps(crit1 | win1, FILL_GAP), DILATE_STEPS)

    # map to rows (DatetimeIndex preserved)
    m0_rows = final0.reindex(t_rows).fillna(False).to_numpy()
    m1_rows = final1.reindex(t_rows).fillna(False).to_numpy()
    idx_rows = g.index

    # write masks + flags
    df.loc[idx_rows, "qc_st_pairdiverge_0"] = np.where(m0_rows, "CRIT_pair_diverge", df.loc[idx_rows, "qc_st_pairdiverge_0"])
    df.loc[idx_rows, "qc_st_pairdiverge_1"] = np.where(m1_rows, "CRIT_pair_diverge", df.loc[idx_rows, "qc_st_pairdiverge_1"])
    df.loc[idx_rows, "ground_temp_0_clean"] = pd.to_numeric(df.loc[idx_rows, "ground_temp_0_clean"], errors="coerce").mask(m0_rows)
    df.loc[idx_rows, "ground_temp_1_clean"] = pd.to_numeric(df.loc[idx_rows, "ground_temp_1_clean"], errors="coerce").mask(m1_rows)

    df.loc[idx_rows, "qc_st_anycrit_0"] = _as_bool(df.loc[idx_rows, "qc_st_anycrit_0"] | m0_rows)
    df.loc[idx_rows, "qc_st_anycrit_1"] = _as_bool(df.loc[idx_rows, "qc_st_anycrit_1"] | m1_rows)

    # lightweight per-node debug summary
    n0, n1 = int(cand0.sum()), int(cand1.sum())
    f0, f1 = int(final0.sum()), int(final1.sum())
    print(f"[node {node}] cand0={n0} final0={f0} | cand1={n1} final1={f1}")

    masked["0"] += int(m0_rows.sum()); raw["0"] += int(pd.to_numeric(g.get("ground_temp_0"), errors="coerce").notna().sum())
    masked["1"] += int(m1_rows.sum()); raw["1"] += int(pd.to_numeric(g.get("ground_temp_1"), errors="coerce").notna().sum())

# -------- save --------
out = _next_versioned(OUT_STEM)
df.to_csv(out, index=False)
print(f"[6PD-ST AGG] wrote: {out}")
for k in ("0","1"):
    print(f"[6PD-ST AGG] probe{k}: masked {masked[k]}/{raw[k]} = {masked[k]/max(raw[k],1):.2%}")
print("[6PD-ST AGG] Gates:",
      f"Z_SIB={Z_SIB}, GAP_SIB={GAP_SIB}; Z_NEI={Z_NEI}, GAP_NEI={GAP_NEI};",
      f"SPIKE_RATE={SPIKE_RATE}, SPIKE_GAP={SPIKE_GAP}; SELF_Z={SELF_Z}, SELF_GAP={SELF_GAP}, SELF_RATE={SELF_RATE};",
      f"Z_DIFF={Z_DIFF}, GAP_DIFF={GAP_DIFF}; MIN_RUN={MIN_RUN}, WIN_COVER={WIN_COVER}, FILL_GAP={FILL_GAP}, DILATE_STEPS={DILATE_STEPS}")


[6PD-ST AGG] Input: af_clean_v01/stage6b_soiltemp_flatline_v001.csv | rows=483,787
[node node_176] cand0=1381 final0=1772 | cand1=1799 final1=2170
[node node_179] cand0=2668 final0=3456 | cand1=1657 final1=2119
[node node_181] cand0=2256 final0=3086 | cand1=3261 final1=4633
[node node_182] cand0=1456 final0=1962 | cand1=1281 final1=1850
[node node_183] cand0=1172 final0=1488 | cand1=1228 final1=1581
[node node_184] cand0=6919 final0=11511 | cand1=961 final1=1207
[node node_186] cand0=1323 final0=1719 | cand1=2924 final1=3635
[node node_187] cand0=8048 final0=11817 | cand1=1297 final1=1741
[node node_189] cand0=3104 final0=3765 | cand1=2869 final1=2691
[6PD-ST AGG] wrote: af_clean_v01/stage6pd_soiltemp_pairdiverge_v008.csv
[6PD-ST AGG] probe0: masked 40576/483787 = 8.39%
[6PD-ST AGG] probe1: masked 21627/483787 = 4.47%
[6PD-ST AGG] Gates: Z_SIB=3.5, GAP_SIB=1.6; Z_NEI=4.5, GAP_NEI=2.0; SPIKE_RATE=2.5, SPIKE_GAP=2.0; SELF_Z=3.0, SELF_GAP=1.8, SELF_RATE=2.2; Z_DIFF=3.5, GAP_DIFF=1.2; MIN_

**Step 7 (Air QC (self-comparison for T/RH, farm-wide for p))**
* Loads latest table (prefers Stage 6c/7/8 as before), rebuilds timestamp from ts_ns.
* Air temperature & humidity (per node):
* For each node, compute a 24-hour rolling median baseline. * Flag a row as an outlier when its absolute deviation from that baseline exceeds a strict absolute threshold AND a robust relative threshold (k×rolling MAD, with fallback). Promote to WARN_outlier only if this condition is sustained ≥ 60 minutes continuously for that node.
* Pressure (farm-wide): Compare each row to the same-time farm median; flag WARN_outlier if deviation ≥ 4 hPa is sustained ≥ 30 minutes.
* No masking. Only adds: qc_air_selfcheck_temp, qc_air_selfcheck_hum, qc_air_crosscheck_pres ∈ {OK, WARN_outlier}.
* Saves a versioned file stage7r_air_selfqc_v###.csv + summaries.

In [34]:
# ============== Step 7a (hardened): Air Temperature QC + masking ==============
# - Picks an input that REALLY has 'temperature' (skips *counts/*summary/*audit*)
# - WARN if T in [-25, -20) or (45, 50]; CRIT if T < -25 or T > 50; missing -> CRIT_missing
# - Flatline: ≥12 identical consecutive samples per node -> WARN_flatline
# - Time: use timestamp/timestamp_iso/ts_ns if present; else backfill via row_id from a donor; else synthesize.
# - NEW: carry forward soil MOISTURE (5PD) and soil TEMP (6PD) clean/flags so masks persist.

import os, glob, re
import numpy as np
import pandas as pd

OUT_DIR, OUT_STEM = "af_clean_v01", "stage7a_air_temp_qc_v"
MIN_FLAT_RUN = 12  # 12 consecutive identical samples

# Prefer later stages but FILTER to files that actually contain 'temperature'
CANDIDATE_PATTERNS = [
    # anything after air QC that might still contain temperature
    "af_clean_v01/stage8_*v*.csv",
    # air chain
    "af_clean_v01/stage7c_air_pres_qc_v*.csv",
    "af_clean_v01/stage7b_air_hum_qc_v*.csv",
    # step 6 soil-temp (now includes all earlier cols in your pipeline)
    "af_clean_v01/stage6pd_soiltemp_pairdiverge_v*.csv",  # ← NEW: prefer 6PD outputs if they carry temperature
    "af_clean_v01/stage6c_*v*.csv",
    "af_clean_v01/stage6b_*v*.csv",
    "af_clean_v01/stage6a_*v*.csv",
    # soil moisture stages, may still carry 'temperature' in many dumps
    "af_clean_v01/stage5pd_sm_pairdiverge_v*.csv",
    "af_clean_v01/stage5r_sm_regimes_v*.csv",
    # fallbacks
    "af_clean_v01/stage1_parsed_v*.csv",
    "af_clean_v01/analysis_ready_v*.csv",
    "analysis_ready_v*.csv",
]
DONOR_PATTERNS = [
    "af_clean_v01/stage8_*v*.csv",
    "af_clean_v01/stage7*_*v*.csv",
    "af_clean_v01/stage6*_*v*.csv",
    "af_clean_v01/stage5pd_*v*.csv",
    "af_clean_v01/stage5r_*v*.csv",
    "af_clean_v01/analysis_ready_v*.csv",
    "analysis_ready_v*.csv",
]

def _ver(path):
    m = re.search(r"_v(\d+)\.csv$", os.path.basename(path))
    return int(m.group(1)) if m else -1

def _skip_name(p):
    name = os.path.basename(p).lower()
    return any(k in name for k in ["count", "counts", "summary", "overall", "audit"])

def _pick_input(required_cols=("temperature",)):
    for pat in CANDIDATE_PATTERNS:
        files = [p for p in sorted(glob.glob(pat), key=_ver, reverse=True) if not _skip_name(p)]
        for p in files:
            try:
                hdr = pd.read_csv(p, nrows=0, low_memory=False)
            except Exception:
                continue
            if all(c in hdr.columns for c in required_cols):
                print(f"[7a] Input picked: {p}")
                return p
    raise FileNotFoundError("[7a] No suitable input that contains 'temperature'.")

def _pick_donor(exclude=None):
    for pat in DONOR_PATTERNS:
        files = [p for p in sorted(glob.glob(pat), key=_ver, reverse=True) if p != exclude and not _skip_name(p)]
        for p in files:
            try:
                hdr = pd.read_csv(p, nrows=0, low_memory=False)
            except Exception:
                continue
            if "row_id" in hdr.columns and any(c in hdr.columns for c in ["timestamp","timestamp_iso","ts_ns"]):
                return p
    return None

def _pick_node_col(df):
    for c in ["node_id","node","station_id","station","device_id","device","logger_id","site"]:
        if c in df.columns:
            print(f"[7a] Using node column: {c!r}")
            return c
    df["__node_synth"] = "__all__"
    print("[7a] No node column found → using synthetic group '__node_synth'.")
    return "__node_synth"

def _ensure_time(df, current_path):
    if "timestamp" in df.columns:
        print("[7a] Found 'timestamp'.")
        return pd.to_datetime(df["timestamp"], utc=True, errors="coerce"), False
    if "timestamp_iso" in df.columns:
        print("[7a] Found 'timestamp_iso'.")
        return pd.to_datetime(df["timestamp_iso"], utc=True, errors="coerce"), False
    if "ts_ns" in df.columns:
        print("[7a] Found 'ts_ns'.")
        return pd.to_datetime(pd.to_numeric(df["ts_ns"], errors="coerce").astype("Int64"), utc=True, errors="coerce"), False

    # backfill from donor by row_id
    if "row_id" in df.columns:
        donor = _pick_donor(exclude=current_path)
        if donor:
            hdr = pd.read_csv(donor, nrows=0, low_memory=False)
            take = ["row_id"] + [c for c in ["timestamp","timestamp_iso","ts_ns"] if c in hdr.columns]
            don = pd.read_csv(donor, usecols=take, low_memory=False)
            merged = df.merge(don, on="row_id", how="left")
            if "timestamp" in merged.columns:
                print(f"[7a] Backfilled time from donor: {donor} (timestamp).")
                return pd.to_datetime(merged["timestamp"], utc=True, errors="coerce"), False
            if "timestamp_iso" in merged.columns:
                print(f"[7a] Backfilled time from donor: {donor} (timestamp_iso).")
                return pd.to_datetime(merged["timestamp_iso"], utc=True, errors="coerce"), False
            if "ts_ns" in merged.columns:
                print(f"[7a] Backfilled time from donor: {donor} (ts_ns).")
                return pd.to_datetime(pd.to_numeric(merged["ts_ns"], errors="coerce").astype("Int64"), utc=True, errors="coerce"), False

    # synthesize (last resort)
    ts = pd.date_range("2000-01-01", periods=len(df), freq="s", tz="UTC")
    print("[7a] WARNING: No time available; using synthetic monotonic timestamps.")
    return pd.Series(ts, index=df.index), True

def _flatline(df, node_col, val_col, minrun=MIN_FLAT_RUN):
    """Flag sequences of >= minrun identical values (per node)."""
    if val_col not in df.columns:
        return pd.Series(False, index=df.index)
    out = pd.Series(False, index=df.index)
    for _, gg in df.groupby(node_col, sort=False):
        v = gg[val_col]
        valid = v.notna()
        # segment id increases whenever value changes or is NaN
        gid = ((~valid) | (v != v.shift())).cumsum()
        runlen = gid.groupby(gid).transform("size")
        out.loc[gg.index] = valid & (runlen >= minrun)
    return out

# ---- Carry-forward helpers ----
def _carry_forward_from(src_pattern, want_cols, tag):
    """Merge selected columns from the latest file matching pattern, by row_id."""
    hits = sorted(glob.glob(os.path.join(OUT_DIR, src_pattern)), key=_ver, reverse=True)
    if not hits:
        return None
    src = hits[0]
    try:
        hdr = pd.read_csv(src, nrows=0, low_memory=False)
    except Exception:
        return None
    have = [c for c in want_cols if c in hdr.columns]
    if not have:
        return None
    add = pd.read_csv(src, usecols=["row_id"] + have, low_memory=False)
    print(f"[7a] Carry-forward from {tag}: {os.path.basename(src)} ({len(have)} cols)")
    return add

def _carry_forward_masks(df_base: pd.DataFrame) -> pd.DataFrame:
    """Bring over SM (5PD) + ST (6PD) clean/flags so they persist through Step 7 outputs."""
    out = df_base.copy()
    if "row_id" not in out.columns:
        return out

    # Soil MOISTURE (5PD)
    sm_want = [
        "ground_humidity_0_clean","ground_humidity_1_clean",
        "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
        "qc_sm_anycrit_0","qc_sm_anycrit_1",
        "qc_sm_anywarn_0","qc_sm_anywarn_1",
        "qc_sm_range_0","qc_sm_range_1",
        "qc_sm_flatline_0","qc_sm_flatline_1",
        "qc_sm_known_broken_0","qc_sm_known_broken_1",
        "qc_sm_regime_0","qc_sm_regime_1",
    ]
    sm_add = _carry_forward_from("stage5pd_sm_pairdiverge_v*.csv", sm_want, "5PD (SM)")
    if sm_add is not None:
        out = out.drop(columns=[c for c in sm_want if c in out.columns], errors="ignore").merge(sm_add, on="row_id", how="left")

    # Soil TEMP (6PD) — includes earlier 6a/6b flags if they existed when 6PD ran
    st_want = [
        "ground_temp_0_clean","ground_temp_1_clean",
        "qc_st_pairdiverge_0","qc_st_pairdiverge_1",
        "qc_st_anycrit_0","qc_st_anycrit_1",
        "qc_st_anywarn_0","qc_st_anywarn_1",
        "qc_st_range_0","qc_st_range_1",
        "qc_st_flatline_0","qc_st_flatline_1",
        "qc_st_known_broken_0","qc_st_known_broken_1",
    ]
    st_add = _carry_forward_from("stage6pd_soiltemp_pairdiverge_v*.csv", st_want, "6PD (ST)")
    if st_add is not None:
        out = out.drop(columns=[c for c in st_want if c in out.columns], errors="ignore").merge(st_add, on="row_id", how="left")

    return out

# ---- run
src = _pick_input(required_cols=("temperature",))
df  = pd.read_csv(src, low_memory=False)

# carry-forward SM (5PD) + ST (6PD) columns (if any)
df = _carry_forward_masks(df)

node_col = _pick_node_col(df)
df["__timestamp"], used_synth_time = _ensure_time(df, current_path=src)

sort_keys = [node_col, "__timestamp"] + (["row_id"] if "row_id" in df.columns else [])
df = df.sort_values(sort_keys, kind="stable").reset_index(drop=True)

# ---- QC + masking (fixed thresholds)
t = pd.to_numeric(df["temperature"], errors="coerce")

qc = pd.Series("OK", index=df.index, dtype="object")
miss = t.isna()
crit = (~miss) & ((t < -25) | (t > 50))
warn = (~miss) & (~crit) & (((t >= -25) & (t < -20)) | ((t > 45) & (t <= 50)))

qc[miss] = "CRIT_missing"
qc[crit] = "CRIT_out_of_range"
qc[warn] = "WARN_out_of_range"

# Flatline (node-wise)
flat = _flatline(df, node_col, "temperature", MIN_FLAT_RUN)

# Consolidate air-temp flags
is_crit = qc.str.startswith("CRIT")
is_ok   = qc.eq("OK")
qc[~is_crit & is_ok & flat] = "WARN_flatline"

df["qc_air_selfcheck_temp"] = qc.astype("category")

# Preferred series: mask WARN/CRIT (keep OK)
df["temperature__pref"] = t.mask(qc.str.startswith(("WARN","CRIT"), na=False)).astype("float64")

# Rollup booleans (safe OR with any existing columns)
def _as_bool(s, default=False):
    if isinstance(s, pd.Series):
        if s.dtype == bool:
            return s
        return s.astype(str).str.lower().isin(["true","1","t","y","yes"]).fillna(default)
    return pd.Series(default, index=df.index)

df["ok_temperature"]       = df["temperature__pref"].notna()

df["qc_air_anywarn"]       = (_as_bool(df.get("qc_air_anywarn", False)) | qc.str.startswith("WARN", na=False)).astype(bool)
df["qc_air_anycrit"]       = (_as_bool(df.get("qc_air_anycrit", False)) | qc.str.startswith("CRIT", na=False)).astype(bool)
df["qc_air_flatline_any"]  = (_as_bool(df.get("qc_air_flatline_any", False)) | flat).astype(bool)

# ---- save
os.makedirs(OUT_DIR, exist_ok=True)
i = 1
while True:
    out = os.path.join(OUT_DIR, f"{OUT_STEM}{i:03d}.csv")
    if not os.path.exists(out): break
    i += 1
df.to_csv(out, index=False)
print(f"[7a] wrote: {out}")

# ---- audit
s = df["qc_air_selfcheck_temp"].astype(str)
print(f"[7a] temp QC counts — OK={int((s=='OK').sum())}  WARN={int(s.str.startswith('WARN').sum())}  CRIT={int(s.str.startswith('CRIT').sum())}")
raw_present  = t.notna()
pref_missing = df["temperature__pref"].isna()
denom = int(raw_present.sum()) or 1
mshare = (pref_missing & raw_present).sum()/denom
print(f"[7a] masked share (temp): {mshare:.2%}")
if used_synth_time:
    print("[7a] NOTE: Used synthetic time; flatline = 12 consecutive samples.")


[7a] Input picked: af_clean_v01/stage6pd_soiltemp_pairdiverge_v008.csv
[7a] Carry-forward from 5PD (SM): stage5pd_sm_pairdiverge_v001.csv (16 cols)
[7a] Carry-forward from 6PD (ST): stage6pd_soiltemp_pairdiverge_v008.csv (14 cols)
[7a] Using node column: 'node_id'
[7a] Found 'timestamp'.
[7a] wrote: af_clean_v01/stage7a_air_temp_qc_v001.csv
[7a] temp QC counts — OK=469158  WARN=4272  CRIT=10357
[7a] masked share (temp): 3.02%


In [35]:
# ============== Step 7b (LENIENT): Air Humidity QC + masking with context (daylight + duration) ==============
# One preferred series: humidity__pref (lenient)
# Mask CRIT, and only those flatline/sticky runs that are implausible (long or in daylight).
import os, glob, re
import numpy as np
import pandas as pd

# ---- knobs (tune here) ----
MIN_FLAT_RUN       = 12          # samples for exact-value flatline detection (>= -> flagged)
STICKY_MINRUN      = 8           # samples for sticky-high detection (>= -> flagged)
STICKY_MIN         = 99.0        # %RH threshold for sticky-high candidates
STICKY_SPREAD      = 0.5         # %RH max (max-min) within a sticky run
LONG_RUN_SAMPLES   = 48          # >= -> implausible (≈12 h at 15-min cadence)
DAYLIGHT_RAD_WM2   = 200.0       # radiation threshold for "daylight"
RH_CRIT_HIGH       = 102.0       # > -> CRIT_out_of_range; (100, RH_CRIT_HIGH] -> WARN_minor_high

# ---- file picking ----
SEARCH_INPUT = [
    "af_clean_v01/stage7a_air_temp_qc_v*.csv",
    "af_clean_v01/stage6pd_soiltemp_pairdiverge_v*.csv",   # ← NEW: include ST-6PD in the chain
    "af_clean_v01/stage8_*v*.csv",
    "af_clean_v01/stage5pd_*v*.csv",
    "af_clean_v01/stage5r_*v*.csv",
    "af_clean_v01/stage6c_*v*.csv", "af_clean_v01/stage6b_*v*.csv", "af_clean_v01/stage6a_*v*.csv",
    "af_clean_v01/stage1_parsed_v*.csv",
    "af_clean_v01/analysis_ready_v*.csv", "analysis_ready_v*.csv",
]
OUT_DIR, OUT_STEM = "af_clean_v01", "stage7b_air_hum_qc_v"

def _ver(path):
    m = re.search(r"_v(\d+)\.csv$", os.path.basename(path)); return int(m.group(1)) if m else -1

def _pick_input():
    def _skip(n): n=n.lower(); return any(k in n for k in ["count","counts","summary","overall","audit"])
    for pat in SEARCH_INPUT:
        for p in sorted(glob.glob(pat), key=_ver, reverse=True):
            if _skip(os.path.basename(p)): continue
            try:
                hdr = pd.read_csv(p, nrows=0, low_memory=False)
            except Exception:
                continue
            if "humidity" in hdr.columns:
                print(f"[7b] Input picked: {p}")
                return p
    raise FileNotFoundError("[7b] No input found that contains 'humidity'.")

def _next_out():
    os.makedirs(OUT_DIR, exist_ok=True); i=1
    while True:
        out = os.path.join(OUT_DIR, f"{OUT_STEM}{i:03d}.csv")
        if not os.path.exists(out): return out
        i += 1

def _to_time(df):
    if "timestamp" in df.columns:     return pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
    if "timestamp_iso" in df.columns: return pd.to_datetime(df["timestamp_iso"], utc=True, errors="coerce")
    if "ts_ns" in df.columns:         return pd.to_datetime(pd.to_numeric(df["ts_ns"], errors="coerce").astype("Int64"), utc=True, errors="coerce")
    return pd.date_range("2000-01-01", periods=len(df), freq="s", tz="UTC")

# ---- Carry-forward helpers (SM from 5PD + ST from 6PD) ----
def _carry_forward_from(src_pattern, want_cols, tag):
    hits = sorted(glob.glob(os.path.join(OUT_DIR, src_pattern)), key=_ver, reverse=True)
    if not hits: return None
    src = hits[0]
    try:
        hdr = pd.read_csv(src, nrows=0, low_memory=False)
    except Exception:
        return None
    have = [c for c in want_cols if c in hdr.columns]
    if not have: return None
    add = pd.read_csv(src, usecols=["row_id"] + have, low_memory=False)
    print(f"[7b] Carry-forward from {tag}: {os.path.basename(src)} ({len(have)} cols)")
    return add

def _carry_forward_masks(df_base: pd.DataFrame) -> pd.DataFrame:
    out = df_base.copy()
    if "row_id" not in out.columns:  # nothing to merge on
        return out

    # Soil MOISTURE (5PD)
    sm_want = [
        "ground_humidity_0_clean","ground_humidity_1_clean",
        "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
        "qc_sm_anycrit_0","qc_sm_anycrit_1",
        "qc_sm_anywarn_0","qc_sm_anywarn_1",
        "qc_sm_range_0","qc_sm_range_1",
        "qc_sm_flatline_0","qc_sm_flatline_1",
        "qc_sm_known_broken_0","qc_sm_known_broken_1",
        "qc_sm_regime_0","qc_sm_regime_1",
    ]
    sm_add = _carry_forward_from("stage5pd_sm_pairdiverge_v*.csv", sm_want, "5PD (SM)")
    if sm_add is not None:
        out = out.drop(columns=[c for c in sm_want if c in out.columns], errors="ignore").merge(sm_add, on="row_id", how="left")

    # Soil TEMP (6PD)
    st_want = [
        "ground_temp_0_clean","ground_temp_1_clean",
        "qc_st_pairdiverge_0","qc_st_pairdiverge_1",
        "qc_st_anycrit_0","qc_st_anycrit_1",
        "qc_st_anywarn_0","qc_st_anywarn_1",
        "qc_st_range_0","qc_st_range_1",
        "qc_st_flatline_0","qc_st_flatline_1",
        "qc_st_known_broken_0","qc_st_known_broken_1",
    ]
    st_add = _carry_forward_from("stage6pd_soiltemp_pairdiverge_v*.csv", st_want, "6PD (ST)")
    if st_add is not None:
        out = out.drop(columns=[c for c in st_want if c in out.columns], errors="ignore").merge(st_add, on="row_id", how="left")

    return out

def _run_ids(mask_bool: pd.Series) -> pd.Series:
    m = mask_bool.fillna(False)
    return (m != m.shift()).cumsum()

def _flatline_flag(series: pd.Series, minrun: int) -> pd.Series:
    v = pd.to_numeric(series, errors="coerce")
    valid = v.notna()
    gid = ((~valid) | (v != v.shift())).cumsum()
    runlen = gid.groupby(gid).transform("size")
    return valid & (runlen >= minrun)

def _sticky_flags(group_df: pd.DataFrame, minrun:int, thr:float, spread:float) -> pd.Series:
    h = pd.to_numeric(group_df["humidity"], errors="coerce")
    cand = h.notna() & (h >= thr)
    rid = _run_ids(cand)
    lengths = rid.groupby(rid).transform("size")
    if cand.any():
        run_spread = group_df.loc[cand].groupby(rid.loc[cand]).agg(rmin=("humidity","min"), rmax=("humidity","max"))
        spread_map = (run_spread["rmax"] - run_spread["rmin"])
        spread_b = rid.map(spread_map).reindex(group_df.index, fill_value=np.nan)
    else:
        spread_b = pd.Series(np.nan, index=group_df.index)
    return cand & (lengths >= minrun) & (spread_b <= spread)

def _as_bool(s, default=False, idx=None):
    if isinstance(s, pd.Series):
        if s.dtype == bool: return s
        return s.astype(str).str.lower().isin(["true","1","t","y","yes"]).fillna(default)
    return pd.Series(default, index=idx)

# ---- run 7b (lenient) ----
src = _pick_input()
df  = pd.read_csv(src, low_memory=False)

# carry-forward SM (5PD) + ST (6PD) columns so masks persist
df = _carry_forward_masks(df)

# node/time
node_col = "node_id" if "node_id" in df.columns else ("node" if "node" in df.columns else None)
if node_col is None:
    df["__node_synth"] = "__all__"; node_col="__node_synth"; print("[7b] No node_id/node -> using '__node_synth'.")

df["__timestamp"] = _to_time(df)
sort_cols = [node_col, "__timestamp"] + (["row_id"] if "row_id" in df.columns else [])
df = df.sort_values(sort_cols, kind="stable").reset_index(drop=True)

# daylight context (accept more column names)
rad_full = None
for c in ["global_radiation__pref", "global_radiation_clean", "global_radiation"]:
    if c in df.columns:
        rad_full = pd.to_numeric(df[c], errors="coerce")
        break

if "humidity" not in df.columns:
    print("[7b] No 'humidity' column; 7b is a no-op.")
else:
    h  = pd.to_numeric(df["humidity"], errors="coerce")
    qc = pd.Series("OK", index=df.index, dtype="object")

    # Base physics thresholds
    miss = h.isna()
    crit = (~miss) & ((h < 0) | (h > RH_CRIT_HIGH))
    warn_minor_high = (~miss) & (~crit) & (h > 100)

    qc[miss] = "CRIT_missing"
    qc[crit] = "CRIT_out_of_range"
    qc[warn_minor_high] = "WARN_minor_high"  # kept (not masked unless implausible context)

    # Containers for context-based masks
    mask_flat_impl = pd.Series(False, index=df.index)
    mask_sticky_impl = pd.Series(False, index=df.index)

    # Per-node processing for flatline & sticky with context
    for node, g in df.groupby(df[node_col].astype(str), sort=False):
        idx = g.index

        # flatline candidates (flag all)
        flat_flag = _flatline_flag(g["humidity"], MIN_FLAT_RUN)
        qc.loc[idx[flat_flag]] = np.where(qc.loc[idx[flat_flag]].eq("OK"),
                                          "WARN_flatline", qc.loc[idx[flat_flag]])

        # sticky-high candidates (flag all; separate daylight tag)
        sticky_flag = _sticky_flags(g, STICKY_MINRUN, STICKY_MIN, STICKY_SPREAD)

        has_day = pd.Series(False, index=g.index)
        if rad_full is not None:
            has_day = (rad_full.loc[idx] > DAYLIGHT_RAD_WM2).fillna(False)

        # run-level daylight and duration for both flat & sticky
        rid_flat   = _run_ids(flat_flag)
        rid_sticky = _run_ids(sticky_flag)

        dur_flat   = rid_flat.groupby(rid_flat).transform("size").where(flat_flag, other=0)
        dur_sticky = rid_sticky.groupby(rid_sticky).transform("size").where(sticky_flag, other=0)

        anyday_flat   = rid_flat.groupby(rid_flat).transform(lambda r: has_day.loc[r.index].max()).astype(bool)
        anyday_sticky = rid_sticky.groupby(rid_sticky).transform(lambda r: has_day.loc[r.index].max()).astype(bool)

        # implausibility tests (these will be masked)
        impl_flat   = flat_flag   & (anyday_flat | (dur_flat   >= LONG_RUN_SAMPLES))
        impl_sticky = sticky_flag & (anyday_sticky | (dur_sticky >= LONG_RUN_SAMPLES))

        mask_flat_impl.loc[idx]   = impl_flat
        mask_sticky_impl.loc[idx] = impl_sticky

        # QC labels for sticky variants
        qc.loc[idx[sticky_flag & anyday_sticky]] = np.where(qc.loc[idx[sticky_flag & anyday_sticky]].eq("OK"),
                                                            "WARN_sticky_high_daylight", qc.loc[idx[sticky_flag & anyday_sticky]])
        qc.loc[idx[sticky_flag & ~anyday_sticky]] = np.where(qc.loc[idx[sticky_flag & ~anyday_sticky]].eq("OK"),
                                                             "WARN_sticky_high", qc.loc[idx[sticky_flag & ~anyday_sticky]])

    # Build lenient mask: CRIT + implausible flat/sticky + missing
    mask_lenient = miss | crit | mask_flat_impl | mask_sticky_impl
    df["humidity__pref"] = h.mask(mask_lenient).astype("float64")
    df["ok_humidity"] = df["humidity__pref"].notna()

    # Persist QC column from this step
    df["qc_air_selfcheck_hum"] = qc.astype("category")

    # Rollups (safe OR)
    df["qc_air_anywarn"] = (_as_bool(df.get("qc_air_anywarn"), False, idx=df.index) | qc.str.startswith("WARN", na=False)).astype(bool)
    df["qc_air_anycrit"] = (_as_bool(df.get("qc_air_anycrit"), False, idx=df.index) | qc.str.startswith("CRIT", na=False)).astype(bool)

    # Optional: compact reason for mask (handy for audits/plots)
    reason = pd.Series("", index=df.index, dtype="object")
    reason[miss]                 = "CRIT_missing"
    reason[crit & ~miss]         = "CRIT_out_of_range"
    reason[mask_flat_impl]       = "MASK_flatline_implausible"
    reason[mask_sticky_impl]     = "MASK_sticky_implausible"
    df["humidity_mask_reason"]   = reason.mask(~mask_lenient, other="")

# ---- save ----
out_path = _next_out()
df.to_csv(out_path, index=False)
print(f"[7b lenient] wrote: {out_path}")

# ---- audit print ----
if "humidity" in df.columns:
    s = df["qc_air_selfcheck_hum"].astype(str) if "qc_air_selfcheck_hum" in df.columns else qc.astype(str)
    raw_present  = pd.to_numeric(df["humidity"], errors="coerce").notna()
    pref_missing = pd.to_numeric(df["humidity__pref"], errors="coerce").isna()
    denom = int(raw_present.sum()) or 1
    mshare = float((raw_present & pref_missing).sum())/denom
    print(f"[7b lenient] masked share: {mshare:.2%}")
    if "humidity_mask_reason" in df.columns:
        rb = df.loc[pref_missing & raw_present, "humidity_mask_reason"].value_counts()
        if len(rb):
            print("[7b lenient] mask reasons:")
            for k,v in rb.items():
                print(f"   {k:>28s}: {v}")


[7b] Input picked: af_clean_v01/stage7a_air_temp_qc_v001.csv
[7b] Carry-forward from 5PD (SM): stage5pd_sm_pairdiverge_v001.csv (16 cols)
[7b] Carry-forward from 6PD (ST): stage6pd_soiltemp_pairdiverge_v008.csv (14 cols)
[7b lenient] wrote: af_clean_v01/stage7b_air_hum_qc_v001.csv
[7b lenient] masked share: 58.98%
[7b lenient] mask reasons:
        MASK_sticky_implausible: 230650
      MASK_flatline_implausible: 52184
              CRIT_out_of_range: 2511


**Step 7c: Air Pressure QC + masking**

Goal:
* Build a lenient, usable humidity series (humidity__pref) by masking only clear problems and implausible artifacts.
* Input picking: Auto-selects the latest upstream CSV that contains humidity (prefers recent QC stages).
* Carry-forward: Pulls over soil moisture & soil temperature clean columns + QC flags so earlier masks persist.
* Time & grouping: Normalizes timestamps, sorts by node × time.
* Daylight context: Uses available radiation to tag daylight (>~200 W/m²) for plausibility checks.

QC rules:
* CRIT: missing, <0, or >102% RH → masked.
* WARN (kept unless implausible): >100% RH, flatlines (exact repeats), sticky-high runs near 99–100% with tiny spread.
* Context gating (lenient masking): Flatline/sticky runs are only masked if they (a) include daylight or (b) last very long (≥ ~12 h at 15-min cadence). Otherwise they stay as WARN (visible, not masked).


Outputs:
* humidity__pref (leniently masked humidity) and ok_humidity (not-NaN).
* qc_air_selfcheck_hum with detailed WARN/CRIT labels.
* Rollups: qc_air_anywarn, qc_air_anycrit.
* Optional humidity_mask_reason for audits.

Saving & audit: Writes versioned CSV to af_clean_v01/; prints total masked share and a breakdown of mask reasons.



In [14]:
# ============== Step 7c: Air Pressure QC + masking (independent) ==============
# Thresholds: WARN if <940 or >1050; CRIT if <920 or >1060; missing -> CRIT_missing
# Flatline ≥6 -> WARN_flatline
import os, glob, re
import numpy as np
import pandas as pd

IN_SEARCH = [
    "af_clean_v01/stage7b_air_hum_qc_v*.csv",  # prefer just-run 7b
    "af_clean_v01/stage7a_air_temp_qc_v*.csv",
    "af_clean_v01/stage5pd_*v*.csv",           # <-- NEW: include 5PD so its masks can ride along
    "af_clean_v01/stage6c_*.csv","af_clean_v01/stage6b_*.csv","af_clean_v01/stage6a_*.csv",
    "af_clean_v01/stage5r_*.csv",
    "af_clean_v01/analysis_ready_v*.csv","analysis_ready_v*.csv",
]
OUT_DIR, OUT_STEM = "af_clean_v01", "stage7c_air_pres_qc_v"
MIN_FLAT_RUN = 6

def _pick():
    for pat in IN_SEARCH:
        hits = sorted(glob.glob(pat))
        if hits:
            def _ver(p):
                m = re.search(r"_v(\d+)\.csv$", os.path.basename(p)); return int(m.group(1)) if m else -1
            best = max(hits, key=_ver); return best if _ver(best)>=0 else hits[-1]
    raise FileNotFoundError("No input for 7c.")

def _next(stem):
    os.makedirs(OUT_DIR, exist_ok=True); i=1
    while True:
        p = os.path.join(OUT_DIR, f"{stem}{i:03d}.csv")
        if not os.path.exists(p): return p
        i+=1

def _dt(df):
    if "timestamp" in df.columns:     return pd.to_datetime(df["timestamp"], utc=True, errors="coerce")
    if "timestamp_iso" in df.columns: return pd.to_datetime(df["timestamp_iso"], utc=True, errors="coerce")
    return pd.to_datetime(pd.to_numeric(df["ts_ns"], errors="coerce").astype("Int64"), utc=True, errors="coerce")

def _flatline(df, node_col, val_col, minrun=6):
    if val_col not in df.columns: return pd.Series(False, index=df.index)
    out = pd.Series(False, index=df.index)
    for _, gg in df.groupby(node_col, sort=False):
        v = gg[val_col]; valid = v.notna()
        gid = ((~valid) | (v != v.shift())).cumsum()
        runlen = gid.groupby(gid).transform("size")
        out.loc[gg.index] = valid & (runlen >= minrun)
    return out

# --- carry-forward SM from 5PD (so masks persist downstream) ---
def _carry_forward_sm_from_5pd(df_base: pd.DataFrame) -> pd.DataFrame:
    hits = sorted(glob.glob(os.path.join(OUT_DIR, "stage5pd_sm_pairdiverge_v*.csv")))
    if not hits or "row_id" not in df_base.columns:
        return df_base
    def _ver(p):
        m = re.search(r"_v(\d+)\.csv$", os.path.basename(p)); return int(m.group(1)) if m else -1
    src = max(hits, key=_ver) if _ver(hits[-1]) >= 0 else hits[-1]
    want = [
        "ground_humidity_0_clean","ground_humidity_1_clean",
        "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
        "qc_sm_anycrit_0","qc_sm_anycrit_1","qc_sm_anywarn_0","qc_sm_anywarn_1",
        "qc_sm_range_0","qc_sm_range_1","qc_sm_flatline_0","qc_sm_flatline_1",
        "qc_sm_known_broken_0","qc_sm_known_broken_1","qc_sm_regime_0","qc_sm_regime_1",
    ]
    try:
        hdr = pd.read_csv(src, nrows=0, low_memory=False)
    except Exception:
        return df_base
    have = [c for c in want if c in hdr.columns]
    if not have:
        return df_base
    add = pd.read_csv(src, usecols=["row_id"]+have, low_memory=False)
    out = df_base.drop(columns=[c for c in have if c in df_base.columns], errors="ignore").merge(add, on="row_id", how="left")
    print(f"[7c] Carry-forward from 5PD: {os.path.basename(src)} ({len(have)} cols)")
    return out

# ---- run
src = _pick()
df = pd.read_csv(src, low_memory=False)
df = _carry_forward_sm_from_5pd(df)  # keep 5PD masks

node_col = "node_id" if "node_id" in df.columns else ("node" if "node" in df.columns else None)
assert node_col is not None, "No node_id/node"
df["__timestamp"] = _dt(df)
df = df.sort_values([node_col, "__timestamp", "row_id" if "row_id" in df.columns else "__timestamp"],
                    kind="stable").reset_index(drop=True)

if "pressure" in df.columns:
    qc = pd.Series("OK", index=df.index, dtype="object")
    p = pd.to_numeric(df["pressure"], errors="coerce")

    miss = p.isna()
    crit = (~miss) & ((p < 920) | (p > 1060))
    warn = (~miss) & (~crit) & ((p < 940) | (p > 1050))

    qc[miss] = "CRIT_missing"
    qc[crit] = "CRIT_out_of_range"
    qc[warn] = "WARN_out_of_range"

    flat = _flatline(df, node_col, "pressure", MIN_FLAT_RUN)
    is_crit = qc.str.startswith("CRIT"); is_ok = qc.eq("OK")
    qc[~is_crit & is_ok & flat] = "WARN_flatline"

    # Persist QC column
    df["qc_air_crosscheck_pres"] = qc.astype("category")

    # ---- IMPORTANT: mask ONLY CRIT (incl. missing) for pressure; WARN stays visible
    pref = p.mask(qc.str.startswith("CRIT", na=False))
    df["pressure__pref"] = pref.astype("float64")
    df["ok_pressure"] = df["pressure__pref"].notna()

    # Rollups (boolean-safe)
    prev_warn = pd.Series(df.get("qc_air_anywarn", False), index=df.index).astype(bool)
    prev_crit = pd.Series(df.get("qc_air_anycrit", False), index=df.index).astype(bool)
    df["qc_air_anywarn"]      = (prev_warn | qc.str.startswith("WARN", na=False)).astype(bool)
    df["qc_air_anycrit"]      = (prev_crit | qc.str.startswith("CRIT", na=False)).astype(bool)
    df["qc_air_flatline_any"] = (pd.Series(df.get("qc_air_flatline_any", False), index=df.index).astype(bool) | flat).astype(bool)
else:
    print("No 'pressure' column; 7c is a no-op.")

out = _next(OUT_STEM)
df.to_csv(out, index=False)
print(f"7c wrote: {out}")

# ---- audit ----
if "pressure" in df.columns:
    s = df["qc_air_crosscheck_pres"].astype(str)
    print(f"[7c] pressure QC counts — OK={int((s=='OK').sum())}  "
          f"WARN={int(s.str.startswith('WARN').sum())}  "
          f"CRIT={int(s.str.startswith('CRIT').sum())}")
    raw_present  = pd.to_numeric(df["pressure"], errors="coerce").notna()
    pref_missing = pd.to_numeric(df["pressure__pref"], errors="coerce").isna()
    denom = int(raw_present.sum()) or 1
    mshare = float((raw_present & pref_missing).sum())/denom
    print(f"[7c] masked share (pressure): {mshare:.2%}")


[7c] Carry-forward from 5PD: stage5pd_sm_pairdiverge_v001.csv (16 cols)
7c wrote: af_clean_v01/stage7c_air_pres_qc_v001.csv
[7c] pressure QC counts — OK=65276  WARN=405052  CRIT=13459
[7c] masked share (pressure): 2.78%


**Stage 8: Radiation & Wind range QC**

* Loads the latest pipeline file (prefers Stage 7 → 6c → …) and rebuilds timestamp from ts_ns. Preserves row_id; asserts no row loss.
* Ensures numeric for:
* global_radiation (W/m²)
* wind_speed_mean, wind_speed_min, wind_speed_max (km/h)
* Applies range sanity with conservative WARN bands and CRIT outside:
* Radiation: OK in [0, 1200]; WARN in [-5, 0) low and (1200, 1400] high; CRIT < -5 or > 1400.
* Wind speeds: OK in [0, 100]; WARN in [-0.5, 0) low and (100, 150] high; CRIT < -0.5 or > 150.
* Writes _clean versions (global_radiation_clean, wind_speed_*_clean):
* CRIT → NaN (masked),
* WARN → clamped to the nearest OK bound (e.g., −0.2 → 0; 145 → 100 for wind mean, 1205 → 1200 for radiation),
* OK → passthrough.
* Adds per-variable QC flags (qc_rad_range, qc_wind_*_range) plus boolean rollups (*_anycrit, *_anywarn) for easy filtering later.
* Adds a wind ordering check (qc_wind_order): flags WARN_inconsistent where min > mean or mean > max. (Diagnostic only; no masking.)
* Saves a versioned output and summary CSVs (overall + per-node), prints time span and node list.



In [36]:
# =========================
# Stage 8: Radiation & Wind — range QC (mask CRIT only; clamp WARN)
# GUARANTEES:
#  - Reads latest prior stage (prefer 7c→7b→7a → 6PD → 6c → 6b → 6a → 5PD → 5R → 4 → 3 → 2 → 1)
#  - Skips summary/audit files
#  - Reconstructs timestamp from ts_ns (no free-text parsing)
#  - Carries forward ST (6PD), SM (5PD), and Air (7a/7b/7c) cols by row_id (no overwrite)
#  - No row loss; preserves row_id
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob
import os, re

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core columns
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"   # in-memory datetime64[ns]

# Node column will be detected dynamically
def _pick_node_col(df):
    for c in ["node", "node_id", "station_id", "station", "device_id", "device", "logger_id", "site"]:
        if c in df.columns:
            print(f"[Stage 8] Using node column: {c!r}")
            return c
    df["__node_synth"] = "__all__"
    print("[Stage 8] No node column found → using synthetic group '__node_synth'.")
    return "__node_synth"

# Variables (exact column names in your file)
RAD_COL   = "global_radiation"     # W/m²
W_MEAN    = "wind_speed_mean"      # km/h
W_MIN     = "wind_speed_min"       # km/h
W_MAX     = "wind_speed_max"       # km/h

# Clean column names to write
RAD_CLEAN = f"{RAD_COL}_clean"
W_MEAN_C  = f"{W_MEAN}_clean"
W_MIN_C   = f"{W_MIN}_clean"
W_MAX_C   = f"{W_MAX}_clean"

# ---------------- Range thresholds ----------------
# Radiation (W/m²)
RAD_LOWER_OK, RAD_UPPER_OK   = 0.0, 1200.0
RAD_LOWER_TOL, RAD_UPPER_TOL = -5.0, 1400.0   # WARN bands near edges

# Wind speed (km/h)
W_LOWER_OK, W_UPPER_OK   = 0.0, 100.0
W_LOWER_TOL, W_UPPER_TOL = -0.5, 150.0

def _ver(path):
    m = re.search(r"_v(\d+)\.csv$", os.path.basename(path)); return int(m.group(1)) if m else -1

def _skip_name(p):
    name = os.path.basename(p).lower()
    return any(k in name for k in ["count", "counts", "summary", "overall", "audit"])

def _has_cols(path, needed):
    try:
        hdr = pd.read_csv(path, nrows=0, low_memory=False)
        return all(c in hdr.columns for c in needed)
    except Exception:
        return False

def _pick_latest_input():
    patterns = [
        # Prefer 7 outputs if they contain RAD/WIND
        "af_clean_v01/stage7c_air_pres_qc_v*.csv",
        "af_clean_v01/stage7b_air_hum_qc_v*.csv",
        "af_clean_v01/stage7a_air_temp_qc_v*.csv",
        # Then 6PD (soil temp diverge stage carrying all earlier cols)
        "af_clean_v01/stage6pd_soiltemp_pairdiverge_v*.csv",  # ← NEW
        # Then classic 6x, 5PD, 5R, …
        "af_clean_v01/stage6c_*v*.csv",
        "af_clean_v01/stage6b_*v*.csv",
        "af_clean_v01/stage6a_*v*.csv",
        "af_clean_v01/stage5pd_*v*.csv",
        "af_clean_v01/stage5r_*v*.csv",
        "af_clean_v01/stage4_*v*.csv",
        "af_clean_v01/stage3_*v*.csv",
        "af_clean_v01/stage2_*v*.csv",
        "af_clean_v01/stage1_parsed_v*.csv",
        # fallbacks
        "af_clean_v01/analysis_ready_v*.csv",
        "analysis_ready_v*.csv",
    ]
    needed = [ROW_ID_COL, TS_NS_COL, TS_ISO_COL, RAD_COL, W_MEAN, W_MIN, W_MAX]
    for pat in patterns:
        files = [p for p in sorted(glob(str(pat)), key=_ver, reverse=True) if not _skip_name(p)]
        for p in files:
            if _has_cols(p, needed):
                print("[Stage 8] Reading:", p)
                return p
    raise FileNotFoundError("[Stage 8] No prior stage contains all radiation & wind columns.")

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

# ---- Load chosen input
in_path = _pick_latest_input()
df = pd.read_csv(in_path, low_memory=False)
n_in = len(df)

# Detect node column (after load)
NODE_COL = _pick_node_col(df)

# ---- Carry-forward helpers (merge by row_id without overwriting present cols)
def _carry_forward(df_base: pd.DataFrame, src_glob: str, want_cols: list, tag: str) -> pd.DataFrame:
    hits = sorted(glob(src_glob), key=_ver, reverse=True)
    if not hits or ROW_ID_COL not in df_base.columns:
        return df_base
    src = hits[0]
    try:
        hdr = pd.read_csv(src, nrows=0, low_memory=False)
    except Exception:
        return df_base
    have = [c for c in want_cols if c in hdr.columns]
    if not have:
        return df_base
    add = pd.read_csv(src, usecols=[ROW_ID_COL] + have, low_memory=False)
    keep_cols = [c for c in have if c not in df_base.columns]
    if not keep_cols:
        return df_base
    print(f"[Stage 8] Carry-forward {len(keep_cols)} cols from {tag}: {os.path.basename(src)}")
    return df_base.merge(add[[ROW_ID_COL] + keep_cols], on=ROW_ID_COL, how="left")

# Carry forward SM (5PD)
SM_WANT = [
    "ground_humidity_0_clean","ground_humidity_1_clean",
    "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
    "qc_sm_anycrit_0","qc_sm_anycrit_1","qc_sm_anywarn_0","qc_sm_anywarn_1",
    "qc_sm_range_0","qc_sm_range_1","qc_sm_flatline_0","qc_sm_flatline_1",
    "qc_sm_known_broken_0","qc_sm_known_broken_1","qc_sm_regime_0","qc_sm_regime_1",
]
df = _carry_forward(df, str(OUT_DIR / "stage5pd_sm_pairdiverge_v*.csv"), SM_WANT, "5PD (SM)")

# Carry forward ST (6PD)  ← NEW
ST_WANT = [
    "ground_temp_0_clean","ground_temp_1_clean",
    "qc_st_pairdiverge_0","qc_st_pairdiverge_1",
    "qc_st_anycrit_0","qc_st_anycrit_1",
    "qc_st_anywarn_0","qc_st_anywarn_1",
    "qc_st_range_0","qc_st_range_1",
    "qc_st_flatline_0","qc_st_flatline_1",
    "qc_st_known_broken_0","qc_st_known_broken_1",
]
df = _carry_forward(df, str(OUT_DIR / "stage6pd_soiltemp_pairdiverge_v*.csv"), ST_WANT, "6PD (ST)")

# Carry forward Air (7a/7b/7c)
AIR7A_WANT = ["temperature__pref","qc_air_selfcheck_temp","ok_temperature"]
AIR7B_WANT = ["humidity__pref","qc_air_selfcheck_hum","humidity_mask_reason","ok_humidity"]
AIR7C_WANT = ["pressure__pref","qc_air_crosscheck_pres","ok_pressure"]
df = _carry_forward(df, str(OUT_DIR / "stage7a_air_temp_qc_v*.csv"), AIR7A_WANT, "7a")
df = _carry_forward(df, str(OUT_DIR / "stage7b_air_hum_qc_v*.csv"), AIR7B_WANT, "7b")
df = _carry_forward(df, str(OUT_DIR / "stage7c_air_pres_qc_v*.csv"), AIR7C_WANT, "7c")

# ---- Sanity: required columns ----
required = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, RAD_COL, W_MEAN, W_MIN, W_MAX}
missing = required - set(df.columns)
if missing:
    raise ValueError(f"Missing required columns for Stage 8: {sorted(missing)}")

# Reconstruct timestamp from ts_ns (authoritative)
df[TIMESTAMP_COL] = pd.to_datetime(pd.to_numeric(df[TS_NS_COL], errors="coerce").astype("Int64"), utc=True, errors="coerce")

assert df[ROW_ID_COL].is_unique, "row_id must be unique from Stage 1."

# Ensure numeric
for c in [RAD_COL, W_MEAN, W_MIN, W_MAX]:
    df[c] = pd.to_numeric(df[c], errors="coerce")

# ---------------- Helpers to apply range QC consistently ----------------
def apply_range_qc(series: pd.Series, lower_ok, upper_ok, lower_tol, upper_tol,
                   flag_col: str, clean_col: str, df_target: pd.DataFrame):
    """
    Adds:
      - flag_col : 'OK' / 'WARN_boundary' / 'CRIT_out_of_range'
      - <*_anycrit>, <*_anywarn> booleans
      - clean_col: CRIT -> NaN; WARN clamped to [lower_ok, upper_ok]; OK passthrough.
    """
    v = series
    qc = pd.Series("OK", index=v.index, dtype="object")

    warn_low  = v.ge(lower_tol) & v.lt(lower_ok)
    warn_high = v.gt(upper_ok) & v.le(upper_tol)
    crit_low  = v.lt(lower_tol)
    crit_high = v.gt(upper_tol)

    qc.loc[warn_low | warn_high] = "WARN_boundary"
    qc.loc[crit_low | crit_high] = "CRIT_out_of_range"

    clean = v.copy()
    clean.loc[crit_low | crit_high] = np.nan
    clean.loc[warn_low]  = lower_ok
    clean.loc[warn_high] = upper_ok

    df_target[flag_col] = qc
    df_target[flag_col.replace("_range","_anycrit")] = qc.eq("CRIT_out_of_range")
    df_target[flag_col.replace("_range","_anywarn")] = qc.eq("WARN_boundary")
    df_target[clean_col] = pd.to_numeric(clean, errors="coerce")

    # Masked share (CRIT only): raw present & clean missing
    raw_present = v.notna()
    masked_share = float((raw_present & df_target[clean_col].isna()).sum()) / (int(raw_present.sum()) or 1)
    return {
        "var": series.name,
        "n_OK": int((qc == "OK").sum()),
        "n_WARN_boundary": int((qc == "WARN_boundary").sum()),
        "n_CRIT_out_of_range": int((qc == "CRIT_out_of_range").sum()),
        "masked_share": masked_share,
    }

# ---------------- Apply range QC ----------------
summaries = []
summaries.append(apply_range_qc(
    df[RAD_COL], RAD_LOWER_OK, RAD_UPPER_OK, RAD_LOWER_TOL, RAD_UPPER_TOL,
    flag_col="qc_rad_range", clean_col=RAD_CLEAN, df_target=df))

summaries.append(apply_range_qc(
    df[W_MEAN], W_LOWER_OK, W_UPPER_OK, W_LOWER_TOL, W_UPPER_TOL,
    flag_col="qc_wind_mean_range", clean_col=W_MEAN_C, df_target=df))

summaries.append(apply_range_qc(
    df[W_MIN], W_LOWER_OK, W_UPPER_OK, W_LOWER_TOL, W_UPPER_TOL,
    flag_col="qc_wind_min_range", clean_col=W_MIN_C, df_target=df))

summaries.append(apply_range_qc(
    df[W_MAX], W_LOWER_OK, W_UPPER_OK, W_LOWER_TOL, W_UPPER_TOL,
    flag_col="qc_wind_max_range", clean_col=W_MAX_C, df_target=df))

# ---------------- Wind internal consistency (diagnostic only) ----------------
# min <= mean <= max; otherwise WARN_inconsistent
order_ok = (
    (df[[W_MIN, W_MEAN, W_MAX]].notna().all(axis=1)) &
    (df[W_MIN] <= df[W_MEAN]) & (df[W_MEAN] <= df[W_MAX])
)
df["qc_wind_order"] = np.where(order_ok | df[[W_MIN, W_MEAN, W_MAX]].isna().any(axis=1),
                               "OK", "WARN_inconsistent")

# ---------------- Save outputs ----------------
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL] if NODE_COL in df.columns else [TS_NS_COL, ROW_ID_COL],
               inplace=True, ignore_index=True)

out_main = next_versioned_csv(OUT_DIR, "stage8_rad_wind_range")
df.to_csv(out_main, index=False)
print("Wrote:", out_main)

# Overall summary CSV
sum_df = pd.DataFrame(summaries)
sum_path = next_versioned_csv(OUT_DIR, "stage8_rad_wind_range_counts_overall")
sum_df.to_csv(sum_path, index=False)
print("Wrote overall counts:", sum_path)
print("\nOverall range QC (radiation & wind):")
print(sum_df.to_string(index=False))

# Per-node summaries
def pernode_counts(flag_col: str, label: str):
    key = NODE_COL if NODE_COL in df.columns else None
    if key is None:
        return pd.DataFrame(columns=["node","which","flag","n"])
    tmp = (df.groupby([key, flag_col]).size()
             .rename("n").reset_index()
             .sort_values([key, flag_col]))
    tmp.insert(1, "which", label)
    tmp.rename(columns={key: "node"}, inplace=True)
    return tmp

pernode = pd.concat([
    pernode_counts("qc_rad_range", "radiation"),
    pernode_counts("qc_wind_mean_range", "wind_mean"),
    pernode_counts("qc_wind_min_range",  "wind_min"),
    pernode_counts("qc_wind_max_range",  "wind_max"),
    pernode_counts("qc_wind_order",      "wind_order"),
], ignore_index=True)

pernode_path = next_versioned_csv(OUT_DIR, "stage8_rad_wind_range_counts_pernode")
pernode.to_csv(pernode_path, index=False)
print("Wrote per-node counts:", pernode_path)

# Breadcrumbs
tmin = pd.to_datetime(pd.to_numeric(df[TS_NS_COL], errors="coerce").astype("Int64"), utc=True, errors="coerce").min()
tmax = pd.to_datetime(pd.to_numeric(df[TS_NS_COL], errors="coerce").astype("Int64"), utc=True, errors="coerce").max()
print("\nTime span (dataset, from ts_ns):", tmin, "→", tmax)
print("Rows in/out (should match input):", n_in, "→", len(df))
if NODE_COL in df.columns:
    print("Nodes:", sorted(pd.Series(df[NODE_COL]).dropna().astype(str).unique().tolist()))


[Stage 8] Reading: af_clean_v01/stage7b_air_hum_qc_v001.csv
[Stage 8] Using node column: 'node'
Wrote: af_clean_v01/stage8_rad_wind_range_v001.csv
Wrote overall counts: af_clean_v01/stage8_rad_wind_range_counts_overall_v001.csv

Overall range QC (radiation & wind):
             var   n_OK  n_WARN_boundary  n_CRIT_out_of_range  masked_share
global_radiation 482121              793                  873      0.001805
 wind_speed_mean 483787                0                    0      0.000000
  wind_speed_min 483787                0                    0      0.000000
  wind_speed_max 483787                0                    0      0.000000
Wrote per-node counts: af_clean_v01/stage8_rad_wind_range_counts_pernode_v001.csv

Time span (dataset, from ts_ns): 2023-10-30 12:18:43+00:00 → 2025-09-18 10:29:42+00:00
Rows in/out (should match input): 483787 → 483787
Nodes: ['node_176', 'node_179', 'node_181', 'node_182', 'node_183', 'node_184', 'node_186', 'node_187', 'node_189']


**Step 9: assemble “analysis_ready” (no ST ramp)**

* Pick input: Loads the most advanced prior stage it can find (prefers Stage 8, then 7, 6, 5…).
* Core setup: Verifies required IDs/time columns, builds a canonical timestamp from ts_ns, drops stale soil-temp ramp flags.
* Soil moisture: Prefers Step 5PD outputs (pair-diverge) to fill *_clean + all SM QC flags; otherwise falls back to 5R.
* Known-broken backfill: Pulls Stage 3 “known broken” flags for SM/ST if any are missing.
* Soil temperature: Backfills ST *_clean + QC from 6a/6b/6c if missing; *masks _clean where known-broken and updates qc_st_anycrit.
* Radiation & wind: Backfills *_clean + range/QC from Stage 8 (and sets benign defaults if still absent).
* Air (T/RH/Pres): Imports __pref and QC from 7a/7b/7c when available; otherwise constructs __pref from raw masked by the step-7 QC labels; adds ok_temperature/ok_humidity/ok_pressure.
* Preferred series: Creates __pref for SM, ST, radiation, wind from their *_clean (or raw if clean missing).
* OK booleans: Builds ok_* for SM/ST from *_anycrit, and for RAD/W from range flags.
* Row rollups: Maintains qc_air_anywarn/anycrit, row_has_warn/crit.
* Export (reproducible): Selects a fixed column order: IDs/time → all __pref → all ok_* → all QC → key *_clean → raw columns; writes a versioned CSV (analysis_ready_v###.csv) and a Parquet copy if possible; includes quoting fallback + console summary.

In [16]:
# =========================
# Step 9: Assemble analysis-ready dataset (no 6d/ramp)
# - Respects Step 5PD (SM pair-diverge) and Step 7 (air masking)
# - Exports *_clean (SM/ST/RAD/W), all __pref, all QC flags, ok_* booleans
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob
import csv

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core identity/time columns
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"   # in-memory datetime64[ns]
NODE_COL      = "node"
NODE_ID_COL   = "node_id"
STATION_TYPE  = "station_type"

# Variable families
SM_COLS  = ["ground_humidity_0", "ground_humidity_1"]
ST_COLS  = ["ground_temp_0", "ground_temp_1"]
RAD_COL  = "global_radiation"
W_COLS   = ["wind_speed_min", "wind_speed_mean", "wind_speed_max"]
AIR_COLS = ["temperature", "humidity", "pressure"]  # Step 7 provides __pref + QC

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

def pick_latest_by_priority(patterns):
    """Return latest path from the FIRST pattern that has matches."""
    for pat in patterns:
        files = sorted(glob(str(OUT_DIR / pat)))
        if files:
            print(f"[pick] {pat} -> {files[-1]}")
            return files[-1]
        else:
            print(f"[pick] {pat} -> (no match)")
    return None

def merge_missing_cols_on_row_id(df_base: pd.DataFrame, src_path: str, want_cols: list) -> pd.DataFrame:
    """Only add columns that are currently missing; avoids duplicate merges."""
    if not want_cols or src_path is None:
        return df_base
    hdr = pd.read_csv(src_path, nrows=0, low_memory=False)
    have = [c for c in want_cols if (c in hdr.columns) and (c not in df_base.columns)]
    if not have:
        return df_base
    usecols = [ROW_ID_COL] + have
    add = pd.read_csv(src_path, usecols=usecols, low_memory=False)
    return df_base.merge(add, on=ROW_ID_COL, how="left")

def replace_cols_on_row_id(df_base: pd.DataFrame, src_path: str, want_cols: list) -> pd.DataFrame:
    """Overwrite df_base[want_cols] using src_path (matched by row_id)."""
    if not want_cols or src_path is None:
        return df_base
    hdr = pd.read_csv(src_path, nrows=0, low_memory=False)
    have = [c for c in want_cols if c in hdr.columns]
    if not have:
        return df_base
    usecols = [ROW_ID_COL] + have
    add = pd.read_csv(src_path, usecols=usecols, low_memory=False)
    df_out = df_base.drop(columns=[c for c in have if c in df_base.columns], errors="ignore")
    return df_out.merge(add, on=ROW_ID_COL, how="left")

# ---- Load the most advanced stage available (prefer Stage 8) ----
in_path = pick_latest_by_priority([
    "stage8_rad_wind_range_v*.csv",
    "stage7c_air_pres_qc_v*.csv",
    "stage7b_air_hum_qc_v*.csv",
    "stage7a_air_temp_qc_v*.csv",
    "stage6c_soiltemp_step_v*.csv",
    "stage6b_soiltemp_flatline_v*.csv",
    "stage6a_soiltemp_range_v*.csv",
    "stage5pd_sm_pairdiverge_v*.csv",
    "stage5r_sm_regimes_v*.csv",
    "stage4_sm_flatline_v*.csv",
    "stage3_sm_knownbroken_v*.csv",
    "stage2_soilmoisture_range_v*.csv",
    "stage1_parsed_v*.csv",
])
assert in_path is not None, "No prior stage files found."
print("Stage 9 INPUT:", in_path)

df = pd.read_csv(in_path, low_memory=False)
n_in = len(df)

# ---- Core checks & authoritative timestamp ----
required_core = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, NODE_COL, NODE_ID_COL, STATION_TYPE}
missing_core = required_core - set(df.columns)
if missing_core:
    raise ValueError(f"Missing core columns: {sorted(missing_core)}")

df[TIMESTAMP_COL] = pd.to_datetime(df[TS_NS_COL].astype("int64"))
assert df[ROW_ID_COL].is_unique, "row_id should be unique from Stage 1."

# ---- Drop stale soil-temp ramp flags (6d removed) ----
ramp_cols = [c for c in df.columns if c.startswith("qc_st_ramp_")]
if ramp_cols:
    print(f"[Stage 9] Dropping stale ramp flags: {ramp_cols}")
    df.drop(columns=ramp_cols, inplace=True, errors="ignore")

# ========= Soil moisture: prefer 5PD (pair-diverge) over 5R =========
sm_5pd_src = pick_latest_by_priority(["stage5pd_sm_pairdiverge_v*.csv"])
SM_FROM_5PD = [
    "ground_humidity_0_clean","ground_humidity_1_clean",
    "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
    "qc_sm_range_0","qc_sm_range_1",
    "qc_sm_flatline_0","qc_sm_flatline_1",
    "qc_sm_known_broken_0","qc_sm_known_broken_1",
    "qc_sm_anycrit_0","qc_sm_anycrit_1",
    "qc_sm_anywarn_0","qc_sm_anywarn_1",
    "qc_sm_regime_0","qc_sm_regime_1"
]
if sm_5pd_src:
    df = replace_cols_on_row_id(df, sm_5pd_src, SM_FROM_5PD)
    print(f"[Stage 9] Replaced soil-moisture clean/QC from 5PD: {sm_5pd_src}")
else:
    step5r_src = pick_latest_by_priority(["stage5r_sm_regimes_v*.csv"])
    if step5r_src:
        SM_FROM_5R = [c for c in SM_FROM_5PD if not c.startswith("qc_sm_pairdiverge_")]
        df = replace_cols_on_row_id(df, step5r_src, SM_FROM_5R)
        print(f"[Stage 9] Replaced soil-moisture clean/QC from 5R: {step5r_src}")
    else:
        print("[Stage 9] WARNING: No 5PD/5R file found; SM clean/QC may be stale.")

# ---------- Bring in Stage 3 known-broken (SM + ST) if any columns missing ----------
ST3_SRC = pick_latest_by_priority([
    "stage3_sm_knownbroken_v*.csv",
    "stage3_*knownbroken_v*.csv",
])
KB_WANT = [
    "qc_sm_known_broken_0","qc_sm_known_broken_1",
    "qc_st_known_broken_0","qc_st_known_broken_1",
]
missing_kb = [c for c in KB_WANT if c not in df.columns]
if ST3_SRC and missing_kb:
    df = merge_missing_cols_on_row_id(df, ST3_SRC, missing_kb)
    print(f"[Stage 9] Backfilled known-broken flags from: {ST3_SRC}")

# ---------- Backfill (only if missing) for Soil Temperature (no ramp) ----------
ST_QC_WANT = [
    "qc_st_range_0","qc_st_range_1",
    "qc_st_flatline_0","qc_st_flatline_1",
    "qc_st_step_0","qc_st_step_1",
    "qc_st_anycrit_0","qc_st_anycrit_1",
    "qc_st_anywarn_0","qc_st_anywarn_1",
    "ground_temp_0_clean","ground_temp_1_clean",
]
st_missing = [c for c in ST_QC_WANT if c not in df.columns]
if st_missing:
    st_src = pick_latest_by_priority([
        "stage6c_soiltemp_step_v*.csv",
        "stage6b_soiltemp_flatline_v*.csv",
        "stage6a_soiltemp_range_v*.csv",
    ])
    if st_src:
        df = merge_missing_cols_on_row_id(df, st_src, st_missing)
        print(f"[Stage 9] Backfilled ST clean/QC from: {st_src}")

# ---------- ENFORCE: soil-temp known-broken masking & anycrit ----------
def coerce_bool_from_anycrit_like(s: pd.Series) -> pd.Series:
    """Accept bool series or strings like 'CRIT_*'/'OK' and return boolean."""
    if s.dtype == bool:
        return s
    return s.astype(str).str.startswith("CRIT")

for suf in ("0","1"):
    kbcol_st = f"qc_st_known_broken_{suf}"
    clean    = f"ground_temp_{suf}_clean"
    anycrit  = f"qc_st_anycrit_{suf}"
    raw      = f"ground_temp_{suf}"
    if clean not in df.columns and raw in df.columns:
        df[clean] = pd.to_numeric(df[raw], errors="coerce")
    if kbcol_st in df.columns:
        kb = df[kbcol_st].astype(str).str.startswith("CRIT")
        if clean in df.columns:
            df.loc[kb, clean] = np.nan
        if anycrit in df.columns:
            df[anycrit] = coerce_bool_from_anycrit_like(df[anycrit]) | kb
        else:
            df[anycrit] = kb

# ---------- Backfill (only if missing) for Radiation & Wind ----------
RW_QC_WANT = [
    "qc_rad_range", "qc_rad_anycrit", "qc_rad_anywarn", f"{RAD_COL}_clean",
    "qc_wind_mean_range", "qc_wind_mean_anycrit", "qc_wind_mean_anywarn", f"{W_COLS[1]}_clean",
    "qc_wind_min_range",  "qc_wind_min_anycrit",  "qc_wind_min_anywarn",  f"{W_COLS[0]}_clean",
    "qc_wind_max_range",  "qc_wind_max_anycrit",  "qc_wind_max_anywarn",  f"{W_COLS[2]}_clean",
    "qc_wind_order",
]
rw_missing = [c for c in RW_QC_WANT if c not in df.columns]
if rw_missing:
    rw_src = pick_latest_by_priority(["stage8_rad_wind_range_v*.csv"])
    if rw_src:
        df = merge_missing_cols_on_row_id(df, rw_src, rw_missing)
        print(f"[Stage 9] Backfilled RAD/WIND clean/QC from: {rw_src}")
# Default OKs if still absent
for c in ["qc_rad_range","qc_wind_mean_range","qc_wind_min_range","qc_wind_max_range","qc_wind_order"]:
    if c not in df.columns:
        df[c] = "OK"

# ---------- AIR: bring QC + inherit masked __pref if present; else rebuild from QC ----------
AIR_QC_WANT = ["qc_air_selfcheck_temp","qc_air_selfcheck_hum","qc_air_crosscheck_pres"]
for pat in ["stage7c_air_pres_qc_v*.csv", "stage7b_air_hum_qc_v*.csv", "stage7a_air_temp_qc_v*.csv"]:
    air_src = pick_latest_by_priority([pat])
    if air_src:
        df = merge_missing_cols_on_row_id(df, air_src, AIR_QC_WANT + ["temperature__pref","humidity__pref","pressure__pref","humidity_mask_reason"])
# normalize legacy names (rare)
if "qc_air_selfcheck_temp" not in df.columns and "qc_air_crosscheck_temp" in df.columns:
    df["qc_air_selfcheck_temp"] = df["qc_air_crosscheck_temp"]
if "qc_air_selfcheck_hum" not in df.columns and "qc_air_crosscheck_hum" in df.columns:
    df["qc_air_selfcheck_hum"] = df["qc_air_crosscheck_hum"]
for c in ["qc_air_crosscheck_temp","qc_air_crosscheck_hum"]:
    if c in df.columns:
        df.drop(columns=c, inplace=True)

# --- Build preferred series ---
def preferred(df_: pd.DataFrame, raw: str) -> pd.Series:
    c = f"{raw}_clean"
    return pd.to_numeric(df_[c], errors="coerce") if c in df_.columns else pd.to_numeric(df_[raw], errors="coerce")

# SM/ST/RAD/W from *_clean when present
pref = {}
for c in SM_COLS + ST_COLS:
    pref[c] = preferred(df, c)
pref[RAD_COL] = preferred(df, RAD_COL)
for c in W_COLS:
    pref[c] = preferred(df, c)

# Assign these __pref
for name, series in pref.items():
    df[f"{name}__pref"] = series

# AIR: if __pref already imported from Step 7, keep it; otherwise build from QC
for base, qc in [
    ("temperature", "qc_air_selfcheck_temp"),
    ("humidity",    "qc_air_selfcheck_hum"),
    ("pressure",    "qc_air_crosscheck_pres"),
]:
    if base not in df.columns:
        continue
    pref_col = f"{base}__pref"
    if pref_col in df.columns:
        df[pref_col] = pd.to_numeric(df[pref_col], errors="coerce")
    else:
        raw = pd.to_numeric(df[base], errors="coerce")
        if qc in df.columns:
            bad = df[qc].astype(str).str.startswith(("WARN","CRIT"), na=False)
            df[pref_col] = raw.mask(bad)
        else:
            df[pref_col] = raw
    df[f"ok_{base}"] = df[pref_col].notna()

def anycrit_bool(df_: pd.DataFrame, col: str) -> pd.Series:
    if col in df_.columns:
        s = df_[col]
        return s if s.dtype == bool else s.astype(str).str.startswith("CRIT")
    return pd.Series(False, index=df_.index, dtype=bool)

# ok_* for SM/ST from anycrit (legacy convention)
df["ok_sm0"] = ~anycrit_bool(df, "qc_sm_anycrit_0")
df["ok_sm1"] = ~anycrit_bool(df, "qc_sm_anycrit_1")
df["ok_st0"] = ~anycrit_bool(df, "qc_st_anycrit_0")
df["ok_st1"] = ~anycrit_bool(df, "qc_st_anycrit_1")

# Radiation & wind OK from explicit range flags (CRIT only masks)
df["ok_rad"]   = ~df["qc_rad_range"].astype(str).eq("CRIT_out_of_range")
df["ok_wmin"]  = ~df["qc_wind_min_range"].astype(str).eq("CRIT_out_of_range")
df["ok_wmean"] = ~df["qc_wind_mean_range"].astype(str).eq("CRIT_out_of_range")
df["ok_wmax"]  = ~df["qc_wind_max_range"].astype(str).eq("CRIT_out_of_range")

# Row-level rollups for air (optional, retained for continuity)
def _startswith(col, prefix):
    return df[col].astype(str).str.startswith(prefix, na=False) if col in df.columns else pd.Series(False, index=df.index)

air_warn = (_startswith("qc_air_selfcheck_temp","WARN") |
            _startswith("qc_air_selfcheck_hum","WARN")  |
            _startswith("qc_air_crosscheck_pres","WARN"))
air_crit = (_startswith("qc_air_selfcheck_temp","CRIT") |
            _startswith("qc_air_selfcheck_hum","CRIT")  |
            _startswith("qc_air_crosscheck_pres","CRIT"))
df["qc_air_anywarn"] = df.get("qc_air_anywarn", False) | air_warn
df["qc_air_anycrit"] = df.get("qc_air_anycrit", False) | air_crit
df["row_has_warn"] = df.get("row_has_warn", False) | df["qc_air_anywarn"]
df["row_has_crit"] = df.get("row_has_crit", False) | df["qc_air_anycrit"]

# =============== Choose & order columns for export ===============
id_cols   = [ROW_ID_COL, NODE_ID_COL, NODE_COL, STATION_TYPE]
time_cols = [TS_NS_COL, TS_ISO_COL, TIMESTAMP_COL]

pref_cols = (
    [f"{c}__pref" for c in SM_COLS] +
    [f"{c}__pref" for c in ST_COLS] +
    [f"{RAD_COL}__pref"] +
    [f"{c}__pref" for c in W_COLS] +
    [f"{c}__pref" for c in AIR_COLS]
)

ok_cols = [
    "ok_sm0","ok_sm1","ok_st0","ok_st1",
    "ok_rad","ok_wmin","ok_wmean","ok_wmax",
    "ok_temperature","ok_humidity","ok_pressure",
]

# Keep *_clean for SM/ST/RAD/W so plotting can compare raw vs clean directly
CLEAN_KEEP = [
    "ground_humidity_0_clean","ground_humidity_1_clean",
    "ground_temp_0_clean","ground_temp_1_clean",
    "global_radiation_clean",
    "wind_speed_min_clean","wind_speed_mean_clean","wind_speed_max_clean",
]
clean_present = [c for c in CLEAN_KEEP if c in df.columns]

# Keep air mask reason if present (from 7b lenient)
AIR_META_KEEP = [c for c in ["humidity_mask_reason"] if c in df.columns]

# all QC flags
qc_cols = [c for c in df.columns if c.startswith("qc_")]

# raw columns for traceability
raw_cols = list({*SM_COLS, *ST_COLS, RAD_COL, *W_COLS, *AIR_COLS})

export_cols = id_cols + time_cols + pref_cols + ok_cols + qc_cols + clean_present + AIR_META_KEEP
raw_cols = [c for c in raw_cols if c not in export_cols]
export_cols += raw_cols

# Deterministic order & write
df.sort_values([NODE_COL, TS_NS_COL, ROW_ID_COL], inplace=True, ignore_index=True)
out_df = df[export_cols].copy()

# Clean object columns for CSV
obj_cols = out_df.select_dtypes(include="object").columns
if len(obj_cols):
    out_df[obj_cols] = out_df[obj_cols].replace({r'[\r\n]': ' ', '"': '""'}, regex=True)

out_main = next_versioned_csv(OUT_DIR, "analysis_ready")

# First attempt: minimal quoting
out_df.to_csv(out_main, index=False, encoding="utf-8", quoting=csv.QUOTE_MINIMAL, lineterminator="\n")

# Verify; if mismatch, rewrite with QUOTE_ALL
_ok = True
try:
    _test = pd.read_csv(out_main, engine="c", low_memory=False)
    if list(_test.columns) != list(out_df.columns):
        _ok = False
except Exception as e:
    print("[Step 9] Re-read failed:", e)
    _ok = False

if not _ok:
    print("[Step 9] Rewriting with QUOTE_ALL for robustness…")
    out_df.to_csv(out_main, index=False, encoding="utf-8",
                  quoting=csv.QUOTE_ALL, lineterminator="\n")
    _test = pd.read_csv(out_main, engine="python", low_memory=False)
    assert list(_test.columns) == list(out_df.columns), "Post-rewrite verification failed."

print("Wrote analysis-ready file:", out_main)

# Optional: Parquet copy
try:
    out_parq = out_main.with_suffix(".parquet")
    out_df.to_parquet(out_parq, index=False)
    print("Wrote Parquet copy:", out_parq)
except Exception as e:
    print("Parquet write skipped:", e)

# ---- Small console summary
print("\nPreview of columns included:")
print(export_cols[:30], "...")
print(f"Total columns exported: {len(export_cols)}")
tmin = pd.to_datetime(df[TS_NS_COL].astype("int64")).min()
tmax = pd.to_datetime(df[TS_NS_COL].astype("int64")).max()
print("Time span (from ts_ns):", tmin, "→", tmax)
print("Rows in/out (should match input):", n_in, "→", len(df))

# Post-save sanity: quick presence checks
hdr_ar = pd.read_csv(out_main, nrows=0)
print("Has SM *_clean in AR?",
      all([(c in hdr_ar.columns) for c in ["ground_humidity_0_clean","ground_humidity_1_clean"] if c in clean_present]) )
print("Has air __pref in AR?",
      all([(f"{c}__pref" in hdr_ar.columns) for c in AIR_COLS]))


[pick] stage8_rad_wind_range_v*.csv -> af_clean_v01/stage8_rad_wind_range_v001.csv
Stage 9 INPUT: af_clean_v01/stage8_rad_wind_range_v001.csv
[pick] stage5pd_sm_pairdiverge_v*.csv -> af_clean_v01/stage5pd_sm_pairdiverge_v001.csv
[Stage 9] Replaced soil-moisture clean/QC from 5PD: af_clean_v01/stage5pd_sm_pairdiverge_v001.csv
[pick] stage3_sm_knownbroken_v*.csv -> af_clean_v01/stage3_sm_knownbroken_v001.csv
[pick] stage6c_soiltemp_step_v*.csv -> (no match)
[pick] stage6b_soiltemp_flatline_v*.csv -> af_clean_v01/stage6b_soiltemp_flatline_v001.csv
[Stage 9] Backfilled ST clean/QC from: af_clean_v01/stage6b_soiltemp_flatline_v001.csv
[pick] stage7c_air_pres_qc_v*.csv -> af_clean_v01/stage7c_air_pres_qc_v001.csv
[pick] stage7b_air_hum_qc_v*.csv -> af_clean_v01/stage7b_air_hum_qc_v001.csv
[pick] stage7a_air_temp_qc_v*.csv -> af_clean_v01/stage7a_air_temp_qc_v001.csv
Wrote analysis-ready file: af_clean_v01/analysis_ready_v001.csv
Wrote Parquet copy: af_clean_v01/analysis_ready_v001.parquet

P

**Stage 9 (analysis_ready)**
* Loaded the latest pipeline table (preferring Step 8) and rebuilt timestamp from ts_ns; verified no row loss and that row_id is unique.
* Chose a “preferred” value for each variable family, creating __pref columns:
* Uses *_clean if present (i.e., after masking CRIT and clamping WARN), otherwise falls back to the raw column.
* Built for: soil moisture (ground_humidity_0/1), soil temperature (ground_temp_0/1), radiation, wind (min/mean/max), and air triad (temperature/humidity/pressure—raw as there’s no masking step for air).
* Kept all QC flags (every column starting with qc_) so you can filter by them during analysis.
* Added convenience booleans that are True when the variable passed our CRIT masks:
* ok_sm0, ok_sm1, ok_st0, ok_st1, ok_rad, ok_wmin, ok_wmean, ok_wmax.
* Preserved provenance columns: IDs (row_id, node_id, node, station_type) and time (ts_ns, timestamp_iso, reconstructed timestamp).
* Optionally retained all raw measurement columns alongside the __pref series for traceability.
* Sorted deterministically by node, ts_ns, row_id and wrote a versioned analysis_ready_v###.csv.
* Printed breadcrumbs: preview of exported columns, total columns, min/max time (from ts_ns), and row-in/out equality.

In [48]:
# =========================
# Step 9: Assemble analysis-ready dataset (no 6d/ramp)
# - Prefers 6PD (soil-temp pair-diverge) and 5PD (soil-moisture pair-diverge)
# - Preserves Step 7 air masks; lenient humidity fallback baked-in
# - Exports *_clean (SM/ST/RAD/W), all __pref, all QC flags, ok_* booleans
# =========================
import pandas as pd
import numpy as np
from pathlib import Path
from glob import glob
import csv, os, re

OUT_DIR = Path("af_clean_v01")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Core identity/time columns
ROW_ID_COL    = "row_id"
TS_NS_COL     = "ts_ns"
TS_ISO_COL    = "timestamp_iso"
TIMESTAMP_COL = "timestamp"   # in-memory datetime64[ns]
NODE_COL      = "node"
NODE_ID_COL   = "node_id"
STATION_TYPE  = "station_type"

# Variable families
SM_COLS  = ["ground_humidity_0", "ground_humidity_1"]
ST_COLS  = ["ground_temp_0", "ground_temp_1"]
RAD_COL  = "global_radiation"
W_COLS   = ["wind_speed_min", "wind_speed_mean", "wind_speed_max"]
AIR_COLS = ["temperature", "humidity", "pressure"]  # Step 7 provides __pref + QC

def next_versioned_csv(out_dir: Path, stem: str) -> Path:
    i = 1
    while True:
        p = out_dir / f"{stem}_v{i:03d}.csv"
        if not p.exists():
            return p
        i += 1

def pick_latest_by_priority(patterns):
    """Return latest path from the FIRST pattern that has matches (by filename order)."""
    for pat in patterns:
        files = sorted(glob(str(OUT_DIR / pat)))
        if files:
            print(f"[pick] {pat} -> {files[-1]}")
            return files[-1]
        else:
            print(f"[pick] {pat} -> (no match)")
    return None

def merge_missing_cols_on_row_id(df_base: pd.DataFrame, src_path: str, want_cols: list) -> pd.DataFrame:
    """Only add columns that are currently missing; avoids duplicate merges."""
    if not want_cols or src_path is None:
        return df_base
    hdr = pd.read_csv(src_path, nrows=0, low_memory=False)
    have = [c for c in want_cols if (c in hdr.columns) and (c not in df_base.columns)]
    if not have:
        return df_base
    usecols = [ROW_ID_COL] + have
    add = pd.read_csv(src_path, usecols=usecols, low_memory=False)
    return df_base.merge(add, on=ROW_ID_COL, how="left")

def replace_cols_on_row_id(df_base: pd.DataFrame, src_path: str, want_cols: list) -> pd.DataFrame:
    """Overwrite df_base[want_cols] using src_path (matched by row_id)."""
    if not want_cols or src_path is None:
        return df_base
    hdr = pd.read_csv(src_path, nrows=0, low_memory=False)
    have = [c for c in want_cols if c in hdr.columns]
    if not have:
        return df_base
    usecols = [ROW_ID_COL] + have
    add = pd.read_csv(src_path, usecols=usecols, low_memory=False)
    df_out = df_base.drop(columns=[c for c in have if c in df_base.columns], errors="ignore")
    return df_out.merge(add, on=ROW_ID_COL, how="left")

# ---- Load the most advanced stage available (prefer Stage 8, then 7, then 6PD, …) ----
in_path = pick_latest_by_priority([
    "stage8_rad_wind_range_v*.csv",
    "stage7c_air_pres_qc_v*.csv",
    "stage7b_air_hum_qc_v*.csv",
    "stage7a_air_temp_qc_v*.csv",
    "stage6pd_soiltemp_pairdiverge_v*.csv",   # ← NEW: recognize 6PD
    "stage6c_soiltemp_step_v*.csv",
    "stage6b_soiltemp_flatline_v*.csv",
    "stage6a_soiltemp_range_v*.csv",
    "stage5pd_sm_pairdiverge_v*.csv",
    "stage5r_sm_regimes_v*.csv",
    "stage4_sm_flatline_v*.csv",
    "stage3_sm_knownbroken_v*.csv",
    "stage2_soilmoisture_range_v*.csv",
    "stage1_parsed_v*.csv",
])
assert in_path is not None, "No prior stage files found."
print("Stage 9 INPUT:", in_path)

df = pd.read_csv(in_path, low_memory=False)
n_in = len(df)

# ---- Core checks & authoritative timestamp ----
required_core = {ROW_ID_COL, TS_NS_COL, TS_ISO_COL, STATION_TYPE}
missing_core = required_core - set(df.columns)
if missing_core:
    raise ValueError(f"Missing core columns: {sorted(missing_core)}")

# Authoritative time from ts_ns (tolerant to nulls)
df[TIMESTAMP_COL] = pd.to_datetime(pd.to_numeric(df[TS_NS_COL], errors="coerce").astype("Int64"), utc=True, errors="coerce")

# ---- Drop stale soil-temp ramp flags (6d removed) ----
ramp_cols = [c for c in df.columns if c.startswith("qc_st_ramp_")]
if ramp_cols:
    print(f"[Stage 9] Dropping stale ramp flags: {ramp_cols}")
    df.drop(columns=ramp_cols, inplace=True, errors="ignore")

# ========= Soil moisture: prefer 5PD (pair-diverge) over 5R =========
sm_5pd_src = pick_latest_by_priority(["stage5pd_sm_pairdiverge_v*.csv"])
SM_FROM_5PD = [
    "ground_humidity_0_clean","ground_humidity_1_clean",
    "qc_sm_pairdiverge_0","qc_sm_pairdiverge_1",
    "qc_sm_pairdiverge_src_0","qc_sm_pairdiverge_src_1",
    "qc_sm_pairdiverge_nref_0","qc_sm_pairdiverge_nref_1",
    "qc_sm_pairdiverge_corr_0","qc_sm_pairdiverge_corr_1",
    "qc_sm_range_0","qc_sm_range_1",
    "qc_sm_flatline_0","qc_sm_flatline_1",
    "qc_sm_known_broken_0","qc_sm_known_broken_1",
    "qc_sm_anycrit_0","qc_sm_anycrit_1",
    "qc_sm_anywarn_0","qc_sm_anywarn_1",
    "qc_sm_regime_0","qc_sm_regime_1",
]
if sm_5pd_src:
    df = replace_cols_on_row_id(df, sm_5pd_src, SM_FROM_5PD)
    print(f"[Stage 9] Replaced soil-moisture clean/QC from 5PD: {sm_5pd_src}")
else:
    step5r_src = pick_latest_by_priority(["stage5r_sm_regimes_v*.csv"])
    if step5r_src:
        SM_FROM_5R = [c for c in SM_FROM_5PD if not c.startswith("qc_sm_pairdiverge")]
        df = replace_cols_on_row_id(df, step5r_src, SM_FROM_5R)
        print(f"[Stage 9] Replaced soil-moisture clean/QC from 5R: {step5r_src}")
    else:
        print("[Stage 9] WARNING: No 5PD/5R file found; SM clean/QC may be stale.")

# ---------- Bring in Stage 3 known-broken (SM + ST) if any columns missing ----------
ST3_SRC = pick_latest_by_priority([
    "stage3_sm_knownbroken_v*.csv",
    "stage3_*knownbroken_v*.csv",
])
KB_WANT = [
    "qc_sm_known_broken_0","qc_sm_known_broken_1",
    "qc_st_known_broken_0","qc_st_known_broken_1",
]
missing_kb = [c for c in KB_WANT if c not in df.columns]
if ST3_SRC and missing_kb:
    df = merge_missing_cols_on_row_id(df, ST3_SRC, missing_kb)
    print(f"[Stage 9] Backfilled known-broken flags from: {ST3_SRC}")

# ---------- Soil Temperature: PREFER 6PD, else backfill from 6c/6b/6a ----------
ST_FROM_6PD = [
    "ground_temp_0_clean","ground_temp_1_clean",
    "qc_st_pairdiverge_0","qc_st_pairdiverge_1",
    "qc_st_pairdiverge_src_0","qc_st_pairdiverge_src_1",
    "qc_st_pairdiverge_nref_0","qc_st_pairdiverge_nref_1",
    "qc_st_pairdiverge_corr_0","qc_st_pairdiverge_corr_1",
    "qc_st_range_0","qc_st_range_1",
    "qc_st_flatline_0","qc_st_flatline_1",
    "qc_st_known_broken_0","qc_st_known_broken_1",
    "qc_st_anycrit_0","qc_st_anycrit_1",
    "qc_st_anywarn_0","qc_st_anywarn_1",
]
st_6pd_src = pick_latest_by_priority(["stage6pd_soiltemp_pairdiverge_v*.csv"])
if st_6pd_src:
    df = replace_cols_on_row_id(df, st_6pd_src, ST_FROM_6PD)
    print(f"[Stage 9] Replaced soil-temp clean/QC from 6PD: {st_6pd_src}")
else:
    ST_QC_WANT = [
        "qc_st_range_0","qc_st_range_1",
        "qc_st_flatline_0","qc_st_flatline_1",
        "qc_st_step_0","qc_st_step_1",
        "qc_st_anycrit_0","qc_st_anycrit_1",
        "qc_st_anywarn_0","qc_st_anywarn_1",
        "ground_temp_0_clean","ground_temp_1_clean",
    ]
    st_missing = [c for c in ST_QC_WANT if c not in df.columns]
    if st_missing:
        st_src = pick_latest_by_priority([
            "stage6c_soiltemp_step_v*.csv",
            "stage6b_soiltemp_flatline_v*.csv",
            "stage6a_soiltemp_range_v*.csv",
        ])
        if st_src:
            df = merge_missing_cols_on_row_id(df, st_src, st_missing)
            print(f"[Stage 9] Backfilled ST clean/QC from: {st_src}")

# ---------- ENFORCE: soil-temp anycrit to include pair-diverge & known-broken ----------
def _as_bool(s, idx):
    if isinstance(s, pd.Series):
        if s.dtype == bool: return s.fillna(False)
        return s.astype(str).str.lower().isin(["true","1","t","y","yes"]).reindex(idx, fill_value=False)
    return pd.Series(False, index=idx)

for suf in ("0","1"):
    kbcol   = f"qc_st_known_broken_{suf}"
    anycrit = f"qc_st_anycrit_{suf}"
    pdv     = f"qc_st_pairdiverge_{suf}"
    clean   = f"ground_temp_{suf}_clean"
    raw     = f"ground_temp_{suf}"

    if clean not in df.columns and raw in df.columns:
        df[clean] = pd.to_numeric(df[raw], errors="coerce")

    kb_mask   = df[kbcol].astype(str).str.startswith("CRIT", na=False) if kbcol in df.columns else pd.Series(False, index=df.index)
    pdv_mask  = df[pdv].astype(str).str.startswith("CRIT", na=False) if pdv in df.columns else pd.Series(False, index=df.index)
    anycrit_s = _as_bool(df.get(anycrit, False), df.index) | kb_mask | pdv_mask
    df[anycrit] = anycrit_s
    if clean in df.columns:
        df.loc[anycrit_s, clean] = np.nan

# ---------- Backfill (only if missing) for Radiation & Wind ----------
RW_QC_WANT = [
    "qc_rad_range", "qc_rad_anycrit", "qc_rad_anywarn", f"{RAD_COL}_clean",
    "qc_wind_mean_range", "qc_wind_mean_anycrit", "qc_wind_mean_anywarn", f"{W_COLS[1]}_clean",
    "qc_wind_min_range",  "qc_wind_min_anycrit",  "qc_wind_min_anywarn",  f"{W_COLS[0]}_clean",
    "qc_wind_max_range",  "qc_wind_max_anycrit",  "qc_wind_max_anywarn",  f"{W_COLS[2]}_clean",
    "qc_wind_order",
]
rw_missing = [c for c in RW_QC_WANT if c not in df.columns]
if rw_missing:
    rw_src = pick_latest_by_priority(["stage8_rad_wind_range_v*.csv"])
    if rw_src:
        df = merge_missing_cols_on_row_id(df, rw_src, rw_missing)
        print(f"[Stage 9] Backfilled RAD/WIND clean/QC from: {rw_src}")
for c in ["qc_rad_range","qc_wind_mean_range","qc_wind_min_range","qc_wind_max_range","qc_wind_order"]:
    if c not in df.columns:
        df[c] = "OK"

# ---------- AIR: bring QC + existing __pref + reason from Step 7 ----------
AIR_QC_WANT   = ["qc_air_selfcheck_temp","qc_air_selfcheck_hum","qc_air_crosscheck_pres"]
AIR_PREF_WANT = ["temperature__pref","humidity__pref","pressure__pref","humidity_mask_reason"]

for pat in ["stage7c_air_pres_qc_v*.csv", "stage7b_air_hum_qc_v*.csv", "stage7a_air_temp_qc_v*.csv"]:
    air_src = pick_latest_by_priority([pat])
    if air_src:
        # QC fields: add if missing
        df = merge_missing_cols_on_row_id(df, air_src, AIR_QC_WANT)
        # __pref + reason: ALWAYS REPLACE from Step 7 artifacts
        df = replace_cols_on_row_id(df, air_src, AIR_PREF_WANT)

# --- Build preferred series for SM/ST/RAD/W from *_clean when present ---
def preferred(df_: pd.DataFrame, raw: str) -> pd.Series:
    c = f"{raw}_clean"
    return pd.to_numeric(df_[c], errors="coerce") if c in df_.columns else pd.to_numeric(df_[raw], errors="coerce")

pref = {}
for c in SM_COLS + ST_COLS:
    if c in df.columns:
        pref[c] = preferred(df, c)
if RAD_COL in df.columns:
    pref[RAD_COL] = preferred(df, RAD_COL)
for c in W_COLS:
    if c in df.columns:
        pref[c] = preferred(df, c)
for name, series in pref.items():
    df[f"{name}__pref"] = series

# --- Build preferred series for AIR (preserve Step 7 values; humidity lenient fallback) ---
for base, qc in [
    ("temperature", "qc_air_selfcheck_temp"),
    ("humidity",    "qc_air_selfcheck_hum"),
    ("pressure",    "qc_air_crosscheck_pres"),
]:
    if base not in df.columns:
        continue
    pref_col = f"{base}__pref"

    if pref_col in df.columns:
        df[pref_col] = pd.to_numeric(df[pref_col], errors="coerce")  # keep Step 7 result
    else:
        raw = pd.to_numeric(df[base], errors="coerce")
        if base == "humidity":
            # LENIENT policy: mask if CRIT or reason != ""
            qc_series = df.get(qc, pd.Series("", index=df.index)).astype(str)
            crit = qc_series.str.startswith("CRIT", na=False)
            reason = df.get("humidity_mask_reason", pd.Series("", index=df.index)).astype(str)
            masked_needed = crit | (reason != "")
            df[pref_col] = raw.mask(masked_needed)
        else:
            # Temperature & pressure: mask WARN/CRIT
            if qc in df.columns:
                bad = df[qc].astype(str).str.startswith(("WARN","CRIT"), na=False)
                df[pref_col] = raw.mask(bad)
            else:
                df[pref_col] = raw

    df[f"ok_{base}"] = df[pref_col].notna()


# ---- Enforce lenient humidity (CRIT or reason != "" -> mask) ----
need = {"humidity","humidity__pref","qc_air_selfcheck_hum","humidity_mask_reason"}
if need.issubset(df.columns):
    _raw    = pd.to_numeric(df["humidity"], errors="coerce")
    _qc     = df["qc_air_selfcheck_hum"].astype(str)
    # FIX: treat missing reason as empty, and strip whitespace
    _reason = df["humidity_mask_reason"].fillna("").astype(str).str.strip()
    _need   = _qc.str.startswith("CRIT", na=False) | (_reason != "")
    df["humidity__pref"] = _raw.mask(_need)
    df["ok_humidity"]    = df["humidity__pref"].notna()


def anycrit_bool(df_: pd.DataFrame, col: str) -> pd.Series:
    if col in df_.columns:
        s = df_[col]
        return s if s.dtype == bool else s.astype(str).str.startswith("CRIT", na=False)
    return pd.Series(False, index=df_.index, dtype=bool)

# ok_* for SM/ST from anycrit (includes 6PD via enforcement above)
df["ok_sm0"] = ~anycrit_bool(df, "qc_sm_anycrit_0") if "qc_sm_anycrit_0" in df.columns else False
df["ok_sm1"] = ~anycrit_bool(df, "qc_sm_anycrit_1") if "qc_sm_anycrit_1" in df.columns else False
df["ok_st0"] = ~anycrit_bool(df, "qc_st_anycrit_0") if "qc_st_anycrit_0" in df.columns else False
df["ok_st1"] = ~anycrit_bool(df, "qc_st_anycrit_1") if "qc_st_anycrit_1" in df.columns else False

# Radiation & wind OK from explicit range flags (CRIT only masks)
if "qc_rad_range" in df.columns:
    df["ok_rad"]   = ~df["qc_rad_range"].astype(str).eq("CRIT_out_of_range")
for w,label in [("min","ok_wmin"),("mean","ok_wmean"),("max","ok_wmax")]:
    qc = f"qc_wind_{w}_range"
    if qc in df.columns:
        df[label] = ~df[qc].astype(str).eq("CRIT_out_of_range")

# Row-level air rollups (retain for continuity)
def _startswith(col, prefix):
    return df[col].astype(str).str.startswith(prefix, na=False) if col in df.columns else pd.Series(False, index=df.index)

air_warn = (_startswith("qc_air_selfcheck_temp","WARN") |
            _startswith("qc_air_selfcheck_hum","WARN")  |
            _startswith("qc_air_crosscheck_pres","WARN"))
air_crit = (_startswith("qc_air_selfcheck_temp","CRIT") |
            _startswith("qc_air_selfcheck_hum","CRIT")  |
            _startswith("qc_air_crosscheck_pres","CRIT"))
df["qc_air_anywarn"] = (anycrit_bool(df, "qc_air_anywarn") | air_warn).astype(bool)
df["qc_air_anycrit"] = (anycrit_bool(df, "qc_air_anycrit") | air_crit).astype(bool)
df["row_has_warn"] = (anycrit_bool(df, "row_has_warn") | df["qc_air_anywarn"]).astype(bool)
df["row_has_crit"] = (anycrit_bool(df, "row_has_crit") | df["qc_air_anycrit"]).astype(bool)

# =============== Choose & order columns for export ===============
id_cols   = [c for c in [ROW_ID_COL, NODE_ID_COL, NODE_COL, STATION_TYPE] if c in df.columns]
time_cols = [c for c in [TS_NS_COL, TS_ISO_COL, TIMESTAMP_COL] if c in df.columns]

pref_cols = (
    [f"{c}__pref" for c in SM_COLS if f"{c}__pref" in df.columns] +
    [f"{c}__pref" for c in ST_COLS if f"{c}__pref" in df.columns] +
    ([f"{RAD_COL}__pref"] if f"{RAD_COL}__pref" in df.columns else []) +
    [f"{c}__pref" for c in W_COLS if f"{c}__pref" in df.columns] +
    [f"{c}__pref" for c in AIR_COLS if f"{c}__pref" in df.columns]
)

ok_cols = [c for c in [
    "ok_sm0","ok_sm1","ok_st0","ok_st1",
    "ok_rad","ok_wmin","ok_wmean","ok_wmax",
    "ok_temperature","ok_humidity","ok_pressure",
] if c in df.columns]

# Keep *_clean for SM/ST/RAD/W so plotting can compare raw vs clean directly
CLEAN_KEEP = [
    "ground_humidity_0_clean","ground_humidity_1_clean",
    "ground_temp_0_clean","ground_temp_1_clean",
    "global_radiation_clean",
    "wind_speed_min_clean","wind_speed_mean_clean","wind_speed_max_clean",
]
clean_present = [c for c in CLEAN_KEEP if c in df.columns]

# Keep air mask reason if present (from 7b lenient)
AIR_META_KEEP = [c for c in ["humidity_mask_reason"] if c in df.columns]

# all QC flags
qc_cols = [c for c in df.columns if c.startswith("qc_")]

# raw columns for traceability (only those that exist)
raw_cols = [c for c in { *SM_COLS, *ST_COLS, RAD_COL, *W_COLS, *AIR_COLS } if c in df.columns]

export_cols = id_cols + time_cols + pref_cols + ok_cols + qc_cols + clean_present + AIR_META_KEEP
raw_cols = [c for c in raw_cols if c not in export_cols]
export_cols += raw_cols

# Deterministic order & write
node_sort = NODE_COL if NODE_COL in df.columns else (NODE_ID_COL if NODE_ID_COL in df.columns else None)
sort_keys = ([node_sort] if node_sort else []) + [TS_NS_COL, ROW_ID_COL]
sort_keys = [k for k in sort_keys if k in df.columns]
df.sort_values(sort_keys, inplace=True, ignore_index=True)
out_df = df[export_cols].copy()

# Clean object columns for CSV
obj_cols = out_df.select_dtypes(include="object").columns
if len(obj_cols):
    out_df[obj_cols] = out_df[obj_cols].replace({r'[\r\n]': ' ', '"': '""'}, regex=True)

out_main = next_versioned_csv(OUT_DIR, "analysis_ready")

# First attempt: minimal quoting
out_df.to_csv(out_main, index=False, encoding="utf-8", quoting=csv.QUOTE_MINIMAL, lineterminator="\n")

# Verify; if mismatch, rewrite with QUOTE_ALL
_ok = True
try:
    _test = pd.read_csv(out_main, engine="c", low_memory=False)
    if list(_test.columns) != list(out_df.columns):
        _ok = False
except Exception as e:
    print("[Step 9] Re-read failed:", e)
    _ok = False

if not _ok:
    print("[Step 9] Rewriting with QUOTE_ALL for robustness…")
    out_df.to_csv(out_main, index=False, encoding="utf-8",
                  quoting=csv.QUOTE_ALL, lineterminator="\n")
    _test = pd.read_csv(out_main, engine="python", low_memory=False)
    assert list(_test.columns) == list(out_df.columns), "Post-rewrite verification failed."

print("Wrote analysis-ready file:", out_main)

# Optional: Parquet copy
try:
    out_parq = Path(str(out_main)).with_suffix(".parquet")
    out_df.to_parquet(out_parq, index=False)
    print("Wrote Parquet copy:", out_parq)
except Exception as e:
    print("Parquet write skipped:", e)

# ---- Small console summary
print("\nPreview of columns included:")
print(export_cols[:30], "...")
print(f"Total columns exported: {len(export_cols)}")
tmin = pd.to_datetime(pd.to_numeric(df[TS_NS_COL], errors="coerce").astype("Int64"), utc=True, errors="coerce").min()
tmax = pd.to_datetime(pd.to_numeric(df[TS_NS_COL], errors="coerce").astype("Int64"), utc=True, errors="coerce").max()
print("Time span (from ts_ns):", tmin, "→", tmax)
print("Rows in/out (should match input):", n_in, "→", len(df))

# ---- Guardrail: lenient humidity must hold if columns exist
need = {"humidity","humidity__pref","qc_air_selfcheck_hum","humidity_mask_reason"}
if need.issubset(df.columns):
    _raw    = pd.to_numeric(df["humidity"], errors="coerce")
    _pref   = pd.to_numeric(df["humidity__pref"], errors="coerce")
    _qc     = df["qc_air_selfcheck_hum"].astype(str)
    # FIX here too
    _reason = df["humidity_mask_reason"].fillna("").astype(str).str.strip()
    _need   = _qc.str.startswith("CRIT", na=False) | (_reason != "")
    viol = int(((_need & _raw.notna()) & _pref.notna()).sum())
    assert viol == 0, f"lenient humidity violated for {viol} rows"


[pick] stage8_rad_wind_range_v*.csv -> af_clean_v01/stage8_rad_wind_range_v001.csv
Stage 9 INPUT: af_clean_v01/stage8_rad_wind_range_v001.csv
[pick] stage5pd_sm_pairdiverge_v*.csv -> af_clean_v01/stage5pd_sm_pairdiverge_v001.csv
[Stage 9] Replaced soil-moisture clean/QC from 5PD: af_clean_v01/stage5pd_sm_pairdiverge_v001.csv
[pick] stage3_sm_knownbroken_v*.csv -> af_clean_v01/stage3_sm_knownbroken_v001.csv
[pick] stage6pd_soiltemp_pairdiverge_v*.csv -> af_clean_v01/stage6pd_soiltemp_pairdiverge_v008.csv
[Stage 9] Replaced soil-temp clean/QC from 6PD: af_clean_v01/stage6pd_soiltemp_pairdiverge_v008.csv
[pick] stage7c_air_pres_qc_v*.csv -> (no match)
[pick] stage7b_air_hum_qc_v*.csv -> af_clean_v01/stage7b_air_hum_qc_v001.csv
[pick] stage7a_air_temp_qc_v*.csv -> af_clean_v01/stage7a_air_temp_qc_v001.csv
Wrote analysis-ready file: af_clean_v01/analysis_ready_v006.csv
Wrote Parquet copy: af_clean_v01/analysis_ready_v006.parquet

Preview of columns included:
['row_id', 'node_id', 'node', 's

In [49]:
# === Analysis-Ready (Step 9) — Sanity Audit v2 ===
# Verifies *_clean / __pref vs QC rules, lenient humidity, ok_* consistency.
import pandas as pd, numpy as np, glob, os, re

# --- locate latest AR ---
ARs = sorted(glob.glob("af_clean_v01/analysis_ready_v*.csv"),
             key=lambda p: int(re.search(r"_v(\d+)\.csv$", os.path.basename(p)).group(1)) if re.search(r"_v(\d+)\.csv$", os.path.basename(p)) else 0)
if not ARs:
    ARs = sorted(glob.glob("analysis_ready_v*.csv"))
assert ARs, "No analysis_ready_v*.csv found."
AR = ARs[-1]
df = pd.read_csv(AR, low_memory=False)
print(f"[AR] {AR}  rows={len(df):,}")

# ---------- helpers ----------
def pct(a, b):
    b = int(b) or 1
    return f"{(100.0*int(a)/b):5.2f}%"

def pbool(s):
    if isinstance(s, pd.Series):
        if s.dtype == bool: return s.fillna(False)
        return s.astype(str).str.lower().isin(["true","1","t","y","yes"])
    return pd.Series(False, index=range(len(df)))

def passfail(name, ok, extra=""):
    print(f"[{name:<28}] {'PASS' if ok else 'FAIL'} {extra}")

def exists(*cols):
    missing = [c for c in cols if c not in df.columns]
    return (len(missing) == 0, missing)

# node/time basics
node_col = "node" if "node" in df.columns else ("node_id" if "node_id" in df.columns else None)

# --- core columns present? ---
core_ok, core_missing = exists("row_id","ts_ns","timestamp_iso","timestamp","station_type")
passfail("core columns", core_ok, f"missing={core_missing}" if core_missing else "")

# --- row_id uniqueness ---
passfail("row_id unique", df["row_id"].is_unique)

# --- time monotonic within node (or globally if no node col) ---
t = pd.to_datetime(pd.to_numeric(df["ts_ns"], errors="coerce").astype("Int64"), utc=True, errors="coerce")
if node_col:
    bad_nodes = []
    for n, g in df.assign(__t=t).groupby(df[node_col].astype(str), sort=False):
        if not g["__t"].is_monotonic_increasing:
            bad_nodes.append(n)
    passfail("time monotonic by node", len(bad_nodes)==0, f"nonmonotonic_nodes={bad_nodes[:5]}{'...' if len(bad_nodes)>5 else ''}")
else:
    passfail("time monotonic", t.is_monotonic_increasing)

# ---------------- SM & ST consistency (anycrit → clean is NaN; clean NaN→anycrit) ----------------
def check_probe(raw, clean, anycrit, label):
    if not all(c in df.columns for c in [raw, clean, anycrit]):
        passfail(label, True, "skipped (cols missing)")
        return
    r = pd.to_numeric(df[raw], errors="coerce")
    c = pd.to_numeric(df[clean], errors="coerce")
    ac = pbool(df[anycrit])
    fail1 = int(((ac & r.notna()) & c.notna()).sum())     # anycrit but clean not NaN
    fail2 = int(((r.notna() & c.isna()) & ~ac).sum())     # clean NaN but not anycrit
    masked_share = pct((r.notna() & c.isna()).sum(), r.notna().sum())
    passfail(label, (fail1==0 and fail2==0), f"masked={masked_share}  f1={fail1} f2={fail2}")

for suf in ("0","1"):
    check_probe(f"ground_humidity_{suf}", f"ground_humidity_{suf}_clean", f"qc_sm_anycrit_{suf}", f"SM{suf} anycrit→NaN")
    check_probe(f"ground_temp_{suf}",     f"ground_temp_{suf}_clean",     f"qc_st_anycrit_{suf}", f"ST{suf} anycrit→NaN")

# ---------------- Radiation/Wind range rules ----------------
# WARN → clamped to boundary; CRIT → NaN
RAD_LOWER_OK, RAD_UPPER_OK = 0.0, 1200.0
W_LOWER_OK,   W_UPPER_OK   = 0.0, 100.0

def check_range(raw, clean, qc, low_ok, hi_ok, label):
    if not all(c in df.columns for c in [raw, clean, qc]):
        passfail(f"{label} range qc", True, "skipped (cols missing)")
        return
    v  = pd.to_numeric(df[raw], errors="coerce")
    cl = pd.to_numeric(df[clean], errors="coerce")
    q  = df[qc].astype(str)

    crit = q.eq("CRIT_out_of_range") & v.notna()
    f1   = int((crit & cl.notna()).sum())  # CRIT must be NaN

    warn = q.eq("WARN_boundary") & v.notna() & cl.notna()
    f2   = int((warn & ~(np.isclose(cl, low_ok) | np.isclose(cl, hi_ok))).sum())  # WARN must clamp

    ok   = q.eq("OK") & v.notna()
    f3   = int((ok & cl.isna()).sum())  # OK should not be NaN if raw present (soft)

    masked_share = pct((v.notna() & cl.isna()).sum(), v.notna().sum())
    passfail(f"{label} range qc", (f1==0 and f2==0), f"masked={masked_share}  crit→not-NaN={f1}  warn→not-clamped={f2}  ok→NaN={f3}")

check_range("global_radiation","global_radiation_clean","qc_rad_range",
            RAD_LOWER_OK, RAD_UPPER_OK, "Radiation")
check_range("wind_speed_min","wind_speed_min_clean","qc_wind_min_range",
            W_LOWER_OK, W_UPPER_OK, "Wind min")
check_range("wind_speed_mean","wind_speed_mean_clean","qc_wind_mean_range",
            W_LOWER_OK, W_UPPER_OK, "Wind mean")
check_range("wind_speed_max","wind_speed_max_clean","qc_wind_max_range",
            W_LOWER_OK, W_UPPER_OK, "Wind max")

# ---------------- Air variables ----------------
def check_air_mask_temp_pressure(base, qc_col):
    label = f"{base} mask(WARN/CRIT)"
    if not all(c in df.columns for c in [base, qc_col]):
        passfail(label, True, "skipped (cols missing)"); return
    pref_col = f"{base}__pref"
    if pref_col not in df.columns:
        passfail(label, True, "skipped (pref missing)"); return
    raw  = pd.to_numeric(df[base], errors="coerce")
    pref = pd.to_numeric(df[pref_col], errors="coerce")
    qc   = df[qc_col].astype(str)
    bad  = qc.str.startswith(("WARN","CRIT"), na=False)
    f1 = int(((bad & raw.notna()) & pref.notna()).sum())      # should be masked but isn't
    f2 = int(((raw.notna() & pref.isna()) & ~bad).sum())      # masked though QC is OK
    passfail(label, f1==0 and f2==0, f"viol_warncrit={f1}  masked_but_ok={f2}")

def check_air_mask_humidity_lenient():
    label = "humidity lenient mask"
    need = ["humidity","humidity__pref","qc_air_selfcheck_hum","humidity_mask_reason"]
    if not all(c in df.columns for c in need):
        passfail(label, True, "skipped (cols missing)"); return
    raw    = pd.to_numeric(df["humidity"], errors="coerce")
    pref   = pd.to_numeric(df["humidity__pref"], errors="coerce")
    qc     = df["qc_air_selfcheck_hum"].astype(str)
    reason = df["humidity_mask_reason"].fillna("").astype(str).str.strip()
    crit = qc.str.startswith("CRIT", na=False)
    masked_needed = crit | (reason != "")
    viol_needed = int(((masked_needed & raw.notna()) & pref.notna()).sum())
    masked_but_not_needed = int(((raw.notna() & pref.isna()) & ~masked_needed).sum())
    passfail(label, viol_needed==0 and masked_but_not_needed==0,
             f"viol_needed={viol_needed}  masked_but_not_needed={masked_but_not_needed}")

check_air_mask_temp_pressure("temperature", "qc_air_selfcheck_temp")
# pressure QC might not exist yet; safe skip if missing
if "pressure" in df.columns:
    check_air_mask_temp_pressure("pressure", "qc_air_crosscheck_pres")
check_air_mask_humidity_lenient()

# ---------------- ok_* booleans ----------------
def same_bool(a, b):
    A = pbool(a) if not (isinstance(a, pd.Series) and a.dtype == bool) else a
    B = pbool(b) if not (isinstance(b, pd.Series) and b.dtype == bool) else b
    return (A.fillna(False) == B.fillna(False))

def expect_ok_flag(flag_col, expect_series, label):
    if flag_col not in df.columns:
        passfail(label, True, "skipped (flag missing)"); return
    ok = pbool(df[flag_col]) if df[flag_col].dtype != bool else df[flag_col]
    cmp = same_bool(ok, expect_series)
    bad = int((~cmp).sum())
    passfail(label, bad==0, f"mismatches={bad}")

# SM/ST ok_* (should be ~ !anycrit)
for suf, flag in [("0","ok_sm0"),("1","ok_sm1")]:
    if f"qc_sm_anycrit_{suf}" in df.columns and flag in df.columns:
        expect_ok_flag(flag, ~pbool(df[f"qc_sm_anycrit_{suf}"]), f"{flag} ~ !anycrit")
for suf, flag in [("0","ok_st0"),("1","ok_st1")]:
    if f"qc_st_anycrit_{suf}" in df.columns and flag in df.columns:
        expect_ok_flag(flag, ~pbool(df[f"qc_st_anycrit_{suf}"]), f"{flag} ~ !anycrit")

# Radiation & wind ok_* from CRIT status
if all(c in df.columns for c in ["qc_rad_range","ok_rad"]):
    expect_ok_flag("ok_rad",  ~df["qc_rad_range"].astype(str).eq("CRIT_out_of_range"), "ok_rad")
for w,label in [("min","ok_wmin"),("mean","ok_wmean"),("max","ok_wmax")]:
    qc = f"qc_wind_{w}_range"
    if all(c in df.columns for c in [qc,label]):
        expect_ok_flag(label, ~df[qc].astype(str).eq("CRIT_out_of_range"), label)

# Air ok_* from __pref notna
for base in ["temperature","humidity","pressure"]:
    flag = f"ok_{base}"; pref = f"{base}__pref"
    if all(c in df.columns for c in [flag,pref]):
        expect_ok_flag(flag, df[pref].notna(), f"{flag} == pref.notna()")

# -------------- masked-share summary --------------
def masked_share(series_raw, series_pref):
    r = pd.to_numeric(series_raw, errors="coerce")
    p = pd.to_numeric(series_pref, errors="coerce")
    den = int(r.notna().sum()) or 1
    return (int((r.notna() & p.isna()).sum()), den, pct((r.notna() & p.isna()).sum(), den))

rows = []
pairs = [
    ("ground_humidity_0","ground_humidity_0__pref"),
    ("ground_humidity_1","ground_humidity_1__pref"),
    ("ground_temp_0","ground_temp_0__pref"),
    ("ground_temp_1","ground_temp_1__pref"),
    ("global_radiation","global_radiation__pref"),
    ("wind_speed_min","wind_speed_min__pref"),
    ("wind_speed_mean","wind_speed_mean__pref"),
    ("wind_speed_max","wind_speed_max__pref"),
    ("temperature","temperature__pref"),
    ("humidity","humidity__pref"),
    ("pressure","pressure__pref"),
]
for raw, pref in pairs:
    if raw in df.columns and pref in df.columns:
        nmask, den, shar = masked_share(df[raw], df[pref])
        rows.append((pref, nmask, den, shar))

if rows:
    print("\nMasked share (raw present → __pref NaN):")
    for name, nmask, den, share in rows:
        print(f"  {name:28s} {nmask:8d}/{den:8d} = {share}")

print("\nSanity audit complete.")


[AR] af_clean_v01/analysis_ready_v006.csv  rows=483,787
[core columns                ] PASS 
[row_id unique               ] PASS 
[time monotonic by node      ] PASS nonmonotonic_nodes=[]
[SM0 anycrit→NaN             ] PASS masked= 7.74%  f1=0 f2=0
[ST0 anycrit→NaN             ] PASS masked=10.32%  f1=0 f2=0
[SM1 anycrit→NaN             ] PASS masked= 4.66%  f1=0 f2=0
[ST1 anycrit→NaN             ] PASS masked= 7.26%  f1=0 f2=0
[Radiation range qc          ] PASS masked= 0.18%  crit→not-NaN=0  warn→not-clamped=0  ok→NaN=0
[Wind min range qc           ] PASS masked= 0.00%  crit→not-NaN=0  warn→not-clamped=0  ok→NaN=0
[Wind mean range qc          ] PASS masked= 0.00%  crit→not-NaN=0  warn→not-clamped=0  ok→NaN=0
[Wind max range qc           ] PASS masked= 0.00%  crit→not-NaN=0  warn→not-clamped=0  ok→NaN=0
[temperature mask(WARN/CRIT) ] PASS viol_warncrit=0  masked_but_ok=0
[pressure mask(WARN/CRIT)    ] PASS skipped (cols missing)
[humidity lenient mask       ] PASS viol_needed=0  maske

In [50]:
import pandas as pd, numpy as np

AR = "af_clean_v01/analysis_ready_v006.csv"
df = pd.read_csv(AR, low_memory=False)

# 1) Why is humidity masked? (top reasons)
m = pd.to_numeric(df["humidity"], errors="coerce").notna() & pd.to_numeric(df["humidity__pref"], errors="coerce").isna()
reason = df.loc[m, "humidity_mask_reason"].fillna("").astype(str).str.strip()
print("Top humidity mask reasons:")
print(reason.value_counts().head(10))

# 2) How much of the masked humidity occurs in daylight?
rad = pd.to_numeric(df.get("global_radiation__pref", df.get("global_radiation", np.nan)), errors="coerce")
day = rad > 200
print("Masked-in-daylight share:", (m & day).sum() / (m.sum() or 1))

# 3) Node/site hotspots (who’s driving the masking?)
site_col = "station_type" if "station_type" in df.columns else ("_site" if "_site" in df.columns else None)
node_col = "node" if "node" in df.columns else ("node_id" if "node_id" in df.columns else None)

if node_col:
    node_stats = (
        df.assign(_raw_h = pd.to_numeric(df["humidity"], errors="coerce"),
                  _pref_h = pd.to_numeric(df["humidity__pref"], errors="coerce"))
          .groupby(node_col, as_index=False)
          .apply(lambda g: pd.Series({
              "raw_present": int(g["_raw_h"].notna().sum()),
              "masked": int((g["_raw_h"].notna() & g["_pref_h"].isna()).sum())
          }))
          .assign(mask_share=lambda x: x["masked"]/(x["raw_present"].replace(0, np.nan)))
          .sort_values("mask_share", ascending=False)
    )
    print("\nNodes with highest humidity mask share:")
    print(node_stats.head(10))
if site_col:
    site_stats = (
        df.assign(_raw_h = pd.to_numeric(df["humidity"], errors="coerce"),
                  _pref_h = pd.to_numeric(df["humidity__pref"], errors="coerce"))
          .groupby(site_col, as_index=False)
          .apply(lambda g: pd.Series({
              "raw_present": int(g["_raw_h"].notna().sum()),
              "masked": int((g["_raw_h"].notna() & g["_pref_h"].isna()).sum())
          }))
          .assign(mask_share=lambda x: x["masked"]/(x["raw_present"].replace(0, np.nan)))
          .sort_values("mask_share", ascending=False)
    )
    print("\nMask share by site:")
    print(site_stats)


Top humidity mask reasons:
humidity_mask_reason
MASK_sticky_implausible      230650
MASK_flatline_implausible     52184
CRIT_out_of_range              2511
Name: count, dtype: int64
Masked-in-daylight share: 0.22028421735092607


  .apply(lambda g: pd.Series({



Nodes with highest humidity mask share:
       node  raw_present  masked  mask_share
6  node_186        52294   45441    0.868952
8  node_189        69757   52395    0.751107
4  node_183        52831   33145    0.627378
7  node_187        56338   35215    0.625067
3  node_182        50424   30846    0.611733
1  node_179        54277   32374    0.596459
5  node_184        49095   24567    0.500397
2  node_181        53798   24692    0.458976
0  node_176        44962    6670    0.148347

Mask share by site:
   station_type  raw_present  masked  mask_share
2     reference        69757   52395    0.751107
0  agroforestry       369057  226280    0.613130
1    open_field        44962    6670    0.148347


  .apply(lambda g: pd.Series({
