In [1]:
from pathlib import Path
from LHC_FillingPattern import LHCFillingPattern 
import matplotlib.pyplot as plt
import numpy as np
import dask.dataframe as dd
import pathlib
import pandas as pd
from pprint import pprint
import math
import pyarrow.dataset as ds
from scipy.stats import linregress
from pathlib import Path
from lmfit import Model 

In [2]:
# user settings
beam = "B1"
ip   = "1"
RAWDATA = Path('/eos/project/l/lhc-lumimod/LuminosityFollowUp/2025/rawdata')
FILLINFO       = Path("/eos/project/l/lhc-lumimod/LuminosityFollowUp/2025/fills-info")
fbmodes        = pd.read_parquet(FILLINFO / "fills_and_bmodes_2025.parquet")

In [3]:

#load variabled
int_var   = f"LHC.BCTFR.B6R4.{beam}:BUNCH_INTENSITY"
bsrt_loc = '5R4' if beam.upper() == 'B1' else '5L4'
int_var   = f"LHC.BCTFR.B6R4.{beam}:BUNCH_INTENSITY"
emitH_var = f"LHC.BSRT.{bsrt_loc}.{beam}:BUNCH_EMITTANCE_H"
emitV_var = f"LHC.BSRT.{bsrt_loc}.{beam}:BUNCH_EMITTANCE_V"
loss_b1 = 'UCAP.LHC.LUMI.LOSSES:EffectiveCrossSectionPerBunch:effectiveCrossSectionB1'
loss_b2 = "UCAP.LHC.LUMI.LOSSES:EffectiveCrossSectionPerBunch:effectiveCrossSectionB2"
lev_pv = "LHC.LUMISERVER:LumiLevelingIP1:Enable"
beta_pv     = "HX:BETASTAR_IP1"
xing_pv     = "LhcStateTracker:LHCBEAM:IP1-XING-H-MURAD:value"
fills = np.array([10665,10666,10671,10673,10675,10676,10685,10689,10690,10701,10717,10721,10732,10709]) 

In [4]:
# ------------------------------------------------------------------
# Build Families for ONE fill
# ------------------------------------------------------------------
def build_families_for_fill(
    fillno: int,
    beam: str,
    ip: str,
    RAWDATA: Path,
    long_gap_set=(32, 63),
    small_gap=8,
    F4_offset_after_long=6,    # 7th bunch (bid_first + 6)
    # F5_offset_after_long removed from use; Family_5 now from small-gap trains
    F6_offset_before_small=26, # 27th bunch (bid_first + 26)
    enforce_filled=True,
    intensity_threshold=None,
) -> dict:
    """
    Return dict {Family_1: np.ndarray([...]), ..., Family_8: np.ndarray([...])}
    for the requested fill/beam/ip.

    Family_5 MODIFIED: 15th bunch (bid_first + 14) in trains whose *preceding* gap == small_gap (default 8).
    """

    # --- Fetch filling pattern object ---
    fpat = LHCFillingPattern(fillno, RAWDATA)

    # beam-specific train table & "filled" mask
    if beam.upper() == "B1":
        trains = fpat.bunchtrainsDF_b1.copy()
        filled_slots = fpat.bunches_b1
    else:
        trains = fpat.bunchtrainsDF_b2.copy()
        filled_slots = fpat.bunches_b2

    trains = (
        trains.loc[trains["id"] != 0]
              .sort_values("bid_first")
              .reset_index(drop=True)
    )

    orbit_len = 3564

    # --- optional BUNCH_INTENSITY for filtering ---
    if intensity_threshold is not None:
        int_var = f"LHC.BCTFR.B6R4.{beam}:BUNCH_INTENSITY"
        ser_int = load_series(int_var, RAWDATA, fillno=fillno)  # helper below
        if ser_int.empty:
            arr_intensity = np.zeros(orbit_len, dtype=float)
        else:
            last_vals = np.asarray(ser_int.iloc[-1])
            if last_vals.shape[0] != orbit_len:
                tmp = np.zeros(orbit_len, dtype=float)
                n = min(orbit_len, last_vals.shape[0])
                tmp[:n] = last_vals[:n]
                last_vals = tmp
            arr_intensity = last_vals
    else:
        arr_intensity = None

    # ---- filter helpers ------------------------------------------------
    filled_mask = np.zeros(orbit_len, dtype=bool)
    filled_mask[filled_slots] = True

    def _filter(arr: np.ndarray) -> np.ndarray:
        if arr.size == 0:
            return arr
        arr = arr.astype(int)
        if enforce_filled:
            arr = arr[filled_mask[arr]]
        if arr_intensity is not None and intensity_threshold is not None:
            arr = arr[arr_intensity[arr] >= intensity_threshold]
        return arr

    # ------------------------------------------------------------------
    # Family_1: first bunch of trains whose *preceding* gap is long (32 or 63)
    # ------------------------------------------------------------------
    mask_long = trains["gap"].isin(long_gap_set)
    Family_1 = trains.loc[mask_long, "bid_first"].to_numpy(int)

    # ------------------------------------------------------------------
    # Family_2: last bunch of a train that is followed by a long (≥31) gap
    # ------------------------------------------------------------------
    next_start = trains["bid_first"].shift(-1).fillna(orbit_len).astype(int)
    trains["gap_after"] = next_start - trains["bid_last"] - 1
    mask_big_gap = trains["gap_after"] >= 31
    Family_2 = trains.loc[mask_big_gap, "bid_last"].to_numpy(int)

    # ------------------------------------------------------------------
    # Family_3: first slot of max LR encounter per train
    # ------------------------------------------------------------------
    lr = fpat.lrencounters[beam][f"ip{ip}"]  # length-3564 array
    peak_slots = []
    for _, row in trains.iterrows():
        bids = np.asarray(row["bids"], dtype=int)
        lr_train = lr[bids]
        max_val = lr_train.max()
        first_idx = np.where(lr_train == max_val)[0][0]
        peak_slots.append(bids[first_idx])
    Family_3 = np.asarray(peak_slots, dtype=int)

    # ------------------------------------------------------------------
    # Family_4: 7th bunch in long-gap trains (bid_first + 6)
    # ------------------------------------------------------------------
    Family_4 = (trains.loc[mask_long, "bid_first"] + F4_offset_after_long).to_numpy(int)

    # ------------------------------------------------------------------
    # Family_5 (MODIFIED): 15th bunch in trains whose *preceding* gap == small_gap
    #                      (e.g., gap == 8). Require nbunches >= 15.
    # ------------------------------------------------------------------
    mask_gap_small_15 = (trains["gap"] == small_gap) & (trains["nbunches"] >= 15)
    cand_F5 = (trains.loc[mask_gap_small_15, "bid_first"] + 14).to_numpy(int)
    # clip to each selected train's end (safety)
    last_F5 = trains.loc[mask_gap_small_15, "bid_last"].to_numpy(int)
    Family_5 = cand_F5[cand_F5 <= last_F5]

    # ------------------------------------------------------------------
    # Family_6: 27th bunch IN THE TRAIN *BEFORE* a gap==8 train
    # ------------------------------------------------------------------
    rows_gap8 = trains["gap"] == small_gap
    train_before_gap8 = trains.shift(1).loc[rows_gap8].dropna()
    mask_nbunch27 = train_before_gap8["nbunches"] >= (F6_offset_before_small + 1)
    Family_6 = (train_before_gap8.loc[mask_nbunch27, "bid_first"] + F6_offset_before_small).to_numpy(int)

    # ------------------------------------------------------------------
    # Family_7: first bunch AFTER an 8-slot gap (the gap8 train itself)
    # ------------------------------------------------------------------
    Family_7 = trains.loc[rows_gap8, "bid_first"].to_numpy(int)

    # ------------------------------------------------------------------
    # Family_8: last bunch BEFORE each gap8 (the last bunch of the previous train)
    # ------------------------------------------------------------------
    train_before_gap8_full = trains.shift(1).loc[rows_gap8].dropna()
    Family_8 = train_before_gap8_full["bid_last"].astype(int).to_numpy()

    # ------------------------------------------------------------------
    # apply filters
    # ------------------------------------------------------------------
    fam_dict = {
        "Family_1": _filter(Family_1),
        "Family_2": _filter(Family_2),
        "Family_3": _filter(Family_3),
        "Family_4": _filter(Family_4),
        "Family_5": _filter(Family_5),
        "Family_6": _filter(Family_6),
        "Family_7": _filter(Family_7),
        "Family_8": _filter(Family_8),
    }

    # report
    #print(f"\n=== Families for fill {fillno} ({beam}, {ip}) ===")
    ##for k, v in fam_dict.items():
        #print(f"{k}: n={len(v)}  slots={v}")

    return fam_dict


# ------------------------------------------------------------------
# Minimal load_series helper (same as in larger module)
# ------------------------------------------------------------------
def load_series(var: str, RAWDATA: Path, fillno: int) -> pd.Series:
    root = RAWDATA / f"HX:FILLN={fillno}"
    parts = []
    for pq in root.rglob("*.parquet"):
        try:
            df = pd.read_parquet(pq, columns=[var])
        except Exception:
            continue
        idx = pd.to_datetime(df.index, utc=True, errors="coerce")
        ser = pd.Series(df[var].to_numpy(), index=idx, name=var)
        parts.append(ser.dropna())
    if not parts:
        return pd.Series(dtype=float, name=var)
    return pd.concat(parts).sort_index()


In [5]:
fams_by_fill = {}
records = []   # for long DataFrame

for fillno in fills:
    fams = build_families_for_fill(
        fillno=fillno,
        beam=beam,
        ip=ip,
        RAWDATA=RAWDATA,
        # (optionally pass thresholds/offset overrides if needed)
    )
    fams_by_fill[fillno] = fams
# ------------------------------------------------------------------
# group back by fill -> dict, then assemble as columns
wide_rows = []
for fillno, fams in fams_by_fill.items():
    row = {"fill": fillno}
    for fam_name, slots in fams.items():
        row[fam_name] = np.array(slots, dtype=int)  # ragged arrays stored as objects
    wide_rows.append(row)

df_families_wide = pd.DataFrame(wide_rows)
print("df_families_wide shape:", df_families_wide.shape)
display(df_families_wide)

df_families_wide shape: (14, 9)


Unnamed: 0,fill,Family_1,Family_2,Family_3,Family_4,Family_5,Family_6,Family_7,Family_8
0,10665,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
1,10666,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
2,10671,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
3,10673,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
4,10675,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
5,10676,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
6,10685,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
7,10689,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
8,10690,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."
9,10701,"[55, 208, 404, 600, 796, 949, 1102, 1298, 1494...","[176, 372, 568, 764, 917, 1070, 1266, 1462, 16...","[71, 114, 157, 224, 267, 310, 353, 420, 463, 5...","[61, 214, 410, 606, 802, 955, 1108, 1304, 1500...","[112, 155, 265, 308, 351, 461, 504, 547, 657, ...","[81, 124, 234, 277, 320, 430, 473, 516, 626, 6...","[98, 141, 251, 294, 337, 447, 490, 533, 643, 6...","[90, 133, 243, 286, 329, 439, 482, 525, 635, 6..."


In [6]:
def _ip_collision_sets_for_fill(fillno: int, beam: str, RAWDATA: Path):
    """
    Return dict with Python sets for slots colliding at each IP-class:
        {'15': set(...), '2': set(...), '8': set(...)}
    where '15' is the combined IP1+IP5 mask exposed by fpat.collbid_ip15.
    """
    fpat = LHCFillingPattern(fillno, RAWDATA)

    if beam.upper() == "B1":
        ip15 = np.asarray(fpat.collbid_ip15["B1"], dtype=int)
        ip2  = np.asarray(fpat.collbid_ip2 ["B1"], dtype=int)
        ip8  = np.asarray(fpat.collbid_ip8 ["B1"], dtype=int)
    else:
        ip15 = np.asarray(fpat.collbid_ip15["B2"], dtype=int)
        ip2  = np.asarray(fpat.collbid_ip2 ["B2"], dtype=int)
        ip8  = np.asarray(fpat.collbid_ip8 ["B2"], dtype=int)

    return {
        "15": set(ip15),
        "2":  set(ip2),
        "8":  set(ip8),
    }

# Classify a numpy array of slots into exclusive IP groups

def _classify_slots_by_ip(slots: np.ndarray, set15, set2, set8):
    """
    Given 1D int array `slots` and three sets (IP15, IP2, IP8),
    return dict {ip_group_label: np.ndarray([...])}.
    """
    slots = np.asarray(slots, dtype=int)

    # for each slot, boolean membership
    in15 = np.array([s in set15 for s in slots], dtype=bool)
    in2  = np.array([s in set2  for s in slots], dtype=bool)
    in8  = np.array([s in set8  for s in slots], dtype=bool)

    # masks
    m15_only   =  in15 & ~in2 & ~in8
    m2_only    = ~in15 &  in2 & ~in8
    m8_only    = ~in15 & ~in2 &  in8

    m15_2      =  in15 &  in2 & ~in8
    m15_8      =  in15 & ~in2 &  in8
    m2_8       = ~in15 &  in2 &  in8

    m15_2_8    =  in15 &  in2 &  in8

    # any that did not match any of above (should be none or "non colliding")
    mnone      = ~(m15_only | m2_only | m8_only | m15_2 | m15_8 | m2_8 | m15_2_8)

    return {
        "15_only":   slots[m15_only],
        "2_only":    slots[m2_only],
        "8_only":    slots[m8_only],
        "15_2":      slots[m15_2],
        "15_8":      slots[m15_8],
        "2_8":       slots[m2_8],
        "15_2_8":    slots[m15_2_8],
        "none":      slots[mnone],
    }

# Main: explode fams_by_fill into IP-partitioned long & summary DFs
def build_family_ip_partitions(
    fams_by_fill: dict,
    RAWDATA: Path,
    beam: str,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Parameters
    ----------
    fams_by_fill : dict
        {fill: {"Family_1": array([...]), ...}}
    RAWDATA : Path
        root rawdata path
    beam : str
        'B1' or 'B2'

    Returns
    -------
    df_families_ip_long : DataFrame
        columns: fill, family, ip_group, slot
    df_families_ip_summary : DataFrame
        columns: fill, family, 15_only, 2_only, 8_only, 15_2, 15_8, 2_8, 15_2_8, none
        counts per group.
    """
    long_records = []
    summary_records = []

    for fillno, fams in fams_by_fill.items():
        # Get IP sets for this fill
        ip_sets = _ip_collision_sets_for_fill(fillno, beam, RAWDATA)
        set15, set2, set8 = ip_sets["15"], ip_sets["2"], ip_sets["8"]

        for fam_name, slots in fams.items():
            slots = np.asarray(slots, dtype=int)

            part_dict = _classify_slots_by_ip(slots, set15, set2, set8)

            # long rows
            for ip_group, arr in part_dict.items():
                for s in arr:
                    long_records.append({
                        "fill": fillno,
                        "family": fam_name,
                        "ip_group": ip_group,
                        "slot": int(s),
                    })

            # summary counts
            rec = {
                "fill": fillno,
                "family": fam_name,
            }
            for ip_group, arr in part_dict.items():
                rec[ip_group] = int(len(arr))
            summary_records.append(rec)

    # assemble long
    if long_records:
        df_families_ip_long = (
            pd.DataFrame.from_records(long_records)
              .sort_values(["fill", "family", "ip_group", "slot"])
              .reset_index(drop=True)
        )
    else:
        df_families_ip_long = pd.DataFrame(columns=["fill", "family", "ip_group", "slot"])

    # assemble summary
    if summary_records:
        df_families_ip_summary = (
            pd.DataFrame.from_records(summary_records)
              .sort_values(["fill", "family"])
              .reset_index(drop=True)
        )
    else:
        df_families_ip_summary = pd.DataFrame(columns=["fill", "family"])

    return df_families_ip_long, df_families_ip_summary

df_families_ip_long, df_families_ip_summary = build_family_ip_partitions(
    fams_by_fill=fams_by_fill,
    RAWDATA=RAWDATA,
    beam=beam,          # same beam as used when building families
)

print("df_families_ip_long shape:", df_families_ip_long.shape)
display(df_families_ip_long.head())

print("df_families_ip_summary shape:", df_families_ip_summary.shape)
display(df_families_ip_summary.head(9))

df_families_ip_long shape: (4494, 4)


Unnamed: 0,fill,family,ip_group,slot
0,10665,Family_1,15_2_8,1843
1,10665,Family_1,15_2_8,1996
2,10665,Family_1,15_2_8,2192
3,10665,Family_1,15_2_8,2388
4,10665,Family_1,15_8,55


df_families_ip_summary shape: (112, 10)


Unnamed: 0,fill,family,15_only,2_only,8_only,15_2,15_8,2_8,15_2_8,none
0,10665,Family_1,5,0,0,0,10,0,4,0
1,10665,Family_2,0,0,0,1,6,0,12,0
2,10665,Family_3,0,0,0,2,3,0,63,0
3,10665,Family_4,0,0,0,5,1,0,13,0
4,10665,Family_5,0,0,0,1,2,0,46,0
5,10665,Family_6,0,0,0,2,1,0,46,0
6,10665,Family_7,2,0,0,0,36,0,11,0
7,10665,Family_8,0,0,0,2,1,0,46,0
8,10666,Family_1,5,0,0,0,10,0,4,0


In [None]:
#!/usr/bin/env python
"""
ip_partition_family_slopes.py
=============================

Robust bunch-emittance growth-rate analysis:

1.  Build families-by-IP partitions for every fill.
2.  Fit weighted, robust slopes per slot.
3.  Aggregate to family-level, then average over fills.
4.  Make one figure per IP-group with families on the x-axis.

Replace the placeholders in `__main__` with your real data handles
(fbmodes DataFrame, RAWDATA path, list/iterable of fills, etc.)
and just run `python ip_partition_family_slopes.py`.
"""
from __future__ import annotations

# --------------------------------------------------------------------------
# Imports & compatibility
# --------------------------------------------------------------------------
import warnings
from pathlib import Path
from typing import Iterable, Mapping, Sequence

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Model

# -------- Python < 3 .11: supply a minimal StrEnum ------------------------
try:
    from enum import StrEnum          # Py 3.11+
except ImportError:                    # Py ≤ 3.10
    from enum import Enum

    class StrEnum(str, Enum):  # type: ignore
        """Fallback for Python versions lacking enum.StrEnum."""
        def _generate_next_value_(name, start, count, last_values):
            return name


# --------------------------------------------------------------------------
# CONFIG — tweak to taste
# --------------------------------------------------------------------------
THRESHOLD        = 1e11       # intensity cut to define t0
PLANE            = "H"        # default plane for summary plots
FIGSIZE          = (9, 3)

F_SCALE          = 0.1
REL_ERR          = 0.05       # 5 % rel error
ABS_ERR_FLOOR    = 0.02       # emittance units
MIN_ERR          = 1e-12
MIN_PTS          = 5
MIN_DT_H         = 0.15       # hours  (≈ 9 min)
MAX_WEIGHT       = 1e4

# --------------------------------------------------------------------------
# 0.  LHCFillingPattern → IP-collision sets
# --------------------------------------------------------------------------
def _ip_collision_sets_for_fill(fillno: int, beam: str, rawdata: Path
) -> dict[str, set[int]]:
    """
    Return a dict of *sets* of bunch slots involved in collisions at each IP-class.

    Keys:
        "15" – combined IP1 + IP5 mask (`fpat.collbid_ip15`)
        "2"  – IP2 mask
        "8"  – IP8 mask
    """

    fpat = LHCFillingPattern(fillno, rawdata)
    side = "B1" if beam.upper() == "B1" else "B2"

    ip15 = fpat.collbid_ip15[side]
    ip2  = fpat.collbid_ip2 [side]
    ip8  = fpat.collbid_ip8 [side]

    return {"15": set(map(int, ip15)),
            "2":  set(map(int, ip2)),
            "8":  set(map(int, ip8))}


# --------------------------------------------------------------------------
# 1.  Classify arbitrary slot arrays into mutually-exclusive IP groups
# --------------------------------------------------------------------------
def _classify_slots_by_ip(
    slots: Sequence[int],
    set15: set[int], set2: set[int], set8: set[int],
) -> dict[str, np.ndarray]:
    """
    Parameters
    ----------
    slots : 1-D iterable of integers
    set15,set2,set8 : sets of ints returned by `_ip_collision_sets_for_fill`

    Returns
    -------
    dict[label → ndarray[int]]
        Arrays are *sub-views* of `slots` in the original order.
    """
    slots = np.asarray(slots, dtype=int)

    # fast vectorised membership
    in15, in2, in8 = (np.isin(slots, list(s)) for s in (set15, set2, set8))

    m15_only   =  in15 & ~in2 & ~in8
    m2_only    = ~in15 &  in2 & ~in8
    m8_only    = ~in15 & ~in2 &  in8
    m15_2      =  in15 &  in2 & ~in8
    m15_8      =  in15 & ~in2 &  in8
    m2_8       = ~in15 &  in2 &  in8
    m15_2_8    =  in15 &  in2 &  in8
    mnone      = ~(in15 | in2 | in8)

    return {
        "15_only": slots[m15_only],
        "2_only":  slots[m2_only],
        "8_only":  slots[m8_only],
        "15_2":    slots[m15_2],
        "15_8":    slots[m15_8],
        "2_8":     slots[m2_8],
        "15_2_8":  slots[m15_2_8],
        "none":    slots[mnone],
    }


# --------------------------------------------------------------------------
# 2.  Main: explode `fams_by_fill` into IP-partitioned DataFrames
# --------------------------------------------------------------------------
def build_family_ip_partitions(
    fams_by_fill: Mapping[int, Mapping[str, Sequence[int]]],
    rawdata: Path,
    beam: str,
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Returns
    -------
    df_ip_long : columns (fill, family, ip_group, slot)
    df_ip_summary : columns (fill, family, …group counts…)
    """
    long_records, summary_records = [], []

    for fillno, fams in fams_by_fill.items():
        ip_sets = _ip_collision_sets_for_fill(fillno, beam, rawdata)
        set15, set2, set8 = ip_sets["15"], ip_sets["2"], ip_sets["8"]

        for fam, slots in fams.items():
            slots = np.asarray(slots, dtype=int)
            partitions = _classify_slots_by_ip(slots, set15, set2, set8)

            # long form
            for ipg, arr in partitions.items():
                for s in arr:
                    long_records.append({"fill": fillno,
                                         "family": fam,
                                         "ip_group": ipg,
                                         "slot": int(s)})
            # summary counts
            rec = {"fill": fillno, "family": fam}
            rec.update({ipg: len(arr) for ipg, arr in partitions.items()})
            summary_records.append(rec)

    df_long = (pd.DataFrame(long_records)
                 .sort_values(["fill", "family", "ip_group", "slot"])
                 .reset_index(drop=True))
    df_sum  = (pd.DataFrame(summary_records)
                 .sort_values(["fill", "family"])
                 .reset_index(drop=True))
    return df_long, df_sum


# --------------------------------------------------------------------------
# 3.  Robust weighted line fit
# --------------------------------------------------------------------------
_line_model = Model(lambda x, m, b: m * x + b)

def robust_line(x_hours: np.ndarray, y: np.ndarray, f_scale=F_SCALE):
    yerr = np.maximum(np.maximum(REL_ERR * np.abs(y), ABS_ERR_FLOOR), MIN_ERR)
    w = 1.0 / yerr
    w = np.minimum(w, MAX_WEIGHT)

    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=RuntimeWarning, module="lmfit")
        res = _line_model.fit(
            y, x=x_hours,
            m=0.0, b=y[0],
            weights=w,
            method="least_squares",
            fit_kws={"loss": "soft_l1", "f_scale": f_scale},
        )

    m = res.params["m"].value
    b = res.params["b"].value
    sm = res.params["m"].stderr or np.nan
    return m, b, sm, res


# --------------------------------------------------------------------------
# 4.  Per-fill family-level robust slopes
# --------------------------------------------------------------------------
def _wmean_and_ci(pairs: Sequence[tuple[float, float]]
) -> tuple[float, float, float, float, float, float, int]:
    """Weighted mean ± CI, plus spread stats, from (value, sigma) pairs."""
    if not pairs:
        return (np.nan,) * 6 + (0,)

    vals, sigs = map(np.asarray, zip(*pairs))
    ok = np.isfinite(vals) & np.isfinite(sigs) & (sigs > 0)
    if ok.sum() == 0:
        return (np.nan,) * 6 + (0,)

    w = 1.0 / sigs[ok] ** 2
    mean = np.sum(w * vals[ok]) / np.sum(w)
    sem  = (1.0 / np.sum(w)) ** 0.5
    ci95 = 1.96 * sem
    std  = np.std(vals[ok], ddof=1) if ok.sum() > 1 else np.nan
    mad  = 1.4826 * np.median(np.abs(vals[ok] - np.median(vals[ok]))) \
           if ok.sum() > 1 else np.nan
    return mean, sem, mean - ci95, mean + ci95, std, mad, int(ok.sum())


def one_fill_family_rates_robust(
    fno: int,
    beam: str,
    families: Mapping[str, Sequence[int]],
    fbmodes: pd.DataFrame,
    rawdata: Path,
    threshold: float = THRESHOLD,
) -> tuple[pd.DataFrame, dict]:
    """
    Returns
    -------
    df : index (fill, family) with H_*/V_* statistics
    fits : nested dict[fam][slot] → (H tuple, V tuple)  (for drill-downs)
    """
    bsrt_loc = "5R4" if beam.upper() == "B1" else "5L4"

    int_var = f"LHC.BCTFR.B6R4.{beam}:BUNCH_INTENSITY"
    H_var = f"LHC.BSRT.{bsrt_loc}.{beam}:BUNCH_EMITTANCE_H"
    V_var = f"LHC.BSRT.{bsrt_loc}.{beam}:BUNCH_EMITTANCE_V"

    def load_INJPHYS(var: str) -> pd.Series:
        sub = fbmodes.loc[fno]
        rows = sub.query("BMODE=='INJPHYS'").sort_values("tsStart")
        if rows.empty:
            rows = sub.sort_values("tsStart").iloc[[0]]
        t0, t1 = map(pd.to_datetime, (rows["tsStart"].iloc[0],
                                      rows["tsEnd"].iloc[0]))
        parts = []
        pq_dir = rawdata / f"HX:FILLN={fno}"
        for pq in pq_dir.rglob("*.parquet"):
            try:
                df = pd.read_parquet(pq, columns=[var])
            except Exception as exc:
                warnings.warn(f"Skipping {pq} ({exc})", UserWarning)
                continue
            ser = df[var]
            ser.index = pd.to_datetime(df.index, errors="coerce")
            parts.append(ser.dropna())
        if not parts:
            raise RuntimeError(f"No data for PV {var!r} in fill {fno}")
        full = pd.concat(parts).sort_index()
        return full.loc[t0:t1]

    def extract_slot(ser: pd.Series, slot: int) -> pd.Series:
        return ser.apply(
            lambda a: a[slot] if hasattr(a, "__len__") and len(a) > slot else np.nan
        )

    def segment_ok(seg: pd.Series) -> bool:
        if seg is None or seg.empty or len(seg) < MIN_PTS:
            return False
        dt_h = (seg.index[-1] - seg.index[0]).total_seconds() / 3600.0
        return dt_h >= MIN_DT_H

    I  = load_INJPHYS(int_var)
    EH = load_INJPHYS(H_var)
    EV = load_INJPHYS(V_var)

    per_h: dict[int, tuple[float, float]] = {}
    per_v: dict[int, tuple[float, float]] = {}
    fits: dict[str, dict[int, dict[str, tuple]]] = {}

    for fam, slots in families.items():
        fits[fam] = {}
        for slot in slots:
            Ii = extract_slot(I, slot)
            above = Ii[Ii >= threshold]
            if above.empty:
                per_h[slot] = per_v[slot] = (np.nan, np.nan)
                continue
            t0 = above.index[0]
            Hs = extract_slot(EH, slot).loc[t0:].dropna()
            Vs = extract_slot(EV, slot).loc[t0:].dropna()

            if not (segment_ok(Hs) and segment_ok(Vs)):
                per_h[slot] = per_v[slot] = (np.nan, np.nan)
                continue

            xh = (Hs.index - t0).total_seconds() / 3600.0
            xv = (Vs.index - t0).total_seconds() / 3600.0
            sh, ih, sh_err, _ = robust_line(xh, Hs.values)
            sv, iv, sv_err, _ = robust_line(xv, Vs.values)
            per_h[slot] = (sh, sh_err)
            per_v[slot] = (sv, sv_err)
            fits[fam][slot] = {"H": (sh, ih, sh_err, t0, Hs),
                               "V": (sv, iv, sv_err, t0, Vs)}

    rows = []
    for fam, slots in families.items():
        h_pairs = [per_h[s] for s in slots if s in per_h]
        v_pairs = [per_v[s] for s in slots if s in per_v]

        H_mean, H_sem, H_lo95, H_hi95, H_std, H_mad, H_n = _wmean_and_ci(h_pairs)
        V_mean, V_sem, V_lo95, V_hi95, V_std, V_mad, V_n = _wmean_and_ci(v_pairs)

        rows.append({
            "fill": fno,
            "family": fam,
            "H_rate_mean_w": H_mean,
            "H_err_family":  H_sem,
            "H_lo95":        H_lo95,
            "H_hi95":        H_hi95,
            "H_std_slots":   H_std,
            "H_mad_slots":   H_mad,
            "H_n_slots":     H_n,
            "V_rate_mean_w": V_mean,
            "V_err_family":  V_sem,
            "V_lo95":        V_lo95,
            "V_hi95":        V_hi95,
            "V_std_slots":   V_std,
            "V_mad_slots":   V_mad,
            "V_n_slots":     V_n,
        })

    df = pd.DataFrame(rows).set_index(["fill", "family"])
    return df, fits


# --------------------------------------------------------------------------
# 5.  Plot helpers
# --------------------------------------------------------------------------
def plot_slot_with_fit(slot_fit_tuple, plane="H", figsize=FIGSIZE, label=None):
    slope, intercept, _, t0, ser = slot_fit_tuple
    x = (ser.index - t0).total_seconds() / 3600.0
    y_fit = intercept + slope * x

    plt.figure(figsize=figsize)
    plt.plot(ser.index, ser.values, label="Raw", alpha=0.6)
    plt.plot(ser.index, y_fit, label="Weighted robust fit", linewidth=2)
    plt.xlabel("Time"); plt.ylabel(f"Emittance {plane}")
    plt.title(label or "Slot"); plt.legend(); plt.tight_layout(); plt.show()


def plot_ip_group(
    df_mean: pd.DataFrame,
    plane: str = "H",
    ipg: str = "15_only",
    err: str = "sem",
    figsize: tuple[int, int] = (11, 3),
):
    """One bar-plot panel for a given IP-group over all families."""
    sub = df_mean.query("ip_group == @ipg")
    fams = sub["family"]
    means = sub[f"{plane}_mean"]
    if err.lower() == "sem":
        yerr = sub[f"{plane}_sem"]
        label = "±1 SEM over fills"
    else:
        ci = 1.96 * sub[f"{plane}_sem"]
        yerr = np.vstack([ci, ci])
        label = "±95 % CI"

    fig, ax = plt.subplots(figsize=figsize)
    ax.errorbar(fams, means, yerr=yerr, fmt="o", capsize=3,
                elinewidth=1.5, label=label)
    ax.set_xticklabels(fams, rotation=45, ha="right")
    ax.set_ylabel(f"{plane}-plane growth rate")
    ax.set_title(f"IP-group {ipg} ({plane})")
    ax.legend(); fig.tight_layout(); plt.show()


# --------------------------------------------------------------------------
# 6.  High-level driver: analyse *and* plot
# --------------------------------------------------------------------------
def analyse_and_plot(
    fills: Iterable[int],
    beam: str,
    ip: int | str,
    rawdata: Path,
    fbmodes: pd.DataFrame,
    build_families_for_fill,   # user-provided
    plane: str = PLANE,
    threshold: float = THRESHOLD,
):
    """
    Full pipeline entry-point; returns the averaged DataFrame
    and a nested dict of fits for drill-downs.
    """
    # --------------------------------------------
    # A. build plain families → fams_by_fill
    # --------------------------------------------
    fams_by_fill = {
        fno: build_families_for_fill(fno, beam=beam, ip=ip, RAWDATA=rawdata)
        for fno in fills
    }

    # --------------------------------------------
    # B. explode into IP-partitioned slot table
    # --------------------------------------------
    df_ip_long, _ = build_family_ip_partitions(
        fams_by_fill=fams_by_fill, rawdata=rawdata, beam=beam
    )

    # --------------------------------------------
    # C. per-(fill × IP) robust rates
    # --------------------------------------------
    ip_tables, ip_fits = [], {}
    for fno in fills:
        df_fill = df_ip_long.query("fill == @fno")
        for ipg, grp in df_fill.groupby("ip_group"):
            fams_ip = {fam: g["slot"].to_numpy()
                       for fam, g in grp.groupby("family")}
            df_rates, fits_this = one_fill_family_rates_robust(
                fno, beam, fams_ip, fbmodes, rawdata, threshold
            )
            df_rates = df_rates.reset_index()
            df_rates["ip_group"] = ipg
            ip_tables.append(df_rates)
            ip_fits.setdefault(ipg, {})[fno] = fits_this

    # --------------------------------------------
    # D. average over fills
    # --------------------------------------------
    df_all = (pd.concat(ip_tables, ignore_index=True)
                .set_index(["ip_group", "family", "fill"])
                .sort_index())

    df_mean = (df_all
                 .groupby(["ip_group", "family"])
                 .agg(H_mean=("H_rate_mean_w", "mean"),
                      H_sem =("H_rate_mean_w", "sem"),
                      V_mean=("V_rate_mean_w", "mean"),
                      V_sem =("V_rate_mean_w", "sem"),
                      n_fills=("H_rate_mean_w", "size"))
                 .reset_index())

    # --------------------------------------------
    # E. plotting
    # --------------------------------------------
    for ipg in df_mean["ip_group"].unique():
        plot_ip_group(df_mean, plane=plane, ipg=ipg, err="sem")

    return df_mean, ip_fits


if __name__ == "__main__":
    df_mean, fits = analyse_and_plot(
        fills, beam, ip, RAWDATA, fbmodes, build_families_for_fill
    )
    df_mean.to_csv("ip_family_slopes.csv")
    #
    # Example:
    #
    # from your_project import fbmodes, build_families_for_fill
    # RAWDATA = Path("/path/to/rawdata/root")
    # fills   = [8450, 8451, 8452, ...]
    # beam    = "B1"
    # ip      = 1   # or however your build_families_for_fill uses it
    #
df_mean, fits = analyse_and_plot(
         fills=fills,
         beam=beam,
         ip=ip,
         rawdata=RAWDATA,
         fbmodes=fbmodes,
         build_families_for_fill=build_families_for_fill,
     )
    #
df_mean.to_csv("family_ip_growth_rates.csv")


# STable

In [None]:
#!/usr/bin/env python
"""
stable_window_ip_groups.py
--------------------------

Cumulative INJPHYS/STABLE emittance‑growth analysis **per IP‑group**.

External objects you need to provide in your notebook/script before
calling `main()` or running the __main__ block:

    fbmodes : DataFrame with fill‑mode metadata (columns tsStart/tsEnd/BMODE)
    fills   : list[int]   – fills to process
    beam    : "B1" | "B2"
    build_families_for_fill(fill, beam, ip, RAWDATA) -> {family:[slots]}
    build_family_ip_partitions(fams_by_fill, RAWDATA, beam)
                              -> df_ip_long , df_ip_summary

Required python packages: numpy, pandas, matplotlib, lmfit
"""

from __future__ import annotations

# ---------------------------------------------------------------------
# CONFIG  – adjust paths/PVs once
# ---------------------------------------------------------------------
from pathlib import Path
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from lmfit import Model

ip              = 1
RAWDATA         = Path("/eos/project/l/lhc-lumimod/LuminosityFollowUp/2025/rawdata")

BETA_PV         = "HX:BETASTAR_IP1"
XING_PV         = "LhcStateTracker:LHCBEAM:IP1-XING-H-MURAD:value"
LEV_PV          = "LHC.LUMISERVER:LumiLevelingIP1:Enable"
THRESH_CA       = 120.0          # μrad

F_SCALE         = 0.1
REL_ERR         = 0.05
ABS_ERR_FLOOR   = 0.02
MIN_ERR         = 1e-12
MIN_PTS         = 5
MIN_DT_H        = 0.15
MAX_WEIGHT      = 1e4

MAKE_HOURLY     = True           # include 1h,2h,… cumulative windows

FIGSIZE_ERR     = (11, 4)
FAMILY_COLORS   = {
    "Family_1": "#1f77b4", "Family_2": "#ff7f0e", "Family_3": "#2ca02c",
    "Family_4": "#d62728", "Family_5": "#9467bd", "Family_6": "#8c564b",
    "Family_7": "#e377c2", "Family_8": "#7f7f7f",
}

# ---------------------------------------------------------------------
# 0.  SMALL HELPERS  (generic I/O + fitting)
# ---------------------------------------------------------------------
def bsrt_loc_for_beam(b: str) -> str:
    return "5R4" if b.upper() == "B1" else "5L4"

def _to_tz(ts, tz):
    return ts if ts is None else (ts if ts.tzinfo else ts.tz_localize(tz))

def load_series(pv: str, fno: int, root: Path) -> pd.Series:
    """Load *any* scalar parquet PV for an entire fill (no mode cut)."""
    parts = []
    for pq in (root / f"HX:FILLN={fno}").rglob("*.parquet"):
        try:
            df = pd.read_parquet(pq, columns=[pv])
        except Exception:
            continue
        idx = pd.to_datetime(df.index, utc=True, errors="coerce")
        parts.append(pd.Series(df[pv].values, index=idx).dropna())
    return pd.concat(parts, sort=True) if parts else pd.Series(dtype=float)

def load_STABLE_series(pv: str, fno: int) -> pd.Series:
    """Return the STABLE‑beam segment for a PV."""
    sub = fbmodes.loc[fno].sort_values("tsStart")
    stable = sub[sub["BMODE"].str.upper() == "STABLE"]
    if stable.empty:
        raise RuntimeError(f"No STABLE period for fill {fno}")
    t0, t1 = map(pd.to_datetime, (stable["tsStart"].iloc[0],
                                  stable["tsEnd"]  .iloc[-1]))
    ser = load_series(pv, fno, RAWDATA)
    ser = ser[~ser.index.duplicated(keep="first")]
    return ser.loc[t0:t1]

def extract_slot(ser: pd.Series, slot: int) -> pd.Series:
    return ser.map(lambda a: a[slot] if hasattr(a, "__len__") and len(a) > slot else np.nan)

# robust linear fit ----------------------------------------------------
_line_model = Model(lambda x, m, b: m * x + b)

def robust_line(xh: np.ndarray, y: np.ndarray):
    yerr = np.maximum(np.maximum(REL_ERR * np.abs(y), ABS_ERR_FLOOR), MIN_ERR)
    w    = np.minimum(1.0 / yerr, MAX_WEIGHT)
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", category=RuntimeWarning)
        r = _line_model.fit(
            y, x=xh, m=0.0, b=y[0], weights=w,
            method="least_squares", fit_kws={"loss": "soft_l1", "f_scale": F_SCALE}
        )
    m, b = r.params["m"].value, r.params["b"].value
    sm   = r.params["m"].stderr or np.nan
    return m, b, sm

# weighted mean + sem --------------------------------------------------
def _wmean_sem(pairs):
    if not pairs: return np.nan, np.nan, 0
    vals, errs = map(np.asarray, zip(*pairs))
    ok = np.isfinite(vals) & np.isfinite(errs) & (errs > 0)
    if ok.sum() == 0: return np.nan, np.nan, 0
    w = 1.0 / errs[ok] ** 2
    mean = np.sum(w * vals[ok]) / np.sum(w)
    sem  = (1.0 / np.sum(w)) ** 0.5
    return mean, sem, int(ok.sum())

# ---------------------------------------------------------------------
# 1.  MARKERS & WINDOWS  (β*, crab‑cavity etc.)
# ---------------------------------------------------------------------
def phase_markers(fno: int):
    beta   = load_STABLE_series(BETA_PV, fno)
    xing   = load_STABLE_series(XING_PV, fno)
    ca0    = xing.iloc[0]

    # β* final = start of longest constant β* plateau
    runs, run_start, v_prev = [], beta.index[0], beta.iloc[0]
    for t, v in beta.iloc[1:].items():
        if v != v_prev:
            runs.append((run_start, t))
            run_start, v_prev = t, v
    runs.append((run_start, beta.index[-1]))
    t_beta_final = max(runs, key=lambda r: r[1] - r[0])[0]

    # crossing‑angle events
    idx_leave = np.where(xing.values != ca0)[0]
    t_ca_start = xing.index[idx_leave[0]] if idx_leave.size else xing.index[-1]
    idx_dip = np.where((xing.values <= THRESH_CA) &
                       (xing.index > t_ca_start))[0]
    t_ca_dip = xing.index[idx_dip[0]] if idx_dip.size else xing.index[-1]

    t_lvl_on = leveling_on_time(fno)

    return dict(beta_final=t_beta_final,
                ca_start=t_ca_start,
                ca_dip=t_ca_dip,
                lvl_on=t_lvl_on)

def leveling_on_time(fno: int) -> pd.Timestamp:
    lev = load_series(LEV_PV, fno, RAWDATA).astype(float).dropna()
    d   = lev.diff().fillna(0)
    t_edge = d[d > 0].index.min()
    t_val  = lev[lev > 0].index.min()
    return min([t for t in (t_edge, t_val) if pd.notna(t)])

def build_windows(t0, markers, ser_H, make_hourly=True):
    """Return list[(label, t_end)].  t0 is leveling‑on start."""
    eps = pd.Timedelta(seconds=1)
    tz  = ser_H.index.tz
    tend_data = ser_H.index.max()

    wins = []
    if make_hourly:
        last = markers["beta_final"] - pd.Timedelta(hours=1)
        hour, current = 1, t0
        while current + pd.Timedelta(hours=1) <= last:
            current += pd.Timedelta(hours=1)
            wins.append((f"{hour}h since lvl-on", current))
            hour += 1

    for lab in ("beta_final", "ca_start", "ca_dip"):
        tend = markers[lab]
        if tend - t0 > eps:
            wins.append((f"up to {('β*' if lab=='beta_final' else lab.replace('_',' '))}", tend))

    # clip & deduplicate
    clean = []
    for lab, t1 in wins:
        t1c = min(t1, tend_data)
        if t1c - t0 > eps:
            clean.append((lab, t1c))
    return clean

# ---------------------------------------------------------------------
# 2.  PER‑FILL, PER‑IP‑GROUP cumulative slopes
# ---------------------------------------------------------------------
def cumulative_rates_one_fill(
    fno: int,
    beam: str,
    fams_ip: dict[str, list[int]],
) -> pd.DataFrame:
    """Return DataFrame indexed by (window, family, plane, stat{mean,sem})."""
    markers = phase_markers(fno)
    t0      = markers["lvl_on"]

    bsrt_loc = bsrt_loc_for_beam(beam)
    ser_H = load_STABLE_series(f"LHC.BSRT.{bsrt_loc}.{beam}:BUNCH_EMITTANCE_H", fno)
    ser_V = load_STABLE_series(f"LHC.BSRT.{bsrt_loc}.{beam}:BUNCH_EMITTANCE_V", fno)

    wins = build_windows(t0, markers, ser_H, MAKE_HOURLY)

    # raw slot data cache
    rawH = {s: extract_slot(ser_H, s).sort_index() for slots in fams_ip.values() for s in slots}
    rawV = {s: extract_slot(ser_V, s).sort_index() for slots in fams_ip.values() for s in slots}

    recs = []

    for label, t_end in wins:
        for fam, slots in fams_ip.items():
            H_pairs, V_pairs = [], []
            for s in slots:
                segH = rawH[s].loc[t0:t_end].dropna()
                segV = rawV[s].loc[t0:t_end].dropna()
                if len(segH) >= MIN_PTS:
                    xh = (segH.index - t0).total_seconds() / 3600.0
                    sh, _, sh_err = robust_line(xh, segH.values)
                    if np.isfinite(sh_err) and sh_err > 0:
                        H_pairs.append((sh, sh_err))
                if len(segV) >= MIN_PTS:
                    xv = (segV.index - t0).total_seconds() / 3600.0
                    sv, _, sv_err = robust_line(xv, segV.values)
                    if np.isfinite(sv_err) and sv_err > 0:
                        V_pairs.append((sv, sv_err))

            H_m, H_s, _ = _wmean_sem(H_pairs)
            V_m, V_s, _ = _wmean_sem(V_pairs)

            recs.append(dict(window=label, family=fam,
                             H_mean=H_m, H_sem=H_s,
                             V_mean=V_m, V_sem=V_s))
    df = (pd.DataFrame(recs)
            .set_index(["window", "family"])
            .sort_index())
    return df

# ---------------------------------------------------------------------
# 3.  AGGREGATION OVER ALL FILLS  (averaging)
# ---------------------------------------------------------------------
def aggregate_over_fills(
    df_tables: list[pd.DataFrame],
) -> pd.DataFrame:
    """
    Input: list of DataFrames (index window×family, columns H_mean/H_sem/V_mean/V_sem)
    Output: MultiIndex columns (family, plane, stat{mean,sem})
    """
    big = pd.concat(df_tables, keys=range(len(df_tables)), names=["fill"])
    # build new column structure
    cols = {}
    for fam in big.index.get_level_values("family").unique():
        for plane in ("H", "V"):
            mcol = (fam, plane, "mean")
            scol = (fam, plane, "sem")
            sub  = big.xs(fam, level="family")[f"{plane}_mean"]
            cols[mcol] = sub.mean(level="window")
            cols[scol] = sub.sem (level="window")
    df_out = pd.concat(cols, axis=1).sort_index(axis=1)
    return df_out

# ---------------------------------------------------------------------
# 4.  PLOT (same style the user had)
# ---------------------------------------------------------------------
def split_windows_names(stable_avg: pd.DataFrame):
    hourly = [w for w in stable_avg.index if "h since" in w]
    optics = [w for w in ("up to β*", "up to CA start", "up to CA dip")
              if w in stable_avg.index]
    return hourly, optics

def common_ylim(stable_avg, plane, pad=0.12):
    cols = [c for c in stable_avg.columns if c[1] == plane and c[2] == "mean"]
    vals = stable_avg[cols].to_numpy().ravel()
    vals = vals[np.isfinite(vals)]
    if vals.size == 0:
        return None
    lo, hi = vals.min(), vals.max()
    span = hi - lo if hi > lo else 1.0
    return lo - pad * span, hi + pad * span

def draw_mean_band(ax, data, frac=0.25, color="k", alpha=0.08):
    m = np.nanmean(data)
    band = frac * abs(m)
    ax.axhline(m, color=color, lw=1.2, alpha=0.35)
    ax.axhspan(m - band, m + band, color=color, alpha=alpha, zorder=0)

def plot_stable_avg_split_lines(stable_avg: pd.DataFrame,
                                plane="H", pct_band=0.25,
                                group_spacing=1.2, figsize=(13, 5)):
    hourly, optics = split_windows_names(stable_avg)
    fams = sorted({f for f, pl, st in stable_avg.columns if pl == plane})
    base = dict(hourly=hourly, optics=optics)

    fig, axes = plt.subplots(1, 2, figsize=figsize, sharey=True)
    ylim = common_ylim(stable_avg, plane)
    for ax, key, title in zip(axes, ("hourly", "optics"),
                              ("Hourly windows", "Optics phases")):
        wins = base[key]
        x0 = np.arange(len(wins)) * group_spacing
        all_means = []

        for fam in fams:
            m = stable_avg.loc[wins, (fam, plane, "mean")].to_numpy()
            e = stable_avg.loc[wins, (fam, plane, "sem" )].to_numpy()
            ax.errorbar(x0, m, yerr=e, fmt="-o", capsize=3, ms=5,
                        color=FAMILY_COLORS.get(fam), label=fam if ax is axes[0] else None)
            all_means.append(m)

        if all_means:
            draw_mean_band(ax, np.concatenate(all_means), pct_band)

        ax.set_xticks(x0); ax.set_xticklabels(wins, rotation=45, ha="right")
        ax.set_title(title); ax.grid(axis="y", alpha=0.3)
        if ylim: ax.set_ylim(*ylim)

    axes[0].set_ylabel(f"Growth rate ({plane})")
    axes[0].legend(bbox_to_anchor=(1.02, 1), loc="upper left", fontsize="small")
    fig.suptitle(f"STABLE averages – {plane}-plane – Families across windows")
    fig.tight_layout(); plt.show()

# ---------------------------------------------------------------------
# 5.  MAIN DRIVER
# ---------------------------------------------------------------------
def main():
    # Step A: build families for every fill
    fams_by_fill = {
        f: build_families_for_fill(f, beam=beam, ip=ip, RAWDATA=RAWDATA)
        for f in fills
    }

    # Step B: explode into slot‑wise IP table
    df_ip_long, _ = build_family_ip_partitions(fams_by_fill, RAWDATA, beam)

    # Step C: per‑(fill × IP) cumulative tables
    per_ip_tables = {}               # ipg → list[DataFrame]
    for fno in fills:
        df_fill = df_ip_long.query("fill == @fno")
        for ipg, grp in df_fill.groupby("ip_group"):
            fams_ip = {fam: g["slot"].to_numpy()
                       for fam, g in grp.groupby("family")}
            df_one = cumulative_rates_one_fill(fno, beam, fams_ip)
            per_ip_tables.setdefault(ipg, []).append(df_one)

    # Step D: average over fills + plot
    for ipg, tbls in per_ip_tables.items():
        stable_avg = aggregate_over_fills(tbls)
        print(f"\n=== IP‑group {ipg}  ({len(tbls)} fills) ===")
        display(stable_avg.head())
        plot_stable_avg_split_lines(stable_avg, plane="H")   # change to 'V' if needed

# ---------------------------------------------------------------------
if __name__ == "__main__":
    # Ensure the external objects exist:
    try:
        fbmodes
        fills
        beam
        build_families_for_fill
        build_family_ip_partitions
    except NameError as exc:
        raise RuntimeError(
            "Before running this script you must define "
            "`fbmodes`, `fills`, `beam`, "
            "`build_families_for_fill`, and `build_family_ip_partitions`."
        ) from exc
    main()
