In [103]:
import pandas as pd

# 加载所有 parquet 文件
df_trade = pd.read_parquet("export_data/trade_updates.parquet")
df_order = pd.read_parquet("export_data/order_updates.parquet")
df_open_sig = pd.read_parquet("export_data/signals_arb_open.parquet")
df_hedge_sig = pd.read_parquet("export_data/signals_arb_hedge.parquet")
df_cancel_sig = pd.read_parquet("export_data/signals_arb_cancel.parquet")
df_close_sig = pd.read_parquet("export_data/signals_arb_close.parquet")

# 提取 strategy_id
# fn extract_strategy_id(order_id: i64) -> i32 {
#     (order_id >> 32) as i32
# }
df_trade["strategy_id"] = df_trade["client_order_id"].apply(lambda x: (x >> 32) & 0xFFFFFFFF)
df_order["strategy_id"] = df_order["client_order_id"].apply(lambda x: (x >> 32) & 0xFFFFFFFF)

print(f"trade_updates: {len(df_trade)} rows")
print(f"order_updates: {len(df_order)} rows")
print(f"signals_arb_open: {len(df_open_sig)} rows")
print(f"signals_arb_hedge: {len(df_hedge_sig)} rows")
print(f"signals_arb_cancel: {len(df_cancel_sig)} rows")
print(f"signals_arb_close: {len(df_close_sig)} rows")

trade_updates: 17211 rows
order_updates: 179817 rows
signals_arb_open: 208404 rows
signals_arb_hedge: 16129 rows
signals_arb_cancel: 184029 rows
signals_arb_close: 0 rows


In [None]:
import pandas as pd
pd.read_parquet("/home/ubuntu/crypto_mkt/mkt_signal/test_export.parquet")

In [104]:
# 聚合行情，把开仓、对冲的整个过程聚合到一个dataframe
# 筛选相关的订单和成交
symbol_filter = "SLPUSDT"
df_order = df_order[df_order["symbol"] == symbol_filter].copy()
df_trade = df_trade[df_trade["symbol"] == symbol_filter].copy()
print(f"ANKRUSDT orders: {len(df_order)} rows")
print(f"ANKRUSDT trades: {len(df_trade)} rows")
df_order.columns

ANKRUSDT orders: 4376 rows
ANKRUSDT trades: 239 rows


Index(['key', 'ts_us', 'event_time', 'symbol', 'order_id', 'client_order_id',
       'client_order_id_str', 'side', 'order_type', 'time_in_force', 'price',
       'quantity', 'last_executed_qty', 'cumulative_filled_quantity', 'status',
       'raw_status', 'execution_type', 'raw_execution_type', 'trading_venue',
       'average_price', 'last_executed_price', 'business_unit', 'strategy_id'],
      dtype='object')

In [105]:
# 筛选对应 symbol 的信号
df_open_sig = df_open_sig[df_open_sig["opening_symbol"] == symbol_filter].copy()
df_hedge_sig = df_hedge_sig[df_hedge_sig["hedging_symbol"] == symbol_filter].copy()
df_close_sig = df_close_sig[df_close_sig["opening_symbol"] == symbol_filter].copy() if len(df_close_sig) > 0 else df_close_sig.copy()

# 合并 open_sig 和 close_sig，增加 signal_type 标记
df_open_sig["signal_type"] = "open"
if len(df_close_sig) > 0:
    df_close_sig["signal_type"] = "close"
    df_all_open_sig = pd.concat([df_open_sig, df_close_sig], ignore_index=True)
else:
    df_all_open_sig = df_open_sig.copy()

print(f"open_sig: {len(df_open_sig)}, close_sig: {len(df_close_sig)}, all_open_sig: {len(df_all_open_sig)}")

open_sig: 1942, close_sig: 0, all_open_sig: 1942


In [106]:
#1、首先根据df_order["status"] 找到开仓的记录
#2、通过 strategy_id + trading_venue 判断是开仓侧还是平仓侧
#3、根据 order_side 关联对应的信号数据
#4、添加时间字段

# 筛选 status == "NEW" 的订单，移除多余字段
drop_cols = ["key", "ts_us", "client_order_id_str", "raw_status", "raw_execution_type", 
             "average_price", "last_executed_price", "business_unit", "time_in_force",
             "last_executed_qty", "cumulative_filled_quantity", "execution_type"]
df_new_orders = df_order[df_order["status"] == "NEW"].copy()
df_new_orders.drop(columns=[c for c in drop_cols if c in df_new_orders.columns], inplace=True)

# 通过 strategy_id 关联 df_all_open_sig，获取 opening_venue, hedging_venue
df_new_orders = df_new_orders.merge(
    df_all_open_sig[["strategy_id", "opening_venue", "hedging_venue"]].drop_duplicates("strategy_id"),
    on="strategy_id",
    how="left"
)

# 根据 trading_venue 判断成交方向
def classify_order_side(row):
    if pd.isna(row["opening_venue"]):
        return pd.NA
    elif row["trading_venue"] == row["opening_venue"]:
        return "open"
    elif row["trading_venue"] == row["hedging_venue"]:
        return "hedge"
    else:
        return pd.NA

df_new_orders["order_side"] = df_new_orders.apply(classify_order_side, axis=1)

# 分离开仓侧和对冲侧订单
df_open_orders = df_new_orders[df_new_orders["order_side"] == "open"].copy()
df_hedge_orders = df_new_orders[df_new_orders["order_side"] == "hedge"].copy()

# 开仓侧订单：关联 df_all_open_sig 获取完整盘口信息 + create_ts
df_open_orders = df_open_orders.merge(
    df_all_open_sig[["strategy_id", "create_ts", "opening_bid0", "opening_ask0", "hedging_bid0", "hedging_ask0", "price_offset"]].drop_duplicates("strategy_id"),
    on="strategy_id",
    how="left"
)

# 对冲侧订单：需要找到 event_time * 1000 > market_ts 且最接近的信号
# 准备 hedge_sig 数据，按 strategy_id 和 market_ts 排序
df_hedge_sig_sorted = df_hedge_sig.sort_values(["strategy_id", "market_ts"])

def match_hedge_signal(order_row):
    """找到 event_time * 1000 > market_ts 且最接近的对冲信号"""
    sid = order_row["strategy_id"]
    event_time_us = order_row["event_time"] * 1000  # ms -> us
    
    # 找该 strategy_id 的所有对冲信号
    signals = df_hedge_sig_sorted[df_hedge_sig_sorted["strategy_id"] == sid]
    if len(signals) == 0:
        return pd.Series({
            "create_ts": pd.NA,
            "hedging_bid0": pd.NA,
            "hedging_ask0": pd.NA,
            "price_offset": pd.NA
        })
    
    # 找 market_ts < event_time_us 的信号中最大的那个（最接近的）
    valid_signals = signals[signals["market_ts"] < event_time_us]
    if len(valid_signals) == 0:
        return pd.Series({
            "create_ts": pd.NA,
            "hedging_bid0": pd.NA,
            "hedging_ask0": pd.NA,
            "price_offset": pd.NA
        })
    
    # 取 market_ts 最大的（最接近 event_time 的）
    best_signal = valid_signals.loc[valid_signals["market_ts"].idxmax()]
    return pd.Series({
        "create_ts": best_signal["market_ts"],
        "hedging_bid0": best_signal["hedging_bid0"],
        "hedging_ask0": best_signal["hedging_ask0"],
        "price_offset": best_signal["price_offset"]
    })

# 对每个对冲订单匹配信号
if len(df_hedge_orders) > 0:
    hedge_matched = df_hedge_orders.apply(match_hedge_signal, axis=1)
    df_hedge_orders = df_hedge_orders.reset_index(drop=True)
    df_hedge_orders["create_ts"] = hedge_matched["create_ts"].values
    df_hedge_orders["hedging_bid0"] = hedge_matched["hedging_bid0"].values
    df_hedge_orders["hedging_ask0"] = hedge_matched["hedging_ask0"].values
    df_hedge_orders["price_offset"] = hedge_matched["price_offset"].values

# 对冲侧：opening 相关字段为 NaN
df_hedge_orders["opening_venue"] = pd.NA
df_hedge_orders["opening_bid0"] = pd.NA
df_hedge_orders["opening_ask0"] = pd.NA

# 合并回去
df_new_orders = pd.concat([df_open_orders, df_hedge_orders], ignore_index=True)

# 添加时间字段
df_new_orders["update_ts"] = 0  # 先填充为0
# local_ts: 交易所响应时间 (ms -> us)
df_new_orders["local_ts"] = df_new_orders["event_time"] * 1000  

# 移除多余的中间字段
df_new_orders.drop(columns=["event_time"], inplace=True, errors="ignore")

# 查看结果
print(f"NEW orders: {len(df_new_orders)}")
print(f"开仓侧订单: {len(df_open_orders)}")
print(f"对冲侧订单: {len(df_hedge_orders)}")

df_new_orders

NEW orders: 2271
开仓侧订单: 1942
对冲侧订单: 329


  df_new_orders = pd.concat([df_open_orders, df_hedge_orders], ignore_index=True)


Unnamed: 0,symbol,order_id,client_order_id,side,order_type,price,quantity,status,trading_venue,strategy_id,...,hedging_venue,order_side,create_ts,opening_bid0,opening_ask0,hedging_bid0,hedging_ask0,price_offset,update_ts,local_ts
0,SLPUSDT,916063648,2837303851558633473,BUY,LIMIT,0.000925,108092.0,NEW,BinanceMargin,660611282,...,BinanceUm,open,1.764330e+15,0.000927,0.000929,0.000929,0.000930,0.0020,0,1764330276356000
1,SLPUSDT,916063647,2837302932435632129,BUY,LIMIT,0.000925,108070.0,NEW,BinanceMargin,660611068,...,BinanceUm,open,1.764330e+15,0.000927,0.000929,0.000929,0.000930,0.0018,0,1764330276332000
2,SLPUSDT,916063646,2837301991837794305,BUY,LIMIT,0.000925,108048.0,NEW,BinanceMargin,660610849,...,BinanceUm,open,1.764330e+15,0.000927,0.000929,0.000929,0.000930,0.0016,0,1764330276309000
3,SLPUSDT,916063645,2837301003995316225,BUY,LIMIT,0.000925,108027.0,NEW,BinanceMargin,660610619,...,BinanceUm,open,1.764330e+15,0.000927,0.000929,0.000929,0.000930,0.0014,0,1764330276287000
4,SLPUSDT,916063643,2837299934548459521,BUY,LIMIT,0.000925,108005.0,NEW,BinanceMargin,660610370,...,BinanceUm,open,1.764330e+15,0.000927,0.000929,0.000929,0.000930,0.0012,0,1764330276255000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2266,SLPUSDT,115093373,9085360405328429058,SELL,LIMIT,0.000947,105815.0,NEW,BinanceUm,2115350311,...,BinanceUm,hedge,1.764270e+15,,,0.000946,0.000947,0.0010,0,1764269522162000
2267,SLPUSDT,115093371,9085359370241310722,SELL,LIMIT,0.000947,105793.0,NEW,BinanceUm,2115350070,...,BinanceUm,hedge,1.764270e+15,,,0.000946,0.000947,0.0010,0,1764269522153000
2268,SLPUSDT,115093368,9085358373808898050,SELL,LIMIT,0.000947,105772.0,NEW,BinanceUm,2115349838,...,BinanceUm,hedge,1.764270e+15,,,0.000946,0.000947,0.0010,0,1764269522142000
2269,SLPUSDT,115093367,9085357308657008642,SELL,LIMIT,0.000947,105751.0,NEW,BinanceUm,2115349590,...,BinanceUm,hedge,1.764270e+15,,,0.000946,0.000947,0.0010,0,1764269522133000


In [107]:
# 根据 order_id 查找后续的订单更新和成交记录
# 1. 找 order update 中的 cancel 记录
# 2. 或找 trade update 中的完全成交记录

# 准备 cancel 订单数据 (status == "CANCELED")
df_cancel_orders = df_order[df_order["status"] == "CANCELED"].copy()

# 准备 trade 数据
df_trade_sorted = df_trade.sort_values(["order_id", "event_time"])

def find_order_outcome(row):
    """
    查找订单的最终状态:
    1. 先找 order update 中是否有 cancel 记录
    2. 如果没有，找 trade update 中的最后一条成交
    """
    oid = row["order_id"]
    
    # 1. 查找是否有 cancel 记录
    cancel_records = df_cancel_orders[df_cancel_orders["order_id"] == oid]
    if len(cancel_records) > 0:
        cancel_row = cancel_records.iloc[-1]  # 取最后一条
        return pd.Series({
            "status": "CANCELED",
            "update_ts": cancel_row["event_time"] * 1000,  # ms -> us
            "filled_qty": cancel_row["cumulative_filled_quantity"]
        })
    
    # 2. 没有 cancel，查找 trade update
    trade_records = df_trade_sorted[df_trade_sorted["order_id"] == oid]
    if len(trade_records) > 0:
        last_trade = trade_records.iloc[-1]  # 取最后一条成交
        return pd.Series({
            "status": "FILLED",
            "update_ts": last_trade["event_time"] * 1000,  # ms -> us
            "filled_qty": last_trade["cumulative_filled_quantity"]
        })
    
    # 3. 都没找到，状态未知
    return pd.Series({
        "status": "UNKNOWN",
        "update_ts": pd.NA,
        "filled_qty": pd.NA
    })

# 对每个订单查找最终状态
outcome_df = df_new_orders.apply(find_order_outcome, axis=1)
df_new_orders["status"] = outcome_df["status"]
df_new_orders["update_ts"] = outcome_df["update_ts"]
df_new_orders["filled_qty"] = outcome_df["filled_qty"]

# 统计结果
print(f"订单最终状态统计:")
print(df_new_orders["status"].value_counts())

# 检查 UNKNOWN 状态
df_unknown = df_new_orders[df_new_orders["status"] == "UNKNOWN"]
if len(df_unknown) > 0:
    print(f"\n⚠️ 存在 {len(df_unknown)} 条 UNKNOWN 状态订单:")
    print(df_unknown[["symbol", "strategy_id", "order_side", "order_id", "quantity"]])
else:
    print(f"\n✅ 没有 UNKNOWN 状态订单")

# 查看结果
df_new_orders.to_parquet("res.parquet")

订单最终状态统计:
status
CANCELED    2105
FILLED       166
Name: count, dtype: int64

✅ 没有 UNKNOWN 状态订单


In [108]:
df_new_orders[df_new_orders["trading_venue"] == "BinanceUm"][df_new_orders["status"] != "UNKNOWN"]["filled_qty"].sum()

  df_new_orders[df_new_orders["trading_venue"] == "BinanceUm"][df_new_orders["status"] != "UNKNOWN"]["filled_qty"].sum()


np.float64(7959151.0)

In [110]:
df_new_orders[df_new_orders["trading_venue"] == "BinanceUm"][df_new_orders["status"] == "FILLED"]

  df_new_orders[df_new_orders["trading_venue"] == "BinanceUm"][df_new_orders["status"] == "FILLED"]


Unnamed: 0,symbol,order_id,client_order_id,side,order_type,price,quantity,status,trading_venue,strategy_id,...,order_side,create_ts,opening_bid0,opening_ask0,hedging_bid0,hedging_ask0,price_offset,update_ts,local_ts,filled_qty
1942,SLPUSDT,115174953,4638282710782050308,SELL,LIMIT,0.000931,107225.0,FILLED,BinanceUm,1079934349,...,hedge,1.764292e+15,,,0.000930,0.000931,0.001,1764292384665000,1764292316291000,107225.0
1943,SLPUSDT,115174952,4685203360180076548,SELL,LIMIT,0.000931,107246.0,FILLED,BinanceUm,1090858914,...,hedge,1.764292e+15,,,0.000930,0.000931,0.001,1764292384665000,1764292316282000,107246.0
1944,SLPUSDT,115174951,4638280726507159556,SELL,LIMIT,0.000931,107203.0,FILLED,BinanceUm,1079933887,...,hedge,1.764292e+15,,,0.000930,0.000931,0.001,1764292384665000,1764292316277000,107203.0
1945,SLPUSDT,115174950,4611569534619353092,SELL,LIMIT,0.000931,107267.0,FILLED,BinanceUm,1073714703,...,hedge,1.764292e+15,,,0.000930,0.000931,0.001,1764292384665000,1764292316270000,107267.0
1946,SLPUSDT,115174949,4685204459691704324,SELL,LIMIT,0.000931,107267.0,FILLED,BinanceUm,1090859170,...,hedge,1.764292e+15,,,0.000930,0.000931,0.001,1764292384665000,1764292316262000,107267.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2223,SLPUSDT,115094224,9107318189406027781,SELL,LIMIT,0.000946,105730.0,FILLED,BinanceUm,2120462756,...,hedge,1.764270e+15,,,0.000945,0.000946,0.001,1764269798575000,1764269792448000,105730.0
2224,SLPUSDT,115094223,9085359370241310725,SELL,LIMIT,0.000946,105793.0,FILLED,BinanceUm,2115350070,...,hedge,1.764270e+15,,,0.000945,0.000946,0.001,1764269798574000,1764269792442000,105793.0
2225,SLPUSDT,115094222,9085355616439894021,SELL,LIMIT,0.000946,105730.0,FILLED,BinanceUm,2115349196,...,hedge,1.764270e+15,,,0.000945,0.000946,0.001,1764269798572000,1764269792431000,105730.0
2239,SLPUSDT,115094018,9107320143616147460,SELL,LIMIT,0.000946,105751.0,FILLED,BinanceUm,2120463211,...,hedge,1.764270e+15,,,0.000945,0.000946,0.001,1764269773689000,1764269702438000,105751.0


In [None]:
dt = pd.to_datetime(9085357308657008644, unit='us', utc=True)