In [None]:
# Parameters
run_date = "2026-01-01"  # papermill replacement
import os
output_dir = os.environ.get("ORION_SIGNALS_DIR", "../signals")
config_path = os.environ.get("DATUM_API_CONFIG_PATH", "../ops/datum_api_config.json")
dry_run = False

# ensure output exists
os.makedirs(output_dir, exist_ok=True)


In [7]:
# Import basic modules
import pandas as pd
from datum_api_client import DatumApi
import datetime
from datetime import timedelta
from typing import Optional, List, Dict, Any


# Import warnings
import warnings
warnings.filterwarnings("ignore")
# pip install xlrd
# pip install openpyxl

In [8]:
import numpy as np
import pandas as pd
from typing import Dict, List, Tuple, Optional

def _floor_time_to_bucket(dt: pd.Series, minutes: int) -> pd.Series:
    if minutes <= 1:
        return dt.dt.floor("min")
    return dt.dt.floor(f"{minutes}min")

def _connected_components(nodes: List[str], edges: Dict[Tuple[str, str], float]) -> List[List[str]]:
    adj = {n: [] for n in nodes}
    for (a, b) in edges.keys():
        adj[a].append(b)
        adj[b].append(a)
    seen = set()
    comps = []
    for n in nodes:
        if n in seen:
            continue
        stack = [n]
        seen.add(n)
        comp = []
        while stack:
            cur = stack.pop()
            comp.append(cur)
            for nb in adj[cur]:
                if nb not in seen:
                    seen.add(nb)
                    stack.append(nb)
        comps.append(sorted(comp))
    return comps

def _count_divergence_reversion(z: np.ndarray, enter: float, exit: float) -> Tuple[int, int, List[float]]:
    """
    divergence event: first time |z| >= enter
    converged: later returns to |z| <= exit (within same day window)
    strength: max |z| during the event
    """
    z = z[np.isfinite(z)]
    if z.size < 30:
        return 0, 0, []
    events = 0
    succ = 0
    strengths = []
    i = 0
    n = z.size
    while i < n:
        if abs(z[i]) >= enter:
            events += 1
            max_abs = abs(z[i])
            j = i + 1
            reverted = False
            while j < n:
                max_abs = max(max_abs, abs(z[j]))
                if abs(z[j]) <= exit:
                    reverted = True
                    break
                j += 1
            strengths.append(float(max_abs))
            if reverted:
                succ += 1
                i = j
            else:
                i += 1
        else:
            i += 1
    return events, succ, strengths

def find_stack_clusters_and_pairs_full_stats(
    path: str,
    *,
    file_type: str = "parquet",     # "parquet" | "csv"
    dt_col: str = "dt",
    ticker_col: str = "ticker",
    stack_col: str = "Stack%",
    beta_col: str = "beta",
    sigma_col: str = "sigma",
    bench_col: str = "bench",

    # window:
    t_from: str = "04:00",
    t_to: str = "09:30",
    bucket_minutes: int = 5,

    # edge constraints (для "щільно ходять разом"):
    corr_threshold: float = 0.70,
    sign_agree_threshold: float = 0.60,
    beta_tol: float = 0.50,
    sigma_tol: float = 0.80,

    # sufficiency:
    min_points_per_ticker: int = 800,
    min_overlap_points_pair: int = 400,

    # divergence/reversion (within 04:00–09:30):
    z_enter: float = 2.0,
    z_exit: float = 0.5,
    min_div_events_pair: int = 3,        # відсіяти "noise" пари
    min_div_events_cluster: int = 10,    # відсіяти слабкі кластери

) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Returns:
      clusters_df: 1 row per cluster with aggregated arb stats
      pairs_df:    1 row per pair with full arb stats + corr/sign/meta
    Both are bench-separated and sorted by succ_to_fail_ratio desc.
    """

    # --- load minimal cols ---
    use_cols = [dt_col, ticker_col, stack_col, beta_col, sigma_col, bench_col]
    if file_type.lower() == "parquet":
        df = pd.read_parquet(path, columns=use_cols)
    else:
        df = pd.read_csv(path, usecols=use_cols)

    df = df.dropna(subset=[dt_col, ticker_col, stack_col, bench_col]).copy()
    df[dt_col] = pd.to_datetime(df[dt_col], errors="coerce")
    df = df.dropna(subset=[dt_col]).copy()

    df[stack_col] = pd.to_numeric(df[stack_col], errors="coerce")
    df[beta_col] = pd.to_numeric(df[beta_col], errors="coerce")
    df[sigma_col] = pd.to_numeric(df[sigma_col], errors="coerce")
    df = df.dropna(subset=[stack_col]).copy()

    # --- filter time window ---
    df["date"] = df[dt_col].dt.date
    df["time"] = df[dt_col].dt.time
    t_from_t = pd.to_datetime(t_from).time()
    t_to_t = pd.to_datetime(t_to).time()
    df = df[(df["time"] >= t_from_t) & (df["time"] <= t_to_t)].copy()
    if df.empty:
        return pd.DataFrame(), pd.DataFrame()

    # bucket to reduce noise
    df["dt_bucket"] = _floor_time_to_bucket(df[dt_col], bucket_minutes)

    all_pairs_rows = []
    all_clusters_rows = []

    # ===== bench loop =====
    for bench, df_b in df.groupby(bench_col):
        # per-ticker meta (robust)
        meta = (
            df_b.groupby(ticker_col)[[beta_col, sigma_col]]
            .median(numeric_only=True)
            .reset_index()
        )
        meta_map = meta.set_index(ticker_col).to_dict(orient="index")

        # pivot for this bench: (date, bucket) x ticker
        piv = (
            df_b.groupby(["date", "dt_bucket", ticker_col])[stack_col].mean()
            .reset_index()
            .pivot(index=["date", "dt_bucket"], columns=ticker_col, values=stack_col)
            .sort_index()
        )
        if piv.shape[1] < 2:
            continue

        # drop tickers with too few points
        counts = piv.notna().sum(axis=0)
        keep = counts[counts >= min_points_per_ticker].index.tolist()
        piv = piv[keep]
        if piv.shape[1] < 2:
            continue

        tickers = piv.columns.tolist()
        dpiv = piv.diff()
        dates = piv.index.get_level_values(0).unique().tolist()

        # ---- build edges and compute pair stats (incl arb) ----
        edges: Dict[Tuple[str, str], float] = {}
        pair_rows_bench = []

        for i, a in enumerate(tickers):
            xa_all = piv[a].to_numpy(dtype=float)
            dxa_all = dpiv[a].to_numpy(dtype=float)
            ma = meta_map.get(a, {})

            for j in range(i + 1, len(tickers)):
                b = tickers[j]
                mb = meta_map.get(b, {})

                beta_a, beta_b = ma.get(beta_col, np.nan), mb.get(beta_col, np.nan)
                sig_a, sig_b = ma.get(sigma_col, np.nan), mb.get(sigma_col, np.nan)

                # meta similarity filter
                if np.isfinite(beta_a) and np.isfinite(beta_b) and abs(beta_a - beta_b) > beta_tol:
                    continue
                if np.isfinite(sig_a) and np.isfinite(sig_b) and abs(sig_a - sig_b) > sigma_tol:
                    continue

                xb_all = piv[b].to_numpy(dtype=float)
                dxb_all = dpiv[b].to_numpy(dtype=float)

                # overlap check on levels
                m = np.isfinite(xa_all) & np.isfinite(xb_all)
                overlap = int(m.sum())
                if overlap < min_overlap_points_pair:
                    continue

                corr = np.corrcoef(xa_all[m], xb_all[m])[0, 1]
                if not np.isfinite(corr) or corr < corr_threshold:
                    continue

                # sign agreement on diffs
                md = np.isfinite(dxa_all) & np.isfinite(dxb_all)
                if int(md.sum()) < int(min_overlap_points_pair * 0.6):
                    continue
                agree = np.mean(((dxa_all[md] > 0) & (dxb_all[md] > 0)) | ((dxa_all[md] < 0) & (dxb_all[md] < 0)))
                if not np.isfinite(agree) or agree < sign_agree_threshold:
                    continue

                # passed "tight movement" filters -> edge in graph
                edges[(a, b)] = float(corr)

                # --- pair-level arb stats: sum per-day divergence/reversion in spread zscore ---
                total_events = 0
                total_succ = 0
                strengths_all: List[float] = []

                for d in dates:
                    day_slice = piv.loc[d]
                    xa = day_slice[a].to_numpy(dtype=float)
                    xb = day_slice[b].to_numpy(dtype=float)
                    mm = np.isfinite(xa) & np.isfinite(xb)
                    if int(mm.sum()) < 30:
                        continue

                    spread = xa[mm] - xb[mm]
                    mu = spread.mean()
                    sd = spread.std(ddof=0)
                    if sd <= 0 or not np.isfinite(sd):
                        continue
                    z = (spread - mu) / sd

                    ev, su, strengths = _count_divergence_reversion(z, z_enter, z_exit)
                    total_events += ev
                    total_succ += su
                    strengths_all.extend(strengths)

                fails = total_events - total_succ
                succ_rate = (total_succ / total_events) if total_events > 0 else np.nan
                ratio = (total_succ / fails) if fails > 0 else (np.inf if total_succ > 0 else np.nan)

                if total_events < min_div_events_pair:
                    # можна не відсікати, але так чистіше для репорту
                    continue

                strength_avg = float(np.mean(strengths_all)) if strengths_all else np.nan
                strength_med = float(np.median(strengths_all)) if strengths_all else np.nan
                strength_p90 = float(np.quantile(strengths_all, 0.90)) if strengths_all else np.nan

                # a simple pair score: prioritize reversion, then tightness
                # (ти можеш підкрутити ваги)
                score = (
                    0.55 * (succ_rate if np.isfinite(succ_rate) else 0.0)
                    + 0.25 * float(corr)
                    + 0.20 * float(agree)
                )
                # reward bigger divergence (more edge) but softly
                if np.isfinite(strength_avg):
                    score += 0.03 * min(5.0, strength_avg)

                pair_rows_bench.append({
                    "bench": bench,
                    "a": a, "b": b,
                    "corr": float(corr),
                    "sign_agree": float(agree),
                    "overlap_points": overlap,
                    "beta_a": float(beta_a) if np.isfinite(beta_a) else np.nan,
                    "beta_b": float(beta_b) if np.isfinite(beta_b) else np.nan,
                    "sigma_a": float(sig_a) if np.isfinite(sig_a) else np.nan,
                    "sigma_b": float(sig_b) if np.isfinite(sig_b) else np.nan,
                    "beta_diff": abs(beta_a - beta_b) if np.isfinite(beta_a) and np.isfinite(beta_b) else np.nan,
                    "sigma_diff": abs(sig_a - sig_b) if np.isfinite(sig_a) and np.isfinite(sig_b) else np.nan,

                    "pair_div_events": int(total_events),
                    "pair_div_converged": int(total_succ),
                    "pair_div_not_converged": int(fails),
                    "pair_success_rate": float(succ_rate) if np.isfinite(succ_rate) else np.nan,
                    "pair_succ_to_fail_ratio": float(ratio) if np.isfinite(ratio) else np.nan,
                    "pair_strength_avg_max_abs_z": strength_avg,
                    "pair_strength_median_max_abs_z": strength_med,
                    "pair_strength_p90_max_abs_z": strength_p90,

                    "pair_score": float(score),
                })

        if not pair_rows_bench:
            continue

        pairs_df_b = pd.DataFrame(pair_rows_bench)
        all_pairs_rows.append(pairs_df_b)

        # ---- clusters from edges ----
        if not edges:
            continue

        comps = _connected_components(tickers, edges)
        comps = [c for c in comps if len(c) >= 2]
        if not comps:
            continue

        # map pair -> cluster id (within bench) by membership
        for cid, members in enumerate(comps, start=1):
            mset = set(members)
            sub_pairs = pairs_df_b[pairs_df_b["a"].isin(mset) & pairs_df_b["b"].isin(mset)].copy()
            if sub_pairs.empty:
                continue

            # cluster-level divergence stats: sum over pairs (not unique events; that's ok for ranking)
            div_events = int(sub_pairs["pair_div_events"].sum())
            div_conv = int(sub_pairs["pair_div_converged"].sum())
            div_fail = int(sub_pairs["pair_div_not_converged"].sum())

            if div_events < min_div_events_cluster:
                continue

            succ_rate = (div_conv / div_events) if div_events > 0 else np.nan
            ratio = (div_conv / div_fail) if div_fail > 0 else (np.inf if div_conv > 0 else np.nan)

            betas = [meta_map.get(t, {}).get(beta_col, np.nan) for t in members]
            sigmas = [meta_map.get(t, {}).get(sigma_col, np.nan) for t in members]

            all_clusters_rows.append({
                "bench": bench,
                "cluster_id_in_bench": cid,
                "n_members": len(members),
                "members": ",".join(members),

                "pairs_count": int(sub_pairs.shape[0]),
                "mean_pair_corr": float(sub_pairs["corr"].mean()),
                "mean_pair_sign_agree": float(sub_pairs["sign_agree"].mean()),
                "mean_pair_score": float(sub_pairs["pair_score"].mean()),

                "beta_mean": float(np.nanmean(betas)),
                "beta_std": float(np.nanstd(betas)),
                "sigma_mean": float(np.nanmean(sigmas)),
                "sigma_std": float(np.nanstd(sigmas)),

                "div_events": div_events,
                "div_converged": div_conv,
                "div_not_converged": div_fail,
                "div_success_rate": float(succ_rate) if np.isfinite(succ_rate) else np.nan,
                "succ_to_fail_ratio": float(ratio) if np.isfinite(ratio) else np.nan,

                "div_strength_avg_max_abs_z": float(sub_pairs["pair_strength_avg_max_abs_z"].mean()),
                "div_strength_median_max_abs_z": float(sub_pairs["pair_strength_median_max_abs_z"].median()),
                "div_strength_p90_max_abs_z": float(sub_pairs["pair_strength_p90_max_abs_z"].mean()),
            })

    pairs_df = pd.concat(all_pairs_rows, ignore_index=True) if all_pairs_rows else pd.DataFrame()
    clusters_df = pd.DataFrame(all_clusters_rows)

    # ---- sorting ----
    if not pairs_df.empty:
        pairs_df = pairs_df.sort_values(
            ["bench", "pair_succ_to_fail_ratio", "pair_success_rate", "pair_score", "corr"],
            ascending=[True, False, False, False, False],
        ).reset_index(drop=True)

    if not clusters_df.empty:
        clusters_df = clusters_df.sort_values(
            ["bench", "succ_to_fail_ratio", "div_converged", "mean_pair_score", "mean_pair_corr"],
            ascending=[True, False, False, False, False],
        ).reset_index(drop=True)

    return clusters_df, pairs_df


In [None]:
# import os

# def save_cluster_pair_reports(clusters_df: pd.DataFrame, pairs_df: pd.DataFrame, output_dir: str, *, top_n: int = 200):
#     os.makedirs(output_dir, exist_ok=True)

#     clusters_path = os.path.join(output_dir, "clusters_top.csv")
#     pairs_full_path = os.path.join(output_dir, "pairs_full.csv")
#     pairs_top_path = os.path.join(output_dir, "pairs_top.csv")

#     if clusters_df is not None and not clusters_df.empty:
#         clusters_df.to_csv(clusters_path, index=False)
#     else:
#         pd.DataFrame().to_csv(clusters_path, index=False)

#     if pairs_df is not None and not pairs_df.empty:
#         pairs_df.to_csv(pairs_full_path, index=False)

#         top = pairs_df.head(top_n).copy()
#         top.to_csv(pairs_top_path, index=False)
#     else:
#         pd.DataFrame().to_csv(pairs_full_path, index=False)
#         pd.DataFrame().to_csv(pairs_top_path, index=False)

#     print("Saved:")
#     print(" ", clusters_path)
#     print(" ", pairs_full_path)
#     print(" ", pairs_top_path)


In [None]:
# clusters_df, pairs_df = find_stack_clusters_and_pairs_full_stats(
#     path="ARBITRAGE/final_filtered.parquet",
#     file_type="parquet",

#     t_from="04:00",
#     t_to="09:30",
#     bucket_minutes=5,

#     corr_threshold=0.70,
#     sign_agree_threshold=0.60,
#     beta_tol=0.50,
#     sigma_tol=0.80,

#     min_points_per_ticker=800,
#     min_overlap_points_pair=400,

#     z_enter=2.0,
#     z_exit=0.5,
#     min_div_events_pair=3,
#     min_div_events_cluster=10,
# )

# print("clusters:", len(clusters_df))
# print("pairs:", len(pairs_df))

# # топ кластерів
# print(clusters_df.head(20))

# # топ пар (вже відсортовано всередині bench по ratio/success/score)
# print(pairs_df.head(30))

# save_cluster_pair_reports(
#     clusters_df,
#     pairs_df,
#     output_dir="ARBITRAGE/open_analysis_fast/cluster_pairs_report",
#     top_n=300,
# )


clusters: 36
pairs: 196
   bench  cluster_id_in_bench  n_members  \
0   BITO                    1          2   
1    GDX                    1         11   
2    IWM                    8          2   
3    IWM                    7          2   
4    IWM                   12          2   
5    IWM                    2          9   
6    IWM                    1          2   
7    IWM                    5          8   
8    IWM                   11          9   
9    IWM                    6          5   
10   IWM                   10          7   
11   IWM                    9          2   
12   QQQ                    1          7   
13   QQQ                    2          2   
14   QQQ                   12          3   
15   QQQ                   10          2   
16   QQQ                    8          5   
17   QQQ                   11          3   
18   QQQ                    5         11   
19   QQQ                    6          2   

                                              membe