In [3]:
# %% ANALÍTICA AVANZADA POR COMEDERO (LS1..LS6) A PARTIR DE PARQUETS "split"
# Requisitos: pandas, numpy, matplotlib (instalados habitualmente con Anaconda)
# Estructura de entrada esperada: data_drive/_parquet_export/split/ls1.parquet, ls2.parquet, ...
# Columnas esperadas en cada parquet: mac (str), time (datetime/str/epoch), vlx (mm, numérico), source_file (str)
#
# Qué hace este notebook:
# 1) Carga todos los .parquet "split" y valida esquema.
# 2) Limpieza robusta: orden temporal, duplicados, outliers (Hampel), suavizado (mediana rodante).
# 3) Métricas de calidad (QoS): frecuencia muestreo, lag, huecos, cobertura, valores planos, etc.
# 4) Detección de eventos:
#    - "Ocupación" (posible hocico del cerdo) por dips rápidos vs baseline (y alternativa por Otsu si hay bimodalidad).
#    - "Rellenos" (refill) por saltos descendentes grandes en la distancia (si el sensor está sobre el nivel de pienso).
# 5) Agregados diarios y resúmenes por comedero.
# 6) Gráficas clave y export de artefactos en data_drive/_analysis/<lsX>/
#
# NOTA IMPORTANTE: sin calibración geométrica/peso, las inferencias de consumo son PROXIES (variaciones de distancia),
# no kg reales. Si se conoce la geometría, puede añadirse una curva nivel->volumen/peso y el código contempla un hook.
from __future__ import annotations

import json
import math
from dataclasses import dataclass
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path


# --------------------------- CONFIGURACIÓN ---------------------------

IN_DIR = Path("data_drive/_parquet_export/split")
OUT_ROOT = Path("data_drive/_analysis")

# Zona horaria y muestreo
TIMEZONE = "Europe/Madrid"   # si tus timestamps son locales; si ya son UTC, cambia a "UTC"
ASSUME_LOCALTIME = True      # si True, parsea 'time' como naive y lo localiza a TIMEZONE

# Ventanas (se adaptan en minutos estimados a partir del muestreo real)
ROLL_MED_MINUTES = 2.0       # mediana rodante de suavizado principal
BASELINE_MINUTES = 30.0      # baseline lento para aislar dips (ocupación)
Hampel_Window_Minutes = 3.0  # ventana para Hampel
Hampel_k = 3.0               # sensibilidad Hampel (3-5 habitual)

# Detección de eventos
MIN_OCCUPANCY_SECONDS = 5.0      # duración mínima de un evento de ocupación
GAP_MAX_SECONDS = 10.0           # máximo hueco entre puntos para considerar continuidad en eventos
REFILL_JUMP_MM = None            # umbral fijo mm para refill; si None, se estima a -5*std(diff) o p1 neg
REFILL_MIN_SEP_MIN = 10.0        # separación mínima entre recargas detectadas (anti-duplicados)
BIMODAL_USE_OTSU_IF_TRUE = True  # si el histograma es claramente bimodal, usa Otsu como apoyo a dips

# Export
SAVE_PLOTS = True
SAVE_EVENTS = True
SAVE_DAILIES = True
SAVE_QC = True

# Conversión nivel->kg (hook). Si se conoce conversión, rellena esta función.
def mm_to_kg(mm_delta: float) -> float:
    # Placeholder: sin calibración, devolvemos 0.0. Sustituir según geometría del comedero.
    return 0.0


# --------------------------- UTILIDADES ---------------------------

def ensure_columns(df: pd.DataFrame, required=("mac","time","vlx")):
    missing = [c for c in required if c not in df.columns]
    if missing:
        raise ValueError(f"Faltan columnas requeridas: {missing}. Presentes: {df.columns.tolist()}")

def parse_time_series(s: pd.Series) -> pd.Series:
    """
    Convierte 'time' robustamente a datetime.
    - Si es numérico grande puede ser epoch (s o ms).
    - Si es string usa to_datetime(infer).
    - Localiza a TIMEZONE si ASSUME_LOCALTIME=True y está naive.
    """
    # Detectar tipo numérico con pandas, no con numpy (evita error con string[python])
    if pd.api.types.is_numeric_dtype(s):
        m = s.median()
        if m > 1e12:  # epoch en ms
            dt = pd.to_datetime(s, unit="ms", errors="coerce")
        elif m > 1e9:  # epoch en s
            dt = pd.to_datetime(s, unit="s", errors="coerce")
        else:
            dt = pd.to_datetime(s, errors="coerce", infer_datetime_format=True)
    else:
        dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)

    # Localización de zona horaria
    if ASSUME_LOCALTIME:
        if dt.dt.tz is None:
            dt = dt.dt.tz_localize(TIMEZONE, nonexistent="NaT", ambiguous="NaT")
        else:
            dt = dt.dt.tz_convert(TIMEZONE)
    return dt

def hampel_filter(x: pd.Series, window: int, k: float = 3.0) -> pd.Series:
    """
    Filtro Hampel (mediana +- k*1.4826*MAD). Ventana mínima = 3.
    """
    window = max(3, int(window))
    med = x.rolling(window, center=True, min_periods=window).median()
    mad = (x - med).abs().rolling(window, center=True, min_periods=window).median()
    sigma = 1.4826 * mad
    outliers = (x - med).abs() > (k * sigma)
    x_filt = x.where(~outliers, med)
    return x_filt.astype(float)


def estimate_sampling_seconds(t: pd.Series) -> float:
    dt = t.sort_values().diff().dropna().dt.total_seconds()
    if dt.empty:
        return 1.0
    return float(dt.median())

def minutes_to_points(minutes: float, sampling_s: float) -> int:
    return max(1, int(round((minutes * 60.0) / max(0.5, sampling_s))))

def otsu_threshold(values: np.ndarray, nbins: int = 256) -> Optional[float]:
    """
    Umbral Otsu sobre valores 1D. Devuelve None si no hay dispersión.
    """
    v = values[np.isfinite(values)]
    if v.size < 10:
        return None
    if np.nanstd(v) < 1e-9:
        return None
    hist, bin_edges = np.histogram(v, bins=nbins)
    weight1 = np.cumsum(hist)
    weight2 = v.size - weight1
    mean1 = np.cumsum(hist * ((bin_edges[:-1] + bin_edges[1:])/2))
    mean2 = mean1[-1] - mean1
    # evitar división por cero
    valid = (weight1 > 0) & (weight2 > 0)
    if not valid.any():
        return None
    variance12 = (mean1[valid] / weight1[valid] - mean2[valid] / weight2[valid]) ** 2 * (weight1[valid] * weight2[valid])
    idx = np.argmax(variance12)
    thr = ((bin_edges[:-1] + bin_edges[1:])/2)[np.where(valid)[0][idx]]
    return float(thr)

def label_runs(bool_series: pd.Series, time_index: pd.Series, gap_max_seconds: float, min_duration_seconds: float) -> pd.DataFrame:
    """
    Etiqueta runs contiguos (True) como eventos. Rompe eventos cuando hay huecos > gap_max_seconds.
    Devuelve DataFrame con start, end, duration_s.
    """
    s = bool_series.fillna(False).astype(bool)
    t = pd.to_datetime(time_index)

    # Cortamos por huecos grandes
    gaps = t.diff().dt.total_seconds().fillna(0.0)
    # start de nuevo bloque si gap mayor a máximo o si cambia de False->True
    block = (gaps > gap_max_seconds).cumsum()

    # Dentro de cada bloque, detectamos runs True
    events = []
    for b, idx in s.groupby(block).groups.items():
        sb = s.loc[idx]
        tb = t.loc[idx]
        change = sb.ne(sb.shift(1))
        run_id = change.cumsum()
        for rid, ids in sb.groupby(run_id).groups.items():
            if not sb.loc[ids].iloc[0]:
                continue
            start = tb.loc[ids].iloc[0]
            end = tb.loc[ids].iloc[-1]
            dur = (end - start).total_seconds()
            if dur >= min_duration_seconds:
                events.append((start, end, dur))
    if not events:
        return pd.DataFrame(columns=["start","end","duration_s"])
    out = pd.DataFrame(events, columns=["start","end","duration_s"])
    return out

def summarize_daily(df: pd.DataFrame, col: str, tz: str = TIMEZONE) -> pd.DataFrame:
    """
    Agregados diarios de la columna 'col' y de sus diferencias (derivadas).
    """
    tmp = df.set_index("time").copy()
    daily = pd.DataFrame()
    daily["n_points"] = tmp[col].resample("1D").size()
    daily["vlx_mm_med"] = tmp[col].resample("1D").median()
    daily["vlx_mm_mean"] = tmp[col].resample("1D").mean()
    daily["vlx_mm_std"] = tmp[col].resample("1D").std()
    # drift (consumo proxy, depende de montaje; signo orientativo)
    daily["net_diff_mm"] = tmp[col].resample("1D").apply(lambda s: float(s.iloc[-1] - s.iloc[0]) if len(s)>1 else np.nan)
    daily["max_drop_mm"] = tmp[col].resample("1D").apply(lambda s: np.nanmin(np.diff(s.values)) if len(s)>1 else np.nan)
    daily["max_rise_mm"] = tmp[col].resample("1D").apply(lambda s: np.nanmax(np.diff(s.values)) if len(s)>1 else np.nan)
    # conversión a kg (si hay calibración)
    daily["net_delta_kg"] = daily["net_diff_mm"].apply(mm_to_kg)
    return daily

def quality_metrics(df: pd.DataFrame) -> Dict:
    """
    Métricas de calidad/operación del sensor.
    """
    n = len(df)
    if n == 0:
        return {"n": 0}
    sampling_s = estimate_sampling_seconds(df["time"])
    coverage_h = (df["time"].max() - df["time"].min()).total_seconds() / 3600.0
    duplicates = int(df.duplicated(subset=["time"]).sum())
    # Huecos
    gaps_s = df["time"].sort_values().diff().dt.total_seconds().dropna()
    big_gaps = int((gaps_s > GAP_MAX_SECONDS).sum())
    pct_missing_time = float((gaps_s[gaps_s > GAP_MAX_SECONDS].sum()) / (coverage_h * 3600.0 + 1e-9))
    # Valores planos prolongados
    flat_runs = (df["vlx_smooth"].round(3).diff().abs() < 1e-6).astype(int)
    flat_len = flat_runs.groupby((flat_runs!=flat_runs.shift()).cumsum()).transform('size')
    long_flat = int((flat_len >= minutes_to_points(5.0, sampling_s)).sum())
    return {
        "n_points": int(n),
        "time_start": str(df["time"].min()),
        "time_end": str(df["time"].max()),
        "coverage_hours": round(coverage_h, 3),
        "sampling_seconds_median": round(sampling_s, 3),
        "duplicates_by_time": duplicates,
        "num_gaps_gt_threshold": big_gaps,
        "pct_time_missing_est": round(100*pct_missing_time, 3),
        "flat_values_long_count": long_flat,
        "vlx_mm_min": float(df["vlx_mm"].min()),
        "vlx_mm_max": float(df["vlx_mm"].max()),
    }

def plot_overview(df: pd.DataFrame, name: str, out_dir: Path,
                  events_occ: Optional[pd.DataFrame], events_refill: Optional[pd.DataFrame]):
    if not SAVE_PLOTS:
        return
    out_dir.mkdir(parents=True, exist_ok=True)
    fig, ax = plt.subplots(figsize=(12, 4))
    ax.plot(df["time"], df["vlx_smooth"], label="vlx_smooth (mm)")
    ax.set_title(f"{name} - Distancia suavizada (mm)")
    ax.set_xlabel("Tiempo")
    ax.set_ylabel("mm (menor = más cerca)")
    # Marca eventos
    if events_occ is not None and len(events_occ) > 0:
        for _, r in events_occ.iterrows():
            ax.axvspan(r["start"], r["end"], alpha=0.15, label="ocupación" if _==0 else None, color="tab:orange")
    if events_refill is not None and len(events_refill) > 0:
        for _, r in events_refill.iterrows():
            ax.axvline(r["time"], alpha=0.4, label="refill" if _==0 else None)
    ax.legend(loc="upper right")
    fig.tight_layout()
    p = out_dir / "overview_vlx.png"
    fig.savefig(p, dpi=150)
    plt.close(fig)

def _smooth_vlx(df: pd.DataFrame, sampling_s: float) -> pd.Series:
    from math import ceil
    # calcula tamaño de ventana por puntos pero con mínimo 3
    w_med = max(3, int(round((ROLL_MED_MINUTES * 60.0) / max(0.5, sampling_s))))
    return (
        df["vlx_filt"]
        .rolling(w_med, center=True, min_periods=w_med)
        .median()
    )


def detect_occupancy(df: pd.DataFrame, sampling_s: float):
    """
    Calcula baseline lento con ventana temporal 'BASELINE_MINUTES' y marca dips significativos.
    Devuelve (serie_booleana, umbral_otsu_opcional).
    """
    # Aseguramos que vlx_smooth existe; si no, lo generamos con el helper
    if "vlx_smooth" not in df.columns or df["vlx_smooth"].isna().all():
        df["vlx_smooth"] = _smooth_vlx(df, sampling_s)

    ts = df.set_index("time").sort_index()
    window_str = f"{max(1, int(round(BASELINE_MINUTES)))}T"  # ej. "30T"
    # baseline y sigma robusta con min_periods=1 para evitar errores
    baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
    residual = ts["vlx_smooth"] - baseline
    mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()
    sigma = 1.4826 * mad
    z = residual / sigma.mask(sigma == 0, np.nan)
    occ_dips = (z < -2.5).fillna(False)  # booleano por índice temporal

    # Opción Otsu si hay bimodalidad
    otsu_thr = None
    if BIMODAL_USE_OTSU_IF_TRUE:
        thr = otsu_threshold(df["vlx_smooth"].to_numpy())
        if thr is not None and np.isfinite(thr):
            otsu_thr = float(thr)
            occ_dips = occ_dips | (ts["vlx_smooth"] < thr)

    # Devolvemos alineado a df (mismo orden/longitud)
    occ_bool = pd.Series(occ_dips.values, index=df.index)
    return occ_bool, otsu_thr

def detect_refills(df: pd.DataFrame, sampling_s: float) -> pd.DataFrame:
    """
    Detecta "refills" como saltos descendentes pronunciados en vlx_smooth (más pienso -> menor distancia).
    Umbral automático si REFILL_JUMP_MM es None.
    Devuelve DataFrame con time y jump_mm (negativo).
    """
    d = df["vlx_smooth"].diff()
    neg = d.dropna()[d.dropna() < 0.0]
    if REFILL_JUMP_MM is None:
        if len(neg) == 0:
            thr = -np.inf
        else:
            thr = min(neg.quantile(0.01), -5.0 * float(d.std() or 0.0))  # muy conservador
            # aseguramos que sea negativo y con magnitud relevante
            thr = min(thr, - np.nanpercentile(abs(d.values), 75))
    else:
        thr = -abs(REFILL_JUMP_MM)

    cand = d[d <= thr]
    if cand.empty:
        return pd.DataFrame(columns=["time","jump_mm"])

    # anti-duplicados: separar en el tiempo
    min_sep_points = minutes_to_points(REFILL_MIN_SEP_MIN, sampling_s)
    idx = cand.index
    keep = []
    last_i = None
    for i in idx:
        if last_i is None or (i - last_i) >= min_sep_points:
            keep.append(i)
            last_i = i
    events = pd.DataFrame({
        "time": df.loc[keep, "time"].values,
        "jump_mm": d.loc[keep].values
    })
    return events

def process_one(df_in: pd.DataFrame, name: str, out_dir: Path) -> Dict:
    """
    Pipeline por comedero.
    """
    ensure_columns(df_in)
    df = df_in.copy()

    # Tipos
    df["mac"] = df["mac"].astype("string").str.strip()
    df["time"] = parse_time_series(df["time"])
    df["vlx_mm"] = pd.to_numeric(df["vlx"], errors="coerce")
    # source opcional
    if "source_file" in df.columns:
        df["source_file"] = df["source_file"].astype("string")
    # Limpieza básica
    df = df.dropna(subset=["time"]).sort_values("time").drop_duplicates(subset=["time"], keep="last")
    sampling_s = estimate_sampling_seconds(df["time"])
    if len(df) == 0:
        return {"name": name, "error": "sin datos tras limpieza"}

    # Hampel -> suavizado
        # Hampel -> suavizado
    w_hampel = minutes_to_points(Hampel_Window_Minutes, sampling_s)
    w_hampel = max(3, w_hampel)
    df["vlx_filt"] = hampel_filter(df["vlx_mm"], window=w_hampel, k=Hampel_k)

    # Mediana rodante principal
    w_med = minutes_to_points(ROLL_MED_MINUTES, sampling_s)
    w_med = max(3, w_med)
    df["vlx_smooth"] = (
        df["vlx_filt"]
        .rolling(w_med, center=True, min_periods=w_med)
        .median()
    )

    # Métricas de calidad
    qc = quality_metrics(df)

    # Ocupación
    occ_bool, otsu_thr = detect_occupancy(df, sampling_s)
    events_occ = label_runs(occ_bool, df["time"], gap_max_seconds=GAP_MAX_SECONDS, min_duration_seconds=MIN_OCCUPANCY_SECONDS)

    # Refill
    events_refill = detect_refills(df, sampling_s)

    # Agregados diarios
    daily = summarize_daily(df, "vlx_smooth")

    # Export
    com_dir = out_dir / name
    com_dir.mkdir(parents=True, exist_ok=True)
    if SAVE_EVENTS:
        events_occ.to_csv(com_dir / "events_occupancy.csv", index=False)
        events_refill.to_csv(com_dir / "events_refill.csv", index=False)
    if SAVE_DAILIES:
        daily.to_csv(com_dir / "daily_summary.csv", index=True)
    if SAVE_QC:
        qc_out = qc.copy()
        if otsu_thr is not None:
            qc_out["otsu_threshold_mm"] = float(otsu_thr)
        (com_dir / "qc.json").write_text(json.dumps(qc_out, indent=2, ensure_ascii=False), encoding="utf-8")

    # Plot
    plot_overview(df, name, com_dir, events_occ, events_refill)

    # KPIs de ocupación (tiempo total/día)
    if len(events_occ) > 0:
        occ_df = events_occ.copy()
        occ_df["date"] = occ_df["start"].dt.tz_localize(TIMEZONE).dt.date
        occ_daily = occ_df.groupby("date")["duration_s"].sum().rename("occ_seconds_day")
        occ_daily.to_csv(com_dir / "occupancy_daily_seconds.csv", index=True)
    else:
        occ_daily = pd.Series(dtype=float)

    # KPI recargas por día
    if len(events_refill) > 0:
        re_df = events_refill.copy()
        re_df["date"] = pd.to_datetime(re_df["time"]).dt.tz_localize(TIMEZONE).dt.date
        refill_daily = re_df.groupby("date")["jump_mm"].count().rename("refill_count_day")
        refill_daily.to_csv(com_dir / "refill_daily_count.csv", index=True)
    else:
        refill_daily = pd.Series(dtype=float)

    # Resultado resumen
    return {
        "name": name,
        "mac": df["mac"].mode().iloc[0] if not df["mac"].isna().all() else "",
        "sampling_s_median": qc["sampling_seconds_median"],
        "coverage_h": qc["coverage_hours"],
        "n_points": qc["n_points"],
        "n_occ_events": int(len(events_occ)),
        "n_refill_events": int(len(events_refill)),
        "occ_total_h": float(events_occ["duration_s"].sum()/3600.0) if len(events_occ)>0 else 0.0,
        "plots_overview": str((out_dir / name / "overview_vlx.png").resolve()) if SAVE_PLOTS else "",
        "path_daily": str((out_dir / name / "daily_summary.csv").resolve()) if SAVE_DAILIES else "",
        "path_qc": str((out_dir / name / "qc.json").resolve()) if SAVE_QC else "",
    }


# --------------------------- CARGA Y EJECUCIÓN ---------------------------

parquets = sorted(IN_DIR.glob("*.parquet"))
if not parquets:
    raise SystemExit(f"No se encontraron .parquet en {IN_DIR.resolve()}")

OUT_ROOT.mkdir(parents=True, exist_ok=True)

dfs: Dict[str, pd.DataFrame] = {}
for p in parquets:
    name = p.stem.lower()  # ej: ls1
    df = pd.read_parquet(p)
    # Validación mínima y casting
    ensure_columns(df, ("mac","time","vlx"))
    dfs[name] = df[["mac","time","vlx"] + ([c for c in df.columns if c=="source_file"])]

# Procesa cada comedero
summaries = []
for name, df in dfs.items():
    print(f"==> Procesando {name} ...")
    res = process_one(df, name=name, out_dir=OUT_ROOT)
    summaries.append(res)

# Resumen global
summary_df = pd.DataFrame(summaries).sort_values("name")
summary_df.to_csv(OUT_ROOT / "summary_all_feeders.csv", index=False)
print("\n=== Resumen global ===")
print(summary_df)

# Ayuda adicional: mostrar dónde está todo
print(f"\nArtefactos guardados en: {OUT_ROOT.resolve()}")
print(" - Por comedero: <lsX>/events_*.csv, daily_summary.csv, qc.json, overview_vlx.png")
print(" - Global: summary_all_feeders.csv")


# --------------------------- NOTAS DE INTERPRETACIÓN ---------------------------
# - "vlx_smooth" es la distancia suavizada. Si el sensor está encima del pienso:
#     * Distancias MENORES -> más pienso (nivel alto) o intrusión (hocico).
#     * Distancias MAYORES -> menos pienso (nivel bajo).
# - Ocupación: se estima como episodios de dips rápidos (z<-2.5 respecto a baseline lento) y/o por Otsu si bimodalidad.
# - Refill: saltos negativos grandes (distancia cae bruscamente) y espaciados ≥ REFILL_MIN_SEP_MIN.
# - Daily net_diff_mm: diferencia final-inicial del día (signo depende del montaje).
#   Para consumo real, sustituir mm_to_kg() con la conversión geométrica adecuada.


==> Procesando ls1 ...


  dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)
  baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
  mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()


==> Procesando ls2 ...


  dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)
  baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
  mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()


==> Procesando ls3 ...


  dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)
  baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
  mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()


==> Procesando ls4 ...
==> Procesando ls5 ...


  dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)
  baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
  mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()
  daily["max_drop_mm"] = tmp[col].resample("1D").apply(lambda s: np.nanmin(np.diff(s.values)) if len(s)>1 else np.nan)
  daily["max_rise_mm"] = tmp[col].resample("1D").apply(lambda s: np.nanmax(np.diff(s.values)) if len(s)>1 else np.nan)
  dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)
  baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
  mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()


==> Procesando ls6 ...

=== Resumen global ===
  name                mac  sampling_s_median  coverage_h  n_points  \
0  ls1  40:22:D8:F1:E3:70             1802.0     527.331      1058   
1  ls2  40:22:D8:F1:E2:CC             1802.0     527.324       902   
2  ls3  40:22:D8:F1:E3:80             1802.0     526.952      1056   
3  ls4  B8:D6:1A:60:95:30                1.0    1272.051       144   
4  ls5  B8:D6:1A:60:94:1C             1802.0     527.784      1058   
5  ls6  D8:13:2A:D2:36:B4              723.5     528.014        15   

   n_occ_events  n_refill_events  occ_total_h  \
0             0                2          0.0   
1             0                1          0.0   
2             0                3          0.0   
3             0                0          0.0   
4             0                3          0.0   
5             0                0          0.0   

                                      plots_overview  \
0  D:\ls_feed\data_drive\_analysis\ls1\overview_v...   
1  D:\

  dt = pd.to_datetime(s.astype(str), errors="coerce", infer_datetime_format=True)
  baseline = ts["vlx_smooth"].rolling(window=window_str, center=True, min_periods=1).median()
  mad = residual.abs().rolling(window=window_str, center=True, min_periods=1).median()
  daily["max_drop_mm"] = tmp[col].resample("1D").apply(lambda s: np.nanmin(np.diff(s.values)) if len(s)>1 else np.nan)
  daily["max_rise_mm"] = tmp[col].resample("1D").apply(lambda s: np.nanmax(np.diff(s.values)) if len(s)>1 else np.nan)
