In [1]:
from __future__ import annotations  # no installation needed

from dataclasses import dataclass  # no installation needed
from pathlib import Path  # no installation needed
import sys  # no installation needed

import pandas as pd  # already in env — no new install

# repo wiring (assumes notebook launched from repo root)
REPO_ROOT = Path.cwd()  # no installation needed
SRC = REPO_ROOT / "src"  # no installation needed
if str(SRC) not in sys.path:  # no installation needed
    sys.path.insert(0, str(SRC))  # no installation needed

from sydata.datasets.master_join_aggtrades import (  # no installation needed
    MasterAggJoinCfg,
    run_monthly_join,
    month_part_path,
    ensure_ts_utc,
    iter_year_months,
    resolve_symbols,
)


In [2]:
DATA_ROOT = Path(r"C:\Users\quantbase\Desktop\marketdata")  # no installation needed

CFG = MasterAggJoinCfg(
    data_root=DATA_ROOT,
    master_root=DATA_ROOT / "norm" / "master",
    agg_root=DATA_ROOT / "norm" / "spot_aggtrades_resampled",
    out_root=DATA_ROOT / "norm" / "master_plus_aggtrades",
    manifest_path=DATA_ROOT / "meta" / "symbols.yml",
    basket="core_major",
    interval="15m",          # change freely (e.g., "1h", "5m", "15m")
    start="2025-01-01",      # inclusive
    end_excl="2026-01-01",   # exclusive
    symbols_override=None,
)

print(CFG)
print("manifest exists:", CFG.manifest_path.exists())
print("master_root exists:", CFG.master_root.exists())
print("agg_root exists:", CFG.agg_root.exists())


MasterAggJoinCfg(data_root=WindowsPath('C:/Users/quantbase/Desktop/marketdata'), master_root=WindowsPath('C:/Users/quantbase/Desktop/marketdata/norm/master'), agg_root=WindowsPath('C:/Users/quantbase/Desktop/marketdata/norm/spot_aggtrades_resampled'), out_root=WindowsPath('C:/Users/quantbase/Desktop/marketdata/norm/master_plus_aggtrades'), manifest_path=WindowsPath('C:/Users/quantbase/Desktop/marketdata/meta/symbols.yml'), basket='core_major', interval='15m', start='2025-01-01', end_excl='2026-01-01', symbols_override=None)
manifest exists: True
master_root exists: True
agg_root exists: True


In [3]:
START_UTC = pd.Timestamp(CFG.start, tz="UTC")  # already in env — no new install
END_EXCL_UTC = pd.Timestamp(CFG.end_excl, tz="UTC")  # already in env — no new install

SYMBOLS = resolve_symbols(CFG)
YEAR_MONTHS = iter_year_months(START_UTC, END_EXCL_UTC)

print("START_UTC:", START_UTC)
print("END_EXCL_UTC:", END_EXCL_UTC)
print("symbols:", SYMBOLS)
print("year_months:", YEAR_MONTHS[:5], "...", YEAR_MONTHS[-1])
print("out_root:", CFG.out_root)


START_UTC: 2025-01-01 00:00:00+00:00
END_EXCL_UTC: 2026-01-01 00:00:00+00:00
symbols: ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'BNB-USDT', 'XRP-USDT', 'ADA-USDT', 'LINK-USDT']
year_months: [(2025, 1), (2025, 2), (2025, 3), (2025, 4), (2025, 5)] ... (2025, 12)
out_root: C:\Users\quantbase\Desktop\marketdata\norm\master_plus_aggtrades


In [4]:
# single symbol test


test_symbol = SYMBOLS[0]
test_year, test_month = YEAR_MONTHS[0]

master_path = month_part_path(CFG.master_root, CFG.interval, test_year, test_month, test_symbol)
agg_path = month_part_path(CFG.agg_root, CFG.interval, test_year, test_month, test_symbol)

print("master_path:", master_path, "exists:", master_path.exists())
print("agg_path:", agg_path, "exists:", agg_path.exists())

md = pd.read_parquet(master_path)
md = ensure_ts_utc(md, "ts")

# hard guard: symbol must be correct for this slice
if "symbol" not in md.columns:
    md["symbol"] = test_symbol
else:
    u = md["symbol"].dropna().unique()
    assert len(u) == 1 and u[0] == test_symbol, (test_symbol, u[:10])

if agg_path.exists():
    ad = pd.read_parquet(agg_path)
    ad = ensure_ts_utc(ad, "ts")
    if "symbol" not in ad.columns:
        ad["symbol"] = test_symbol
    else:
        u = ad["symbol"].dropna().unique()
        assert len(u) == 1 and u[0] == test_symbol, (test_symbol, u[:10])

    # join through the library runner (same logic as batch)
    from sydata.datasets.master_join_aggtrades import join_master_with_aggtrades  # no installation needed
    jd = join_master_with_aggtrades(md, ad)
else:
    jd = md.copy()
    for c in ["agg_sum_qty","agg_trades","agg_cvd_qty","agg_vwap","agg_last_trade_id"]:
        if c not in jd.columns:
            jd[c] = pd.NA

print(jd.head(3))
print("rows:", len(jd))
print("dup(ts,symbol):", int(jd.duplicated(["ts","symbol"]).sum()))


master_path: C:\Users\quantbase\Desktop\marketdata\norm\master\interval=15m\year=2025\month=01\symbol=BTC-USDT\part-2025-01.parquet exists: True
agg_path: C:\Users\quantbase\Desktop\marketdata\norm\spot_aggtrades_resampled\interval=15m\year=2025\month=01\symbol=BTC-USDT\part-2025-01.parquet exists: True
                         ts      open_time  spot_close    mark_close  \
0 2025-01-01 00:00:00+00:00  1735689600000    93656.18  93637.200000   
1 2025-01-01 00:15:00+00:00  1735690500000    93761.90  93743.400000   
2 2025-01-01 00:30:00+00:00  1735691400000    93885.01  93864.328404   

    index_close  premium_close  basis_mark_vs_spot  basis_index_vs_spot  \
0  93650.139149      -0.000025           -0.000203            -0.000065   
1  93760.971489      -0.000197           -0.000197            -0.000010   
2  93885.908085      -0.000272           -0.000220             0.000010   

   funding_rate  funding_interval_hours     volume  quote_volume  trades  \
0        0.0001              

In [5]:
# batch join

results = run_monthly_join(CFG)

summary = pd.DataFrame.from_dict(results, orient="index")
summary.index = pd.MultiIndex.from_tuples(summary.index, names=["symbol","year","month"])
summary = summary.sort_index()

bad = summary[summary["ok"] != True]
print("bad partitions:", len(bad))
display(bad.head(20))

display(summary.head(10))


bad partitions: 0


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,rows,ts_unique,agg_sum_qty_na_frac,agg_trades_na_frac,agg_cvd_qty_na_frac,agg_vwap_na_frac,agg_last_trade_id_na_frac,ok,out,master,agg,min_ts,max_ts
symbol,year,month,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1


Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,rows,ts_unique,agg_sum_qty_na_frac,agg_trades_na_frac,agg_cvd_qty_na_frac,agg_vwap_na_frac,agg_last_trade_id_na_frac,ok,out,master,agg,min_ts,max_ts
symbol,year,month,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1
ADA-USDT,2025,1,2976,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-01-01 00:00:00+00:00,2025-01-31 23:45:00+00:00
ADA-USDT,2025,2,2688,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-02-01 00:00:00+00:00,2025-02-28 23:45:00+00:00
ADA-USDT,2025,3,2976,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-03-01 00:00:00+00:00,2025-03-31 23:45:00+00:00
ADA-USDT,2025,4,2880,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-04-01 00:00:00+00:00,2025-04-30 23:45:00+00:00
ADA-USDT,2025,5,2976,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-05-01 00:00:00+00:00,2025-05-31 23:45:00+00:00
ADA-USDT,2025,6,2880,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-06-01 00:00:00+00:00,2025-06-30 23:45:00+00:00
ADA-USDT,2025,7,2976,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-07-01 00:00:00+00:00,2025-07-31 23:45:00+00:00
ADA-USDT,2025,8,2976,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-08-01 00:00:00+00:00,2025-08-31 23:45:00+00:00
ADA-USDT,2025,9,2880,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-09-01 00:00:00+00:00,2025-09-30 23:45:00+00:00
ADA-USDT,2025,10,2976,True,0.0,0.0,0.0,0.0,0.0,True,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\mas...,C:\Users\quantbase\Desktop\marketdata\norm\spo...,2025-10-01 00:00:00+00:00,2025-10-31 23:45:00+00:00


In [6]:
# join to a single long dataframe

def collect_joined_paths(cfg: MasterAggJoinCfg) -> list[Path]:
    start_utc = pd.Timestamp(cfg.start, tz="UTC")
    end_excl_utc = pd.Timestamp(cfg.end_excl, tz="UTC")
    symbols = resolve_symbols(cfg)
    ym = iter_year_months(start_utc, end_excl_utc)

    paths: list[Path] = []
    for sym in symbols:
        for (y, m) in ym:
            p = month_part_path(cfg.out_root, cfg.interval, y, m, sym)
            if p.exists():
                paths.append(p)
    return paths

joined_paths = collect_joined_paths(CFG)
print("joined_parts:", len(joined_paths))
print("example:", joined_paths[0])

dfs = []
for p in joined_paths:
    df = pd.read_parquet(p)
    df = ensure_ts_utc(df, "ts")

    # hard guard: each file must be single-symbol
    assert "symbol" in df.columns
    u = df["symbol"].dropna().unique()
    assert len(u) == 1, (p, u[:10])

    dfs.append(df)

master_long_plus_agg = pd.concat(dfs, ignore_index=True)
master_long_plus_agg = master_long_plus_agg.sort_values(["symbol","ts"]).reset_index(drop=True)

print(master_long_plus_agg.shape)
print("dup(ts,symbol):", int(master_long_plus_agg.duplicated(["ts","symbol"]).sum()))
display(master_long_plus_agg.head(5))


joined_parts: 84
example: C:\Users\quantbase\Desktop\marketdata\norm\master_plus_aggtrades\interval=15m\year=2025\month=01\symbol=BTC-USDT\part-2025-01.parquet
(245280, 19)
dup(ts,symbol): 0


Unnamed: 0,ts,open_time,spot_close,mark_close,index_close,premium_close,basis_mark_vs_spot,basis_index_vs_spot,funding_rate,funding_interval_hours,volume,quote_volume,trades,symbol,agg_sum_qty,agg_trades,agg_cvd_qty,agg_vwap,agg_last_trade_id
0,2025-01-01 00:00:00+00:00,1735689600000,0.8512,0.8509,0.850975,0.0,-0.000352,-0.000264,0.0001,8,839360.2,712349.52783,2397,ADA-USDT,839360.2,890,102727.2,0.848682,360606452
1,2025-01-01 00:15:00+00:00,1735690500000,0.8521,0.8519,0.852083,-1.7e-05,-0.000235,-2e-05,0.0001,8,496512.6,422710.98123,1745,ADA-USDT,496512.6,680,35633.8,0.85136,360607132
2,2025-01-01 00:30:00+00:00,1735691400000,0.8541,0.853723,0.853985,-8.4e-05,-0.000441,-0.000135,0.0001,8,1096408.0,936787.72716,2402,ADA-USDT,1096408.0,843,400983.8,0.854415,360607975
3,2025-01-01 00:45:00+00:00,1735692300000,0.8596,0.859561,0.859551,0.0,-4.6e-05,-5.7e-05,0.0001,8,1001700.4,858835.26673,2452,ADA-USDT,1001700.4,866,322042.0,0.857377,360608841
4,2025-01-01 01:00:00+00:00,1735693200000,0.8563,0.856132,0.856203,-7.1e-05,-0.000196,-0.000114,0.0001,8,1005577.8,861594.6356,2061,ADA-USDT,1005577.8,776,-279054.8,0.856815,360609617


In [7]:
wide_cols = [
    "spot_close","mark_close","index_close","premium_close",
    "basis_mark_vs_spot","basis_index_vs_spot",
    "funding_rate","funding_interval_hours",
    "volume","quote_volume","trades",
    "agg_sum_qty","agg_trades","agg_cvd_qty","agg_vwap","agg_last_trade_id",
]
wide_cols = [c for c in wide_cols if c in master_long_plus_agg.columns]

x = master_long_plus_agg.set_index(["ts","symbol"])[wide_cols]
wide = x.unstack("symbol")
wide.columns = [f"{feat}__{sym}" for (feat, sym) in wide.columns]
wide = wide.sort_index()

print(wide.shape)
display(wide.head(3))


(35040, 112)


Unnamed: 0_level_0,spot_close__ADA-USDT,spot_close__BNB-USDT,spot_close__BTC-USDT,spot_close__ETH-USDT,spot_close__LINK-USDT,spot_close__SOL-USDT,spot_close__XRP-USDT,mark_close__ADA-USDT,mark_close__BNB-USDT,mark_close__BTC-USDT,...,agg_vwap__LINK-USDT,agg_vwap__SOL-USDT,agg_vwap__XRP-USDT,agg_last_trade_id__ADA-USDT,agg_last_trade_id__BNB-USDT,agg_last_trade_id__BTC-USDT,agg_last_trade_id__ETH-USDT,agg_last_trade_id__LINK-USDT,agg_last_trade_id__SOL-USDT,agg_last_trade_id__XRP-USDT
ts,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2025-01-01 00:00:00+00:00,0.8512,704.01,93656.18,3348.5,20.09,190.51,2.0967,0.8509,703.874597,93637.2,...,20.050102,189.939904,2.088335,360606452,625311307,3358812432,1376417740,256077578,466117504,518542856
2025-01-01 00:15:00+00:00,0.8521,704.63,93761.9,3349.43,20.06,190.72,2.0979,0.8519,704.41,93743.4,...,20.031426,190.782304,2.098782,360607132,625312764,3358819367,1376421943,256078131,466120412,518545603
2025-01-01 00:30:00+00:00,0.8541,706.21,93885.01,3356.09,20.11,191.28,2.0996,0.853723,706.08,93864.328404,...,20.086682,191.275699,2.104032,360607975,625314665,3358826369,1376427051,256078715,466122878,518548770


In [8]:
# save long and wide


def _slug(d: str) -> str:
    # "2025-01-01" -> "20250101"
    return d.replace("-", "")

OUT_LONG = (
    CFG.data_root / "norm" / "master_long_plus_aggtrades"
    / f"interval={CFG.interval}"
    / f"part-{_slug(CFG.start)}-{_slug(CFG.end_excl)}.parquet"
)
OUT_WIDE = (
    CFG.data_root / "norm" / "master_wide_plus_aggtrades"
    / f"interval={CFG.interval}"
    / f"part-{_slug(CFG.start)}-{_slug(CFG.end_excl)}.parquet"
)

OUT_LONG.parent.mkdir(parents=True, exist_ok=True)
OUT_WIDE.parent.mkdir(parents=True, exist_ok=True)

master_long_plus_agg.to_parquet(OUT_LONG, index=False)
wide.to_parquet(OUT_WIDE, index=True)

print("saved long:", OUT_LONG)
print("saved wide:", OUT_WIDE)


saved long: C:\Users\quantbase\Desktop\marketdata\norm\master_long_plus_aggtrades\interval=15m\part-20250101-20260101.parquet
saved wide: C:\Users\quantbase\Desktop\marketdata\norm\master_wide_plus_aggtrades\interval=15m\part-20250101-20260101.parquet


In [9]:
df_long = pd.read_parquet(OUT_LONG)

In [13]:
df_long[200000:200010]

Unnamed: 0,ts,open_time,spot_close,mark_close,index_close,premium_close,basis_mark_vs_spot,basis_index_vs_spot,funding_rate,funding_interval_hours,volume,quote_volume,trades,symbol,agg_sum_qty,agg_trades,agg_cvd_qty,agg_vwap,agg_last_trade_id
200000,2025-09-16 08:00:00+00:00,1758009600000,236.76,236.662746,236.752895,-0.000506,-0.000411,-3e-05,7.6e-05,8,60719.312,14364440.0,18244,SOL-USDT,60719.312,3670,8422.228,236.571126,582012885
200001,2025-09-16 08:15:00+00:00,1758010500000,235.88,235.8,235.907105,-0.000486,-0.000339,0.000115,7.6e-05,8,28763.559,6795553.0,17837,SOL-USDT,28763.559,3565,3100.163,236.255651,582016450
200002,2025-09-16 08:30:00+00:00,1758011400000,235.27,235.14,235.262368,-0.00054,-0.000553,-3.2e-05,7.6e-05,8,26081.471,6140818.0,17868,SOL-USDT,26081.471,4834,-2388.307,235.447534,582021284
200003,2025-09-16 08:45:00+00:00,1758012300000,236.1,235.98,236.106053,-0.000556,-0.000508,2.6e-05,7.6e-05,8,18450.393,4352531.0,14022,SOL-USDT,18450.393,2333,6624.877,235.904509,582023617
200004,2025-09-16 09:00:00+00:00,1758013200000,235.77,235.632123,235.770789,-0.000597,-0.000585,3e-06,7.6e-05,8,16365.182,3862479.0,10450,SOL-USDT,16365.182,2094,478.33,236.018093,582025711
200005,2025-09-16 09:15:00+00:00,1758014100000,235.72,235.58322,235.712368,-0.0005,-0.00058,-3.2e-05,7.6e-05,8,15904.292,3751755.0,12756,SOL-USDT,15904.292,2219,-2473.112,235.89574,582027930
200006,2025-09-16 09:30:00+00:00,1758015000000,235.06,234.96,235.071053,-0.000512,-0.000425,4.7e-05,7.6e-05,8,20413.138,4807028.0,16014,SOL-USDT,20413.138,2385,-5159.156,235.486963,582030315
200007,2025-09-16 09:45:00+00:00,1758015900000,235.31,235.180917,235.307105,-0.000537,-0.000549,-1.2e-05,7.6e-05,8,18854.794,4433148.0,11235,SOL-USDT,18854.794,2165,644.916,235.120448,582032480
200008,2025-09-16 10:00:00+00:00,1758016800000,235.11,235.001136,235.118684,-0.000579,-0.000463,3.7e-05,7.6e-05,8,22378.652,5261464.0,15387,SOL-USDT,22378.652,2505,316.286,235.110843,582034985
200009,2025-09-16 10:15:00+00:00,1758017700000,235.57,235.45,235.564474,-0.000447,-0.000509,-2.3e-05,7.6e-05,8,9537.993,2244444.0,15013,SOL-USDT,9537.993,2129,459.005,235.316198,582037114


In [14]:
# QA Checks

INTERVAL = "15m"              # change freely (e.g., "1h")
SYMBOL = "BTC-USDT"           # change freely
DAY_UTC = "2025-01-01"        # any UTC date inside your dataset

RAW_ROOT = DATA_ROOT / "raw" / "binance" / "spot_aggtrades"
RESAMPLED_ROOT = DATA_ROOT / "norm" / "spot_aggtrades_resampled"
MASTER_ROOT = DATA_ROOT / "norm" / "master"

def interval_to_floor_str(interval: str) -> str:
    s = interval.strip().lower()
    if s.endswith("m"):
        return f"{int(s[:-1])}min"
    if s.endswith("h"):
        return f"{int(s[:-1])}H"
    if s.endswith("d"):
        return f"{int(s[:-1])}D"
    raise ValueError(f"Unsupported interval: {interval!r}")

FLOOR_STR = interval_to_floor_str(INTERVAL)

start_utc = pd.Timestamp(DAY_UTC, tz="UTC")
end_excl_utc = start_utc + pd.Timedelta(days=1)

YEAR = start_utc.year
MONTH = start_utc.month

print("FLOOR_STR:", FLOOR_STR)
print("Window:", start_utc, "→", end_excl_utc, "UTC")
print("YEAR/MONTH:", YEAR, MONTH)


FLOOR_STR: 15min
Window: 2025-01-01 00:00:00+00:00 → 2025-01-02 00:00:00+00:00 UTC
YEAR/MONTH: 2025 1


In [15]:
def month_dir(root: Path, *, interval: str, year: int, month: int, symbol: str) -> Path:
    return root / f"interval={interval}" / f"year={year}" / f"month={month:02d}" / f"symbol={symbol}"

def month_part_path(root: Path, *, interval: str, year: int, month: int, symbol: str) -> Path:
    return month_dir(root, interval=interval, year=year, month=month, symbol=symbol) / f"part-{year}-{month:02d}.parquet"

raw_month_dir = RAW_ROOT / f"symbol={SYMBOL}" / f"year={YEAR}" / f"month={MONTH:02d}"
raw_files = sorted(raw_month_dir.glob("part-*.parquet"))

resampled_path = month_part_path(RESAMPLED_ROOT, interval=INTERVAL, year=YEAR, month=MONTH, symbol=SYMBOL)
master_path = month_part_path(MASTER_ROOT, interval=INTERVAL, year=YEAR, month=MONTH, symbol=SYMBOL)

print("raw_month_dir:", raw_month_dir, "files:", len(raw_files))
print("resampled_path exists:", resampled_path.exists(), resampled_path)
print("master_path exists:", master_path.exists(), master_path)


raw_month_dir: C:\Users\quantbase\Desktop\marketdata\raw\binance\spot_aggtrades\symbol=BTC-USDT\year=2025\month=01 files: 1
resampled_path exists: True C:\Users\quantbase\Desktop\marketdata\norm\spot_aggtrades_resampled\interval=15m\year=2025\month=01\symbol=BTC-USDT\part-2025-01.parquet
master_path exists: True C:\Users\quantbase\Desktop\marketdata\norm\master\interval=15m\year=2025\month=01\symbol=BTC-USDT\part-2025-01.parquet


In [16]:
# load resampled month

need_cols_res = ["ts", "sum_qty", "trades", "cvd_qty", "vwap", "last_trade_id", "symbol"]
res = pd.read_parquet(resampled_path, columns=need_cols_res)

# ensure ts is UTC
if not pd.api.types.is_datetime64_any_dtype(res["ts"]):
    res["ts"] = pd.to_datetime(res["ts"], utc=True)
else:
    try:
        res["ts"] = res["ts"].dt.tz_convert("UTC")
    except Exception:
        res["ts"] = res["ts"].dt.tz_localize("UTC")

res_day = res[(res["ts"] >= start_utc) & (res["ts"] < end_excl_utc)].copy()
res_day = res_day.sort_values("ts").reset_index(drop=True)

print("res_day rows:", len(res_day))
res_day.head(3)


res_day rows: 96


Unnamed: 0,ts,sum_qty,trades,cvd_qty,vwap,last_trade_id,symbol
0,2025-01-01 00:00:00+00:00,175.85673,8259,-49.44345,93609.121473,3358812432,BTC-USDT
1,2025-01-01 00:15:00+00:00,95.41749,6935,9.19953,93784.423454,3358819367,BTC-USDT
2,2025-01-01 00:30:00+00:00,94.36416,7002,28.80936,93889.895651,3358826369,BTC-USDT


In [17]:
# compute raw -> bars

USECOLS_RAW = ["ts", "agg_trade_id", "price", "qty", "is_buyer_maker", "symbol"]

def _ensure_ts_utc(s: pd.Series) -> pd.Series:
    if not pd.api.types.is_datetime64_any_dtype(s):
        return pd.to_datetime(s, utc=True)
    # datetime64 with/without tz
    try:
        return s.dt.tz_convert("UTC")
    except Exception:
        return s.dt.tz_localize("UTC")

def raw_to_bars_incremental(
    files: list[Path],
    *,
    floor_str: str,
    start_utc: pd.Timestamp,
    end_excl_utc: pd.Timestamp,
) -> pd.DataFrame:
    agg = None

    for fp in files:
        df = pd.read_parquet(fp, columns=USECOLS_RAW)
        if df.empty:
            continue

        df["ts"] = _ensure_ts_utc(df["ts"])
        df = df[(df["ts"] >= start_utc) & (df["ts"] < end_excl_utc)]
        if df.empty:
            continue

        df["bar_ts"] = df["ts"].dt.floor(floor_str)

        # CVD sign convention:
        # is_buyer_maker=True  -> taker sell -> negative
        # is_buyer_maker=False -> taker buy  -> positive
        df["signed_qty"] = df["qty"].where(~df["is_buyer_maker"], -df["qty"])
        df["notional"] = df["price"] * df["qty"]

        g = (
            df.groupby("bar_ts", sort=False)
              .agg(
                  sum_qty=("qty", "sum"),
                  trades=("agg_trade_id", "size"),
                  cvd_qty=("signed_qty", "sum"),
                  sum_notional=("notional", "sum"),
                  last_trade_id=("agg_trade_id", "max"),
              )
              .reset_index()
              .rename(columns={"bar_ts": "ts"})
        )
        g["vwap"] = g["sum_notional"] / g["sum_qty"]
        g = g.drop(columns=["sum_notional"])

        if agg is None:
            agg = g
        else:
            # align on ts; additive cols sum; last_trade_id max; vwap qty-weighted
            agg = agg.set_index("ts")
            g = g.set_index("ts")

            for c in ["sum_qty", "trades", "cvd_qty"]:
                agg[c] = agg[c].add(g[c], fill_value=0)

            notional = (agg["vwap"] * agg["sum_qty"]).add(g["vwap"] * g["sum_qty"], fill_value=0)
            agg["vwap"] = notional / agg["sum_qty"]

            agg["last_trade_id"] = agg["last_trade_id"].combine(g["last_trade_id"], max)
            agg = agg.reset_index()

    if agg is None:
        return pd.DataFrame(columns=["ts","sum_qty","trades","cvd_qty","vwap","last_trade_id"])

    return agg.sort_values("ts").reset_index(drop=True)

raw_bars = raw_to_bars_incremental(
    raw_files,
    floor_str=FLOOR_STR,
    start_utc=start_utc,
    end_excl_utc=end_excl_utc,
)

raw_bars["symbol"] = SYMBOL
print("raw_bars rows:", len(raw_bars))
raw_bars.head(3)


raw_bars rows: 96


Unnamed: 0,ts,sum_qty,trades,cvd_qty,last_trade_id,vwap,symbol
0,2025-01-01 00:00:00+00:00,175.85673,8259,-49.44345,3358812432,93609.121473,BTC-USDT
1,2025-01-01 00:15:00+00:00,95.41749,6935,9.19953,3358819367,93784.423454,BTC-USDT
2,2025-01-01 00:30:00+00:00,94.36416,7002,28.80936,3358826369,93889.895651,BTC-USDT


In [18]:
# recon - raw vs resampled

# expected grid for the day
grid = pd.date_range(start_utc, end_excl_utc, freq=FLOOR_STR, inclusive="left")

res_i = res_day.set_index("ts").reindex(grid)
raw_i = raw_bars.set_index("ts").reindex(grid)

# basic coverage checks
missing_in_res = int(res_i["sum_qty"].isna().sum()) if "sum_qty" in res_i else None
missing_in_raw = int(raw_i["sum_qty"].isna().sum()) if "sum_qty" in raw_i else None

print("grid bars:", len(grid))
print("missing_in_res:", missing_in_res)
print("missing_in_raw:", missing_in_raw)

# reconcile numeric columns
cmp = pd.DataFrame(index=grid)
for c in ["sum_qty","trades","cvd_qty","vwap","last_trade_id"]:
    cmp[f"{c}_raw"] = raw_i[c]
    cmp[f"{c}_res"] = res_i[c]

# diffs
cmp["sum_qty_diff"] = cmp["sum_qty_raw"] - cmp["sum_qty_res"]
cmp["trades_diff"]  = cmp["trades_raw"] - cmp["trades_res"]
cmp["cvd_qty_diff"] = cmp["cvd_qty_raw"] - cmp["cvd_qty_res"]
cmp["vwap_diff"]    = cmp["vwap_raw"] - cmp["vwap_res"]
cmp["last_id_diff"] = cmp["last_trade_id_raw"] - cmp["last_trade_id_res"]

# tolerances
ATOL_QTY = 1e-10
ATOL_VWAP = 1e-10

bad_qty   = cmp["sum_qty_diff"].abs() > ATOL_QTY
bad_trd   = cmp["trades_diff"].fillna(0).astype(float).abs() > 0
bad_cvd   = cmp["cvd_qty_diff"].abs() > ATOL_QTY
bad_vwap  = cmp["vwap_diff"].abs() > ATOL_VWAP
bad_last  = cmp["last_id_diff"].fillna(0).astype(float).abs() > 0

print("bad sum_qty:", int(bad_qty.sum()))
print("bad trades :", int(bad_trd.sum()))
print("bad cvd_qty:", int(bad_cvd.sum()))
print("bad vwap   :", int(bad_vwap.sum()))
print("bad last_id:", int(bad_last.sum()))

# hard asserts (this is the whole point of QA-A)
assert int(bad_qty.sum()) == 0
assert int(bad_trd.sum()) == 0
assert int(bad_cvd.sum()) == 0
assert int(bad_vwap.sum()) == 0
assert int(bad_last.sum()) == 0

cmp.head(3)


grid bars: 96
missing_in_res: 0
missing_in_raw: 0
bad sum_qty: 0
bad trades : 0
bad cvd_qty: 0
bad vwap   : 0
bad last_id: 0


Unnamed: 0,sum_qty_raw,sum_qty_res,trades_raw,trades_res,cvd_qty_raw,cvd_qty_res,vwap_raw,vwap_res,last_trade_id_raw,last_trade_id_res,sum_qty_diff,trades_diff,cvd_qty_diff,vwap_diff,last_id_diff
2025-01-01 00:00:00+00:00,175.85673,175.85673,8259,8259,-49.44345,-49.44345,93609.121473,93609.121473,3358812432,3358812432,0.0,0,0.0,0.0,0
2025-01-01 00:15:00+00:00,95.41749,95.41749,6935,6935,9.19953,9.19953,93784.423454,93784.423454,3358819367,3358819367,0.0,0,0.0,0.0,0
2025-01-01 00:30:00+00:00,94.36416,94.36416,7002,7002,28.80936,28.80936,93889.895651,93889.895651,3358826369,3358826369,0.0,0,0.0,0.0,0


In [20]:
from pathlib import Path
import pandas as pd

master_cols = ["ts", "volume"]  # do NOT request "symbol" from per-symbol partitions
m = pd.read_parquet(master_path, columns=master_cols)

# ensure ts is UTC
if not pd.api.types.is_datetime64_any_dtype(m["ts"]):
    m["ts"] = pd.to_datetime(m["ts"], utc=True)
else:
    try:
        m["ts"] = m["ts"].dt.tz_convert("UTC")
    except Exception:
        m["ts"] = m["ts"].dt.tz_localize("UTC")

# reattach symbol from path partition
# master_path .../symbol=BTC-USDT/part-YYYY-MM.parquet
p = Path(master_path)
sym_from_path = p.parent.name.split("symbol=", 1)[-1]
m["symbol"] = sym_from_path

m_day = m[(m["ts"] >= start_utc) & (m["ts"] < end_excl_utc)].copy().sort_values("ts")

chk = m_day.merge(res_day[["ts","sum_qty"]], on="ts", how="left", validate="one_to_one")
chk["vol_minus_sumqty"] = chk["volume"] - chk["sum_qty"]

print("rows:", len(chk))
print("NA sum_qty:", float(chk["sum_qty"].isna().mean()))
print("max abs(volume - sum_qty):", float(chk["vol_minus_sumqty"].abs().max()))
chk.head(3)


rows: 96
NA sum_qty: 0.0
max abs(volume - sum_qty): 1.4210854715202004e-14


Unnamed: 0,ts,volume,symbol,sum_qty,vol_minus_sumqty
0,2025-01-01 00:00:00+00:00,175.85673,BTC-USDT,175.85673,0.0
1,2025-01-01 00:15:00+00:00,95.41749,BTC-USDT,95.41749,0.0
2,2025-01-01 00:30:00+00:00,94.36416,BTC-USDT,94.36416,0.0


In [31]:
@dataclass(frozen=True)
class QAConfig:
    data_root: Path
    interval: str                  # e.g. "15m", "1h"
    start: str                     # e.g. "2025-01-01"
    end_excl: str                  # e.g. "2026-01-01"
    symbols: list[str]             # e.g. ["BTC-USDT", ...]
    months_pick: int = 3           # pick first/middle/last month in window
    symbols_pick: int = 3          # pick first/second/last symbol in list
    days_per_month_internal: int = 2
    rng_seed: int = 42


def ensure_ts_utc(df: pd.DataFrame, col: str = "ts") -> pd.DataFrame:
    if col not in df.columns:
        raise ValueError(f"missing {col}: {df.columns.tolist()}")
    if not pd.api.types.is_datetime64_any_dtype(df[col]):
        df[col] = pd.to_datetime(df[col], utc=True)
    else:
        # if timezone-naive, localize to UTC
        if getattr(df[col].dtype, "tz", None) is None:
            df[col] = df[col].dt.tz_localize("UTC")
    return df


def interval_to_minutes(interval: str) -> int:
    s = interval.strip().lower()
    if s.endswith("m"):
        return int(s[:-1])
    if s.endswith("h"):
        return int(s[:-1]) * 60
    if s.endswith("d"):
        return int(s[:-1]) * 24 * 60
    raise ValueError(f"unsupported interval: {interval}")


def expected_bars_per_day(interval: str) -> int:
    mins = interval_to_minutes(interval)
    if 1440 % mins != 0:
        raise ValueError(f"interval does not divide day evenly: {interval} -> {mins}min")
    return 1440 // mins


def iter_year_months(start_utc: pd.Timestamp, end_excl_utc: pd.Timestamp) -> list[tuple[int, int]]:
    # Months intersecting [start, end_excl), using month-start dates; no tz/Period conversions.
    if start_utc.tzinfo is None or end_excl_utc.tzinfo is None:
        raise ValueError("start_utc and end_excl_utc must be tz-aware (UTC)")

    s = pd.Timestamp(year=start_utc.year, month=start_utc.month, day=1)  # naive month start
    last = end_excl_utc - pd.Timedelta(seconds=1)
    e = pd.Timestamp(year=last.year, month=last.month, day=1)            # naive month start

    months = pd.date_range(s, e, freq="MS")  # Month Start
    return [(d.year, d.month) for d in months]

def pick_months(year_months: list[tuple[int,int]], k: int) -> list[tuple[int,int]]:
    if not year_months:
        return []
    if len(year_months) <= k:
        return year_months
    idxs = sorted(set([0, len(year_months)//2, len(year_months)-1]))
    # if k != 3, spread indices
    if k != 3:
        idxs = np.linspace(0, len(year_months)-1, k, dtype=int).tolist()
        idxs = sorted(set(idxs))
    return [year_months[i] for i in idxs]


def pick_symbols(symbols: list[str], k: int) -> list[str]:
    if not symbols:
        return []
    if len(symbols) <= k:
        return symbols
    # stable pick: first, second, last (for k=3)
    if k == 3:
        picks = [symbols[0], symbols[1], symbols[-1]]
        # if list too small, de-dup
        out = []
        for s in picks:
            if s not in out:
                out.append(s)
        return out
    return symbols[:k]


def month_day_range(year: int, month: int) -> tuple[pd.Timestamp, pd.Timestamp]:
    start = pd.Timestamp(year=year, month=month, day=1, tz="UTC")
    end_excl = (start + pd.offsets.MonthBegin(1)).tz_convert("UTC")
    return start, end_excl


def choose_days_for_month(year: int, month: int, n_internal: int, rng: np.random.Generator) -> list[pd.Timestamp]:
    m_start, m_end_excl = month_day_range(year, month)
    all_days = pd.date_range(m_start, m_end_excl, freq="D", inclusive="left", tz="UTC")
    if len(all_days) == 0:
        return []
    # boundary days
    first_day = all_days[0]
    last_day = all_days[-1]
    internal = all_days[1:-1]
    chosen = [first_day, last_day]
    if len(internal) > 0 and n_internal > 0:
        n = min(n_internal, len(internal))
        # deterministic sample
        idx = rng.choice(len(internal), size=n, replace=False)
        chosen.extend(sorted(internal[idx]))
    # unique + sorted
    chosen = sorted({d for d in chosen})
    return chosen


def master_path(data_root: Path, interval: str, year: int, month: int, symbol: str) -> Path:
    return data_root / "norm" / "master" / f"interval={interval}" / f"year={year}" / f"month={month:02d}" / f"symbol={symbol}" / f"part-{year}-{month:02d}.parquet"


def agg_resampled_path(data_root: Path, interval: str, year: int, month: int, symbol: str) -> Path:
    return data_root / "norm" / "spot_aggtrades_resampled" / f"interval={interval}" / f"year={year}" / f"month={month:02d}" / f"symbol={symbol}" / f"part-{year}-{month:02d}.parquet"


In [32]:
def qa_a_day_check(
    data_root: Path,
    interval: str,
    year: int,
    month: int,
    symbol: str,
    day_start_utc: pd.Timestamp,
) -> dict:
    mpath = master_path(data_root, interval, year, month, symbol)
    apath = agg_resampled_path(data_root, interval, year, month, symbol)

    if not mpath.exists():
        return {"ok": False, "reason": "missing_master", "master": str(mpath), "agg": str(apath)}
    if not apath.exists():
        return {"ok": False, "reason": "missing_agg", "master": str(mpath), "agg": str(apath)}

    # master monthly files may not contain 'symbol' column; do not request it
    m = pd.read_parquet(mpath, columns=["ts", "volume"])
    a = pd.read_parquet(apath, columns=["ts", "sum_qty"])

    m = ensure_ts_utc(m, "ts")
    a = ensure_ts_utc(a, "ts")

    day_end_excl = day_start_utc + pd.Timedelta(days=1)

    mday = m[(m["ts"] >= day_start_utc) & (m["ts"] < day_end_excl)].copy()
    aday = a[(a["ts"] >= day_start_utc) & (a["ts"] < day_end_excl)].copy()

    # expected grid
    exp = expected_bars_per_day(interval)

    # outer join to expose holes
    j = mday.merge(aday, on="ts", how="outer", suffixes=("_m", "_a")).sort_values("ts")
    missing_m = int(j["volume"].isna().sum())
    missing_a = int(j["sum_qty"].isna().sum())

    # compare where both present
    both = j.dropna(subset=["volume", "sum_qty"]).copy()
    both["abs_diff"] = (both["volume"] - both["sum_qty"]).abs()

    max_abs = float(both["abs_diff"].max()) if len(both) else np.nan
    mean_abs = float(both["abs_diff"].mean()) if len(both) else np.nan

    # pass criteria: full grid match + tiny numeric error
    ok_rows = (len(mday) == exp) and (len(aday) == exp)
    ok_missing = (missing_m == 0) and (missing_a == 0)
    ok_diff = np.isfinite(max_abs) and (max_abs <= 1e-9)

    ok = bool(ok_rows and ok_missing and ok_diff)

    return {
        "ok": ok,
        "symbol": symbol,
        "year": year,
        "month": month,
        "day": str(day_start_utc.date()),
        "expected_bars": exp,
        "master_rows": int(len(mday)),
        "agg_rows": int(len(aday)),
        "missing_in_master": missing_m,
        "missing_in_agg": missing_a,
        "max_abs_diff": max_abs,
        "mean_abs_diff": mean_abs,
        "master": str(mpath),
        "agg": str(apath),
    }


def run_qa_a_broadened(cfg: QAConfig) -> pd.DataFrame:
    start_utc = pd.Timestamp(cfg.start, tz="UTC")
    end_excl_utc = pd.Timestamp(cfg.end_excl, tz="UTC")

    year_months_all = iter_year_months(start_utc, end_excl_utc)
    ym_pick = pick_months(year_months_all, cfg.months_pick)
    sym_pick = pick_symbols(cfg.symbols, cfg.symbols_pick)

    rng = np.random.default_rng(cfg.rng_seed)

    rows = []
    for (y, m) in ym_pick:
        days = choose_days_for_month(y, m, cfg.days_per_month_internal, rng)
        # clamp days to [start,end_excl) window
        days = [d for d in days if (d >= start_utc) and (d < end_excl_utc)]
        for sym in sym_pick:
            for d0 in days:
                rows.append(qa_a_day_check(cfg.data_root, cfg.interval, y, m, sym, d0))

    df = pd.DataFrame(rows)
    # stable ordering
    if len(df):
        df = df.sort_values(["symbol", "year", "month", "day"]).reset_index(drop=True)
    return df


In [33]:
import numpy as np

CFG = QAConfig(
    data_root=Path(r"C:\Users\quantbase\Desktop\marketdata"),
    interval="15m",
    start="2025-01-01",
    end_excl="2026-01-01",
    symbols=["BTC-USDT","ETH-USDT","SOL-USDT","BNB-USDT","XRP-USDT","ADA-USDT","LINK-USDT"],
)

qa_a = run_qa_a_broadened(CFG)

summary = {
    "checks": int(len(qa_a)),
    "passed": int(qa_a["ok"].sum()) if len(qa_a) else 0,
    "failed": int((~qa_a["ok"]).sum()) if len(qa_a) else 0,
}
summary, qa_a.head(12)


({'checks': 36, 'passed': 36, 'failed': 0},
       ok    symbol  year  month         day  expected_bars  master_rows  \
 0   True  BTC-USDT  2025      1  2025-01-01             96           96   
 1   True  BTC-USDT  2025      1  2025-01-04             96           96   
 2   True  BTC-USDT  2025      1  2025-01-24             96           96   
 3   True  BTC-USDT  2025      1  2025-01-31             96           96   
 4   True  BTC-USDT  2025      7  2025-07-01             96           96   
 5   True  BTC-USDT  2025      7  2025-07-14             96           96   
 6   True  BTC-USDT  2025      7  2025-07-30             96           96   
 7   True  BTC-USDT  2025      7  2025-07-31             96           96   
 8   True  BTC-USDT  2025     12  2025-12-01             96           96   
 9   True  BTC-USDT  2025     12  2025-12-04             96           96   
 10  True  BTC-USDT  2025     12  2025-12-22             96           96   
 11  True  BTC-USDT  2025     12  2025-12-31

In [34]:
fails = qa_a[qa_a["ok"] == False].copy()  # noqa: E712
fails[["symbol","year","month","day","expected_bars","master_rows","agg_rows","missing_in_master","missing_in_agg","max_abs_diff","reason"]].head(50) if "reason" in fails.columns else fails.head(50)


Unnamed: 0,ok,symbol,year,month,day,expected_bars,master_rows,agg_rows,missing_in_master,missing_in_agg,max_abs_diff,mean_abs_diff,master,agg


In [35]:
# If any failures: print the full row(s) to see exact path and why
fails.to_dict(orient="records")[:10]


[]

In [None]:
# VAlidate monthly master_plus_aggtrades partitions are a correct, lossless join.

In [36]:
# ---------- fallbacks to reuse existing notebook state ----------
JOIN_ROOT = DATA_ROOT / "norm" / "master_plus_aggtrades"

SYMBOLS = globals().get("SYMBOLS", None)
if SYMBOLS is None:
    # if you already imported these earlier in this notebook, this will exist
    resolve_symbols = globals()["resolve_symbols"]
    SYMBOLS = resolve_symbols(CFG)

YEAR_MONTHS = globals().get("YEAR_MONTHS", None)
if YEAR_MONTHS is None:
    iter_year_months = globals()["iter_year_months"]
    START_UTC = pd.Timestamp(getattr(CFG, "start", "2025-01-01"), tz="UTC")
    END_EXCL_UTC = pd.Timestamp(getattr(CFG, "end_excl", "2026-01-01"), tz="UTC")
    YEAR_MONTHS = iter_year_months(START_UTC, END_EXCL_UTC)

interval_to_floor_str = globals().get("interval_to_floor_str", None)
if interval_to_floor_str is None:
    def interval_to_floor_str(interval: str) -> str:
        # minimal mapping; keep consistent with your earlier helper
        if interval.endswith("m"):
            return f"{int(interval[:-1])}min"
        if interval.endswith("h"):
            return f"{int(interval[:-1])}H"
        if interval.endswith("d"):
            return f"{int(interval[:-1])}D"
        raise ValueError(f"unsupported interval: {interval}")

FLOOR_STR = globals().get("FLOOR_STR", interval_to_floor_str(INTERVAL))

month_part_path = globals().get("month_part_path", None)
if month_part_path is None:
    def month_part_path(root: Path, *, interval: str, year: int, month: int, symbol: str) -> Path:
        return (
            root
            / f"interval={interval}"
            / f"year={year}"
            / f"month={month:02d}"
            / f"symbol={symbol}"
            / f"part-{year}-{month:02d}.parquet"
        )

def ensure_ts_utc_df(df: pd.DataFrame, col: str = "ts") -> pd.DataFrame:
    if col not in df.columns:
        raise ValueError(f"missing {col}")
    if not pd.api.types.is_datetime64_any_dtype(df[col]):
        df[col] = pd.to_datetime(df[col], utc=True)
    else:
        try:
            df[col] = df[col].dt.tz_convert("UTC")
        except Exception:
            df[col] = df[col].dt.tz_localize("UTC")
    return df

def expected_month_bars(year: int, month: int, floor_str: str) -> int:
    start = pd.Timestamp(year=year, month=month, day=1, tz="UTC")
    end_excl = start + pd.offsets.MonthBegin(1)  # first day next month
    grid = pd.date_range(start, end_excl, freq=floor_str, inclusive="left")
    return int(len(grid))

def safe_read_parquet(path: Path, want_cols: list[str]) -> pd.DataFrame:
    df = pd.read_parquet(path)
    cols = [c for c in want_cols if c in df.columns]
    return df[cols].copy()

def sample_partitions(symbols: list[str], year_months: list[tuple[int,int]], n: int = 24, seed: int = 7):
    all_parts = [(s, y, m) for s in symbols for (y, m) in year_months]
    rng = np.random.default_rng(seed)
    if n >= len(all_parts):
        return all_parts
    idx = rng.choice(len(all_parts), size=n, replace=False)
    return [all_parts[i] for i in idx]

SAMPLE_PARTS = sample_partitions(SYMBOLS, YEAR_MONTHS, n=24, seed=7)
print("interval:", INTERVAL, "floor:", FLOOR_STR)
print("symbols:", len(SYMBOLS), SYMBOLS)
print("year_months:", len(YEAR_MONTHS), YEAR_MONTHS[:3], "...", YEAR_MONTHS[-3:])
print("sample parts:", len(SAMPLE_PARTS), "example:", SAMPLE_PARTS[0])


interval: 15m floor: 15min
symbols: 7 ['BTC-USDT', 'ETH-USDT', 'SOL-USDT', 'BNB-USDT', 'XRP-USDT', 'ADA-USDT', 'LINK-USDT']
year_months: 12 [(2025, 1), (2025, 2), (2025, 3)] ... [(2025, 10), (2025, 11), (2025, 12)]
sample parts: 24 example: ('BNB-USDT', 2025, 8)


In [37]:
# join checks

def qa_month(symbol: str, year: int, month: int) -> dict:
    master_path = month_part_path(MASTER_ROOT, interval=INTERVAL, year=year, month=month, symbol=symbol)
    agg_path = month_part_path(RESAMPLED_ROOT, interval=INTERVAL, year=year, month=month, symbol=symbol)
    join_path = month_part_path(JOIN_ROOT, interval=INTERVAL, year=year, month=month, symbol=symbol)

    out = {
        "ok": False,
        "symbol": symbol,
        "year": year,
        "month": month,
        "master": str(master_path),
        "agg": str(agg_path),
        "joined": str(join_path),
        "expected_rows": None,
        "master_rows": None,
        "agg_rows": None,
        "joined_rows": None,
        "dup_joined(ts,symbol)": None,
        "max_abs(volume-sum_qty)": None,
        "max_abs(joined_agg-sum_qty)": None,
        "max_abs(joined_trades-trades)": None,
        "max_abs(joined_cvd-cvd)": None,
        "max_abs(joined_vwap-vwap)": None,
        "max_abs(joined_lastid-lastid)": None,
    }

    if not master_path.exists() or not agg_path.exists() or not join_path.exists():
        out["reason"] = "missing parquet(s)"
        return out

    exp = expected_month_bars(year, month, FLOOR_STR)
    out["expected_rows"] = exp

    # master: symbol may or may not exist in file (often implied by partition)
    m = safe_read_parquet(master_path, ["ts", "volume"])
    m = ensure_ts_utc_df(m, "ts")
    m["symbol"] = symbol
    out["master_rows"] = int(len(m))

    # agg resampled
    a = safe_read_parquet(agg_path, ["ts", "sum_qty", "trades", "cvd_qty", "vwap", "last_trade_id"])
    a = ensure_ts_utc_df(a, "ts")
    a["symbol"] = symbol
    out["agg_rows"] = int(len(a))

    # joined
    j = pd.read_parquet(join_path)
    j = ensure_ts_utc_df(j, "ts")
    if "symbol" not in j.columns:
        j["symbol"] = symbol
    out["joined_rows"] = int(len(j))

    # structural checks
    dup = int(j.duplicated(["ts","symbol"]).sum())
    out["dup_joined(ts,symbol)"] = dup

    # expected coverage (strict)
    strict_ok = (len(m) == exp) and (len(a) == exp) and (len(j) == exp) and (dup == 0)

    # align on ts
    mi = m.set_index("ts")[["volume"]]
    ai = a.set_index("ts")[["sum_qty", "trades", "cvd_qty", "vwap", "last_trade_id"]]
    ji = j.set_index("ts")

    # volume identity: master.volume == agg.sum_qty
    z = mi.join(ai, how="inner")
    if len(z) != exp:
        strict_ok = False
    vol_diff = (z["volume"] - z["sum_qty"]).abs()
    out["max_abs(volume-sum_qty)"] = float(vol_diff.max()) if len(vol_diff) else None

    # joined vs agg resampled equality
    # tolerate tiny float error on vwap/cvd/sum_qty; last_trade_id exact int
    need = ["agg_sum_qty","agg_trades","agg_cvd_qty","agg_vwap","agg_last_trade_id"]
    if not all(c in ji.columns for c in need):
        out["reason"] = f"joined missing cols: {[c for c in need if c not in ji.columns]}"
        return out

    jj = ji[need].join(ai, how="inner")
    if len(jj) != exp:
        strict_ok = False

    out["max_abs(joined_agg-sum_qty)"] = float((jj["agg_sum_qty"] - jj["sum_qty"]).abs().max())
    out["max_abs(joined_trades-trades)"] = float((jj["agg_trades"] - jj["trades"]).abs().max())
    out["max_abs(joined_cvd-cvd)"] = float((jj["agg_cvd_qty"] - jj["cvd_qty"]).abs().max())
    out["max_abs(joined_vwap-vwap)"] = float((jj["agg_vwap"] - jj["vwap"]).abs().max())
    out["max_abs(joined_lastid-lastid)"] = int((jj["agg_last_trade_id"] - jj["last_trade_id"]).abs().max())

    # final ok: strict + numeric tolerances
    tol_float = 1e-9
    ok = (
        strict_ok
        and out["max_abs(volume-sum_qty)"] <= tol_float
        and out["max_abs(joined_agg-sum_qty)"] <= tol_float
        and out["max_abs(joined_cvd-cvd)"] <= tol_float
        and out["max_abs(joined_vwap-vwap)"] <= tol_float
        and out["max_abs(joined_trades-trades)"] == 0.0
        and out["max_abs(joined_lastid-lastid)"] == 0
    )
    out["ok"] = bool(ok)
    return out

rows = []
for (sym, y, m) in SAMPLE_PARTS:
    rows.append(qa_month(sym, y, m))

qa_b = pd.DataFrame(rows).sort_values(["ok","symbol","year","month"], ascending=[True,True,True,True])
summary = {"checks": int(len(qa_b)), "passed": int(qa_b["ok"].sum()), "failed": int((~qa_b["ok"]).sum())}
summary, qa_b.head(15)


({'checks': 24, 'passed': 24, 'failed': 0},
       ok     symbol  year  month  \
 13  True   ADA-USDT  2025      3   
 9   True   ADA-USDT  2025      4   
 2   True   ADA-USDT  2025      7   
 1   True   BNB-USDT  2025      2   
 6   True   BNB-USDT  2025      3   
 0   True   BNB-USDT  2025      8   
 8   True   BTC-USDT  2025      1   
 15  True   BTC-USDT  2025      4   
 16  True   BTC-USDT  2025     10   
 7   True   BTC-USDT  2025     11   
 3   True   ETH-USDT  2025      4   
 17  True   ETH-USDT  2025      9   
 20  True   ETH-USDT  2025     10   
 12  True   ETH-USDT  2025     12   
 14  True  LINK-USDT  2025      3   
 
                                                master  \
 13  C:\Users\quantbase\Desktop\marketdata\norm\mas...   
 9   C:\Users\quantbase\Desktop\marketdata\norm\mas...   
 2   C:\Users\quantbase\Desktop\marketdata\norm\mas...   
 1   C:\Users\quantbase\Desktop\marketdata\norm\mas...   
 6   C:\Users\quantbase\Desktop\marketdata\norm\mas...   
 0   C:\Users\

In [38]:
fails = qa_b[qa_b["ok"] != True].copy()
fails

Unnamed: 0,ok,symbol,year,month,master,agg,joined,expected_rows,master_rows,agg_rows,joined_rows,"dup_joined(ts,symbol)",max_abs(volume-sum_qty),max_abs(joined_agg-sum_qty),max_abs(joined_trades-trades),max_abs(joined_cvd-cvd),max_abs(joined_vwap-vwap),max_abs(joined_lastid-lastid)


In [39]:
# C

def interval_to_pandas_freq(interval: str) -> str:
    """
    Convert '15m','1h','1d' style intervals to pandas frequency strings.
    Extend here if you add more intervals later.
    """
    interval = str(interval).strip().lower()
    if interval.endswith("m"):
        n = int(interval[:-1])
        return f"{n}min"
    if interval.endswith("h"):
        n = int(interval[:-1])
        return f"{n}h"
    if interval.endswith("d"):
        n = int(interval[:-1])
        return f"{n}d"
    raise ValueError(f"Unsupported interval: {interval}")


def ensure_ts_utc(df: pd.DataFrame, col: str = "ts") -> pd.DataFrame:
    if col not in df.columns and df.index.name == col:
        df = df.reset_index()
    if col not in df.columns:
        raise ValueError(f"Missing '{col}'. cols={list(df.columns)} index_name={df.index.name}")
    df[col] = pd.to_datetime(df[col], utc=True)
    return df


# pull from notebook globals if they exist; otherwise default
DATA_ROOT = globals().get("DATA_ROOT", Path(r"C:\Users\quantbase\Desktop\marketdata"))
INTERVAL = globals().get("INTERVAL", None) or (globals().get("CFG").interval if "CFG" in globals() else "15m")
START_UTC = globals().get("START_UTC", pd.Timestamp("2025-01-01", tz="UTC"))
END_EXCL_UTC = globals().get("END_EXCL_UTC", pd.Timestamp("2026-01-01", tz="UTC"))

FREQ = interval_to_pandas_freq(INTERVAL)

long_path = (
    DATA_ROOT
    / "norm"
    / "master_long_plus_aggtrades"
    / f"interval={INTERVAL}"
    / f"part-{START_UTC.strftime('%Y%m%d')}-{END_EXCL_UTC.strftime('%Y%m%d')}.parquet"
)

wide_path = (
    DATA_ROOT
    / "norm"
    / "master_wide_plus_aggtrades"
    / f"interval={INTERVAL}"
    / f"part-{START_UTC.strftime('%Y%m%d')}-{END_EXCL_UTC.strftime('%Y%m%d')}.parquet"
)

print("DATA_ROOT:", DATA_ROOT)
print("INTERVAL:", INTERVAL, "FREQ:", FREQ)
print("START_UTC:", START_UTC, "END_EXCL_UTC:", END_EXCL_UTC)
print("long_path exists:", long_path.exists(), long_path)
print("wide_path exists:", wide_path.exists(), wide_path)


DATA_ROOT: C:\Users\quantbase\Desktop\marketdata
INTERVAL: 15m FREQ: 15min
START_UTC: 2025-01-01 00:00:00+00:00 END_EXCL_UTC: 2026-01-01 00:00:00+00:00
long_path exists: True C:\Users\quantbase\Desktop\marketdata\norm\master_long_plus_aggtrades\interval=15m\part-20250101-20260101.parquet
wide_path exists: True C:\Users\quantbase\Desktop\marketdata\norm\master_wide_plus_aggtrades\interval=15m\part-20250101-20260101.parquet


In [40]:
# --- load artifacts ---

master_long = pd.read_parquet(long_path)
master_long = ensure_ts_utc(master_long, "ts")

master_wide = pd.read_parquet(wide_path)
# wide might have ts as index
if master_wide.index.name == "ts":
    master_wide = master_wide.sort_index()
else:
    master_wide = ensure_ts_utc(master_wide, "ts").set_index("ts").sort_index()

print("master_long:", master_long.shape)
print("master_wide:", master_wide.shape)
print("master_long cols:", list(master_long.columns))
print("master_wide index:", master_wide.index.min(), "->", master_wide.index.max())


master_long: (245280, 19)
master_wide: (35040, 112)
master_long cols: ['ts', 'open_time', 'spot_close', 'mark_close', 'index_close', 'premium_close', 'basis_mark_vs_spot', 'basis_index_vs_spot', 'funding_rate', 'funding_interval_hours', 'volume', 'quote_volume', 'trades', 'symbol', 'agg_sum_qty', 'agg_trades', 'agg_cvd_qty', 'agg_vwap', 'agg_last_trade_id']
master_wide index: 2025-01-01 00:00:00+00:00 -> 2025-12-31 23:45:00+00:00


In [41]:
# --- QA-C1: schema + key constraints + expected grid ---

required_cols = [
    "ts", "symbol",
    "spot_close", "mark_close", "index_close",
    "premium_close", "basis_mark_vs_spot", "basis_index_vs_spot",
    "funding_rate", "funding_interval_hours",
    "volume", "quote_volume", "trades",
    "agg_sum_qty", "agg_trades", "agg_cvd_qty", "agg_vwap", "agg_last_trade_id",
]

missing_cols = [c for c in required_cols if c not in master_long.columns]
print("missing_cols:", missing_cols)

ts_ok = pd.api.types.is_datetime64tz_dtype(master_long["ts"])
print("ts tz-aware:", ts_ok, "dtype:", master_long["ts"].dtype)

dup = int(master_long.duplicated(["ts", "symbol"]).sum())
print("dup(ts,symbol):", dup)

# monotonic ts within symbol
ml = master_long[["symbol", "ts"]].sort_values(["symbol", "ts"], kind="mergesort")
mono_flags = ml.groupby("symbol", sort=False)["ts"].apply(lambda s: bool(s.is_monotonic_increasing))
bad_mono = mono_flags[~mono_flags]
print("symbols with non-monotone ts:", bad_mono.index.tolist())

# expected bars in window, per symbol
expected_ts = pd.date_range(START_UTC, END_EXCL_UTC, freq=FREQ, inclusive="left")
expected_n = int(len(expected_ts))
print("expected bars per symbol:", expected_n)

per_sym_n = master_long.groupby("symbol")["ts"].nunique().sort_values()
print("per_symbol unique ts (min/median/max):",
      int(per_sym_n.min()), int(per_sym_n.median()), int(per_sym_n.max()))
bad_counts = per_sym_n[per_sym_n != expected_n]
print("symbols with unexpected bar-count:", bad_counts.to_dict())

# grid coverage check for a small sample of symbols (fast)
sample_syms = per_sym_n.index[: min(3, len(per_sym_n))].tolist()
grid = pd.DataFrame({"ts": expected_ts})
for sym in sample_syms:
    s = master_long.loc[master_long["symbol"] == sym, ["ts"]].drop_duplicates()
    miss = int(grid.merge(s, on="ts", how="left", indicator=True).query("_merge=='left_only'").shape[0])
    print(f"missing ts vs expected grid for {sym}:", miss)


missing_cols: []
ts tz-aware: True dtype: datetime64[ns, UTC]
dup(ts,symbol): 0
symbols with non-monotone ts: []
expected bars per symbol: 35040
per_symbol unique ts (min/median/max): 35040 35040 35040
symbols with unexpected bar-count: {}
missing ts vs expected grid for ADA-USDT: 0
missing ts vs expected grid for BNB-USDT: 0
missing ts vs expected grid for BTC-USDT: 0


  ts_ok = pd.api.types.is_datetime64tz_dtype(master_long["ts"])


In [42]:
# --- QA-C2: missingness profiles (core + agg) ---

cols_to_profile = [
    "spot_close", "mark_close", "index_close",
    "premium_close", "basis_mark_vs_spot", "basis_index_vs_spot",
    "funding_rate", "funding_interval_hours",
    "volume", "quote_volume", "trades",
    "agg_sum_qty", "agg_trades", "agg_cvd_qty", "agg_vwap", "agg_last_trade_id",
]

na_profile = (
    master_long.groupby("symbol")[cols_to_profile]
    .apply(lambda g: g.isna().mean())
)

# compact view: max NA frac per column, and per-symbol max NA
max_na_by_col = na_profile.max(axis=0).sort_values(ascending=False)
max_na_by_sym = na_profile.max(axis=1).sort_values(ascending=False)

print("max NA frac by column:")
print(max_na_by_col)

print("\nmax NA frac by symbol (top 10):")
print(max_na_by_sym.head(10))

# show full table only if needed
na_profile.head()


max NA frac by column:
spot_close                0.0
mark_close                0.0
index_close               0.0
premium_close             0.0
basis_mark_vs_spot        0.0
basis_index_vs_spot       0.0
funding_rate              0.0
funding_interval_hours    0.0
volume                    0.0
quote_volume              0.0
trades                    0.0
agg_sum_qty               0.0
agg_trades                0.0
agg_cvd_qty               0.0
agg_vwap                  0.0
agg_last_trade_id         0.0
dtype: float64

max NA frac by symbol (top 10):
symbol
ADA-USDT     0.0
BNB-USDT     0.0
BTC-USDT     0.0
ETH-USDT     0.0
LINK-USDT    0.0
SOL-USDT     0.0
XRP-USDT     0.0
dtype: float64


Unnamed: 0_level_0,spot_close,mark_close,index_close,premium_close,basis_mark_vs_spot,basis_index_vs_spot,funding_rate,funding_interval_hours,volume,quote_volume,trades,agg_sum_qty,agg_trades,agg_cvd_qty,agg_vwap,agg_last_trade_id
symbol,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1
ADA-USDT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
BNB-USDT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
BTC-USDT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
ETH-USDT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
LINK-USDT,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [63]:
# --- QA-C3: reconstruct wide from long and compare to saved wide ---

wide_fields = [
    "spot_close", "mark_close", "index_close",
    "premium_close", "basis_mark_vs_spot", "basis_index_vs_spot",
    "funding_rate", "funding_interval_hours",
    "volume", "quote_volume", "trades",
    "agg_sum_qty", "agg_trades", "agg_cvd_qty", "agg_vwap", "agg_last_trade_id",
]

# build reconstructed wide
recon_parts = []
for f in wide_fields:
    p = master_long.pivot(index="ts", columns="symbol", values=f)
    p.columns = [f"{f}__{c}" for c in p.columns]
    recon_parts.append(p)

# Build wide exactly like the saved dataset: drop non-canonical fields
DROP_FOR_WIDE = {"open_time", "abs_vol_minus_sumqty"}  # abs_vol_minus_sumqty is QA-only
value_cols = [c for c in long_df.columns if c not in {"ts", "symbol"} and c not in DROP_FOR_WIDE]

wide_recon = (
    long_df[["ts", "symbol"] + value_cols]
    .pivot(index="ts", columns="symbol", values=value_cols)
)

# flatten columns: "<field>__<symbol>"
wide_recon.columns = [f"{field}__{sym}" for field, sym in wide_recon.columns]
wide_recon = wide_recon.sort_index().sort_index(axis=1)


wide_recon = pd.concat(recon_parts, axis=1).sort_index()
wide_recon = wide_recon.reindex(sorted(wide_recon.columns), axis=1)

wide_saved = master_wide.copy()
wide_saved = wide_saved.reindex(sorted(wide_saved.columns), axis=1)

print("wide_recon:", wide_recon.shape)
print("wide_saved:", wide_saved.shape)

# index equality
idx_equal = wide_recon.index.equals(wide_saved.index)
print("index equal:", idx_equal)

# column coverage
missing_in_saved = sorted(set(wide_recon.columns) - set(wide_saved.columns))
missing_in_recon = sorted(set(wide_saved.columns) - set(wide_recon.columns))
print("missing_in_saved (recon has, saved missing):", missing_in_saved[:20], "count:", len(missing_in_saved))
print("missing_in_recon (saved has, recon missing):", missing_in_recon[:20], "count:", len(missing_in_recon))

common_cols = sorted(set(wide_recon.columns) & set(wide_saved.columns))
print("common_cols:", len(common_cols))

# compare values
# exact for integer-like columns, tol for floats
int_like = [c for c in common_cols if c.endswith("trades") or c.endswith("last_trade_id") or c.endswith("funding_interval_hours")]
float_cols = [c for c in common_cols if c not in int_like]

# ints: exact match (after aligning NaNs)
ints_ok = True
bad_int_cols = []
for c in int_like:
    a = wide_recon[c]
    b = wide_saved[c]
    # convert to pandas nullable int where possible; otherwise compare via fillna sentinel
    eq = (a.fillna(-1).astype("int64") == b.fillna(-1).astype("int64")).all()
    if not bool(eq):
        ints_ok = False
        bad_int_cols.append(c)

print("int cols exact match:", ints_ok, "bad_int_cols:", bad_int_cols[:20])

# floats: max abs diff summary
max_abs = {}
for c in float_cols:
    a = wide_recon[c].to_numpy(dtype="float64")
    b = wide_saved[c].to_numpy(dtype="float64")
    d = np.nanmax(np.abs(a - b))
    max_abs[c] = float(d)

max_abs_series = pd.Series(max_abs).sort_values(ascending=False)
print("top 20 float cols by max_abs_diff:")
print(max_abs_series.head(20))
print("global max_abs_diff:", float(max_abs_series.iloc[0]) if len(max_abs_series) else 0.0)


wide_recon: (35040, 112)
wide_saved: (35040, 112)
index equal: True
missing_in_saved (recon has, saved missing): [] count: 0
missing_in_recon (saved has, recon missing): [] count: 0
common_cols: 112
int cols exact match: True bad_int_cols: []
top 20 float cols by max_abs_diff:
agg_cvd_qty__ADA-USDT           0.0
agg_cvd_qty__BNB-USDT           0.0
agg_cvd_qty__BTC-USDT           0.0
agg_cvd_qty__ETH-USDT           0.0
agg_cvd_qty__LINK-USDT          0.0
agg_cvd_qty__SOL-USDT           0.0
agg_cvd_qty__XRP-USDT           0.0
agg_last_trade_id__ADA-USDT     0.0
agg_last_trade_id__BNB-USDT     0.0
agg_last_trade_id__BTC-USDT     0.0
agg_last_trade_id__ETH-USDT     0.0
agg_last_trade_id__LINK-USDT    0.0
agg_last_trade_id__SOL-USDT     0.0
agg_last_trade_id__XRP-USDT     0.0
agg_sum_qty__ADA-USDT           0.0
agg_sum_qty__BNB-USDT           0.0
agg_sum_qty__BTC-USDT           0.0
agg_sum_qty__ETH-USDT           0.0
agg_sum_qty__LINK-USDT          0.0
agg_sum_qty__SOL-USDT           0.0
dt

In [64]:
# --- QA-C4: microstructure invariants on the final long artifact ---

# 1) volume identity: volume vs agg_sum_qty (should be ~0)
vol_diff = (master_long["volume"].astype("float64") - master_long["agg_sum_qty"].astype("float64")).abs()
by_sym = master_long.assign(abs_vol_minus_sumqty=vol_diff).groupby("symbol")["abs_vol_minus_sumqty"].max().sort_values(ascending=False)
print("max abs(volume - agg_sum_qty) by symbol:")
print(by_sym)

print("global max abs(volume - agg_sum_qty):", float(by_sym.iloc[0]))

# 2) last_trade_id monotonic within symbol
ml_id = master_long[["symbol", "ts", "agg_last_trade_id"]].sort_values(["symbol", "ts"], kind="mergesort")
neg_counts = (
    ml_id.groupby("symbol")["agg_last_trade_id"]
    .apply(lambda s: int((s.diff() < 0).sum()))
    .sort_values(ascending=False)
)
print("count of negative agg_last_trade_id diffs by symbol:")
print(neg_counts)

# 3) sanity on agg_vwap values (finite)
vwap_nan = float(master_long["agg_vwap"].isna().mean())
vwap_inf = float(np.isinf(master_long["agg_vwap"].to_numpy(dtype="float64")).mean())
print("agg_vwap nan_frac:", vwap_nan, "inf_frac:", vwap_inf)


max abs(volume - agg_sum_qty) by symbol:
symbol
ADA-USDT     9.313226e-10
XRP-USDT     2.328306e-10
LINK-USDT    1.164153e-10
SOL-USDT     1.164153e-10
ETH-USDT     2.910383e-11
BNB-USDT     3.637979e-12
BTC-USDT     9.094947e-13
Name: abs_vol_minus_sumqty, dtype: float64
global max abs(volume - agg_sum_qty): 9.313225746154785e-10
count of negative agg_last_trade_id diffs by symbol:
symbol
ADA-USDT     0
BNB-USDT     0
BTC-USDT     0
ETH-USDT     0
LINK-USDT    0
SOL-USDT     0
XRP-USDT     0
Name: agg_last_trade_id, dtype: int64
agg_vwap nan_frac: 0.0 inf_frac: 0.0


In [45]:
START_UTC = pd.Timestamp("2025-01-01", tz="UTC")
END_EXCL_UTC = pd.Timestamp("2026-01-01", tz="UTC")

LONG_PATH = DATA_ROOT / "norm" / "master_long_plus_aggtrades" / f"interval={INTERVAL}" / "part-20250101-20260101.parquet"
WIDE_PATH = DATA_ROOT / "norm" / "master_wide_plus_aggtrades" / f"interval={INTERVAL}" / "part-20250101-20260101.parquet"

assert LONG_PATH.exists(), LONG_PATH
assert WIDE_PATH.exists(), WIDE_PATH

long_df = pd.read_parquet(LONG_PATH)
wide_df = pd.read_parquet(WIDE_PATH)

# ensure UTC ts
long_df["ts"] = pd.to_datetime(long_df["ts"], utc=True)
if "symbol" in long_df.columns:
    long_df["symbol"] = long_df["symbol"].astype(str)

if "ts" in wide_df.columns:
    wide_df["ts"] = pd.to_datetime(wide_df["ts"], utc=True)
    wide_df = wide_df.set_index("ts")
else:
    # already indexed
    if not pd.api.types.is_datetime64_any_dtype(wide_df.index):
        wide_df.index = pd.to_datetime(wide_df.index, utc=True)

print("long:", long_df.shape, "wide:", wide_df.shape)
print("long cols:", list(long_df.columns))
print("wide index dtype:", wide_df.index.dtype)

long: (245280, 19) wide: (35040, 112)
long cols: ['ts', 'open_time', 'spot_close', 'mark_close', 'index_close', 'premium_close', 'basis_mark_vs_spot', 'basis_index_vs_spot', 'funding_rate', 'funding_interval_hours', 'volume', 'quote_volume', 'trades', 'symbol', 'agg_sum_qty', 'agg_trades', 'agg_cvd_qty', 'agg_vwap', 'agg_last_trade_id']
wide index dtype: datetime64[ns, UTC]


In [46]:
# ---- Invariants: schema + keys + ordering ----
REQ_LONG = {
    "ts","symbol",
    "open_time","spot_close","mark_close","index_close","premium_close",
    "basis_mark_vs_spot","basis_index_vs_spot",
    "funding_rate","funding_interval_hours",
    "volume","quote_volume","trades",
    "agg_sum_qty","agg_trades","agg_cvd_qty","agg_vwap","agg_last_trade_id",
}
missing = sorted(list(REQ_LONG - set(long_df.columns)))
assert not missing, f"master_long missing cols: {missing}"

dup = int(long_df.duplicated(["ts","symbol"]).sum())
assert dup == 0, f"dup(ts,symbol)={dup}"

# ts monotone within symbol (strict)
bad_syms = []
for sym, g in long_df.sort_values(["symbol","ts"]).groupby("symbol", sort=False):
    dt = g["ts"].diff().dropna()
    if (dt <= pd.Timedelta(0)).any():
        bad_syms.append(sym)
assert not bad_syms, f"non-monotone ts within symbols: {bad_syms}"

# wide: one row per ts, monotone
assert wide_df.index.is_unique, "wide ts index not unique"
assert wide_df.index.is_monotonic_increasing, "wide ts not monotonic increasing"

# wide columns should include agg_* expansions
assert any(c.startswith("agg_sum_qty__") for c in wide_df.columns), "wide missing agg_sum_qty__* columns"

print("Phase 1 OK")


Phase 1 OK


In [47]:
# coverage

def interval_to_floor_str(interval: str) -> str:
    # supports "15m", "1h", "1d" patterns used in your repo
    interval = interval.strip().lower()
    if interval.endswith("m"):
        return f"{int(interval[:-1])}min"
    if interval.endswith("h"):
        return f"{int(interval[:-1])}h"
    if interval.endswith("d"):
        return f"{int(interval[:-1])}d"
    raise ValueError(f"unsupported interval: {interval}")

FLOOR_STR = interval_to_floor_str(INTERVAL)
GRID = pd.date_range(
    START_UTC,
    END_EXCL_UTC - pd.to_timedelta(FLOOR_STR),
    freq=FLOOR_STR,
    tz="UTC",
)
expected_per_symbol = len(GRID)
symbols = sorted(long_df["symbol"].unique())

summary = []
for sym in symbols:
    g = long_df.loc[long_df["symbol"] == sym, ["ts"]]
    n = int(g["ts"].nunique())
    summary.append({
        "symbol": sym,
        "rows": int(len(g)),
        "ts_unique": n,
        "expected": expected_per_symbol,
        "missing_ts": expected_per_symbol - n,
        "min_ts": str(g["ts"].min()),
        "max_ts": str(g["ts"].max()),
    })

cov = pd.DataFrame(summary).sort_values(["missing_ts","symbol"], ascending=[False, True])
cov


Unnamed: 0,symbol,rows,ts_unique,expected,missing_ts,min_ts,max_ts
0,ADA-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00
1,BNB-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00
2,BTC-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00
3,ETH-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00
4,LINK-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00
5,SOL-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00
6,XRP-USDT,35040,35040,35040,0,2025-01-01 00:00:00+00:00,2025-12-31 23:45:00+00:00


In [48]:
# pinpoint gaps if any (should be 0 everywhere)
idx = long_df.set_index(["symbol","ts"]).index
full_idx = pd.MultiIndex.from_product([symbols, GRID], names=["symbol","ts"])

missing = full_idx.difference(idx)
extra = idx.difference(full_idx)

print("missing rows:", len(missing), "extra rows:", len(extra))

# show top missing slices (if any)
if len(missing):
    miss_df = pd.DataFrame({"symbol": missing.get_level_values(0), "ts": missing.get_level_values(1)})
    display(miss_df.sort_values(["symbol","ts"]).head(50))

# duplicates already asserted 0, but keep a guardrail summary
dup_counts = (long_df.groupby(["symbol","ts"]).size().reset_index(name="n").query("n>1"))
print("dup groups:", len(dup_counts))
if len(dup_counts):
    display(dup_counts.head(50))

print("Phase 2 OK (if missing/extra/dup are all zero)")


missing rows: 0 extra rows: 0
dup groups: 0
Phase 2 OK (if missing/extra/dup are all zero)


In [51]:
# complete e2e regression check 


def qstats(x: pd.Series) -> dict:
    x = pd.to_numeric(x, errors="coerce")
    return {
        "rows": int(x.shape[0]),
        "na_frac": float(x.isna().mean()),
        "p01": float(x.quantile(0.01)),
        "p50": float(x.quantile(0.50)),
        "p99": float(x.quantile(0.99)),
        "min": float(x.min()),
        "max": float(x.max()),
    }

TOL_VOL_ABS = 1e-6  # volume vs agg_sum_qty should be essentially exact
TOL_QUOTE_REL_P99 = 5e-3  # quote_volume ≈ agg_vwap*agg_sum_qty; allow small rounding/aggregation drift

checks = {}

# C1: volume consistency (kline volume vs agg_sum_qty)
long_df["abs_vol_minus_sumqty"] = (long_df["volume"] - long_df["agg_sum_qty"]).abs()
by_sym_vol = long_df.groupby("symbol")["abs_vol_minus_sumqty"].max().sort_values(ascending=False)
checks["max_abs(volume-agg_sum_qty)_by_symbol"] = by_sym_vol.to_dict()
checks["global_max_abs(volume-agg_sum_qty)"] = float(long_df["abs_vol_minus_sumqty"].max())
assert checks["global_max_abs(volume-agg_sum_qty)"] <= TOL_VOL_ABS, checks["global_max_abs(volume-agg_sum_qty)"]

# C2: quote_volume consistency
# expected quote = vwap * sum_qty
quote_est = long_df["agg_vwap"] * long_df["agg_sum_qty"]
rel_err = (long_df["quote_volume"] - quote_est).abs() / (long_df["quote_volume"].abs() + 1e-12)
checks["quote_rel_err_stats"] = qstats(rel_err)
assert checks["quote_rel_err_stats"]["p99"] <= TOL_QUOTE_REL_P99, checks["quote_rel_err_stats"]

# C3: agg_last_trade_id monotonic within symbol
neg_counts = {}
for sym, g in long_df.sort_values(["symbol","ts"]).groupby("symbol", sort=False):
    d = g["agg_last_trade_id"].astype("int64").diff()
    neg_counts[sym] = int((d < 0).sum())
checks["neg_diffs(agg_last_trade_id)_by_symbol"] = neg_counts
assert sum(neg_counts.values()) == 0, neg_counts



In [None]:
# D: structural constraints + distributions
# FIX: don't invert a float; compute nan/inf fractions explicitly (works even if dtype is object)
vwap = pd.to_numeric(long_df["agg_vwap"], errors="coerce")  # NaN if bad parse

checks["agg_vwap_nan_frac"] = float(vwap.isna().mean())
checks["agg_vwap_inf_frac"] = float(np.isinf(vwap.to_numpy()).mean())

assert checks["agg_vwap_nan_frac"] == 0.0, checks["agg_vwap_nan_frac"]
assert checks["agg_vwap_inf_frac"] == 0.0, checks["agg_vwap_inf_frac"]


checks["agg_sum_qty_neg_frac"] = float((long_df["agg_sum_qty"] < 0).mean())
checks["agg_trades_neg_frac"] = float((long_df["agg_trades"] < 0).mean())
assert checks["agg_sum_qty_neg_frac"] == 0.0, checks["agg_sum_qty_neg_frac"]
assert checks["agg_trades_neg_frac"] == 0.0, checks["agg_trades_neg_frac"]

checks["cvd_out_of_bounds_frac"] = float((long_df["agg_cvd_qty"].abs() > (long_df["agg_sum_qty"].abs() + 1e-12)).mean())
assert checks["cvd_out_of_bounds_frac"] == 0.0, checks["cvd_out_of_bounds_frac"]

# funding piecewise const sanity: unique funding_rate counts per 8h block should be small
# block key = floor ts to 8H
blk = long_df["ts"].dt.floor(f"{int(long_df['funding_interval_hours'].mode().iloc[0])}h")
uniq_per_blk = long_df.groupby(["symbol", blk])["funding_rate"].nunique()
checks["funding_unique_per_block_stats"] = qstats(uniq_per_blk)
# no assert here: just report; different exchanges/symbols can change around boundaries

# premium zeros: per-symbol + time-of-day clustering
premium_zero = (long_df["premium_close"] == 0.0)
checks["premium_zero_frac_by_symbol"] = premium_zero.groupby(long_df["symbol"]).mean().to_dict()

tod = long_df["ts"].dt.strftime("%H:%M")
zero_tod = (
    long_df.loc[premium_zero, ["symbol"]]
    .assign(tod=tod[premium_zero].values)
    .groupby(["symbol","tod"])
    .size()
    .sort_values(ascending=False)
    .groupby(level=0)
    .head(10)
)
checks["premium_zero_top_time_of_day_counts"] = {sym: zero_tod.xs(sym).to_dict() for sym in symbols if sym in zero_tod.index.get_level_values(0)}

checks

{'max_abs(volume-agg_sum_qty)_by_symbol': {'ADA-USDT': 9.313225746154785e-10,
  'XRP-USDT': 2.3283064365386963e-10,
  'LINK-USDT': 1.1641532182693481e-10,
  'SOL-USDT': 1.1641532182693481e-10,
  'ETH-USDT': 2.9103830456733704e-11,
  'BNB-USDT': 3.637978807091713e-12,
  'BTC-USDT': 9.094947017729282e-13},
 'global_max_abs(volume-agg_sum_qty)': 9.313225746154785e-10,
 'quote_rel_err_stats': {'rows': 245280,
  'na_frac': 0.0,
  'p01': 0.0,
  'p50': 0.0,
  'p99': 1.966857116250266e-16,
  'min': 0.0,
  'max': 4.199974789647954e-16},
 'neg_diffs(agg_last_trade_id)_by_symbol': {'ADA-USDT': 0,
  'BNB-USDT': 0,
  'BTC-USDT': 0,
  'ETH-USDT': 0,
  'LINK-USDT': 0,
  'SOL-USDT': 0,
  'XRP-USDT': 0},
 'agg_vwap_nan_frac': 0.0,
 'agg_vwap_inf_frac': 0.0,
 'agg_sum_qty_neg_frac': 0.0,
 'agg_trades_neg_frac': 0.0,
 'cvd_out_of_bounds_frac': 0.0,
 'funding_unique_per_block_stats': {'rows': 7665,
  'na_frac': 0.0,
  'p01': 1.0,
  'p50': 1.0,
  'p99': 1.0,
  'min': 1.0,
  'max': 1.0},
 'premium_zero_frac

In [None]:
# E regression: reconstruct wide from long and compare to saved wide (exact or tolerance by dtype)

# --- wide: column-set equality + reorder saved to recon order ---

cols_recon = pd.Index(wide_recon.columns)
cols_saved = pd.Index(wide_cmp.columns)

# must not be missing anything in saved vs recon canonical
missing_in_saved = cols_recon.difference(cols_saved)
extra_in_saved   = cols_saved.difference(cols_recon)

# If recon has extra QA columns, drop them before asserting set equality
if len(missing_in_saved) > 0:
    wide_recon = wide_recon.drop(columns=list(missing_in_saved), errors="ignore")
    cols_recon = pd.Index(wide_recon.columns)
    missing_in_saved = cols_recon.difference(cols_saved)

print("missing_in_saved:", list(missing_in_saved)[:50], "count:", len(missing_in_saved))
print("extra_in_saved  :", list(extra_in_saved)[:50],   "count:", len(extra_in_saved))

assert len(missing_in_saved) == 0 and len(extra_in_saved) == 0, "wide columns set mismatch"

wide_cmp = wide_cmp.loc[:, cols_recon]


missing_in_saved: [] count: 0
extra_in_saved  : [] count: 0


In [68]:
# F: persist QA report (single JSON artifact)
import json

out_dir = DATA_ROOT / "norm" / "qa_reports" / "master_plus_aggtrades" / f"interval={INTERVAL}"
out_dir.mkdir(parents=True, exist_ok=True)

report = {
    "window": {"start_utc": str(START_UTC), "end_excl_utc": str(END_EXCL_UTC), "interval": INTERVAL, "floor_str": FLOOR_STR},
    "shapes": {"long": [int(long_df.shape[0]), int(long_df.shape[1])], "wide": [int(wide_df.shape[0]), int(wide_df.shape[1])]},
    "symbols": symbols,
    "checks": checks,
}

report_path = out_dir / f"qa_master_plus_aggtrades_{START_UTC.strftime('%Y%m%d')}_{END_EXCL_UTC.strftime('%Y%m%d')}.json"
report_path.write_text(json.dumps(report, indent=2))
print("saved:", report_path)


saved: C:\Users\quantbase\Desktop\marketdata\norm\qa_reports\master_plus_aggtrades\interval=15m\qa_master_plus_aggtrades_20250101_20260101.json
