In [1]:
# Parameters
run_date = "2026-01-28"
output_dir = "C:\\datum-api-examples-main\\OriON\\signals"


In [2]:
# Parameters
run_date = "2026-01-01"  # papermill replacement
import os
output_dir = os.environ.get("ORION_SIGNALS_DIR", "../signals")
config_path = os.environ.get("DATUM_API_CONFIG_PATH", "../ops/datum_api_config.json")
dry_run = False

# ensure output exists
os.makedirs(output_dir, exist_ok=True)


In [3]:
# Import basic modules
import pandas as pd
from datum_api_client import DatumApi
import datetime
from datetime import timedelta
import gzip
from pathlib import Path


# Import warnings
import warnings
warnings.filterwarnings("ignore")
# pip install xlrd
# pip install openpyxl
import os
from pathlib import Path

In [4]:
from __future__ import annotations


def devsig_stream_stats_v12_exporter(
    input_path: str,
    *,
    # === outputs ===
    output_onefile_jsonl: str = "ARBITRAGE/onefile.jsonl",
    output_summary_csv: str = "ARBITRAGE/summary.csv",
    output_best_params_jsonl: str = "ARBITRAGE/best_params.jsonl",
    # === include heavy parts ===
    include_events_pre: bool = False,
    include_events_intra: bool = False,
    include_events_post: bool = False,
    max_events_per_ticker: int = 500,
    # === thresholds ===
    dev_thr: float = 0.30,      # trigger (abs(dev_sig) >= dev_thr)
    norm_thr: float = 0.10,     # HARD normalization threshold (abs(dev_sig) <= norm_thr)
    soft_ratio: float = 3.0,    # SOFT: abs(dev_sig) <= peak_abs / soft_ratio
    # === best params selection rules (kept; we ADD simpler ANY windows) ===
    best_rules: "dict|None" = None,
    # === reading ===
    assume_sorted: bool = True,
    parquet_use_pyarrow: bool = True,
    parquet_iter_batches: bool = True,   # ✅ optional speedup (Step 4)
    parquet_batch_size: int = 1_000_000, # ✅ batch size for iter_batches
    csv_chunksize: int = 500_000,
    log_every_n_chunks: int = 5,
    # === bins ===
    sigma_bin_min: float = 0.2,
    sigma_bin_max: float = 2.7,
    sigma_bin_step: float = 0.1,
    bench_bin_min: float = -3.0,
    bench_bin_max: float = 3.0,
    bench_bin_step: float = 0.1,
    # === time bands ===
    start_band_minutes: int = 30,
    norm_band_minutes: int = 30,
    # === numeric fields stored in data ===
    BENCH_NUM_FIELD: str = "Bench%",
    STOCK_NUM_FIELD: str = "Stack%",
    # === global filter for ALL outputs ===
    min_events_per_ticker: int = 10,
    # === open series ===
    open_series_downsample_seconds: int = 60,  # 60s => 1 point / minute
):
    """
    v12 exporter UPDATED with BLUE + POST and strict "parallel class checks" semantics:

    ✅ Classes:
      - PRE classes: BLUE, ARK, PRINT, OPEN (all evaluated in parallel for the same PRE event)
      - GLOBAL = priority selector over {BLUE, ARK, PRINT, OPEN} (POST NOT included)
      - INTRA class (10:00–12:00)
      - POST class (16:01–19:59) (separate event stream, not in GLOBAL)

    ✅ GLOBAL priority:
      BLUE_HARD > ARK_HARD > PRINT_HARD > BLUE_SOFT > ARK_SOFT > PRINT_SOFT > OPEN_HARD > OPEN_SOFT > NONE

    ✅ BEST_PARAMS:
      - best_windows_any stitched for ALL classes:
        blue/ark/print/open/global/intra/post, each per sign (pos/neg)
      - uses ANY = hard+soft normalization ratio, thresholds total>=4, rate>=0.6

    ✅ IMPORTANT SEMANTICS:
      - BLUE/ARK/PRINT/OPEN do NOT mute each other. They all get their own hard/soft/none outcome.
      - PRE event is finalized after OPEN window (same as before).
      - BLUE has its OWN peak (frozen until 03:59) and soft is evaluated vs BLUE peak.
      - ARK/PRINT/OPEN use PRE peak frozen until 09:29.

    ✅ PERFORMANCE PATCHES (no semantic changes):
      - gzip outputs: .jsonl -> .jsonl.gz (if path endswith .gz)
      - vectorized dt/dev parsing + ignored window filtering before loop
      - avoid parse_dt()/hhmm()/is_ignored_time() per row
      - reduce gc.collect() frequency
      - optional pyarrow iter_batches() for parquet
    """
    import os, gc, json, time, math, gzip
    from collections import deque, defaultdict, Counter
    from datetime import datetime, timedelta
    import numpy as np
    import pandas as pd

    # ---------------- defaults for best rules (kept) ----------------
    if best_rules is None:
        best_rules = {
            "sigma_any":  {"min_rate": 0.60, "min_total": 20, "top_n": 3},
            "sigma_hard": {"min_rate": 0.55, "min_total": 20, "top_n": 3},
            "sigma_soft": {"min_rate": 0.60, "min_total": 15, "top_n": 3},

            "bench_any":  {"min_rate": 0.58, "min_total": 25, "top_n": 3},
            "bench_hard": {"min_rate": 0.52, "min_total": 25, "top_n": 3},
            "bench_soft": {"min_rate": 0.60, "min_total": 20, "top_n": 3},

            "soft_peak_rate": {"min_rate": 0.55, "min_total": 15, "top_n": 3},
            "soft_low_rate":  {"min_rate": 0.55, "min_total": 15, "top_n": 3},

            "start_band_any": {"min_rate": 0.60, "min_total": 20, "top_n": 3},
        }

    # ---------------- gzip-aware open ----------------
    def _open_text(path: str, mode: str = "wt"):
        # mode expected: "wt" or "at"
        if str(path).lower().endswith(".gz"):
            return gzip.open(
                path,
                mode,
                encoding="utf-8",
                newline="\n",
                compresslevel=6,  # баланс швидкість/розмір
            )
        return open(path, mode.replace("t", ""), encoding="utf-8", newline="\n")

    Path(output_onefile_jsonl).parent.mkdir(parents=True, exist_ok=True)
    Path(output_summary_csv).parent.mkdir(parents=True, exist_ok=True)
    Path(output_best_params_jsonl).parent.mkdir(parents=True, exist_ok=True)


    onefile_f = _open_text(output_onefile_jsonl, "wt")

    summary_cols = [
        "ticker", "bench", "events_total",
        "events_pre_total", "events_intra_total", "events_post_total",

        "blue_any_rate", "blue_hard_rate", "blue_soft_rate",
        "ark_any_rate", "ark_hard_rate", "ark_soft_rate",
        "print_any_rate", "print_hard_rate", "print_soft_rate",
        "open_any_rate", "open_hard_rate", "open_soft_rate",
        "global_any_rate", "global_hard_rate", "global_soft_rate",
        "intra_any_rate", "intra_hard_rate", "intra_soft_rate",
        "post_any_rate", "post_hard_rate", "post_soft_rate",

        "corr", "beta", "sigma",
    ]
    pd.DataFrame(columns=summary_cols).to_csv(output_summary_csv, index=False, mode="w")

    best_params_f = _open_text(output_best_params_jsonl, "wt")
    best_params_f.write(json.dumps({
        "meta": {"version": "v12+blue+post", "generated_at": datetime.utcnow().isoformat() + "Z"}
    }, ensure_ascii=False) + "\n")

    # ---------------- helpers ----------------
    def _json_safe(x):
        if x is None:
            return None
        if isinstance(x, (np.floating, float)):
            if np.isnan(x) or np.isinf(x):
                return None
            return float(x)
        if isinstance(x, (np.integer, int)):
            return int(x)
        if isinstance(x, (np.bool_, bool)):
            return bool(x)
        if isinstance(x, (pd.Timestamp,)):
            return x.isoformat()
        if isinstance(x, (datetime,)):
            return x.isoformat()
        return x

    def is_finite_num(x) -> bool:
        try:
            return np.isfinite(float(x))
        except Exception:
            return False

    def as_float_or_nan(x) -> float:
        try:
            return float(x)
        except Exception:
            return np.nan

    def hhmm(dt_obj):
        return (dt_obj.hour, dt_obj.minute) if isinstance(dt_obj, datetime) else None

    def in_range(t, a, b):
        return (t is not None) and (a <= t <= b)

    def _dt_iso(x):
        return x.isoformat() if isinstance(x, datetime) else None

    def floor_to_band(dt_obj: datetime, minutes: int) -> str:
        if not isinstance(dt_obj, datetime):
            return None
        m = (dt_obj.minute // minutes) * minutes
        start = dt_obj.replace(minute=m, second=0, microsecond=0)
        end = start + timedelta(minutes=minutes)
        return f"{start.hour:02d}:{start.minute:02d}-{end.hour:02d}:{end.minute:02d}"

    # ---------------- time windows ----------------
    BLUE_FROM = (0, 1)
    BLUE_TO   = (3, 59)

    ARK_FROM = (0, 5)
    ARK_TO   = (9, 29)

    PRINT_FROM = (9, 30)
    PRINT_TO   = (9, 35)

    OPEN_FROM  = (9, 31)
    OPEN_TO    = (9, 40)

    INTRA_FROM = (10, 0)
    INTRA_TO   = (12, 0)

    POST_FROM  = (16, 1)
    POST_TO    = (19, 59)

    # ✅ ignored windows (vectorized filtering uses these exact bounds, inclusive)
    IGNORE_WINDOWS = [((3, 58), (4, 5)), ((7, 58), (8, 5))]

    def is_ignored_time(t):
        return any(in_range(t, a, b) for a, b in IGNORE_WINDOWS)

    # ---------------- binning ----------------
    def _clamp(v, lo, hi):
        return max(lo, min(hi, v))

    def sigma_bin(abs_sigma):
        if not is_finite_num(abs_sigma):
            return None
        v = float(abs_sigma)
        v = _clamp(v, sigma_bin_min, sigma_bin_max)
        b = round(np.floor(v / sigma_bin_step) * sigma_bin_step, 1)
        return f"{b:.1f}"

    def bench_bin(val):
        if not is_finite_num(val):
            return None
        v = float(val)
        v = _clamp(v, bench_bin_min, bench_bin_max)
        b = round(np.floor(v / bench_bin_step) * bench_bin_step, 1)
        return f"{b:.1f}"

    def _score(rate: float, total: int) -> float:
        return float(rate) * math.log1p(int(total))

    # ---- “simple ANY windows” selection (rate>=0.6 & total>=4) ----
    def stitch_numeric_bin_intervals_from_any(
        bin_counts: dict, *, step: float, min_total: int = 4, min_rate: float = 0.6
    ):
        eligible = []
        for b_str, st in (bin_counts or {}).items():
            try:
                b = float(b_str)
            except Exception:
                continue
            total = int(st.get("total", 0))
            if total < min_total:
                continue
            any_ = int(st.get("hard", 0)) + int(st.get("soft", 0))
            rate = any_ / total if total else 0.0
            if rate >= min_rate:
                eligible.append(b)

        eligible.sort()
        if not eligible:
            return []

        intervals = []
        lo = hi = eligible[0]
        for b in eligible[1:]:
            if abs(b - (hi + step)) <= 1e-9:
                hi = b
            else:
                intervals.append((lo, hi))
                lo = hi = b
        intervals.append((lo, hi))

        out = []
        for lo, hi in intervals:
            tot = hard = soft = none = 0
            k = lo
            while k <= hi + 1e-9:
                ks = f"{k:.1f}"
                st = (bin_counts or {}).get(ks)
                if st:
                    tot += int(st.get("total", 0))
                    hard += int(st.get("hard", 0))
                    soft += int(st.get("soft", 0))
                    none += int(st.get("none", 0))
                k = round(k + step, 10)
            if tot <= 0:
                continue
            rate = (hard + soft) / tot
            if tot >= min_total and rate >= min_rate:
                out.append({
                    "lo": round(lo, 2),
                    "hi": round(hi, 2),
                    "total": int(tot),
                    "hard": int(hard),
                    "soft": int(soft),
                    "none": int(none),
                    "rate": float(rate),
                    "score": _score(rate, tot),
                })

        out.sort(key=lambda x: (x["score"], x["total"]), reverse=True)
        return out

    def stitch_timeband_intervals_from_any(
        band_counts: dict, *, min_total: int = 4, min_rate: float = 0.6
    ):
        def band_key_to_minutes(k: str):
            try:
                a, _b = k.split("-")
                h1, m1 = map(int, a.split(":"))
                return h1 * 60 + m1
            except Exception:
                return None

        items = []
        for k, st in (band_counts or {}).items():
            tot = int(st.get("total", 0))
            if tot < min_total:
                continue
            any_ = int(st.get("hard", 0)) + int(st.get("soft", 0))
            rate = any_ / tot if tot else 0.0
            if rate >= min_rate:
                km = band_key_to_minutes(k)
                if km is not None:
                    items.append((km, k))
        items.sort()
        if not items:
            return []

        stitched_groups = []
        cur = [items[0][1]]
        for _, k in items[1:]:
            prev = cur[-1]
            try:
                prev_end = prev.split("-")[1]
                k_start = k.split("-")[0]
                if prev_end == k_start:
                    cur.append(k)
                else:
                    stitched_groups.append(cur)
                    cur = [k]
            except Exception:
                stitched_groups.append(cur)
                cur = [k]
        stitched_groups.append(cur)

        stitched = []
        for bands in stitched_groups:
            tot = hard = soft = none = 0
            for b in bands:
                st = (band_counts or {}).get(b, {})
                tot += int(st.get("total", 0))
                hard += int(st.get("hard", 0))
                soft += int(st.get("soft", 0))
                none += int(st.get("none", 0))
            if tot <= 0:
                continue
            rate = (hard + soft) / tot
            if tot >= min_total and rate >= min_rate:
                stitched.append({
                    "from": bands[0],
                    "to": bands[-1],
                    "bands": bands,
                    "total": int(tot),
                    "hard": int(hard),
                    "soft": int(soft),
                    "none": int(none),
                    "rate": float(rate),
                    "score": _score(rate, tot),
                })
        stitched.sort(key=lambda x: (x["score"], x["total"]), reverse=True)
        return stitched

    # ---------------- global label priority (GLOBAL only) ----------------
    GLOBAL_PRIORITY = [
        "BLUE_HARD",
        "ARK_HARD",
        "PRINT_HARD",
        "BLUE_SOFT",
        "ARK_SOFT",
        "PRINT_SOFT",
        "OPEN_HARD",
        "OPEN_SOFT",
        "NONE",
    ]

    def compute_global_label(blue_status, ark_status, print_status, open_status):
        if blue_status == "hard":
            return "BLUE_HARD"
        if ark_status == "hard":
            return "ARK_HARD"
        if print_status == "hard":
            return "PRINT_HARD"
        if blue_status == "soft":
            return "BLUE_SOFT"
        if ark_status == "soft":
            return "ARK_SOFT"
        if print_status == "soft":
            return "PRINT_SOFT"
        if open_status == "hard":
            return "OPEN_HARD"
        if open_status == "soft":
            return "OPEN_SOFT"
        return "NONE"

    # ---------------- per-ticker state ----------------
    cur_ticker = None
    cur_day = None

    bench_name_seen = None
    static_triplet_set = False
    corr_static = beta_static = sigma_static = None

    # optional heavy buffers
    pre_events_buf = deque(maxlen=max_events_per_ticker)
    intra_events_buf = deque(maxlen=max_events_per_ticker)
    post_events_buf = deque(maxlen=max_events_per_ticker)

    # counts per class
    counts_pre = {
        "blue": Counter(),
        "ark": Counter(),
        "print": Counter(),
        "open": Counter(),
        "global": Counter(),
    }
    global_labels_counter = Counter()
    counts_intra = {"intra": Counter()}
    counts_post = {"post": Counter()}

    # sigma bins: class -> sign -> bin -> Counter(total/hard/soft/none)
    def make_sigma_bins_map(classes):
        m = {}
        for c in classes:
            m[c] = {"pos": defaultdict(lambda: Counter()), "neg": defaultdict(lambda: Counter())}
        return m

    sigma_bins_pre = make_sigma_bins_map(["blue", "ark", "print", "open", "global"])
    sigma_bins_intra = make_sigma_bins_map(["intra"])
    sigma_bins_post = make_sigma_bins_map(["post"])

    # bench bins
    def make_bench_bins_map(classes):
        out = {}
        for c in classes:
            out[c] = {
                "start": {"pos": defaultdict(lambda: Counter()), "neg": defaultdict(lambda: Counter())},
                "peak":  {"pos": defaultdict(lambda: Counter()), "neg": defaultdict(lambda: Counter())},
                "norm":  {"pos": defaultdict(lambda: Counter()), "neg": defaultdict(lambda: Counter())},
            }
        return out

    bench_bins_pre = make_bench_bins_map(["blue", "ark", "print", "open", "global"])
    bench_bins_intra = make_bench_bins_map(["intra"])
    bench_bins_post = make_bench_bins_map(["post"])

    # time bands (OLD ones kept) — PRE/INTRA/POST each
    start_bands_pre_total = Counter()
    start_bands_pre_any   = Counter()
    start_bands_pre_hard  = Counter()
    start_bands_pre_soft  = Counter()
    norm_bands_pre_any   = Counter()
    norm_bands_pre_hard  = Counter()
    norm_bands_pre_soft  = Counter()

    start_bands_intra_total = Counter()
    start_bands_intra_any   = Counter()
    start_bands_intra_hard  = Counter()
    start_bands_intra_soft  = Counter()
    norm_bands_intra_any   = Counter()
    norm_bands_intra_hard  = Counter()
    norm_bands_intra_soft  = Counter()

    start_bands_post_total = Counter()
    start_bands_post_any   = Counter()
    start_bands_post_hard  = Counter()
    start_bands_post_soft  = Counter()
    norm_bands_post_any   = Counter()
    norm_bands_post_hard  = Counter()
    norm_bands_post_soft  = Counter()

    # NEW: time bands per class+sign with total/hard/soft/none
    def make_timeband_map(classes):
        out = {}
        for c in classes:
            out[c] = {
                "start": {"pos": defaultdict(lambda: Counter()), "neg": defaultdict(lambda: Counter())},
                "norm":  {"pos": defaultdict(lambda: Counter()), "neg": defaultdict(lambda: Counter())},
            }
        return out

    timebands_pre_by_class_sign = make_timeband_map(["blue", "ark", "print", "open", "global"])
    timebands_intra_by_class_sign = make_timeband_map(["intra"])
    timebands_post_by_class_sign = make_timeband_map(["post"])

    # ✅ last3 examples per class AND per sign
    def make_last3_map(classes):
        return {c: {"pos": deque(maxlen=3), "neg": deque(maxlen=3)} for c in classes}

    last3_examples = make_last3_map(["blue", "ark", "print", "open", "global", "intra", "post"])

    # RECENT by DAYS (kept)
    recent_days = deque(maxlen=10)  # day strings YYYY-MM-DD
    recent_by_day = {}  # day -> {"print":..., "peak":...}

    last5_print_days_pos = deque(maxlen=5)
    last5_print_days_neg = deque(maxlen=5)

    last5_peak_days_pos = deque(maxlen=5)
    last5_peak_days_neg = deque(maxlen=5)

    # HARD delays (kept + blue/post)
    hard_delay_sum = Counter()
    hard_delay_cnt = Counter()

    # mean peak_abs for globally normalized events (kept) — now picks BLUE peak if global is BLUE
    global_norm_peak_sum = {"pos": 0.0, "neg": 0.0}
    global_norm_peak_cnt = {"pos": 0, "neg": 0}

    # OPEN dev_sig series for last10 days (downsample by seconds)
    open_series_by_day = {}

    # ---------------- reset ticker ----------------
    def reset_ticker_state():
        nonlocal bench_name_seen, static_triplet_set, corr_static, beta_static, sigma_static
        nonlocal pre_events_buf, intra_events_buf, post_events_buf
        nonlocal counts_pre, global_labels_counter, counts_intra, counts_post
        nonlocal sigma_bins_pre, sigma_bins_intra, sigma_bins_post
        nonlocal bench_bins_pre, bench_bins_intra, bench_bins_post
        nonlocal start_bands_pre_total, start_bands_pre_any, start_bands_pre_hard, start_bands_pre_soft
        nonlocal norm_bands_pre_any, norm_bands_pre_hard, norm_bands_pre_soft
        nonlocal start_bands_intra_total, start_bands_intra_any, start_bands_intra_hard, start_bands_intra_soft
        nonlocal norm_bands_intra_any, norm_bands_intra_hard, norm_bands_intra_soft
        nonlocal start_bands_post_total, start_bands_post_any, start_bands_post_hard, start_bands_post_soft
        nonlocal norm_bands_post_any, norm_bands_post_hard, norm_bands_post_soft
        nonlocal timebands_pre_by_class_sign, timebands_intra_by_class_sign, timebands_post_by_class_sign
        nonlocal last3_examples
        nonlocal recent_days, recent_by_day, last5_print_days_pos, last5_print_days_neg
        nonlocal last5_peak_days_pos, last5_peak_days_neg
        nonlocal hard_delay_sum, hard_delay_cnt
        nonlocal global_norm_peak_sum, global_norm_peak_cnt
        nonlocal open_series_by_day

        bench_name_seen = None
        static_triplet_set = False
        corr_static = beta_static = sigma_static = None

        pre_events_buf = deque(maxlen=max_events_per_ticker)
        intra_events_buf = deque(maxlen=max_events_per_ticker)
        post_events_buf = deque(maxlen=max_events_per_ticker)

        counts_pre = {"blue": Counter(), "ark": Counter(), "print": Counter(), "open": Counter(), "global": Counter()}
        global_labels_counter = Counter()
        counts_intra = {"intra": Counter()}
        counts_post = {"post": Counter()}

        sigma_bins_pre = make_sigma_bins_map(["blue", "ark", "print", "open", "global"])
        sigma_bins_intra = make_sigma_bins_map(["intra"])
        sigma_bins_post = make_sigma_bins_map(["post"])

        bench_bins_pre = make_bench_bins_map(["blue", "ark", "print", "open", "global"])
        bench_bins_intra = make_bench_bins_map(["intra"])
        bench_bins_post = make_bench_bins_map(["post"])

        start_bands_pre_total = Counter()
        start_bands_pre_any   = Counter()
        start_bands_pre_hard  = Counter()
        start_bands_pre_soft  = Counter()
        norm_bands_pre_any   = Counter()
        norm_bands_pre_hard  = Counter()
        norm_bands_pre_soft  = Counter()

        start_bands_intra_total = Counter()
        start_bands_intra_any   = Counter()
        start_bands_intra_hard  = Counter()
        start_bands_intra_soft  = Counter()
        norm_bands_intra_any   = Counter()
        norm_bands_intra_hard  = Counter()
        norm_bands_intra_soft  = Counter()

        start_bands_post_total = Counter()
        start_bands_post_any   = Counter()
        start_bands_post_hard  = Counter()
        start_bands_post_soft  = Counter()
        norm_bands_post_any   = Counter()
        norm_bands_post_hard  = Counter()
        norm_bands_post_soft  = Counter()

        timebands_pre_by_class_sign = make_timeband_map(["blue", "ark", "print", "open", "global"])
        timebands_intra_by_class_sign = make_timeband_map(["intra"])
        timebands_post_by_class_sign = make_timeband_map(["post"])

        last3_examples = make_last3_map(["blue", "ark", "print", "open", "global", "intra", "post"])

        recent_days = deque(maxlen=10)
        recent_by_day = {}
        last5_print_days_pos = deque(maxlen=5)
        last5_print_days_neg = deque(maxlen=5)
        last5_peak_days_pos = deque(maxlen=5)
        last5_peak_days_neg = deque(maxlen=5)

        hard_delay_sum = Counter()
        hard_delay_cnt = Counter()

        global_norm_peak_sum = {"pos": 0.0, "neg": 0.0}
        global_norm_peak_cnt = {"pos": 0, "neg": 0}

        open_series_by_day = {}

    # ---------------- common utils ----------------

    def parse_dt(x):
        """Уніфіковано привести вхід до datetime або None.
        Підтримує: datetime, pd.Timestamp, ISO-рядки; повертає Python datetime (може бути tz-aware) або None.
        """
        try:
            if x is None:
                return None
            if isinstance(x, datetime):
                return x
            # pandas поверне pd.Timestamp; встановлюємо utc=True щоб уникнути неоднозначностей
            ts = pd.to_datetime(x, errors="coerce", utc=True)
            if pd.isna(ts):
                return None
            return ts.to_pydatetime()
        except Exception:
            return None

    
    def push_last3_example(class_key, sign_key, kind, start_dt, end_dt, start_dev, end_dev, peak_dev,
                           start_stock, end_stock, start_bench, end_bench, start_time=None, end_time=None):
        d = start_dt.date().isoformat() if isinstance(start_dt, datetime) else None
        last3_examples[class_key][sign_key].appendleft({
            "date": d,
            "dt": _dt_iso(start_dt),
            "kind": kind,  # "hard"/"soft"
            "start_time": start_time,
            "end_time": end_time,
            "start_dev": _json_safe(start_dev),
            "peak_dev": _json_safe(peak_dev),
            "end_dev": _json_safe(end_dev),
            "stock_start": _json_safe(start_stock),
            "stock_end": _json_safe(end_stock),
            "bench_start": _json_safe(start_bench),
            "bench_end": _json_safe(end_bench),
        })

    def update_sigma_bins(map_ref, class_key, sign_key, abs_peak_sigma, outcome_kind):
        b = sigma_bin(abs_peak_sigma)
        if b is None:
            return
        st = map_ref[class_key][sign_key][b]
        st["total"] += 1
        if outcome_kind not in ("hard", "soft", "none"):
            outcome_kind = "none"
        st[outcome_kind] += 1

    def update_bench_bins(map_ref, class_key, which, sign_key, bench_value, outcome_kind):
        b = bench_bin(bench_value)
        if b is None:
            return
        st = map_ref[class_key][which][sign_key][b]
        st["total"] += 1
        if outcome_kind not in ("hard", "soft", "none"):
            outcome_kind = "none"
        st[outcome_kind] += 1

    def update_timeband_by_class_sign(map_ref, class_key, which, sign_key, band_key, outcome_kind):
        if not band_key:
            return
        st = map_ref[class_key][which][sign_key][band_key]
        st["total"] += 1
        if outcome_kind not in ("hard", "soft", "none"):
            outcome_kind = "none"
        st[outcome_kind] += 1
        _ = st["hard"]; _ = st["soft"]; _ = st["none"]

    def class_rates(counter: Counter):
        total = int(sum(counter.values()))
        hard = int(counter.get("hard", 0))
        soft = int(counter.get("soft", 0))
        none = int(counter.get("none", 0))
        any_ = hard + soft
        return {
            "total": total,
            "hard": hard,
            "soft": soft,
            "none": none,
            "rate_any": (any_ / total) if total else None,
            "rate_hard": (hard / total) if total else None,
            "rate_soft": (soft / total) if total else None,
            "hard_share_in_norm": (hard / (hard + soft)) if (hard + soft) else None,
        }

    def add_hard_delay(key: str, start_dt: datetime, hard_dt: datetime):
        if isinstance(start_dt, datetime) and isinstance(hard_dt, datetime) and hard_dt >= start_dt:
            hard_delay_sum[key] += (hard_dt - start_dt).total_seconds()
            hard_delay_cnt[key] += 1

    def avg_hard_delay(key: str):
        c = int(hard_delay_cnt.get(key, 0))
        if c <= 0:
            return None
        return float(hard_delay_sum.get(key, 0.0)) / c

    # ---------------- PRE event state ----------------
    pre_active = False
    pre_id = 0

    pre_start_dt = None
    pre_start_dev = np.nan
    pre_start_sign = 0
    pre_start_stock = np.nan
    pre_start_bench = np.nan

    # PRE peak frozen until 09:29 (for ARK/PRINT/OPEN/GLOBAL)
    pre_peak_abs = 0.0
    pre_peak_signed = 0.0
    pre_peak_dt = None
    pre_peak_stock = np.nan
    pre_peak_bench = np.nan

    pre_post_peak_low_abs = np.inf

    # BLUE peak frozen until 03:59 (for BLUE soft)
    blue_peak_abs = 0.0
    blue_peak_signed = 0.0
    blue_peak_dt = None
    blue_peak_stock = np.nan
    blue_peak_bench = np.nan

    blue_hard_dt = None
    blue_hard_val = np.nan
    blue_hard_stock = np.nan
    blue_hard_bench = np.nan

    blue_soft_found = False
    blue_soft_dt = None
    blue_soft_val = np.nan
    blue_soft_stock = np.nan
    blue_soft_bench = np.nan

    ark_hard_dt = None
    ark_hard_val = np.nan
    ark_hard_stock = np.nan
    ark_hard_bench = np.nan

    ark_soft_found = False
    ark_soft_dt = None
    ark_soft_val = np.nan
    ark_soft_stock = np.nan
    ark_soft_bench = np.nan

    print_first_dt = None
    print_first_val = np.nan
    print_first_stock = np.nan
    print_first_bench = np.nan

    open_hard_dt = None
    open_hard_val = np.nan
    open_hard_stock = np.nan
    open_hard_bench = np.nan

    open_soft_found = False
    open_soft_dt = None
    open_soft_val = np.nan
    open_soft_stock = np.nan
    open_soft_bench = np.nan

    def reset_pre_event():
        nonlocal pre_active, pre_start_dt, pre_start_dev, pre_start_sign, pre_start_stock, pre_start_bench
        nonlocal pre_peak_abs, pre_peak_signed, pre_peak_dt, pre_peak_stock, pre_peak_bench
        nonlocal pre_post_peak_low_abs
        nonlocal blue_peak_abs, blue_peak_signed, blue_peak_dt, blue_peak_stock, blue_peak_bench
        nonlocal blue_hard_dt, blue_hard_val, blue_hard_stock, blue_hard_bench
        nonlocal blue_soft_found, blue_soft_dt, blue_soft_val, blue_soft_stock, blue_soft_bench
        nonlocal ark_hard_dt, ark_hard_val, ark_hard_stock, ark_hard_bench
        nonlocal ark_soft_found, ark_soft_dt, ark_soft_val, ark_soft_stock, ark_soft_bench
        nonlocal print_first_dt, print_first_val, print_first_stock, print_first_bench
        nonlocal open_hard_dt, open_hard_val, open_hard_stock, open_hard_bench
        nonlocal open_soft_found, open_soft_dt, open_soft_val, open_soft_stock, open_soft_bench

        pre_active = False
        pre_start_dt = None
        pre_start_dev = np.nan
        pre_start_sign = 0
        pre_start_stock = np.nan
        pre_start_bench = np.nan

        pre_peak_abs = 0.0
        pre_peak_signed = 0.0
        pre_peak_dt = None
        pre_peak_stock = np.nan
        pre_peak_bench = np.nan

        pre_post_peak_low_abs = np.inf

        blue_peak_abs = 0.0
        blue_peak_signed = 0.0
        blue_peak_dt = None
        blue_peak_stock = np.nan
        blue_peak_bench = np.nan

        blue_hard_dt = None
        blue_hard_val = np.nan
        blue_hard_stock = np.nan
        blue_hard_bench = np.nan

        blue_soft_found = False
        blue_soft_dt = None
        blue_soft_val = np.nan
        blue_soft_stock = np.nan
        blue_soft_bench = np.nan

        ark_hard_dt = None
        ark_hard_val = np.nan
        ark_hard_stock = np.nan
        ark_hard_bench = np.nan

        ark_soft_found = False
        ark_soft_dt = None
        ark_soft_val = np.nan
        ark_soft_stock = np.nan
        ark_soft_bench = np.nan

        print_first_dt = None
        print_first_val = np.nan
        print_first_stock = np.nan
        print_first_bench = np.nan

        open_hard_dt = None
        open_hard_val = np.nan
        open_hard_stock = np.nan
        open_hard_bench = np.nan

        open_soft_found = False
        open_soft_dt = None
        open_soft_val = np.nan
        open_soft_stock = np.nan
        open_soft_bench = np.nan

    def start_pre_event(dt_now, dev_now, stock_pct, bench_pct):
        nonlocal pre_active, pre_start_dt, pre_start_dev, pre_start_sign, pre_start_stock, pre_start_bench
        nonlocal pre_peak_abs, pre_peak_signed, pre_peak_dt, pre_peak_stock, pre_peak_bench
        nonlocal blue_peak_abs, blue_peak_signed, blue_peak_dt, blue_peak_stock, blue_peak_bench

        pre_active = True
        pre_start_dt = dt_now
        pre_start_dev = float(dev_now)
        pre_start_sign = 1 if float(dev_now) >= 0 else -1
        pre_start_stock = stock_pct
        pre_start_bench = bench_pct

        # PRE peak init (shared)
        pre_peak_abs = abs(float(dev_now))
        pre_peak_signed = float(dev_now)
        pre_peak_dt = dt_now
        pre_peak_stock = stock_pct
        pre_peak_bench = bench_pct

        # BLUE peak init (so BLUE soft works even if event starts in BLUE)
        blue_peak_abs = abs(float(dev_now))
        blue_peak_signed = float(dev_now)
        blue_peak_dt = dt_now
        blue_peak_stock = stock_pct
        blue_peak_bench = bench_pct

    def pre_sign_key():
        return "pos" if pre_start_sign > 0 else "neg"

    def classify_print_with_frozen_peak(first_val):
        if not is_finite_num(first_val):
            return "none"
        a = abs(float(first_val))
        if a <= norm_thr:
            return "hard"
        if pre_peak_abs > 0 and a <= (float(pre_peak_abs) / float(soft_ratio)):
            return "soft"
        return "none"

    def classify_blue():
        if blue_hard_dt is not None and is_finite_num(blue_hard_val):
            return "hard"
        if blue_soft_found and blue_soft_dt is not None and is_finite_num(blue_soft_val):
            return "soft"
        return "none"

    def classify_ark():
        if ark_hard_dt is not None and is_finite_num(ark_hard_val):
            return "hard"
        if ark_soft_found and ark_soft_dt is not None and is_finite_num(ark_soft_val):
            return "soft"
        return "none"

    def classify_open():
        if open_hard_dt is not None and is_finite_num(open_hard_val):
            return "hard"
        if open_soft_found and open_soft_dt is not None and is_finite_num(open_soft_val):
            return "soft"
        return "none"

    def capture_open_series(dt_now: datetime, dev_now: float):
        if not isinstance(dt_now, datetime):
            return
        t = (dt_now.hour, dt_now.minute)
        if not in_range(t, OPEN_FROM, OPEN_TO):
            return

        day_str = dt_now.date().isoformat()
        store = open_series_by_day.get(day_str)
        if store is None:
            store = {}
            open_series_by_day[day_str] = store

        sec = max(1, int(open_series_downsample_seconds))
        bucket_epoch = int(dt_now.timestamp() // sec) * sec
        # use pandas to create tz-aware Timestamp consistently
        bucket_dt = pd.Timestamp(bucket_epoch, unit="s", tz=dt_now.tzinfo).to_pydatetime()
        store[bucket_dt.isoformat()] = float(dev_now)

    def pre_process_tick(dt_now, dev_now, stock_pct, bench_pct):
        nonlocal pre_peak_abs, pre_peak_signed, pre_peak_dt, pre_peak_stock, pre_peak_bench
        nonlocal pre_post_peak_low_abs

        nonlocal blue_peak_abs, blue_peak_signed, blue_peak_dt, blue_peak_stock, blue_peak_bench
        nonlocal blue_hard_dt, blue_hard_val, blue_hard_stock, blue_hard_bench
        nonlocal blue_soft_found, blue_soft_dt, blue_soft_val, blue_soft_stock, blue_soft_bench

        nonlocal ark_hard_dt, ark_hard_val, ark_hard_stock, ark_hard_bench
        nonlocal ark_soft_found, ark_soft_dt, ark_soft_val, ark_soft_stock, ark_soft_bench

        nonlocal print_first_dt, print_first_val, print_first_stock, print_first_bench

        nonlocal open_hard_dt, open_hard_val, open_hard_stock, open_hard_bench
        nonlocal open_soft_found, open_soft_dt, open_soft_val, open_soft_stock, open_soft_bench

        t = (dt_now.hour, dt_now.minute)
        cur_abs = abs(float(dev_now))

        # capture open series (kept)
        capture_open_series(dt_now, float(dev_now))

        # ---------- BLUE processing inside 00:01–03:59 (parallel) ----------
        if in_range(t, BLUE_FROM, BLUE_TO):
            # BLUE peak frozen until 03:59
            if cur_abs > float(blue_peak_abs):
                blue_peak_abs = float(cur_abs)
                blue_peak_signed = float(dev_now)
                blue_peak_dt = dt_now
                blue_peak_stock = stock_pct
                blue_peak_bench = bench_pct
                if blue_hard_dt is None:
                    blue_soft_found = False
                    blue_soft_dt = None
                    blue_soft_val = np.nan
                    blue_soft_stock = np.nan
                    blue_soft_bench = np.nan

            # BLUE HARD
            if blue_hard_dt is None and cur_abs <= norm_thr:
                blue_hard_dt = dt_now
                blue_hard_val = float(dev_now)
                blue_hard_stock = stock_pct
                blue_hard_bench = bench_pct

            # BLUE SOFT only if hard not yet, after BLUE peak
            if (blue_hard_dt is None) and isinstance(blue_peak_dt, datetime) and (dt_now >= blue_peak_dt) and blue_peak_abs > 0:
                if cur_abs <= (float(blue_peak_abs) / float(soft_ratio)):
                    if (not blue_soft_found) or (dt_now < blue_soft_dt):
                        blue_soft_found = True
                        blue_soft_dt = dt_now
                        blue_soft_val = float(dev_now)
                        blue_soft_stock = stock_pct
                        blue_soft_bench = bench_pct

        # ---------- PRE peak (for ARK/PRINT/OPEN/GLOBAL) frozen until 09:29 ----------
        if t <= ARK_TO:
            if cur_abs > float(pre_peak_abs):
                pre_peak_abs = float(cur_abs)
                pre_peak_signed = float(dev_now)
                pre_peak_dt = dt_now
                pre_peak_stock = stock_pct
                pre_peak_bench = bench_pct
                if ark_hard_dt is None:
                    ark_soft_found = False
                    ark_soft_dt = None
                    ark_soft_val = np.nan
                    ark_soft_stock = np.nan
                    ark_soft_bench = np.nan

        # post-peak low abs after peak (kept)
        if isinstance(pre_peak_dt, datetime) and dt_now >= pre_peak_dt:
            if cur_abs < pre_post_peak_low_abs:
                pre_post_peak_low_abs = cur_abs

        # ARK HARD in [start..09:29]
        if (ark_hard_dt is None) and (t <= ARK_TO):
            if cur_abs <= norm_thr:
                ark_hard_dt = dt_now
                ark_hard_val = float(dev_now)
                ark_hard_stock = stock_pct
                ark_hard_bench = bench_pct

        # ARK SOFT only if hard failed, in [peak_dt..09:29]
        if (ark_hard_dt is None) and (t <= ARK_TO) and isinstance(pre_peak_dt, datetime):
            if dt_now >= pre_peak_dt and pre_peak_abs > 0:
                if cur_abs <= (float(pre_peak_abs) / float(soft_ratio)):
                    if (not ark_soft_found) or (dt_now < ark_soft_dt):
                        ark_soft_found = True
                        ark_soft_dt = dt_now
                        ark_soft_val = float(dev_now)
                        ark_soft_stock = stock_pct
                        ark_soft_bench = bench_pct

        # PRINT first tick only in [09:30..09:35]
        if (print_first_dt is None) and in_range(t, PRINT_FROM, PRINT_TO):
            print_first_dt = dt_now
            print_first_val = float(dev_now)
            print_first_stock = stock_pct
            print_first_bench = bench_pct

        # OPEN scan in [09:31..09:40]
        if in_range(t, OPEN_FROM, OPEN_TO):
            if open_hard_dt is None and cur_abs <= norm_thr:
                open_hard_dt = dt_now
                open_hard_val = float(dev_now)
                open_hard_stock = stock_pct
                open_hard_bench = bench_pct

            if open_hard_dt is None and pre_peak_abs > 0:
                gate_dt = pre_peak_dt if isinstance(pre_peak_dt, datetime) else dt_now
                open_gate = dt_now.replace(hour=OPEN_FROM[0], minute=OPEN_FROM[1], second=0, microsecond=0)
                if gate_dt < open_gate:
                    gate_dt = open_gate
                if dt_now >= gate_dt:
                    if cur_abs <= (float(pre_peak_abs) / float(soft_ratio)):
                        if (not open_soft_found) or (dt_now < open_soft_dt):
                            open_soft_found = True
                            open_soft_dt = dt_now
                            open_soft_val = float(dev_now)
                            open_soft_stock = stock_pct
                            open_soft_bench = bench_pct

    def _ensure_recent_day(day_str: str):
        if day_str not in recent_by_day:
            recent_by_day[day_str] = {"print": None, "peak": None}
        if (len(recent_days) == 0) or (recent_days[0] != day_str):
            if day_str in recent_days:
                recent_days.remove(day_str)
            recent_days.appendleft(day_str)

    def _update_recent_daily_print(day_str: str, print_dt: datetime, print_dev: float, peak_abs: float, peak_signed: float, first_sign: int):
        snap = recent_by_day.get(day_str)
        if snap is None:
            recent_by_day[day_str] = {"print": None, "peak": None}
            snap = recent_by_day[day_str]
        if snap["print"] is None:
            snap["print"] = {
                "dt": _dt_iso(print_dt),
                "dev": _json_safe(print_dev),
                "pre_peak_abs": _json_safe(peak_abs),
                "pre_peak_signed": _json_safe(peak_signed),
                "first_sign": int(first_sign),
            }

    def _update_recent_daily_peak(day_str: str, peak_dt: datetime, peak_abs: float, peak_signed: float, first_sign: int):
        snap = recent_by_day.get(day_str)
        if snap is None:
            recent_by_day[day_str] = {"print": None, "peak": None}
            snap = recent_by_day[day_str]
        cur = snap["peak"]
        if cur is None or (is_finite_num(peak_abs) and float(peak_abs) > float(cur.get("sigma_abs", 0.0) or 0.0)):
            snap["peak"] = {
                "dt": _dt_iso(peak_dt),
                "sigma_abs": _json_safe(peak_abs),
                "sigma_signed": _json_safe(peak_signed),
                "first_sign": int(first_sign),
            }

    def finalize_pre_event(reason="window_end"):
        nonlocal pre_active, pre_id
        nonlocal global_norm_peak_sum, global_norm_peak_cnt

        if not pre_active:
            return

        blue_status  = classify_blue()
        ark_status   = classify_ark()
        print_status = classify_print_with_frozen_peak(print_first_val)
        open_status  = classify_open()

        global_label = compute_global_label(blue_status, ark_status, print_status, open_status)
        if global_label.endswith("_HARD"):
            global_kind = "hard"
        elif global_label.endswith("_SOFT"):
            global_kind = "soft"
        else:
            global_kind = "none"

        sk = pre_sign_key()

        # OLD start band counters (kept) — any/hard/soft across all pre classes (OR semantics)
        sb_total = floor_to_band(pre_start_dt, start_band_minutes)
        if sb_total:
            start_bands_pre_total[sb_total] += 1
            if (blue_status != "none") or (ark_status != "none") or (print_status != "none") or (open_status != "none"):
                start_bands_pre_any[sb_total] += 1
            if (blue_status == "hard") or (ark_status == "hard") or (print_status == "hard") or (open_status == "hard"):
                start_bands_pre_hard[sb_total] += 1
            if (blue_status == "soft") or (ark_status == "soft") or (print_status == "soft") or (open_status == "soft"):
                start_bands_pre_soft[sb_total] += 1

        # NEW per-class-sign start band counters
        if sb_total:
            update_timeband_by_class_sign(timebands_pre_by_class_sign, "blue", "start", sk, sb_total, blue_status)
            update_timeband_by_class_sign(timebands_pre_by_class_sign, "ark", "start", sk, sb_total, ark_status)
            update_timeband_by_class_sign(timebands_pre_by_class_sign, "print", "start", sk, sb_total, print_status)
            update_timeband_by_class_sign(timebands_pre_by_class_sign, "open", "start", sk, sb_total, open_status)
            update_timeband_by_class_sign(timebands_pre_by_class_sign, "global", "start", sk, sb_total, global_kind)

        # counts (classes independent)
        counts_pre["blue"][blue_status] += 1
        counts_pre["ark"][ark_status] += 1
        counts_pre["print"][print_status] += 1
        counts_pre["open"][open_status] += 1
        counts_pre["global"][global_kind] += 1
        global_labels_counter[global_label] += 1

        # HARD delays (kept + blue)
        if blue_status == "hard":
            add_hard_delay("blue", pre_start_dt, blue_hard_dt)
        if ark_status == "hard":
            add_hard_delay("ark", pre_start_dt, ark_hard_dt)
        if print_status == "hard":
            add_hard_delay("print", pre_start_dt, print_first_dt)
        if open_status == "hard":
            add_hard_delay("open", pre_start_dt, open_hard_dt)
        if global_kind == "hard":
            # delay is delay to the winning hard in GLOBAL
            if global_label.startswith("BLUE_"):
                add_hard_delay("global", pre_start_dt, blue_hard_dt)
            elif global_label.startswith("ARK_"):
                add_hard_delay("global", pre_start_dt, ark_hard_dt)
            elif global_label.startswith("PRINT_"):
                add_hard_delay("global", pre_start_dt, print_first_dt)
            elif global_label.startswith("OPEN_"):
                add_hard_delay("global", pre_start_dt, open_hard_dt)

        # sigma bins:
        # - BLUE uses BLUE peak (frozen to 03:59)
        # - others use PRE peak (frozen to 09:29)
        update_sigma_bins(sigma_bins_pre, "blue", sk, blue_peak_abs, blue_status)
        update_sigma_bins(sigma_bins_pre, "ark", sk, pre_peak_abs, ark_status)
        update_sigma_bins(sigma_bins_pre, "print", sk, pre_peak_abs, print_status)
        update_sigma_bins(sigma_bins_pre, "open", sk, pre_peak_abs, open_status)
        update_sigma_bins(sigma_bins_pre, "global", sk, pre_peak_abs, global_kind)  # global is modeled on PRE peak scale

        # bench bins start+peak
        update_bench_bins(bench_bins_pre, "blue", "start", sk, pre_start_bench, blue_status)
        update_bench_bins(bench_bins_pre, "blue", "peak",  sk, blue_peak_bench, blue_status)

        for cls, status in (("ark", ark_status), ("print", print_status), ("open", open_status), ("global", global_kind)):
            update_bench_bins(bench_bins_pre, cls, "start", sk, pre_start_bench, status)
            update_bench_bins(bench_bins_pre, cls, "peak",  sk, pre_peak_bench, status)

        # norm dt per class (for norm bench bins + norm timebands)
        def cls_norm_dt(cls):
            if cls == "blue":
                return blue_hard_dt if blue_status == "hard" else (blue_soft_dt if blue_status == "soft" else None)
            if cls == "ark":
                return ark_hard_dt if ark_status == "hard" else (ark_soft_dt if ark_status == "soft" else None)
            if cls == "print":
                return print_first_dt if print_status != "none" else None
            if cls == "open":
                return open_hard_dt if open_status == "hard" else (open_soft_dt if open_status == "soft" else None)
            if cls == "global":
                if global_label.startswith("BLUE_"):
                    return cls_norm_dt("blue")
                if global_label.startswith("ARK_"):
                    return cls_norm_dt("ark")
                if global_label.startswith("PRINT_"):
                    return cls_norm_dt("print")
                if global_label.startswith("OPEN_"):
                    return cls_norm_dt("open")
                return None
            return None

        def cls_outcome(cls):
            if cls == "blue": return blue_status
            if cls == "ark": return ark_status
            if cls == "print": return print_status
            if cls == "open": return open_status
            if cls == "global": return global_kind
            return "none"

        def cls_peak_signed(cls):
            if cls == "blue": return blue_peak_signed
            return pre_peak_signed

        def cls_end_fields(cls, status):
            if cls == "blue":
                if status == "hard":
                    return blue_hard_val, blue_hard_stock, blue_hard_bench
                if status == "soft":
                    return blue_soft_val, blue_soft_stock, blue_soft_bench
                return np.nan, np.nan, np.nan
            if cls == "ark":
                if status == "hard":
                    return ark_hard_val, ark_hard_stock, ark_hard_bench
                if status == "soft":
                    return ark_soft_val, ark_soft_stock, ark_soft_bench
                return np.nan, np.nan, np.nan
            if cls == "print":
                if status != "none":
                    return print_first_val, print_first_stock, print_first_bench
                return np.nan, np.nan, np.nan
            if cls == "open":
                if status == "hard":
                    return open_hard_val, open_hard_stock, open_hard_bench
                if status == "soft":
                    return open_soft_val, open_soft_stock, open_soft_bench
                return np.nan, np.nan, np.nan
            if cls == "global":
                if global_label.startswith("BLUE_"):
                    return cls_end_fields("blue", blue_status)
                if global_label.startswith("ARK_"):
                    return cls_end_fields("ark", ark_status)
                if global_label.startswith("PRINT_"):
                    return cls_end_fields("print", print_status)
                if global_label.startswith("OPEN_"):
                    return cls_end_fields("open", open_status)
                return np.nan, np.nan, np.nan
            return np.nan, np.nan, np.nan

        # norm bins + bench norm + ✅ last3 pushes (hard/soft only)
        for cls in ("blue", "ark", "print", "open", "global"):
            ndt = cls_norm_dt(cls)
            status = cls_outcome(cls)
            if isinstance(ndt, datetime):
                b = floor_to_band(ndt, norm_band_minutes)
                if b:
                    # OLD norm counters kept (any/hard/soft only)
                    if status != "none":
                        norm_bands_pre_any[b] += 1
                    if status == "hard":
                        norm_bands_pre_hard[b] += 1
                    if status == "soft":
                        norm_bands_pre_soft[b] += 1

                    # NEW per-class-sign norm bins
                    update_timeband_by_class_sign(timebands_pre_by_class_sign, cls, "norm", sk, b, status)

                    # bench norm bin value:
                    if cls == "blue":
                        end_bench_val = blue_hard_bench if status == "hard" else (blue_soft_bench if status == "soft" else np.nan)
                        update_bench_bins(bench_bins_pre, cls, "norm", sk, end_bench_val, status)
                    else:
                        update_bench_bins(bench_bins_pre, cls, "norm", sk, pre_peak_bench, status)

                # last3 only for normalized statuses
                if status in ("hard", "soft"):
                    end_dev, end_stock, end_bench = cls_end_fields(cls, status)
                    push_last3_example(
                        cls, sk, status,
                        pre_start_dt, ndt,
                        pre_start_dev, end_dev, cls_peak_signed(cls),
                        pre_start_stock, end_stock,
                        pre_start_bench, end_bench,
                        start_time=pre_start_dt.strftime("%H:%M") if isinstance(pre_start_dt, datetime) else None,
                        end_time=ndt.strftime("%H:%M") if isinstance(ndt, datetime) else None,
                    )

        # global mean peak for normalized — use BLUE peak if global label is BLUE
        if global_kind != "none":
            if global_label.startswith("BLUE_") and is_finite_num(blue_peak_abs):
                global_norm_peak_sum[sk] += float(blue_peak_abs)
                global_norm_peak_cnt[sk] += 1
            elif is_finite_num(pre_peak_abs):
                global_norm_peak_sum[sk] += float(pre_peak_abs)
                global_norm_peak_cnt[sk] += 1

        # DAILY RECENT (kept): based on PRE peak (09:29-frozen)
        if isinstance(pre_start_dt, datetime):
            day_str = pre_start_dt.date().isoformat()
            _ensure_recent_day(day_str)
            _update_recent_daily_peak(day_str, pre_peak_dt, pre_peak_abs, pre_peak_signed, int(pre_start_sign))
            if isinstance(print_first_dt, datetime) and is_finite_num(print_first_val):
                _update_recent_daily_print(
                    day_str,
                    print_first_dt,
                    float(print_first_val),
                    float(pre_peak_abs),
                    float(pre_peak_signed),
                    int(pre_start_sign),
                )

        # rebuild last5 deques from recent_days (kept)
        last5_print_days_pos.clear()
        last5_print_days_neg.clear()
        last5_peak_days_pos.clear()
        last5_peak_days_neg.clear()
        for d in list(recent_days):
            snap = recent_by_day.get(d)
            if not snap:
                continue
            pk = snap.get("peak")
            pr = snap.get("print")
            if pk and is_finite_num(pk.get("sigma_abs")):
                if int(pk.get("first_sign", 1)) > 0:
                    last5_peak_days_pos.append(float(pk["sigma_abs"]))
                else:
                    last5_peak_days_neg.append(float(pk["sigma_abs"]))
            if pr and is_finite_num(pr.get("dev")):
                if int(pr.get("first_sign", 1)) > 0:
                    last5_print_days_pos.append(float(pr["dev"]))
                else:
                    last5_print_days_neg.append(float(pr["dev"]))
            if (
                len(last5_print_days_pos) >= 5
                and len(last5_print_days_neg) >= 5
                and len(last5_peak_days_pos) >= 5
                and len(last5_peak_days_neg) >= 5
            ):
                break

        if include_events_pre:
            pre_events_buf.append({
                "pre_id": int(pre_id),
                "reason_finalized": reason,
                "start": {"dt": _dt_iso(pre_start_dt), "dev": _json_safe(pre_start_dev), "sign": int(pre_start_sign),
                          "stock_pct": _json_safe(pre_start_stock), "bench_pct": _json_safe(pre_start_bench)},
                "pre_peak_frozen": {"dt": _dt_iso(pre_peak_dt), "abs": _json_safe(pre_peak_abs), "signed": _json_safe(pre_peak_signed),
                                    "bin": sigma_bin(pre_peak_abs),
                                    "stock_pct": _json_safe(pre_peak_stock), "bench_pct": _json_safe(pre_peak_bench)},
                "blue_peak_frozen": {"dt": _dt_iso(blue_peak_dt), "abs": _json_safe(blue_peak_abs), "signed": _json_safe(blue_peak_signed),
                                     "bin": sigma_bin(blue_peak_abs),
                                     "stock_pct": _json_safe(blue_peak_stock), "bench_pct": _json_safe(blue_peak_bench)},
                "blue": {"status": blue_status},
                "ark": {"status": ark_status},
                "print": {"status": print_status},
                "open": {"status": open_status},
                "global": {"label": global_label, "kind": global_kind},
            })

        pre_id += 1
        reset_pre_event()

    # ---------------- INTRA event state ----------------
    intra_active = False
    intra_id = 0

    intra_start_dt = None
    intra_start_dev = np.nan
    intra_start_sign = 0
    intra_start_stock = np.nan
    intra_start_bench = np.nan

    intra_peak_abs = 0.0
    intra_peak_signed = 0.0
    intra_peak_dt = None
    intra_peak_stock = np.nan
    intra_peak_bench = np.nan

    intra_hard_dt = None
    intra_hard_val = np.nan
    intra_hard_stock = np.nan
    intra_hard_bench = np.nan

    intra_soft_found = False
    intra_soft_dt = None
    intra_soft_val = np.nan
    intra_soft_stock = np.nan
    intra_soft_bench = np.nan

    def reset_intra_event():
        nonlocal intra_active, intra_start_dt, intra_start_dev, intra_start_sign, intra_start_stock, intra_start_bench
        nonlocal intra_peak_abs, intra_peak_signed, intra_peak_dt, intra_peak_stock, intra_peak_bench
        nonlocal intra_hard_dt, intra_hard_val, intra_hard_stock, intra_hard_bench
        nonlocal intra_soft_found, intra_soft_dt, intra_soft_val, intra_soft_stock, intra_soft_bench

        intra_active = False
        intra_start_dt = None
        intra_start_dev = np.nan
        intra_start_sign = 0
        intra_start_stock = np.nan
        intra_start_bench = np.nan

        intra_peak_abs = 0.0
        intra_peak_signed = 0.0
        intra_peak_dt = None
        intra_peak_stock = np.nan
        intra_peak_bench = np.nan

        intra_hard_dt = None
        intra_hard_val = np.nan
        intra_hard_stock = np.nan
        intra_hard_bench = np.nan

        intra_soft_found = False
        intra_soft_dt = None
        intra_soft_val = np.nan
        intra_soft_stock = np.nan
        intra_soft_bench = np.nan

    def start_intra_event(dt_now, dev_now, stock_pct, bench_pct):
        nonlocal intra_active, intra_start_dt, intra_start_dev, intra_start_sign, intra_start_stock, intra_start_bench
        nonlocal intra_peak_abs, intra_peak_signed, intra_peak_dt, intra_peak_stock, intra_peak_bench

        intra_active = True
        intra_start_dt = dt_now
        intra_start_dev = float(dev_now)
        intra_start_sign = 1 if float(dev_now) >= 0 else -1
        intra_start_stock = stock_pct
        intra_start_bench = bench_pct

        intra_peak_abs = abs(float(dev_now))
        intra_peak_signed = float(dev_now)
        intra_peak_dt = dt_now
        intra_peak_stock = stock_pct
        intra_peak_bench = bench_pct

    def intra_sign_key():
        return "pos" if intra_start_sign > 0 else "neg"

    def intra_process_tick(dt_now, dev_now, stock_pct, bench_pct):
        nonlocal intra_peak_abs, intra_peak_signed, intra_peak_dt, intra_peak_stock, intra_peak_bench
        nonlocal intra_hard_dt, intra_hard_val, intra_hard_stock, intra_hard_bench
        nonlocal intra_soft_found, intra_soft_dt, intra_soft_val, intra_soft_stock, intra_soft_bench

        cur_abs = abs(float(dev_now))

        if cur_abs > float(intra_peak_abs):
            intra_peak_abs = float(cur_abs)
            intra_peak_signed = float(dev_now)
            intra_peak_dt = dt_now
            intra_peak_stock = stock_pct
            intra_peak_bench = bench_pct
            if intra_hard_dt is None:
                intra_soft_found = False
                intra_soft_dt = None
                intra_soft_val = np.nan
                intra_soft_stock = np.nan
                intra_soft_bench = np.nan

        t = (dt_now.hour, dt_now.minute)
        if not in_range(t, INTRA_FROM, INTRA_TO):
            return

        if intra_hard_dt is None and cur_abs <= norm_thr:
            intra_hard_dt = dt_now
            intra_hard_val = float(dev_now)
            intra_hard_stock = stock_pct
            intra_hard_bench = bench_pct

        if intra_hard_dt is None and isinstance(intra_peak_dt, datetime) and intra_peak_abs > 0:
            if dt_now >= intra_peak_dt and cur_abs <= (float(intra_peak_abs) / float(soft_ratio)):
                if (not intra_soft_found) or (dt_now < intra_soft_dt):
                    intra_soft_found = True
                    intra_soft_dt = dt_now
                    intra_soft_val = float(dev_now)
                    intra_soft_stock = stock_pct
                    intra_soft_bench = bench_pct

    def finalize_intra_event(reason="window_end"):
        nonlocal intra_active, intra_id
        if not intra_active:
            return

        if intra_hard_dt is not None:
            status = "hard"
            end_dt = intra_hard_dt
            add_hard_delay("intra", intra_start_dt, intra_hard_dt)
        elif intra_soft_found and intra_soft_dt is not None:
            status = "soft"
            end_dt = intra_soft_dt
        else:
            status = "none"
            end_dt = None

        sk = intra_sign_key()

        sb = floor_to_band(intra_start_dt, start_band_minutes)
        if sb:
            start_bands_intra_total[sb] += 1
            if status != "none":
                start_bands_intra_any[sb] += 1
            if status == "hard":
                start_bands_intra_hard[sb] += 1
            if status == "soft":
                start_bands_intra_soft[sb] += 1

            update_timeband_by_class_sign(timebands_intra_by_class_sign, "intra", "start", sk, sb, status)

        counts_intra["intra"][status] += 1

        update_sigma_bins(sigma_bins_intra, "intra", sk, intra_peak_abs, status)
        update_bench_bins(bench_bins_intra, "intra", "start", sk, intra_start_bench, status)
        update_bench_bins(bench_bins_intra, "intra", "peak",  sk, intra_peak_bench, status)

        if status != "none" and isinstance(end_dt, datetime):
            b = floor_to_band(end_dt, norm_band_minutes)
            if b:
                norm_bands_intra_any[b] += 1
                if status == "hard":
                    norm_bands_intra_hard[b] += 1
                if status == "soft":
                    norm_bands_intra_soft[b] += 1

                update_timeband_by_class_sign(timebands_intra_by_class_sign, "intra", "norm", sk, b, status)
                update_bench_bins(bench_bins_intra, "intra", "norm", sk, (intra_hard_bench if status == "hard" else intra_soft_bench), status)

            push_last3_example(
                "intra", sk, status,
                intra_start_dt, end_dt,
                intra_start_dev, (intra_hard_val if status == "hard" else intra_soft_val),
                intra_peak_signed,
                intra_start_stock, (intra_hard_stock if status == "hard" else intra_soft_stock),
                intra_start_bench, (intra_hard_bench if status == "hard" else intra_soft_bench),
                start_time=intra_start_dt.strftime("%H:%M") if isinstance(intra_start_dt, datetime) else None,
                end_time=end_dt.strftime("%H:%M") if isinstance(end_dt, datetime) else None,
            )

        if include_events_intra:
            intra_events_buf.append({
                "intra_id": int(intra_id),
                "reason_finalized": reason,
                "start": {"dt": _dt_iso(intra_start_dt), "dev": _json_safe(intra_start_dev), "sign": int(intra_start_sign)},
                "peak":  {"dt": _dt_iso(intra_peak_dt), "abs": _json_safe(intra_peak_abs), "signed": _json_safe(intra_peak_signed)},
                "status": status,
            })

        intra_id += 1
        reset_intra_event()

    # ---------------- POST event state ----------------
    post_active = False
    post_id = 0

    post_start_dt = None
    post_start_dev = np.nan
    post_start_sign = 0
    post_start_stock = np.nan
    post_start_bench = np.nan

    post_peak_abs = 0.0
    post_peak_signed = 0.0
    post_peak_dt = None
    post_peak_stock = np.nan
    post_peak_bench = np.nan

    post_hard_dt = None
    post_hard_val = np.nan
    post_hard_stock = np.nan
    post_hard_bench = np.nan

    post_soft_found = False
    post_soft_dt = None
    post_soft_val = np.nan
    post_soft_stock = np.nan
    post_soft_bench = np.nan

    def reset_post_event():
        nonlocal post_active, post_start_dt, post_start_dev, post_start_sign, post_start_stock, post_start_bench
        nonlocal post_peak_abs, post_peak_signed, post_peak_dt, post_peak_stock, post_peak_bench
        nonlocal post_hard_dt, post_hard_val, post_hard_stock, post_hard_bench
        nonlocal post_soft_found, post_soft_dt, post_soft_val, post_soft_stock, post_soft_bench

        post_active = False
        post_start_dt = None
        post_start_dev = np.nan
        post_start_sign = 0
        post_start_stock = np.nan
        post_start_bench = np.nan

        post_peak_abs = 0.0
        post_peak_signed = 0.0
        post_peak_dt = None
        post_peak_stock = np.nan
        post_peak_bench = np.nan

        post_hard_dt = None
        post_hard_val = np.nan
        post_hard_stock = np.nan
        post_hard_bench = np.nan

        post_soft_found = False
        post_soft_dt = None
        post_soft_val = np.nan
        post_soft_stock = np.nan
        post_soft_bench = np.nan

    def start_post_event(dt_now, dev_now, stock_pct, bench_pct):
        nonlocal post_active, post_start_dt, post_start_dev, post_start_sign, post_start_stock, post_start_bench
        nonlocal post_peak_abs, post_peak_signed, post_peak_dt, post_peak_stock, post_peak_bench

        post_active = True
        post_start_dt = dt_now
        post_start_dev = float(dev_now)
        post_start_sign = 1 if float(dev_now) >= 0 else -1
        post_start_stock = stock_pct
        post_start_bench = bench_pct

        post_peak_abs = abs(float(dev_now))
        post_peak_signed = float(dev_now)
        post_peak_dt = dt_now
        post_peak_stock = stock_pct
        post_peak_bench = bench_pct

    def post_sign_key():
        return "pos" if post_start_sign > 0 else "neg"

    def post_process_tick(dt_now, dev_now, stock_pct, bench_pct):
        nonlocal post_peak_abs, post_peak_signed, post_peak_dt, post_peak_stock, post_peak_bench
        nonlocal post_hard_dt, post_hard_val, post_hard_stock, post_hard_bench
        nonlocal post_soft_found, post_soft_dt, post_soft_val, post_soft_stock, post_soft_bench

        cur_abs = abs(float(dev_now))

        if cur_abs > float(post_peak_abs):
            post_peak_abs = float(cur_abs)
            post_peak_signed = float(dev_now)
            post_peak_dt = dt_now
            post_peak_stock = stock_pct
            post_peak_bench = bench_pct
            if post_hard_dt is None:
                post_soft_found = False
                post_soft_dt = None
                post_soft_val = np.nan
                post_soft_stock = np.nan
                post_soft_bench = np.nan

        t = (dt_now.hour, dt_now.minute)
        if not in_range(t, POST_FROM, POST_TO):
            return

        if post_hard_dt is None and cur_abs <= norm_thr:
            post_hard_dt = dt_now
            post_hard_val = float(dev_now)
            post_hard_stock = stock_pct
            post_hard_bench = bench_pct

        if post_hard_dt is None and isinstance(post_peak_dt, datetime) and post_peak_abs > 0:
            if dt_now >= post_peak_dt and cur_abs <= (float(post_peak_abs) / float(soft_ratio)):
                if (not post_soft_found) or (dt_now < post_soft_dt):
                    post_soft_found = True
                    post_soft_dt = dt_now
                    post_soft_val = float(dev_now)
                    post_soft_stock = stock_pct
                    post_soft_bench = bench_pct

    def finalize_post_event(reason="window_end"):
        nonlocal post_active, post_id
        if not post_active:
            return

        if post_hard_dt is not None:
            status = "hard"
            end_dt = post_hard_dt
            add_hard_delay("post", post_start_dt, post_hard_dt)
        elif post_soft_found and post_soft_dt is not None:
            status = "soft"
            end_dt = post_soft_dt
        else:
            status = "none"
            end_dt = None

        sk = post_sign_key()

        sb = floor_to_band(post_start_dt, start_band_minutes)
        if sb:
            start_bands_post_total[sb] += 1
            if status != "none":
                start_bands_post_any[sb] += 1
            if status == "hard":
                start_bands_post_hard[sb] += 1
            if status == "soft":
                start_bands_post_soft[sb] += 1

            update_timeband_by_class_sign(timebands_post_by_class_sign, "post", "start", sk, sb, status)

        counts_post["post"][status] += 1

        update_sigma_bins(sigma_bins_post, "post", sk, post_peak_abs, status)
        update_bench_bins(bench_bins_post, "post", "start", sk, post_start_bench, status)
        update_bench_bins(bench_bins_post, "post", "peak",  sk, post_peak_bench, status)

        if status != "none" and isinstance(end_dt, datetime):
            b = floor_to_band(end_dt, norm_band_minutes)
            if b:
                norm_bands_post_any[b] += 1
                if status == "hard":
                    norm_bands_post_hard[b] += 1
                if status == "soft":
                    norm_bands_post_soft[b] += 1

                update_timeband_by_class_sign(timebands_post_by_class_sign, "post", "norm", sk, b, status)
                update_bench_bins(bench_bins_post, "post", "norm", sk, (post_hard_bench if status == "hard" else post_soft_bench), status)

            push_last3_example(
                "post", sk, status,
                post_start_dt, end_dt,
                post_start_dev, (post_hard_val if status == "hard" else post_soft_val),
                post_peak_signed,
                post_start_stock, (post_hard_stock if status == "hard" else post_soft_stock),
                post_start_bench, (post_hard_bench if status == "hard" else post_soft_bench),
                start_time=post_start_dt.strftime("%H:%M") if isinstance(post_start_dt, datetime) else None,
                end_time=end_dt.strftime("%H:%M") if isinstance(end_dt, datetime) else None,
            )

        if include_events_post:
            post_events_buf.append({
                "post_id": int(post_id),
                "reason_finalized": reason,
                "start": {"dt": _dt_iso(post_start_dt), "dev": _json_safe(post_start_dev), "sign": int(post_start_sign)},
                "peak":  {"dt": _dt_iso(post_peak_dt), "abs": _json_safe(post_peak_abs), "signed": _json_safe(post_peak_signed)},
                "status": status,
            })

        post_id += 1
        reset_post_event()

    # ---------------- day boundary helpers ----------------
    def on_new_day():
        if pre_active:
            finalize_pre_event(reason="day_boundary")
        if intra_active:
            finalize_intra_event(reason="day_boundary")
        if post_active:
            finalize_post_event(reason="day_boundary")

    # ---------------- dictify helpers ----------------
    def dictify_sigma_bins(m):
        return {
            "pos": {b: dict(c) for b, c in m["pos"].items()},
            "neg": {b: dict(c) for b, c in m["neg"].items()},
        }

    def dictify_bench_bins(m):
        out = {}
        for which in ("start", "peak", "norm"):
            out[which] = {
                "pos": {b: dict(c) for b, c in m[which]["pos"].items()},
                "neg": {b: dict(c) for b, c in m[which]["neg"].items()},
            }
        return out

    def dictify_timebands_by_class_sign(m):
        out = {}
        for cls, blk in m.items():
            out[cls] = {}
            for which in ("start", "norm"):
                out[cls][which] = {
                    "pos": {band: dict(c) for band, c in blk[which]["pos"].items()},
                    "neg": {band: dict(c) for band, c in blk[which]["neg"].items()},
                }
        return out

    def dictify_last3(last3_map):
        out = {}
        for cls, by_sign in last3_map.items():
            out[cls] = {"pos": list(by_sign["pos"]), "neg": list(by_sign["neg"])}
        return out

    # ---------------- flush ticker (write files) ----------------
    def flush_current_ticker():
        nonlocal cur_ticker, cur_day
        nonlocal bench_name_seen, corr_static, beta_static, sigma_static

        if cur_ticker is None:
            return

        if pre_active:
            finalize_pre_event(reason="ticker_end")
        if intra_active:
            finalize_intra_event(reason="ticker_end")
        if post_active:
            finalize_post_event(reason="ticker_end")

        blue_r = class_rates(counts_pre["blue"])
        ark_r = class_rates(counts_pre["ark"])
        pr_r  = class_rates(counts_pre["print"])
        op_r  = class_rates(counts_pre["open"])
        gl_r  = class_rates(counts_pre["global"])
        intra_r = class_rates(counts_intra["intra"])
        post_r  = class_rates(counts_post["post"])

        events_pre_total = int(gl_r["total"])  # pre event count aligns with global counter
        events_intra_total = int(intra_r["total"])
        events_post_total  = int(post_r["total"])
        events_total = events_pre_total + events_intra_total + events_post_total

        # ✅ global filter for ALL outputs
        if events_total < int(min_events_per_ticker):
            reset_ticker_state()
            return

        # last10
        last10_print_days = []
        last10_peak_days = []
        for d in list(recent_days):
            snap = recent_by_day.get(d)
            if not snap:
                continue
            if snap.get("print") is not None:
                last10_print_days.append(snap["print"])
            if snap.get("peak") is not None:
                last10_peak_days.append(snap["peak"])

        pos_vals = list(last5_print_days_pos)
        neg_vals = list(last5_print_days_neg)

        # open series last10
        open_series_last10 = []
        for d in list(recent_days):
            series_map = open_series_by_day.get(d)
            if not series_map:
                continue
            pts = sorted(series_map.items(), key=lambda kv: kv[0])
            open_series_last10.append({
                "date": d,
                "points": [[dt_iso, _json_safe(val)] for dt_iso, val in pts],
            })

        payload = {
            "ticker": cur_ticker,
            "bench": bench_name_seen,
            "static": {"corr": _json_safe(corr_static), "beta": _json_safe(beta_static), "sigma": _json_safe(sigma_static)},
            "params": {
                "dev_thr": float(dev_thr), "norm_thr": float(norm_thr), "soft_ratio": float(soft_ratio),
                "windows": {
                    "blue": "00:01-03:59 (trigger allowed; peak frozen to 03:59; hard/soft within BLUE)",
                    "fixation_window": "00:05-09:29 (ARK peak frozen; ARK hard/soft within)",
                    "ignored_gaps": ["03:58-04:05", "07:58-08:05"],
                    "frozen_peak_until": "09:29",
                    "print_first": "09:30-09:35 (first tick only)",
                    "open_scan": "09:31-09:40 (scan + open dev series)",
                    "intra": "10:00-12:00 (trigger+normalize within)",
                    "post": "16:01-19:59 (trigger+normalize within)",
                    "global_priority": GLOBAL_PRIORITY,
                },
                "bins": {
                    "sigma": {"min": sigma_bin_min, "max": sigma_bin_max, "step": sigma_bin_step},
                    "bench": {"min": bench_bin_min, "max": bench_bin_max, "step": bench_bin_step},
                },
                "time_bands": {"start_band_minutes": start_band_minutes, "norm_band_minutes": norm_band_minutes},
                "best_rules": best_rules,
                "min_events_per_ticker": int(min_events_per_ticker),
                "open_series_downsample_seconds": int(open_series_downsample_seconds),
            },
      "stats": {
                "events_total": int(events_total),
                "pre": {
                    "events_total": int(events_pre_total),
                    "blue": blue_r,
                    "ark": ark_r,
                    "print": pr_r,
                    "open": op_r,
                    "global": {
                        **gl_r,
                        "labels": dict(global_labels_counter),
                        "best_label": global_labels_counter.most_common(1)[0][0] if global_labels_counter else None,
                    },
                    "hard_delay_avg_sec": {
                        "blue": _json_safe(avg_hard_delay("blue")),
                        "ark": _json_safe(avg_hard_delay("ark")),
                        "print": _json_safe(avg_hard_delay("print")),
                        "open": _json_safe(avg_hard_delay("open")),
                        "global": _json_safe(avg_hard_delay("global")),
                    },
                    "global_mean_peak_abs_when_normalized": {
                        "pos": _json_safe((global_norm_peak_sum["pos"] / global_norm_peak_cnt["pos"]) if global_norm_peak_cnt["pos"] else None),
                        "neg": _json_safe((global_norm_peak_sum["neg"] / global_norm_peak_cnt["neg"]) if global_norm_peak_cnt["neg"] else None),
                    },
                },
                "intra": {
                    "events_total": int(events_intra_total),
                    "intra": intra_r,
                    "hard_delay_avg_sec": {"intra": _json_safe(avg_hard_delay("intra"))},
                },
                "post": {
                    "events_total": int(events_post_total),
                    "post": post_r,
                    "hard_delay_avg_sec": {"post": _json_safe(avg_hard_delay("post"))},
                },
            },
            "time_bands": {
                "pre": {
                    "start_total": dict(start_bands_pre_total),
                    "start_any": dict(start_bands_pre_any),
                    "start_hard": dict(start_bands_pre_hard),
                    "start_soft": dict(start_bands_pre_soft),
                    "norm_any": dict(norm_bands_pre_any),
                    "norm_hard": dict(norm_bands_pre_hard),
                    "norm_soft": dict(norm_bands_pre_soft),
                },
                "intra": {
                    "start_total": dict(start_bands_intra_total),
                    "start_any": dict(start_bands_intra_any),
                    "start_hard": dict(start_bands_intra_hard),
                    "start_soft": dict(start_bands_intra_soft),
                    "norm_any": dict(norm_bands_intra_any),
                    "norm_hard": dict(norm_bands_intra_hard),
                    "norm_soft": dict(norm_bands_intra_soft),
                },
                "post": {
                    "start_total": dict(start_bands_post_total),
                    "start_any": dict(start_bands_post_any),
                    "start_hard": dict(start_bands_post_hard),
                    "start_soft": dict(start_bands_post_soft),
                    "norm_any": dict(norm_bands_post_any),
                    "norm_hard": dict(norm_bands_post_hard),
                    "norm_soft": dict(norm_bands_post_soft),
                },
                "pre_by_class_sign": dictify_timebands_by_class_sign(timebands_pre_by_class_sign),
                "intra_by_class_sign": dictify_timebands_by_class_sign(timebands_intra_by_class_sign),
                "post_by_class_sign": dictify_timebands_by_class_sign(timebands_post_by_class_sign),
            },
            "recent": {
                "last10_days": list(recent_days),
                "last10_print": last10_print_days,
                "last10_pre_peak_sigma": last10_peak_days,
                "last10_open_dev_series": open_series_last10,
                "last5_print": {
                    "pos": {
                        "values": pos_vals,
                        "mean": _json_safe(float(np.mean(pos_vals)) if pos_vals else None),
                        "median": _json_safe(float(np.median(pos_vals)) if pos_vals else None),
                    },
                    "neg": {
                        "values": neg_vals,
                        "mean": _json_safe(float(np.mean(neg_vals)) if neg_vals else None),
                        "median": _json_safe(float(np.median(neg_vals)) if neg_vals else None),
                    },
                },
            },

            "examples_last3_normalized": dictify_last3(last3_examples),

            "bins": {
                "sigma": {
                    "pre": {
                        "blue": dictify_sigma_bins(sigma_bins_pre["blue"]),
                        "ark": dictify_sigma_bins(sigma_bins_pre["ark"]),
                        "print": dictify_sigma_bins(sigma_bins_pre["print"]),
                        "open": dictify_sigma_bins(sigma_bins_pre["open"]),
                        "global": dictify_sigma_bins(sigma_bins_pre["global"]),
                    },
                    "intra": {"intra": dictify_sigma_bins(sigma_bins_intra["intra"])},
                    "post": {"post": dictify_sigma_bins(sigma_bins_post["post"])},
                },
                "bench": {
                    "pre": {
                        "blue": dictify_bench_bins(bench_bins_pre["blue"]),
                        "ark": dictify_bench_bins(bench_bins_pre["ark"]),
                        "print": dictify_bench_bins(bench_bins_pre["print"]),
                        "open": dictify_bench_bins(bench_bins_pre["open"]),
                        "global": dictify_bench_bins(bench_bins_pre["global"]),
                    },
                    "intra": {"intra": dictify_bench_bins(bench_bins_intra["intra"])},
                    "post": {"post": dictify_bench_bins(bench_bins_post["post"])},
                },
            },
        }

        if include_events_pre:
            payload["events_pre"] = list(pre_events_buf)
        if include_events_intra:
            payload["events_intra"] = list(intra_events_buf)
        if include_events_post:
            payload["events_post"] = list(post_events_buf)

        onefile_f.write(json.dumps(payload, ensure_ascii=False) + "\n")

        # SUMMARY row
        row = {
            "ticker": cur_ticker,
            "bench": bench_name_seen,
            "events_total": int(events_total),
            "events_pre_total": int(events_pre_total),
            "events_intra_total": int(events_intra_total),
            "events_post_total": int(events_post_total),

            "blue_any_rate": _json_safe(blue_r["rate_any"]),
            "blue_hard_rate": _json_safe(blue_r["rate_hard"]),
            "blue_soft_rate": _json_safe(blue_r["rate_soft"]),

            "ark_any_rate": _json_safe(ark_r["rate_any"]),
            "ark_hard_rate": _json_safe(ark_r["rate_hard"]),
            "ark_soft_rate": _json_safe(ark_r["rate_soft"]),

            "print_any_rate": _json_safe(pr_r["rate_any"]),
            "print_hard_rate": _json_safe(pr_r["rate_hard"]),
            "print_soft_rate": _json_safe(pr_r["rate_soft"]),

            "open_any_rate": _json_safe(op_r["rate_any"]),
            "open_hard_rate": _json_safe(op_r["rate_hard"]),
            "open_soft_rate": _json_safe(op_r["rate_soft"]),

            "global_any_rate": _json_safe(gl_r["rate_any"]),
            "global_hard_rate": _json_safe(gl_r["rate_hard"]),
            "global_soft_rate": _json_safe(gl_r["rate_soft"]),

            "intra_any_rate": _json_safe(intra_r["rate_any"]),
            "intra_hard_rate": _json_safe(intra_r["rate_hard"]),
            "intra_soft_rate": _json_safe(intra_r["rate_soft"]),

            "post_any_rate": _json_safe(post_r["rate_any"]),
            "post_hard_rate": _json_safe(post_r["rate_hard"]),
            "post_soft_rate": _json_safe(post_r["rate_soft"]),

            "corr": _json_safe(corr_static),
            "beta": _json_safe(beta_static),
            "sigma": _json_safe(sigma_static),
        }
        pd.DataFrame([row], columns=summary_cols).to_csv(output_summary_csv, mode="a", header=False, index=False)

        # BEST PARAMS: keep + ADD best_windows_any for ALL classes
        def median_or_none(arr):
            arr = list(arr)
            return _json_safe(float(np.median(arr)) if arr else None)

        best_windows_any = {
            "sigma_peak_bins": {
                "blue":  {"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["blue"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["blue"]["neg"], step=sigma_bin_step)},
                "ark":   {"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["ark"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["ark"]["neg"], step=sigma_bin_step)},
                "print": {"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["print"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["print"]["neg"], step=sigma_bin_step)},
                "open":  {"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["open"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["open"]["neg"], step=sigma_bin_step)},
                "global":{"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["global"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_pre["global"]["neg"], step=sigma_bin_step)},
                "intra": {"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_intra["intra"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_intra["intra"]["neg"], step=sigma_bin_step)},
                "post":  {"pos": stitch_numeric_bin_intervals_from_any(sigma_bins_post["post"]["pos"], step=sigma_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(sigma_bins_post["post"]["neg"], step=sigma_bin_step)},
            },
            "bench_peak_bins": {
                "blue":  {"pos": stitch_numeric_bin_intervals_from_any(bench_bins_pre["blue"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_pre["blue"]["peak"]["neg"], step=bench_bin_step)},
                "ark":   {"pos": stitch_numeric_bin_intervals_from_any(bench_bins_pre["ark"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_pre["ark"]["peak"]["neg"], step=bench_bin_step)},
                "print": {"pos": stitch_numeric_bin_intervals_from_any(bench_bins_pre["print"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_pre["print"]["peak"]["neg"], step=bench_bin_step)},
                "open":  {"pos": stitch_numeric_bin_intervals_from_any(bench_bins_pre["open"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_pre["open"]["peak"]["neg"], step=bench_bin_step)},
                "global":{"pos": stitch_numeric_bin_intervals_from_any(bench_bins_pre["global"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_pre["global"]["peak"]["neg"], step=bench_bin_step)},
                "intra": {"pos": stitch_numeric_bin_intervals_from_any(bench_bins_intra["intra"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_intra["intra"]["peak"]["neg"], step=bench_bin_step)},
                "post":  {"pos": stitch_numeric_bin_intervals_from_any(bench_bins_post["post"]["peak"]["pos"], step=bench_bin_step),
                          "neg": stitch_numeric_bin_intervals_from_any(bench_bins_post["post"]["peak"]["neg"], step=bench_bin_step)},
            },
            "time_start_bands": {
                "blue": {
                    "pos": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["blue"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["blue"]["start"]["neg"]),
                },
                "ark": {
                    "pos": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["ark"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["ark"]["start"]["neg"]),
                },
                "print": {
                    "pos": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["print"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["print"]["start"]["neg"]),
                },
                "open": {
                    "pos": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["open"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["open"]["start"]["neg"]),
                },
                "global": {
                    "pos": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["global"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_pre_by_class_sign["global"]["start"]["neg"]),
                },
                "intra": {
                    "pos": stitch_timeband_intervals_from_any(timebands_intra_by_class_sign["intra"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_intra_by_class_sign["intra"]["start"]["neg"]),
                },
                "post": {
                    "pos": stitch_timeband_intervals_from_any(timebands_post_by_class_sign["post"]["start"]["pos"]),
                    "neg": stitch_timeband_intervals_from_any(timebands_post_by_class_sign["post"]["start"]["neg"]),
                },
            }
        }

        best = {
            "ticker": cur_ticker,
            "bench": bench_name_seen,
            "static": {"corr": _json_safe(corr_static), "beta": _json_safe(beta_static), "sigma": _json_safe(sigma_static)},

            "dev_print_last5_median": {
                "pos": median_or_none(last5_print_days_pos),
                "neg": median_or_none(last5_print_days_neg),
            },

            "totals": {
                "events_total": int(events_total),
                "pre_total": int(events_pre_total),
                "intra_total": int(events_intra_total),
                "post_total": int(events_post_total),
            },

            "ratings": {
                "blue": _json_safe(blue_r["rate_any"]),
                "ark": _json_safe(ark_r["rate_any"]),
                "print": _json_safe(pr_r["rate_any"]),
                "open": _json_safe(op_r["rate_any"]),
                "intra": _json_safe(intra_r["rate_any"]),
                "post": _json_safe(post_r["rate_any"]),
                "global": _json_safe(gl_r["rate_any"]),
            },

            "hard_soft_share": {
                "blue":  {"hard": int(blue_r["hard"]), "soft": int(blue_r["soft"]), "hard_share": _json_safe(blue_r["hard_share_in_norm"])},
                "ark":   {"hard": int(ark_r["hard"]),  "soft": int(ark_r["soft"]),  "hard_share": _json_safe(ark_r["hard_share_in_norm"])},
                "print": {"hard": int(pr_r["hard"]),   "soft": int(pr_r["soft"]),   "hard_share": _json_safe(pr_r["hard_share_in_norm"])},
                "open":  {"hard": int(op_r["hard"]),   "soft": int(op_r["soft"]),   "hard_share": _json_safe(op_r["hard_share_in_norm"])},
                "intra": {"hard": int(intra_r["hard"]), "soft": int(intra_r["soft"]), "hard_share": _json_safe(intra_r["hard_share_in_norm"])},
                "post":  {"hard": int(post_r["hard"]),  "soft": int(post_r["soft"]),  "hard_share": _json_safe(post_r["hard_share_in_norm"])},
                "global":{"hard": int(gl_r["hard"]),   "soft": int(gl_r["soft"]),   "hard_share": _json_safe(gl_r["hard_share_in_norm"])},
            },

            "avg_hard_delay_sec": {
                "blue": _json_safe(avg_hard_delay("blue")),
                "ark": _json_safe(avg_hard_delay("ark")),
                "print": _json_safe(avg_hard_delay("print")),
                "open": _json_safe(avg_hard_delay("open")),
                "intra": _json_safe(avg_hard_delay("intra")),
                "post": _json_safe(avg_hard_delay("post")),
                "global": _json_safe(avg_hard_delay("global")),
            },

            "best_windows_any": {
                "rule": {"min_rate": 0.60, "min_total": 4, "rate": "(hard+soft)/total"},
                "stitched": best_windows_any,
            },

            "params": {"dev_thr": float(dev_thr), "norm_thr": float(norm_thr), "soft_ratio": float(soft_ratio)},
        }

        best_params_f.write(json.dumps(best, ensure_ascii=False) + "\n")
        reset_ticker_state()
        
    # ---------------- processing chunks ----------------
    def process_chunk(chunk: "pd.DataFrame", ci: int):
        nonlocal cur_ticker, cur_day
        nonlocal bench_name_seen, static_triplet_set, corr_static, beta_static, sigma_static
        nonlocal pre_active, intra_active, post_active

        req = {"ticker", "date", "dt", "dev_sig"}
        if not req.issubset(chunk.columns):
            raise KeyError(f"Input must contain columns: {sorted(req)}")

        # If not sorted — make it sorted once (vectorized), no per-row parse_dt
        if not assume_sorted:
            chunk["dt"] = pd.to_datetime(chunk["dt"], errors="coerce", utc=True)
            chunk.sort_values(["ticker", "date", "dt"], inplace=True)

        def col(name):
            return chunk[name] if name in chunk.columns else pd.Series(np.nan, index=chunk.index)

        s_ticker = col("ticker")
        s_date   = col("date")

        # ✅ Vectorized datetime + numeric parse
        s_dt_ts  = pd.to_datetime(col("dt"), errors="coerce", utc=True)  # tz-aware Timestamp (UTC)
        s_dev    = pd.to_numeric(col("dev_sig"), errors="coerce")

        s_bench_name = col("bench")
        s_corr  = col("corr")
        s_beta  = col("beta")
        s_sigma = col("sigma")

        s_bench_num = (
            pd.to_numeric(col(BENCH_NUM_FIELD), errors="coerce")
            if BENCH_NUM_FIELD in chunk.columns
            else pd.Series(np.nan, index=chunk.index)
        )
        s_stock_pct = (
            pd.to_numeric(col(STOCK_NUM_FIELD), errors="coerce")
            if STOCK_NUM_FIELD in chunk.columns
            else pd.Series(np.nan, index=chunk.index)
        )

        # ✅ Build a fast mask upfront (drop bad dt/dev and ignored windows) BEFORE loop
        dt_ok  = s_dt_ts.notna()
        dev_ok = np.isfinite(s_dev.to_numpy(dtype="float64", copy=False))
        mask = (dt_ok.to_numpy(copy=False) & dev_ok)

        if mask.any():
            # ignored windows mask using hour/min (vectorized)
            dt2 = s_dt_ts[mask]
            h = dt2.dt.hour.to_numpy(dtype="int16", copy=False)
            m = dt2.dt.minute.to_numpy(dtype="int16", copy=False)

            # vectorized ignored windows:
            # IGNORE_WINDOWS = [((3, 58), (4, 5)), ((7, 58), (8, 5))]
            # condition: a <= (h,m) <= b for any window
            def _in_win(h, m, a, b):
                ah, am = a
                bh, bm = b
                return ((h > ah) | ((h == ah) & (m >= am))) & ((h < bh) | ((h == bh) & (m <= bm)))

            ig = np.zeros_like(h, dtype=bool)
            for a, b in IGNORE_WINDOWS:
                ig |= _in_win(h, m, a, b)

            # apply back to global mask
            idx_mask = np.flatnonzero(mask)
            mask[idx_mask] = ~ig

        # nothing useful in chunk
        if not mask.any():
            return

        # ✅ set bench_name_seen once per chunk (first non-null)
        if bench_name_seen is None and "bench" in chunk.columns:
            bn = s_bench_name[mask]
            if len(bn) > 0:
                first_bn = bn.dropna()
                if len(first_bn) > 0:
                    bench_name_seen = first_bn.iloc[0]

        # ✅ set static triplet once per chunk (first row with corr/beta/sigma present)
        if not static_triplet_set and all(x in chunk.columns for x in ("corr", "beta", "sigma")):
            cc = s_corr[mask]
            bb = s_beta[mask]
            ss = s_sigma[mask]
            ok = (cc.notna() & bb.notna() & ss.notna())
            if ok.any():
                j = ok.idxmax()  # first True index
                corr_static, beta_static, sigma_static = cc.loc[j], bb.loc[j], ss.loc[j]
                static_triplet_set = True

        # ✅ Build compact arrays (keep original order)
        # Convert to python datetime ONLY for the filtered rows (cheap vs per-row parse_dt)
        dt_py = s_dt_ts[mask].dt.to_pydatetime()  # ndarray of datetime (tz-aware)
        tk_arr = s_ticker[mask].to_numpy(copy=False)
        ds_arr = s_date[mask].to_numpy(copy=False)
        dev_arr = s_dev[mask].to_numpy(dtype="float64", copy=False)
        bench_arr = s_bench_num[mask].to_numpy(dtype="float64", copy=False)
        stock_arr = s_stock_pct[mask].to_numpy(dtype="float64", copy=False)

        # Precompute hour/min for fast tuple t
        dt2 = s_dt_ts[mask]
        h_arr = dt2.dt.hour.to_numpy(dtype="int16", copy=False)
        m_arr = dt2.dt.minute.to_numpy(dtype="int16", copy=False)

        n = len(dev_arr)
        for i in range(n):
            tk = tk_arr[i]
            ds = ds_arr[i]
            dt_now = dt_py[i]         # datetime with tzinfo
            dev_now = dev_arr[i]      # float
            bench_num = bench_arr[i]  # float (may be nan)
            stock_pct = stock_arr[i]  # float (may be nan)

            # cheap time tuple
            t = (int(h_arr[i]), int(m_arr[i]))

            # ticker boundary
            if cur_ticker is not None and tk != cur_ticker:
                flush_current_ticker()
                cur_ticker, cur_day = tk, ds
                on_new_day()

            if cur_ticker is None:
                cur_ticker, cur_day = tk, ds
                on_new_day()

            # day boundary
            if ds != cur_day:
                cur_day = ds
                on_new_day()

            # finalize PRE after OPEN window
            if pre_active and (t > OPEN_TO):
                finalize_pre_event(reason="passed_open_window")

            # start PRE in BLUE or ARK window
            if (not pre_active) and (in_range(t, BLUE_FROM, BLUE_TO) or in_range(t, ARK_FROM, ARK_TO)):
                if abs(float(dev_now)) >= dev_thr:
                    start_pre_event(dt_now, dev_now, stock_pct, bench_num)

            if pre_active:
                pre_process_tick(dt_now, dev_now, stock_pct, bench_num)

            # finalize INTRA after window
            if intra_active and (t > INTRA_TO):
                finalize_intra_event(reason="passed_intra_window")

            # start INTRA inside 10:00-12:00
            if (not intra_active) and in_range(t, INTRA_FROM, INTRA_TO):
                if abs(float(dev_now)) >= dev_thr:
                    start_intra_event(dt_now, dev_now, stock_pct, bench_num)

            if intra_active and in_range(t, INTRA_FROM, INTRA_TO):
                intra_process_tick(dt_now, dev_now, stock_pct, bench_num)

            # finalize POST after window
            if post_active and (t > POST_TO):
                finalize_post_event(reason="passed_post_window")

            # start POST inside 16:01-19:59
            if (not post_active) and in_range(t, POST_FROM, POST_TO):
                if abs(float(dev_now)) >= dev_thr:
                    start_post_event(dt_now, dev_now, stock_pct, bench_num)

            if post_active and in_range(t, POST_FROM, POST_TO):
                post_process_tick(dt_now, dev_now, stock_pct, bench_num)


    # ---------------- main read loop ----------------
    t0 = time.time()
    total_rows = 0
    last_rows = 0
    last_ts = t0

    is_parquet = str(input_path).lower().endswith((".parquet", ".pq", ".parq"))
    print(
        f"▶️ START v12 exporter+BLUE+POST file={input_path} parquet={is_parquet} "
        f"dev_thr={dev_thr} norm_thr={norm_thr} soft_ratio={soft_ratio} min_events={min_events_per_ticker}"
    )

    try:
        if is_parquet and parquet_use_pyarrow:
            import pyarrow.parquet as pq
            pf = pq.ParquetFile(input_path)

            wanted = ["ticker", "date", "dt", "dev_sig", "bench", "corr", "beta", "sigma", STOCK_NUM_FIELD, BENCH_NUM_FIELD]
            cols = [c for c in wanted if c in pf.schema.names]

            for ci in range(pf.num_row_groups):
                chunk = pf.read_row_group(ci, columns=cols).to_pandas()
                process_chunk(chunk, ci + 1)
                total_rows += len(chunk)

                if (ci + 1) % log_every_n_chunks == 0:
                    now = time.time()
                    rps = (total_rows - last_rows) / max(now - last_ts, 1e-6)
                    print(f"[rg {ci+1:>4}/{pf.num_row_groups}] rows={total_rows:,} speed={rps:,.0f}/s elapsed={now-t0:,.1f}s")
                    last_rows, last_ts = total_rows, now

                # cheaper cleanup (avoid gc.collect each chunk)
                del chunk
                if (ci + 1) % max(10, log_every_n_chunks * 2) == 0:
                    gc.collect()

        elif not is_parquet:
            reader = pd.read_csv(input_path, compression="infer", low_memory=False, chunksize=csv_chunksize)
            for ci, chunk in enumerate(reader, 1):
                process_chunk(chunk, ci)
                total_rows += len(chunk)

                if ci % log_every_n_chunks == 0:
                    now = time.time()
                    rps =qar = (total_rows - last_rows) / max(now - last_ts, 1e-6)
                    print(f"[chunk {ci:>5}] rows={total_rows:,} speed={rps:,.0f}/s elapsed={now-t0:,.1f}s")
                    last_rows, last_ts = total_rows, now

                del chunk
                if ci % max(10, log_every_n_chunks * 2) == 0:
                    gc.collect()

        else:
            df = pd.read_parquet(input_path)
            step = 1_000_000
            for ci, start in enumerate(range(0, len(df), step), 1):
                chunk = df.iloc[start:start + step]
                process_chunk(chunk, ci)
                total_rows += len(chunk)

                if ci % log_every_n_chunks == 0:
                    now = time.time()
                    rps = (total_rows - last_rows) / max(now - last_ts, 1e-6)
                    print(f"[chunk {ci:>5}] rows={total_rows:,} speed={rps:,.0f}/s elapsed={now-t0:,.1f}s")
                    last_rows, last_ts = total_rows, now

                del chunk
                if ci % max(10, log_every_n_chunks * 2) == 0:
                    gc.collect()

        flush_current_ticker()
        print(f"🏁 DONE rows={total_rows:,} -> onefile={output_onefile_jsonl} summary={output_summary_csv} best_params={output_best_params_jsonl}")

    finally:
        onefile_f.close()
        best_params_f.close()



In [5]:
# devsig_stream_stats_v12_exporter(
#     input_path="ARBITRAGE/final_filtered.parquet",
#     output_onefile_jsonl="ARBITRAGE/onefile.jsonl",
#     output_summary_csv="ARBITRAGE/summary.csv",
#     output_best_params_jsonl="ARBITRAGE/best_params.jsonl",

#     dev_thr=0.30,
#     norm_thr=0.10,
#     soft_ratio=3.0,

#     include_events_pre=False,
#     include_events_intra=False,
#     max_events_per_ticker=500,

#     min_events_per_ticker=10,

#     start_band_minutes=30,
#     norm_band_minutes=30,

#     sigma_bin_min=0.2,
#     sigma_bin_max=2.7,
#     sigma_bin_step=0.1,

#     bench_bin_min=-3.0,
#     bench_bin_max=3.0,
#     bench_bin_step=0.1,

#     open_series_downsample_seconds=60,
# )


In [6]:
from pathlib import Path
import os

def _resolve_orion_paths(strategy_code: str):
    """
    Strategy reads FINAL from:
      1) env FINAL_PARQUET_PATH (preferred; runner sets it)
      2) ORION_HOME/CRACEN/final.parquet
      3) auto-find OriON by walking up from cwd
    Writes outputs to:
      ORION_HOME/signals/{strategy_code}/...
    """
    # 1) preferred: runner sets FINAL_PARQUET_PATH + SIGNALS_DIR
    final_env = os.environ.get("FINAL_PARQUET_PATH")
    sig_env   = os.environ.get("SIGNALS_DIR")
    orion_env = os.environ.get("ORION_HOME")

    if final_env:
        final_path = Path(final_env).expanduser().resolve()
    else:
        final_path = None

    if sig_env:
        signals_base = Path(sig_env).expanduser().resolve()
    else:
        signals_base = None

    # 2) fallback: ORION_HOME
    if (final_path is None or signals_base is None) and orion_env:
        orion_home = Path(orion_env).expanduser().resolve()
        if final_path is None:
            final_path = (orion_home / "CRACEN" / "final.parquet").resolve()
        if signals_base is None:
            signals_base = (orion_home / "signals").resolve()

    # 3) fallback: search upward for OriON folder
    if final_path is None or signals_base is None:
        here = Path.cwd().resolve()
        orion_home = None
        for parent in [here] + list(here.parents):
            if parent.name.lower() == "orion":
                orion_home = parent
                break
            cand = parent / "OriON"
            if cand.exists() and cand.is_dir():
                orion_home = cand.resolve()
                break
        if orion_home is None:
            raise RuntimeError("Cannot locate OriON. Set ORION_HOME (recommended).")
        if final_path is None:
            final_path = (orion_home / "CRACEN" / "final.parquet").resolve()
        if signals_base is None:
            signals_base = (orion_home / "signals").resolve()

    out_dir = (signals_base / strategy_code.lower()).resolve()
    out_dir.mkdir(parents=True, exist_ok=True)

    if not final_path.exists():
        raise FileNotFoundError(f"FINAL parquet not found: {final_path}")

    return final_path, out_dir


FINAL_PATH, OUT_DIR = _resolve_orion_paths("arbitrage")

devsig_stream_stats_v12_exporter(
    input_path=str(FINAL_PATH),

    # Запис у стиснені файли (.gz)
    output_onefile_jsonl=str(OUT_DIR / "onefile.jsonl.gz"),
    output_best_params_jsonl=str(OUT_DIR / "best_params.jsonl.gz"),

    # summary залишається незжатим
    output_summary_csv=str(OUT_DIR / "summary.csv"),

    dev_thr=0.30,
    norm_thr=0.10,
    soft_ratio=3.0,

    include_events_pre=False,
    include_events_intra=False,
    max_events_per_ticker=500,

    min_events_per_ticker=10,

    start_band_minutes=30,
    norm_band_minutes=30,

    sigma_bin_min=0.2,
    sigma_bin_max=2.7,
    sigma_bin_step=0.1,

    bench_bin_min=-3.0,
    bench_bin_max=3.0,
    bench_bin_step=0.1,

    open_series_downsample_seconds=60,
)


▶️ START v12 exporter+BLUE+POST file=C:\datum-api-examples-main\OriON\CRACEN\final.parquet parquet=True dev_thr=0.3 norm_thr=0.1 soft_ratio=3.0 min_events=10


[rg    5/2746] rows=77,566 speed=350,068/s elapsed=0.2s
[rg   10/2746] rows=143,178 speed=444,230/s elapsed=0.4s


[rg   15/2746] rows=272,002 speed=464,823/s elapsed=0.6s
[rg   20/2746] rows=328,722 speed=335,708/s elapsed=0.8s


[rg   25/2746] rows=418,517 speed=418,967/s elapsed=1.0s


[rg   30/2746] rows=525,780 speed=429,155/s elapsed=1.3s
[rg   35/2746] rows=581,345 speed=490,574/s elapsed=1.4s


[rg   40/2746] rows=669,418 speed=449,925/s elapsed=1.6s


[rg   45/2746] rows=759,716 speed=185,022/s elapsed=2.1s


[rg   50/2746] rows=844,828 speed=167,280/s elapsed=2.6s


[rg   55/2746] rows=938,074 speed=162,303/s elapsed=3.2s


[rg   60/2746] rows=1,055,806 speed=174,179/s elapsed=3.8s


[rg   65/2746] rows=1,118,892 speed=159,956/s elapsed=4.2s


[rg   70/2746] rows=1,203,393 speed=229,997/s elapsed=4.6s


[rg   75/2746] rows=1,270,297 speed=218,190/s elapsed=4.9s


[rg   80/2746] rows=1,342,700 speed=227,117/s elapsed=5.2s
[rg   85/2746] rows=1,398,807 speed=274,494/s elapsed=5.4s


[rg   90/2746] rows=1,474,270 speed=243,259/s elapsed=5.7s


[rg   95/2746] rows=1,552,034 speed=237,380/s elapsed=6.1s


[rg  100/2746] rows=1,616,970 speed=246,539/s elapsed=6.3s


[rg  105/2746] rows=1,715,643 speed=227,100/s elapsed=6.8s
[rg  110/2746] rows=1,750,233 speed=180,155/s elapsed=7.0s


[rg  115/2746] rows=1,797,677 speed=179,151/s elapsed=7.2s


[rg  120/2746] rows=1,866,492 speed=164,668/s elapsed=7.6s


[rg  125/2746] rows=1,949,329 speed=157,402/s elapsed=8.2s


[rg  130/2746] rows=2,030,715 speed=147,955/s elapsed=8.7s


[rg  135/2746] rows=2,080,351 speed=132,209/s elapsed=9.1s


[rg  140/2746] rows=2,153,099 speed=255,289/s elapsed=9.4s


[rg  145/2746] rows=2,219,624 speed=234,527/s elapsed=9.7s


[rg  150/2746] rows=2,283,673 speed=223,896/s elapsed=9.9s


[rg  155/2746] rows=2,342,826 speed=203,419/s elapsed=10.2s


[rg  160/2746] rows=2,411,984 speed=246,881/s elapsed=10.5s


[rg  165/2746] rows=2,486,367 speed=271,285/s elapsed=10.8s


[rg  170/2746] rows=2,547,057 speed=217,744/s elapsed=11.1s


[rg  175/2746] rows=2,626,022 speed=229,081/s elapsed=11.4s


[rg  180/2746] rows=2,664,105 speed=159,456/s elapsed=11.7s


[rg  185/2746] rows=2,710,168 speed=155,534/s elapsed=11.9s


[rg  190/2746] rows=2,776,808 speed=198,801/s elapsed=12.3s


[rg  195/2746] rows=2,863,230 speed=142,247/s elapsed=12.9s


[rg  200/2746] rows=2,944,524 speed=130,670/s elapsed=13.5s


[rg  205/2746] rows=3,006,024 speed=142,918/s elapsed=13.9s


[rg  210/2746] rows=3,105,212 speed=263,038/s elapsed=14.3s


[rg  215/2746] rows=3,159,727 speed=215,578/s elapsed=14.6s


[rg  220/2746] rows=3,257,894 speed=199,791/s elapsed=15.1s


[rg  225/2746] rows=3,309,902 speed=200,575/s elapsed=15.3s


[rg  230/2746] rows=3,390,733 speed=159,558/s elapsed=15.8s


[rg  235/2746] rows=3,469,344 speed=140,948/s elapsed=16.4s


[rg  240/2746] rows=3,569,332 speed=199,421/s elapsed=16.9s


[rg  245/2746] rows=3,644,942 speed=185,693/s elapsed=17.3s


[rg  250/2746] rows=3,735,062 speed=179,246/s elapsed=17.8s


[rg  255/2746] rows=3,809,016 speed=224,873/s elapsed=18.1s


[rg  260/2746] rows=3,876,878 speed=153,185/s elapsed=18.6s


[rg  265/2746] rows=3,928,234 speed=136,733/s elapsed=18.9s


[rg  270/2746] rows=3,991,042 speed=146,582/s elapsed=19.4s


[rg  275/2746] rows=4,024,707 speed=92,945/s elapsed=19.7s


[rg  280/2746] rows=4,187,818 speed=262,611/s elapsed=20.4s


[rg  285/2746] rows=4,296,614 speed=191,988/s elapsed=20.9s


[rg  290/2746] rows=4,393,869 speed=154,633/s elapsed=21.6s


[rg  295/2746] rows=4,496,862 speed=192,683/s elapsed=22.1s


[rg  300/2746] rows=4,572,789 speed=167,078/s elapsed=22.5s


[rg  305/2746] rows=4,649,164 speed=191,933/s elapsed=22.9s


[rg  310/2746] rows=4,720,723 speed=205,700/s elapsed=23.3s


[rg  315/2746] rows=4,779,372 speed=183,332/s elapsed=23.6s


[rg  320/2746] rows=4,844,243 speed=216,733/s elapsed=23.9s


[rg  325/2746] rows=4,962,996 speed=285,578/s elapsed=24.3s


[rg  330/2746] rows=5,046,840 speed=237,458/s elapsed=24.7s


[rg  335/2746] rows=5,092,391 speed=197,170/s elapsed=24.9s


[rg  340/2746] rows=5,185,458 speed=249,212/s elapsed=25.3s


[rg  345/2746] rows=5,246,243 speed=162,757/s elapsed=25.7s


[rg  350/2746] rows=5,289,565 speed=162,505/s elapsed=25.9s


[rg  355/2746] rows=5,342,317 speed=154,408/s elapsed=26.3s


[rg  360/2746] rows=5,402,244 speed=119,116/s elapsed=26.8s


[rg  365/2746] rows=5,441,188 speed=128,079/s elapsed=27.1s


[rg  370/2746] rows=5,508,313 speed=205,909/s elapsed=27.4s


[rg  375/2746] rows=5,595,817 speed=168,800/s elapsed=27.9s


[rg  380/2746] rows=5,725,206 speed=251,869/s elapsed=28.4s


[rg  385/2746] rows=5,788,481 speed=164,100/s elapsed=28.8s


[rg  390/2746] rows=5,905,479 speed=223,369/s elapsed=29.3s


[rg  395/2746] rows=5,957,637 speed=120,658/s elapsed=29.8s


[rg  400/2746] rows=6,016,712 speed=158,829/s elapsed=30.1s


[rg  405/2746] rows=6,099,135 speed=140,176/s elapsed=30.7s


[rg  410/2746] rows=6,189,823 speed=192,345/s elapsed=31.2s


[rg  415/2746] rows=6,277,291 speed=175,394/s elapsed=31.7s


[rg  420/2746] rows=6,363,562 speed=177,637/s elapsed=32.2s


[rg  425/2746] rows=6,444,547 speed=164,296/s elapsed=32.7s


[rg  430/2746] rows=6,536,315 speed=163,909/s elapsed=33.2s


[rg  435/2746] rows=6,637,587 speed=173,320/s elapsed=33.8s


[rg  440/2746] rows=6,699,821 speed=195,679/s elapsed=34.1s


[rg  445/2746] rows=6,757,355 speed=214,676/s elapsed=34.4s


[rg  450/2746] rows=6,843,493 speed=218,283/s elapsed=34.8s


[rg  455/2746] rows=6,932,933 speed=229,896/s elapsed=35.2s


[rg  460/2746] rows=7,049,733 speed=264,667/s elapsed=35.6s


[rg  465/2746] rows=7,100,735 speed=197,255/s elapsed=35.9s


[rg  470/2746] rows=7,190,271 speed=228,116/s elapsed=36.3s


[rg  475/2746] rows=7,267,948 speed=209,021/s elapsed=36.7s


[rg  480/2746] rows=7,356,233 speed=205,019/s elapsed=37.1s


[rg  485/2746] rows=7,454,893 speed=220,248/s elapsed=37.5s


[rg  490/2746] rows=7,593,676 speed=218,145/s elapsed=38.2s


[rg  495/2746] rows=7,683,191 speed=199,520/s elapsed=38.6s


[rg  500/2746] rows=7,745,466 speed=221,150/s elapsed=38.9s
[rg  505/2746] rows=7,788,686 speed=280,452/s elapsed=39.1s


[rg  510/2746] rows=7,839,274 speed=256,679/s elapsed=39.3s


[rg  515/2746] rows=7,894,269 speed=210,621/s elapsed=39.5s


[rg  520/2746] rows=7,975,531 speed=244,376/s elapsed=39.8s


[rg  525/2746] rows=8,040,301 speed=197,826/s elapsed=40.2s


[rg  530/2746] rows=8,103,055 speed=218,223/s elapsed=40.5s


[rg  535/2746] rows=8,167,614 speed=191,690/s elapsed=40.8s


[rg  540/2746] rows=8,231,774 speed=221,661/s elapsed=41.1s


[rg  545/2746] rows=8,303,914 speed=179,440/s elapsed=41.5s


[rg  550/2746] rows=8,396,384 speed=180,561/s elapsed=42.0s


[rg  555/2746] rows=8,524,561 speed=160,755/s elapsed=42.8s


[rg  560/2746] rows=8,592,726 speed=160,966/s elapsed=43.2s


[rg  565/2746] rows=8,653,881 speed=165,316/s elapsed=43.6s


[rg  570/2746] rows=8,734,648 speed=191,045/s elapsed=44.0s


[rg  575/2746] rows=8,788,912 speed=182,471/s elapsed=44.3s
[rg  580/2746] rows=8,840,191 speed=252,663/s elapsed=44.5s


[rg  585/2746] rows=8,891,632 speed=145,383/s elapsed=44.9s


[rg  590/2746] rows=8,979,861 speed=164,386/s elapsed=45.4s


[rg  595/2746] rows=9,060,788 speed=181,623/s elapsed=45.9s


[rg  600/2746] rows=9,158,867 speed=225,152/s elapsed=46.3s


[rg  605/2746] rows=9,229,215 speed=266,556/s elapsed=46.6s


[rg  610/2746] rows=9,363,308 speed=340,407/s elapsed=46.9s


[rg  615/2746] rows=9,431,726 speed=243,174/s elapsed=47.2s


[rg  620/2746] rows=9,514,309 speed=232,914/s elapsed=47.6s


[rg  625/2746] rows=9,592,357 speed=258,894/s elapsed=47.9s
[rg  630/2746] rows=9,617,312 speed=184,608/s elapsed=48.0s


[rg  635/2746] rows=9,676,355 speed=217,489/s elapsed=48.3s


[rg  640/2746] rows=9,801,239 speed=264,684/s elapsed=48.8s


[rg  645/2746] rows=9,851,308 speed=140,053/s elapsed=49.1s


[rg  650/2746] rows=9,919,139 speed=236,615/s elapsed=49.4s


[rg  655/2746] rows=9,970,845 speed=81,247/s elapsed=50.0s


[rg  660/2746] rows=10,052,710 speed=181,711/s elapsed=50.5s


[rg  665/2746] rows=10,117,824 speed=152,570/s elapsed=50.9s


[rg  670/2746] rows=10,166,781 speed=178,966/s elapsed=51.2s


[rg  675/2746] rows=10,213,021 speed=178,756/s elapsed=51.5s


[rg  680/2746] rows=10,271,351 speed=220,221/s elapsed=51.7s


[rg  685/2746] rows=10,317,251 speed=148,080/s elapsed=52.0s
[rg  690/2746] rows=10,360,276 speed=220,728/s elapsed=52.2s


[rg  695/2746] rows=10,409,723 speed=134,801/s elapsed=52.6s


[rg  700/2746] rows=10,525,856 speed=184,771/s elapsed=53.2s


[rg  705/2746] rows=10,593,424 speed=135,723/s elapsed=53.7s


[rg  710/2746] rows=10,655,579 speed=174,263/s elapsed=54.1s


[rg  715/2746] rows=10,708,969 speed=197,520/s elapsed=54.3s


[rg  720/2746] rows=10,773,215 speed=183,449/s elapsed=54.7s


[rg  725/2746] rows=10,839,857 speed=207,742/s elapsed=55.0s


[rg  730/2746] rows=10,892,997 speed=176,756/s elapsed=55.3s


[rg  735/2746] rows=10,918,102 speed=85,171/s elapsed=55.6s


[rg  740/2746] rows=10,956,208 speed=105,888/s elapsed=56.0s


[rg  745/2746] rows=11,041,981 speed=127,004/s elapsed=56.6s
[rg  750/2746] rows=11,079,117 speed=185,999/s elapsed=56.8s


[rg  755/2746] rows=11,211,025 speed=264,564/s elapsed=57.3s


[rg  760/2746] rows=11,292,108 speed=206,449/s elapsed=57.7s


[rg  765/2746] rows=11,362,036 speed=144,590/s elapsed=58.2s


[rg  770/2746] rows=11,493,635 speed=136,310/s elapsed=59.2s


[rg  775/2746] rows=11,629,581 speed=327,490/s elapsed=59.6s


[rg  780/2746] rows=11,731,389 speed=325,187/s elapsed=59.9s


[rg  785/2746] rows=11,789,398 speed=229,177/s elapsed=60.2s


[rg  790/2746] rows=11,869,490 speed=253,328/s elapsed=60.5s


[rg  795/2746] rows=11,944,024 speed=257,388/s elapsed=60.8s


[rg  800/2746] rows=12,020,900 speed=274,980/s elapsed=61.0s


[rg  805/2746] rows=12,100,134 speed=254,428/s elapsed=61.4s


[rg  810/2746] rows=12,158,753 speed=213,251/s elapsed=61.6s


[rg  815/2746] rows=12,228,178 speed=204,905/s elapsed=62.0s


[rg  820/2746] rows=12,301,606 speed=250,637/s elapsed=62.3s


[rg  825/2746] rows=12,381,739 speed=147,861/s elapsed=62.8s


[rg  830/2746] rows=12,442,279 speed=171,267/s elapsed=63.2s


[rg  835/2746] rows=12,519,395 speed=174,512/s elapsed=63.6s


[rg  840/2746] rows=12,657,375 speed=223,581/s elapsed=64.2s


[rg  845/2746] rows=12,805,582 speed=326,324/s elapsed=64.7s


[rg  850/2746] rows=12,905,971 speed=247,880/s elapsed=65.1s


[rg  855/2746] rows=12,979,101 speed=212,785/s elapsed=65.4s


[rg  860/2746] rows=13,022,294 speed=185,316/s elapsed=65.7s


[rg  865/2746] rows=13,087,970 speed=196,692/s elapsed=66.0s


[rg  870/2746] rows=13,132,206 speed=199,757/s elapsed=66.2s


[rg  875/2746] rows=13,203,144 speed=188,564/s elapsed=66.6s
[rg  880/2746] rows=13,236,415 speed=209,495/s elapsed=66.7s


[rg  885/2746] rows=13,290,274 speed=131,711/s elapsed=67.2s


[rg  890/2746] rows=13,406,117 speed=164,187/s elapsed=67.9s


[rg  895/2746] rows=13,462,403 speed=227,767/s elapsed=68.1s


[rg  900/2746] rows=13,515,169 speed=240,159/s elapsed=68.3s


[rg  905/2746] rows=13,589,018 speed=195,978/s elapsed=68.7s
[rg  910/2746] rows=13,621,905 speed=220,881/s elapsed=68.9s


[rg  915/2746] rows=13,701,049 speed=165,807/s elapsed=69.3s


[rg  920/2746] rows=13,752,007 speed=133,241/s elapsed=69.7s


[rg  925/2746] rows=13,812,910 speed=154,435/s elapsed=70.1s


[rg  930/2746] rows=13,885,121 speed=131,178/s elapsed=70.7s
[rg  935/2746] rows=13,927,695 speed=229,448/s elapsed=70.8s


[rg  940/2746] rows=14,005,212 speed=191,560/s elapsed=71.3s


[rg  945/2746] rows=14,078,411 speed=192,003/s elapsed=71.6s


[rg  950/2746] rows=14,179,762 speed=147,896/s elapsed=72.3s


[rg  955/2746] rows=14,248,681 speed=114,748/s elapsed=72.9s


[rg  960/2746] rows=14,307,200 speed=160,684/s elapsed=73.3s


[rg  965/2746] rows=14,395,603 speed=226,672/s elapsed=73.7s


[rg  970/2746] rows=14,480,605 speed=240,150/s elapsed=74.0s


[rg  975/2746] rows=14,549,466 speed=234,955/s elapsed=74.3s


[rg  980/2746] rows=14,633,023 speed=254,726/s elapsed=74.6s


[rg  985/2746] rows=14,757,618 speed=273,023/s elapsed=75.1s


[rg  990/2746] rows=14,808,410 speed=208,335/s elapsed=75.3s


[rg  995/2746] rows=14,892,605 speed=257,989/s elapsed=75.7s


[rg 1000/2746] rows=14,940,149 speed=176,189/s elapsed=75.9s


[rg 1005/2746] rows=15,016,494 speed=200,037/s elapsed=76.3s


[rg 1010/2746] rows=15,146,035 speed=152,592/s elapsed=77.2s


[rg 1015/2746] rows=15,218,046 speed=121,473/s elapsed=77.8s


[rg 1020/2746] rows=15,276,305 speed=160,262/s elapsed=78.1s
[rg 1025/2746] rows=15,311,404 speed=172,482/s elapsed=78.3s


[rg 1030/2746] rows=15,389,632 speed=330,015/s elapsed=78.6s


[rg 1035/2746] rows=15,471,469 speed=248,390/s elapsed=78.9s


[rg 1040/2746] rows=15,554,118 speed=250,448/s elapsed=79.2s


[rg 1045/2746] rows=15,654,344 speed=261,479/s elapsed=79.6s


[rg 1050/2746] rows=15,722,261 speed=277,186/s elapsed=79.9s


[rg 1055/2746] rows=15,770,925 speed=206,223/s elapsed=80.1s


[rg 1060/2746] rows=15,838,672 speed=259,666/s elapsed=80.4s


[rg 1065/2746] rows=15,892,705 speed=200,598/s elapsed=80.6s


[rg 1070/2746] rows=15,979,281 speed=212,956/s elapsed=81.0s


[rg 1075/2746] rows=16,025,441 speed=185,177/s elapsed=81.3s


[rg 1080/2746] rows=16,109,782 speed=157,775/s elapsed=81.8s


[rg 1085/2746] rows=16,183,013 speed=152,540/s elapsed=82.3s


[rg 1090/2746] rows=16,251,665 speed=159,610/s elapsed=82.7s


[rg 1095/2746] rows=16,309,978 speed=194,569/s elapsed=83.0s


[rg 1100/2746] rows=16,408,784 speed=227,296/s elapsed=83.5s


[rg 1105/2746] rows=16,468,147 speed=213,411/s elapsed=83.7s


[rg 1110/2746] rows=16,540,115 speed=198,909/s elapsed=84.1s


[rg 1115/2746] rows=16,614,754 speed=124,259/s elapsed=84.7s


[rg 1120/2746] rows=16,689,326 speed=130,776/s elapsed=85.3s


[rg 1125/2746] rows=16,740,316 speed=168,455/s elapsed=85.6s


[rg 1130/2746] rows=16,826,514 speed=236,904/s elapsed=85.9s


[rg 1135/2746] rows=16,914,044 speed=197,072/s elapsed=86.4s


[rg 1140/2746] rows=16,968,982 speed=205,618/s elapsed=86.6s


[rg 1145/2746] rows=17,001,865 speed=91,958/s elapsed=87.0s


[rg 1150/2746] rows=17,096,806 speed=167,833/s elapsed=87.6s


[rg 1155/2746] rows=17,194,581 speed=144,199/s elapsed=88.2s


[rg 1160/2746] rows=17,270,246 speed=241,232/s elapsed=88.6s


[rg 1165/2746] rows=17,374,083 speed=197,512/s elapsed=89.1s


[rg 1170/2746] rows=17,460,481 speed=212,889/s elapsed=89.5s


[rg 1175/2746] rows=17,529,157 speed=143,557/s elapsed=90.0s


[rg 1180/2746] rows=17,576,896 speed=91,829/s elapsed=90.5s


[rg 1185/2746] rows=17,638,853 speed=155,621/s elapsed=90.9s


[rg 1190/2746] rows=17,700,703 speed=239,493/s elapsed=91.1s
[rg 1195/2746] rows=17,736,411 speed=215,868/s elapsed=91.3s


[rg 1200/2746] rows=17,826,258 speed=268,449/s elapsed=91.6s


[rg 1205/2746] rows=17,920,130 speed=224,573/s elapsed=92.1s


[rg 1210/2746] rows=18,003,998 speed=190,542/s elapsed=92.5s


[rg 1215/2746] rows=18,093,034 speed=142,663/s elapsed=93.1s


[rg 1220/2746] rows=18,116,144 speed=90,237/s elapsed=93.4s


[rg 1225/2746] rows=18,173,511 speed=174,016/s elapsed=93.7s
[rg 1230/2746] rows=18,233,615 speed=302,067/s elapsed=93.9s


[rg 1235/2746] rows=18,298,659 speed=270,292/s elapsed=94.2s


[rg 1240/2746] rows=18,360,245 speed=257,949/s elapsed=94.4s
[rg 1245/2746] rows=18,396,126 speed=209,022/s elapsed=94.6s


[rg 1250/2746] rows=18,456,653 speed=255,870/s elapsed=94.8s


[rg 1255/2746] rows=18,518,742 speed=206,823/s elapsed=95.1s


[rg 1260/2746] rows=18,577,223 speed=223,544/s elapsed=95.4s


[rg 1265/2746] rows=18,661,529 speed=238,748/s elapsed=95.7s


[rg 1270/2746] rows=18,731,482 speed=250,163/s elapsed=96.0s


[rg 1275/2746] rows=18,841,525 speed=274,521/s elapsed=96.4s


[rg 1280/2746] rows=18,933,118 speed=217,923/s elapsed=96.8s


[rg 1285/2746] rows=19,006,274 speed=159,104/s elapsed=97.3s


[rg 1290/2746] rows=19,091,427 speed=160,378/s elapsed=97.8s


[rg 1295/2746] rows=19,174,962 speed=155,037/s elapsed=98.3s


[rg 1300/2746] rows=19,230,619 speed=216,222/s elapsed=98.6s


[rg 1305/2746] rows=19,304,925 speed=271,698/s elapsed=98.9s


[rg 1310/2746] rows=19,394,186 speed=234,004/s elapsed=99.3s


[rg 1315/2746] rows=19,450,662 speed=222,461/s elapsed=99.5s


[rg 1320/2746] rows=19,493,493 speed=135,447/s elapsed=99.8s


[rg 1325/2746] rows=19,544,625 speed=142,225/s elapsed=100.2s


[rg 1330/2746] rows=19,578,336 speed=106,389/s elapsed=100.5s


[rg 1335/2746] rows=19,666,313 speed=179,116/s elapsed=101.0s


[rg 1340/2746] rows=19,724,358 speed=160,417/s elapsed=101.4s


[rg 1345/2746] rows=19,840,697 speed=289,163/s elapsed=101.8s


[rg 1350/2746] rows=19,919,198 speed=330,371/s elapsed=102.0s


[rg 1355/2746] rows=20,051,777 speed=301,574/s elapsed=102.4s


[rg 1360/2746] rows=20,140,148 speed=291,969/s elapsed=102.7s


[rg 1365/2746] rows=20,218,061 speed=238,316/s elapsed=103.1s


[rg 1370/2746] rows=20,308,729 speed=242,636/s elapsed=103.4s


[rg 1375/2746] rows=20,366,319 speed=189,098/s elapsed=103.7s


[rg 1380/2746] rows=20,459,042 speed=209,607/s elapsed=104.2s


[rg 1385/2746] rows=20,535,780 speed=180,048/s elapsed=104.6s


[rg 1390/2746] rows=20,610,377 speed=144,055/s elapsed=105.1s


[rg 1395/2746] rows=20,693,113 speed=134,261/s elapsed=105.8s


[rg 1400/2746] rows=20,744,375 speed=189,061/s elapsed=106.0s
[rg 1405/2746] rows=20,795,841 speed=235,848/s elapsed=106.2s


[rg 1410/2746] rows=20,952,711 speed=304,103/s elapsed=106.8s


[rg 1415/2746] rows=21,061,722 speed=271,085/s elapsed=107.2s


[rg 1420/2746] rows=21,140,837 speed=222,194/s elapsed=107.5s


[rg 1425/2746] rows=21,216,494 speed=238,117/s elapsed=107.8s


[rg 1430/2746] rows=21,313,404 speed=261,919/s elapsed=108.2s


[rg 1435/2746] rows=21,406,449 speed=192,707/s elapsed=108.7s


[rg 1440/2746] rows=21,480,240 speed=210,735/s elapsed=109.0s


[rg 1445/2746] rows=21,560,725 speed=156,038/s elapsed=109.6s


[rg 1450/2746] rows=21,617,527 speed=104,013/s elapsed=110.1s


[rg 1455/2746] rows=21,735,619 speed=159,418/s elapsed=110.8s


[rg 1460/2746] rows=21,861,901 speed=215,441/s elapsed=111.4s


[rg 1465/2746] rows=21,943,698 speed=211,176/s elapsed=111.8s


[rg 1470/2746] rows=22,023,700 speed=201,477/s elapsed=112.2s


[rg 1475/2746] rows=22,100,036 speed=134,785/s elapsed=112.8s


[rg 1480/2746] rows=22,152,506 speed=121,465/s elapsed=113.2s


[rg 1485/2746] rows=22,233,226 speed=213,982/s elapsed=113.6s


[rg 1490/2746] rows=22,311,471 speed=222,684/s elapsed=113.9s


[rg 1495/2746] rows=22,367,676 speed=208,910/s elapsed=114.2s


[rg 1500/2746] rows=22,436,803 speed=208,286/s elapsed=114.5s


[rg 1505/2746] rows=22,503,651 speed=127,543/s elapsed=115.1s


[rg 1510/2746] rows=22,557,261 speed=125,407/s elapsed=115.5s


[rg 1515/2746] rows=22,631,875 speed=154,639/s elapsed=116.0s


[rg 1520/2746] rows=22,703,279 speed=245,447/s elapsed=116.3s


[rg 1525/2746] rows=22,753,269 speed=175,361/s elapsed=116.5s


[rg 1530/2746] rows=22,833,748 speed=330,971/s elapsed=116.8s


[rg 1535/2746] rows=22,892,186 speed=198,947/s elapsed=117.1s
[rg 1540/2746] rows=22,958,038 speed=335,332/s elapsed=117.3s


[rg 1545/2746] rows=23,059,357 speed=256,784/s elapsed=117.7s


[rg 1550/2746] rows=23,126,337 speed=315,171/s elapsed=117.9s


[rg 1555/2746] rows=23,194,768 speed=245,315/s elapsed=118.2s


[rg 1560/2746] rows=23,299,474 speed=198,496/s elapsed=118.7s


[rg 1565/2746] rows=23,349,439 speed=163,572/s elapsed=119.0s


[rg 1570/2746] rows=23,438,533 speed=200,325/s elapsed=119.4s


[rg 1575/2746] rows=23,507,083 speed=132,791/s elapsed=120.0s


[rg 1580/2746] rows=23,585,104 speed=136,219/s elapsed=120.5s


[rg 1585/2746] rows=23,645,637 speed=167,260/s elapsed=120.9s


[rg 1590/2746] rows=23,715,746 speed=230,988/s elapsed=121.2s


[rg 1595/2746] rows=23,777,384 speed=186,335/s elapsed=121.5s


[rg 1600/2746] rows=23,859,893 speed=215,807/s elapsed=121.9s


[rg 1605/2746] rows=23,925,580 speed=133,622/s elapsed=122.4s


[rg 1610/2746] rows=24,014,579 speed=157,437/s elapsed=123.0s


[rg 1615/2746] rows=24,098,570 speed=183,029/s elapsed=123.4s


[rg 1620/2746] rows=24,194,280 speed=305,972/s elapsed=123.7s


[rg 1625/2746] rows=24,263,571 speed=189,736/s elapsed=124.1s


[rg 1630/2746] rows=24,353,017 speed=258,353/s elapsed=124.5s


[rg 1635/2746] rows=24,419,711 speed=147,707/s elapsed=124.9s


[rg 1640/2746] rows=24,492,752 speed=161,668/s elapsed=125.4s


[rg 1645/2746] rows=24,552,599 speed=148,118/s elapsed=125.8s


[rg 1650/2746] rows=24,618,157 speed=231,491/s elapsed=126.0s


[rg 1655/2746] rows=24,700,322 speed=217,153/s elapsed=126.4s


[rg 1660/2746] rows=24,751,690 speed=218,840/s elapsed=126.7s


[rg 1665/2746] rows=24,808,263 speed=202,269/s elapsed=126.9s


[rg 1670/2746] rows=24,895,125 speed=165,669/s elapsed=127.5s


[rg 1675/2746] rows=24,949,425 speed=110,819/s elapsed=127.9s


[rg 1680/2746] rows=25,023,093 speed=141,803/s elapsed=128.5s


[rg 1685/2746] rows=25,078,019 speed=179,278/s elapsed=128.8s
[rg 1690/2746] rows=25,130,275 speed=240,096/s elapsed=129.0s


[rg 1695/2746] rows=25,194,345 speed=219,365/s elapsed=129.3s


[rg 1700/2746] rows=25,296,730 speed=255,208/s elapsed=129.7s


[rg 1705/2746] rows=25,406,161 speed=252,203/s elapsed=130.1s
[rg 1710/2746] rows=25,442,009 speed=181,748/s elapsed=130.3s


[rg 1715/2746] rows=25,505,160 speed=249,965/s elapsed=130.6s


[rg 1720/2746] rows=25,617,079 speed=245,109/s elapsed=131.0s


[rg 1725/2746] rows=25,685,557 speed=224,870/s elapsed=131.3s


[rg 1730/2746] rows=25,759,064 speed=264,008/s elapsed=131.6s


[rg 1735/2746] rows=25,813,907 speed=119,668/s elapsed=132.1s


[rg 1740/2746] rows=25,878,331 speed=142,618/s elapsed=132.5s


[rg 1745/2746] rows=25,908,726 speed=106,924/s elapsed=132.8s


[rg 1750/2746] rows=25,997,005 speed=190,167/s elapsed=133.3s
[rg 1755/2746] rows=26,034,046 speed=182,135/s elapsed=133.5s


[rg 1760/2746] rows=26,104,966 speed=262,728/s elapsed=133.7s


[rg 1765/2746] rows=26,191,651 speed=270,916/s elapsed=134.1s


[rg 1770/2746] rows=26,342,414 speed=323,505/s elapsed=134.5s


[rg 1775/2746] rows=26,402,298 speed=227,234/s elapsed=134.8s


[rg 1780/2746] rows=26,471,609 speed=266,844/s elapsed=135.1s
[rg 1785/2746] rows=26,537,575 speed=338,605/s elapsed=135.2s


[rg 1790/2746] rows=26,600,781 speed=269,332/s elapsed=135.5s


[rg 1795/2746] rows=26,677,855 speed=244,494/s elapsed=135.8s


[rg 1800/2746] rows=26,735,631 speed=262,906/s elapsed=136.0s


[rg 1805/2746] rows=26,808,935 speed=207,023/s elapsed=136.4s


[rg 1810/2746] rows=26,880,608 speed=152,719/s elapsed=136.8s


[rg 1815/2746] rows=26,941,016 speed=144,419/s elapsed=137.3s


[rg 1820/2746] rows=26,982,225 speed=113,498/s elapsed=137.6s


[rg 1825/2746] rows=27,029,977 speed=126,982/s elapsed=138.0s
[rg 1830/2746] rows=27,086,453 speed=260,257/s elapsed=138.2s


[rg 1835/2746] rows=27,150,226 speed=212,104/s elapsed=138.5s


[rg 1840/2746] rows=27,202,459 speed=191,445/s elapsed=138.8s


[rg 1845/2746] rows=27,286,507 speed=260,667/s elapsed=139.1s


[rg 1850/2746] rows=27,317,852 speed=126,158/s elapsed=139.4s


[rg 1855/2746] rows=27,403,126 speed=143,183/s elapsed=140.0s


[rg 1860/2746] rows=27,458,983 speed=152,961/s elapsed=140.3s


[rg 1865/2746] rows=27,517,191 speed=153,471/s elapsed=140.7s


[rg 1870/2746] rows=27,601,369 speed=205,885/s elapsed=141.1s


[rg 1875/2746] rows=27,666,020 speed=197,230/s elapsed=141.4s
[rg 1880/2746] rows=27,701,716 speed=287,997/s elapsed=141.6s


[rg 1885/2746] rows=27,764,936 speed=228,890/s elapsed=141.8s
[rg 1890/2746] rows=27,809,309 speed=239,646/s elapsed=142.0s


[rg 1895/2746] rows=27,884,981 speed=247,464/s elapsed=142.3s


[rg 1900/2746] rows=27,966,661 speed=236,034/s elapsed=142.7s


[rg 1905/2746] rows=28,035,104 speed=243,767/s elapsed=143.0s


[rg 1910/2746] rows=28,113,662 speed=198,386/s elapsed=143.3s


[rg 1915/2746] rows=28,165,071 speed=152,479/s elapsed=143.7s
[rg 1920/2746] rows=28,215,601 speed=236,732/s elapsed=143.9s


[rg 1925/2746] rows=28,299,613 speed=138,762/s elapsed=144.5s


[rg 1930/2746] rows=28,373,315 speed=152,951/s elapsed=145.0s


[rg 1935/2746] rows=28,453,724 speed=161,041/s elapsed=145.5s


[rg 1940/2746] rows=28,516,937 speed=241,109/s elapsed=145.7s


[rg 1945/2746] rows=28,564,458 speed=221,947/s elapsed=146.0s
[rg 1950/2746] rows=28,634,617 speed=366,381/s elapsed=146.2s


[rg 1955/2746] rows=28,750,956 speed=265,442/s elapsed=146.6s


[rg 1960/2746] rows=28,818,248 speed=246,353/s elapsed=146.9s


[rg 1965/2746] rows=28,897,517 speed=228,835/s elapsed=147.2s
[rg 1970/2746] rows=28,938,827 speed=234,101/s elapsed=147.4s


[rg 1975/2746] rows=28,991,302 speed=235,618/s elapsed=147.6s
[rg 1980/2746] rows=29,034,499 speed=223,912/s elapsed=147.8s


[rg 1985/2746] rows=29,111,672 speed=229,068/s elapsed=148.1s


[rg 1990/2746] rows=29,179,447 speed=223,512/s elapsed=148.4s


[rg 1995/2746] rows=29,369,697 speed=224,683/s elapsed=149.3s


[rg 2000/2746] rows=29,529,515 speed=193,699/s elapsed=150.1s


[rg 2005/2746] rows=29,590,396 speed=199,510/s elapsed=150.4s


[rg 2010/2746] rows=29,648,903 speed=209,937/s elapsed=150.7s


[rg 2015/2746] rows=29,710,646 speed=155,229/s elapsed=151.1s


[rg 2020/2746] rows=29,754,041 speed=180,241/s elapsed=151.3s


[rg 2025/2746] rows=29,811,938 speed=145,765/s elapsed=151.7s


[rg 2030/2746] rows=29,888,731 speed=129,880/s elapsed=152.3s


[rg 2035/2746] rows=29,940,764 speed=162,353/s elapsed=152.6s


[rg 2040/2746] rows=30,015,349 speed=216,080/s elapsed=153.0s


[rg 2045/2746] rows=30,093,745 speed=268,935/s elapsed=153.3s


[rg 2050/2746] rows=30,185,896 speed=257,493/s elapsed=153.6s


[rg 2055/2746] rows=30,249,625 speed=228,582/s elapsed=153.9s


[rg 2060/2746] rows=30,329,330 speed=252,602/s elapsed=154.2s


[rg 2065/2746] rows=30,434,350 speed=196,270/s elapsed=154.8s


[rg 2070/2746] rows=30,529,976 speed=196,597/s elapsed=155.3s


[rg 2075/2746] rows=30,618,774 speed=160,341/s elapsed=155.8s


[rg 2080/2746] rows=30,709,810 speed=141,467/s elapsed=156.5s


[rg 2085/2746] rows=30,783,109 speed=206,338/s elapsed=156.8s


[rg 2090/2746] rows=30,831,975 speed=203,690/s elapsed=157.0s


[rg 2095/2746] rows=30,914,441 speed=213,499/s elapsed=157.4s


[rg 2100/2746] rows=30,984,807 speed=241,799/s elapsed=157.7s


[rg 2105/2746] rows=31,017,837 speed=98,748/s elapsed=158.1s


[rg 2110/2746] rows=31,096,678 speed=148,744/s elapsed=158.6s


[rg 2115/2746] rows=31,204,001 speed=148,376/s elapsed=159.3s


[rg 2120/2746] rows=31,289,082 speed=244,082/s elapsed=159.7s
[rg 2125/2746] rows=31,325,750 speed=196,873/s elapsed=159.8s


[rg 2130/2746] rows=31,386,894 speed=248,508/s elapsed=160.1s


[rg 2135/2746] rows=31,455,907 speed=254,951/s elapsed=160.4s


[rg 2140/2746] rows=31,527,476 speed=266,438/s elapsed=160.6s


[rg 2145/2746] rows=31,612,933 speed=232,759/s elapsed=161.0s


[rg 2150/2746] rows=31,649,628 speed=143,475/s elapsed=161.3s


[rg 2155/2746] rows=31,727,741 speed=202,926/s elapsed=161.6s


[rg 2160/2746] rows=31,820,024 speed=167,934/s elapsed=162.2s


[rg 2165/2746] rows=31,896,991 speed=156,869/s elapsed=162.7s


[rg 2170/2746] rows=31,990,948 speed=190,284/s elapsed=163.2s
[rg 2175/2746] rows=32,019,099 speed=183,917/s elapsed=163.3s


[rg 2180/2746] rows=32,102,450 speed=269,768/s elapsed=163.6s


[rg 2185/2746] rows=32,171,010 speed=253,639/s elapsed=163.9s


[rg 2190/2746] rows=32,260,244 speed=276,148/s elapsed=164.2s


[rg 2195/2746] rows=32,344,973 speed=197,526/s elapsed=164.7s


[rg 2200/2746] rows=32,447,081 speed=220,363/s elapsed=165.1s


[rg 2205/2746] rows=32,494,838 speed=129,892/s elapsed=165.5s


[rg 2210/2746] rows=32,546,036 speed=181,898/s elapsed=165.8s


[rg 2215/2746] rows=32,611,023 speed=208,276/s elapsed=166.1s


[rg 2220/2746] rows=32,685,954 speed=247,530/s elapsed=166.4s


[rg 2225/2746] rows=32,755,550 speed=182,293/s elapsed=166.8s


[rg 2230/2746] rows=32,827,754 speed=207,614/s elapsed=167.1s


[rg 2235/2746] rows=32,918,242 speed=172,939/s elapsed=167.6s


[rg 2240/2746] rows=32,977,243 speed=148,465/s elapsed=168.0s


[rg 2245/2746] rows=33,038,318 speed=164,983/s elapsed=168.4s


[rg 2250/2746] rows=33,139,330 speed=153,230/s elapsed=169.1s


[rg 2255/2746] rows=33,230,395 speed=160,119/s elapsed=169.6s


[rg 2260/2746] rows=33,303,172 speed=194,860/s elapsed=170.0s
[rg 2265/2746] rows=33,353,053 speed=258,141/s elapsed=170.2s


[rg 2270/2746] rows=33,403,576 speed=150,755/s elapsed=170.5s


[rg 2275/2746] rows=33,491,558 speed=177,768/s elapsed=171.0s


[rg 2280/2746] rows=33,559,341 speed=211,821/s elapsed=171.4s


[rg 2285/2746] rows=33,654,237 speed=143,098/s elapsed=172.0s


[rg 2290/2746] rows=33,733,211 speed=157,347/s elapsed=172.5s


[rg 2295/2746] rows=33,839,485 speed=156,768/s elapsed=173.2s


[rg 2300/2746] rows=33,895,493 speed=181,788/s elapsed=173.5s


[rg 2305/2746] rows=33,957,879 speed=125,438/s elapsed=174.0s


[rg 2310/2746] rows=34,024,942 speed=125,394/s elapsed=174.5s


[rg 2315/2746] rows=34,112,391 speed=131,079/s elapsed=175.2s


[rg 2320/2746] rows=34,189,335 speed=184,557/s elapsed=175.6s


[rg 2325/2746] rows=34,264,450 speed=211,576/s elapsed=176.0s


[rg 2330/2746] rows=34,348,164 speed=184,394/s elapsed=176.4s


[rg 2335/2746] rows=34,426,497 speed=219,265/s elapsed=176.8s
[rg 2340/2746] rows=34,442,908 speed=94,873/s elapsed=177.0s


[rg 2345/2746] rows=34,524,001 speed=160,449/s elapsed=177.5s


[rg 2350/2746] rows=34,589,564 speed=143,829/s elapsed=177.9s


[rg 2355/2746] rows=34,679,202 speed=163,230/s elapsed=178.5s


[rg 2360/2746] rows=34,727,284 speed=153,092/s elapsed=178.8s


[rg 2365/2746] rows=34,826,254 speed=228,406/s elapsed=179.2s
[rg 2370/2746] rows=34,856,948 speed=188,521/s elapsed=179.4s


[rg 2375/2746] rows=34,916,553 speed=124,071/s elapsed=179.9s


[rg 2380/2746] rows=34,959,099 speed=161,485/s elapsed=180.1s


[rg 2385/2746] rows=35,010,293 speed=115,661/s elapsed=180.6s


[rg 2390/2746] rows=35,148,331 speed=254,085/s elapsed=181.1s


[rg 2395/2746] rows=35,255,577 speed=255,874/s elapsed=181.5s


[rg 2400/2746] rows=35,321,820 speed=216,480/s elapsed=181.8s


[rg 2405/2746] rows=35,398,318 speed=238,881/s elapsed=182.2s


[rg 2410/2746] rows=35,490,625 speed=255,137/s elapsed=182.5s


[rg 2415/2746] rows=35,581,561 speed=228,744/s elapsed=182.9s


[rg 2420/2746] rows=35,716,791 speed=267,475/s elapsed=183.4s


[rg 2425/2746] rows=35,783,995 speed=165,215/s elapsed=183.8s


[rg 2430/2746] rows=35,827,888 speed=190,464/s elapsed=184.1s


[rg 2435/2746] rows=35,873,070 speed=139,680/s elapsed=184.4s


[rg 2440/2746] rows=35,948,106 speed=133,679/s elapsed=184.9s


[rg 2445/2746] rows=36,011,095 speed=135,191/s elapsed=185.4s


[rg 2450/2746] rows=36,153,694 speed=261,656/s elapsed=186.0s


[rg 2455/2746] rows=36,188,678 speed=158,312/s elapsed=186.2s


[rg 2460/2746] rows=36,254,053 speed=256,395/s elapsed=186.4s


[rg 2465/2746] rows=36,318,147 speed=225,324/s elapsed=186.7s


[rg 2470/2746] rows=36,388,034 speed=232,208/s elapsed=187.0s


[rg 2475/2746] rows=36,519,739 speed=231,636/s elapsed=187.6s


[rg 2480/2746] rows=36,584,893 speed=199,671/s elapsed=187.9s
[rg 2485/2746] rows=36,598,305 speed=73,677/s elapsed=188.1s


[rg 2490/2746] rows=36,645,381 speed=189,882/s elapsed=188.3s


[rg 2495/2746] rows=36,715,861 speed=131,735/s elapsed=188.9s


[rg 2500/2746] rows=36,790,420 speed=182,957/s elapsed=189.3s


[rg 2505/2746] rows=36,855,011 speed=149,755/s elapsed=189.7s


[rg 2510/2746] rows=37,018,673 speed=265,051/s elapsed=190.3s


[rg 2515/2746] rows=37,078,880 speed=153,091/s elapsed=190.7s


[rg 2520/2746] rows=37,138,887 speed=145,609/s elapsed=191.1s


[rg 2525/2746] rows=37,194,440 speed=115,741/s elapsed=191.6s


[rg 2530/2746] rows=37,276,910 speed=163,365/s elapsed=192.1s


[rg 2535/2746] rows=37,370,170 speed=229,042/s elapsed=192.5s


[rg 2540/2746] rows=37,476,862 speed=238,086/s elapsed=193.0s


[rg 2545/2746] rows=37,557,055 speed=206,822/s elapsed=193.4s


[rg 2550/2746] rows=37,585,612 speed=100,982/s elapsed=193.6s


[rg 2555/2746] rows=37,669,770 speed=210,342/s elapsed=194.0s


[rg 2560/2746] rows=37,724,414 speed=124,678/s elapsed=194.5s


[rg 2565/2746] rows=37,811,964 speed=156,624/s elapsed=195.0s


[rg 2570/2746] rows=37,899,855 speed=232,443/s elapsed=195.4s


[rg 2575/2746] rows=37,985,776 speed=249,859/s elapsed=195.8s


[rg 2580/2746] rows=38,060,078 speed=233,706/s elapsed=196.1s


[rg 2585/2746] rows=38,121,169 speed=237,355/s elapsed=196.3s


[rg 2590/2746] rows=38,187,632 speed=260,134/s elapsed=196.6s


[rg 2595/2746] rows=38,258,442 speed=256,256/s elapsed=196.9s


[rg 2600/2746] rows=38,326,204 speed=267,887/s elapsed=197.1s


[rg 2605/2746] rows=38,378,702 speed=196,085/s elapsed=197.4s


[rg 2610/2746] rows=38,422,866 speed=174,990/s elapsed=197.6s


[rg 2615/2746] rows=38,476,688 speed=193,684/s elapsed=197.9s


[rg 2620/2746] rows=38,565,282 speed=272,272/s elapsed=198.2s


[rg 2625/2746] rows=38,656,441 speed=234,698/s elapsed=198.6s


[rg 2630/2746] rows=38,725,647 speed=136,840/s elapsed=199.1s


[rg 2635/2746] rows=38,770,561 speed=110,878/s elapsed=199.5s


[rg 2640/2746] rows=38,821,575 speed=198,676/s elapsed=199.8s


[rg 2645/2746] rows=38,923,065 speed=236,261/s elapsed=200.2s


[rg 2650/2746] rows=38,990,759 speed=233,645/s elapsed=200.5s
[rg 2655/2746] rows=39,034,066 speed=203,903/s elapsed=200.7s


[rg 2660/2746] rows=39,073,806 speed=283,889/s elapsed=200.9s
[rg 2665/2746] rows=39,126,307 speed=236,205/s elapsed=201.1s


[rg 2670/2746] rows=39,196,276 speed=278,065/s elapsed=201.4s


[rg 2675/2746] rows=39,329,507 speed=266,569/s elapsed=201.9s


[rg 2680/2746] rows=39,416,558 speed=211,653/s elapsed=202.3s


[rg 2685/2746] rows=39,453,174 speed=152,159/s elapsed=202.5s


[rg 2690/2746] rows=39,535,667 speed=201,258/s elapsed=202.9s


[rg 2695/2746] rows=39,624,011 speed=185,772/s elapsed=203.4s


[rg 2700/2746] rows=39,719,243 speed=158,854/s elapsed=204.0s


[rg 2705/2746] rows=39,771,581 speed=154,003/s elapsed=204.3s


[rg 2710/2746] rows=39,847,655 speed=200,633/s elapsed=204.7s


[rg 2715/2746] rows=39,915,226 speed=224,012/s elapsed=205.0s


[rg 2720/2746] rows=40,004,236 speed=255,297/s elapsed=205.4s


[rg 2725/2746] rows=40,077,431 speed=191,779/s elapsed=205.7s


[rg 2730/2746] rows=40,129,020 speed=173,362/s elapsed=206.0s


[rg 2735/2746] rows=40,212,565 speed=114,508/s elapsed=206.8s


[rg 2740/2746] rows=40,307,561 speed=155,703/s elapsed=207.4s


[rg 2745/2746] rows=40,397,365 speed=247,983/s elapsed=207.7s
🏁 DONE rows=40,409,432 -> onefile=C:\datum-api-examples-main\OriON\signals\arbitrage\onefile.jsonl.gz summary=C:\datum-api-examples-main\OriON\signals\arbitrage\summary.csv best_params=C:\datum-api-examples-main\OriON\signals\arbitrage\best_params.jsonl.gz
