In [2]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional
import warnings
import talib as ta
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
import os

def _ensure_datetime(series: pd.Series) -> pd.Series:
    # 若已是datetime类型，直接返回
    if pd.api.types.is_datetime64_any_dtype(series):
        return series
    
    # 安全转换为数值类型（非数值转为NaN，避免astype直接报错）
    s = pd.to_numeric(series, errors='coerce')
    
    # 检查是否存在无法转换的非数值
    if s.isna().any():
        warnings.warn("序列中包含无法转换为数值的元素，已自动转为NaT")
    
    # 避免空序列导致的判断错误
    if s.empty:
        return pd.Series([], dtype='datetime64[ns]')
    
    # 基于2025年前后的合理时间戳范围设置阈值（单位：对应单位的数量）
    # 参考：2025年的时间戳约为 1.7e9秒 ≈ 1.7e12毫秒 ≈ 1.7e15微秒 ≈ 1.7e18纳秒
    ns_threshold = 1e17  # 纳秒级阈值（约317年，覆盖合理时间范围）
    us_threshold = 1e14  # 微秒级阈值（约317年）
    ms_threshold = 1e11  # 毫秒级阈值（约317年）
    s_abs = s.abs()  # 用绝对值判断量级，保留原始符号用于转换（支持负时间戳）
    
    # 按any()逻辑判断单位（只要有一个元素满足阈值就用对应单位）
    if (s_abs > ns_threshold).any():
        return pd.to_datetime(s, unit='ns', errors='coerce')
    elif (s_abs > us_threshold).any():
        return pd.to_datetime(s, unit='us', errors='coerce')
    elif (s_abs > ms_threshold).any():
        return pd.to_datetime(s, unit='ms', errors='coerce')
    else:
        return pd.to_datetime(s, unit='s', errors='coerce')

In [3]:
def build_dollar_bars(
    trades: pd.DataFrame,
    dollar_threshold: float,
) -> pd.DataFrame:
    """
    生成dollar bars，确保bar_id连续递增。
    
    改进点：
    1. 重构bar_id计算逻辑，通过跟踪累积成交额确保连续
    2. 避免因单笔大额交易导致的bar_id跳跃
    3. 仅过滤最后一个不完整的bar（若存在）
    """
    trades['time'] = _ensure_datetime(trades['time'])
    trades = trades.sort_values('time').reset_index(drop=True)
    df = trades.copy()
    # 处理时间列和排序
    # df['time'] = _ensure_datetime(df['time'])
    # df = df.sort_values('time').reset_index(drop=True)
    
    # 计算成交额（quote_qty）
    if 'quote_qty' not in df.columns or df['quote_qty'].isna().all():
        df['quote_qty'] = df['price'] * df['qty']
    
    # 标记交易方向
    df['trade_sign'] = np.where(df['is_buyer_maker'], -1, 1)
    df['buy_qty'] = df['qty'].where(df['trade_sign'] > 0, 0.0)
    df['sell_qty'] = df['qty'].where(df['trade_sign'] < 0, 0.0)
    
    # 核心改进：逐笔计算bar_id，确保连续递增，并在单次遍历内累积 bar 级指标
    # 思路：
    # 1) 用累计成交额达到阈值时“结算”一个 bar，当前交易仍计入该 bar。
    # 2) 在 bar 内用对数价格增量累积 rv、用相邻 |r| 的乘积累积 bpv，避免二次遍历。
    # 3) 同步累计成交量、带符号的成交量/成交额、价格*数量之和以便计算 vwap。
    cumulative = 0.0  # 累积成交额
    bar_id = 0        # 当前bar_id
    bar_ids = []      # 存储每个交易的bar_id
    bar_trade_counts = {}

    # per-bar 累积器（在 bar 结算时写入 metrics 字典）
    per_bar_metrics = {}
    curr_sum_qty = 0.0
    curr_sum_signed_qty = 0.0
    curr_sum_quote = 0.0
    curr_sum_signed_quote = 0.0
    curr_sum_pxqty = 0.0
    curr_rv = 0.0
    curr_bpv = 0.0
    curr_abs_r_sum = 0.0
    prev_logp_in_bar = None
    prev_abs_r_in_bar = None
    curr_trades = 0
    bar_start_time = None
    bar_start_idx = None

    prices = df['price'].to_numpy(dtype=float)
    qtys = df['qty'].to_numpy(dtype=float)
    quotes = df['quote_qty'].to_numpy(dtype=float)
    signs = np.where(df['is_buyer_maker'].to_numpy(), -1.0, 1.0)
    times = pd.to_datetime(df['time']).to_numpy()

    for i, (q_quote, q_qty, p, sgn, t) in enumerate(zip(quotes, qtys, prices, signs, times)):
        logp = float(np.log(p)) if p > 0 else np.nan

        # 初始化当前 bar 的起点信息
        if curr_trades == 0:
            bar_start_time = pd.Timestamp(t)
            bar_start_idx = i

        # 累积交易级指标
        # r 为对数收益增量；rv = Σ r^2；bpv 近似 = Σ |r_t||r_{t-1}|；abs_return_sum = Σ |r|
        if prev_logp_in_bar is not None and np.isfinite(logp):
            r = logp - prev_logp_in_bar
            abs_r = abs(r)
            curr_rv += r * r
            if prev_abs_r_in_bar is not None:
                curr_bpv += prev_abs_r_in_bar * abs_r
            curr_abs_r_sum += abs_r
            prev_abs_r_in_bar = abs_r
        prev_logp_in_bar = logp if np.isfinite(logp) else prev_logp_in_bar

        curr_sum_qty += float(q_qty)
        curr_sum_signed_qty += float(sgn * q_qty)
        curr_sum_quote += float(q_quote)
        curr_sum_signed_quote += float(sgn * q_quote)
        curr_sum_pxqty += float(p * q_qty)
        curr_trades += 1

        cumulative += q_quote
        bar_trade_counts[bar_id] = bar_trade_counts.get(bar_id, 0) + 1
        bar_ids.append(bar_id)

        # 结算 bar（当前交易仍属于当前 bar）
        if cumulative >= dollar_threshold:
            bar_end_time = pd.Timestamp(t)
            bar_end_idx = i
            duration_sec = max(1.0, (bar_end_time - bar_start_time).total_seconds())
            vwap = (curr_sum_pxqty / curr_sum_qty) if curr_sum_qty > 0 else np.nan
            per_bar_metrics[bar_id] = {
                'signed_volume': curr_sum_signed_qty,
                'signed_dollar': curr_sum_signed_quote,
                'pxqty_sum': curr_sum_pxqty,
                'vwap': vwap,
                'rv': curr_rv,
                'bpv': curr_bpv,
                'jump_rv_bpv': max(curr_rv - curr_bpv, 0.0) if np.isfinite(curr_rv) and np.isfinite(curr_bpv) else np.nan,
                'abs_return_sum': curr_abs_r_sum,
                'bar_duration_sec': duration_sec,
                'intensity': float(curr_trades) / duration_sec if duration_sec > 0 else np.nan,
                'start_trade_idx': bar_start_idx,
                'end_trade_idx': bar_end_idx,
            }

            # 为下一个 bar 重置累积器
            cumulative -= dollar_threshold
            bar_id += 1
            curr_sum_qty = 0.0
            curr_sum_signed_qty = 0.0
            curr_sum_quote = 0.0
            curr_sum_signed_quote = 0.0
            curr_sum_pxqty = 0.0
            curr_rv = 0.0
            curr_bpv = 0.0
            curr_abs_r_sum = 0.0
            prev_logp_in_bar = None
            prev_abs_r_in_bar = None
            curr_trades = 0
            bar_start_time = None
            bar_start_idx = None
    
    
    
    df['bar_id'] = bar_ids
    df['trades'] = df['bar_id'].map(bar_trade_counts)  # 关键修复：通过map对齐

    df = df.reset_index().rename(columns={'index': 'original_index'})
    # 分组聚合
    agg = {
        'time': ['first', 'last'],
        'price': ['first', 'max', 'min', 'last'],
        'qty': 'sum',
        'quote_qty': 'sum',
        'buy_qty': 'sum',
        'sell_qty': 'sum',
        'trades': 'first',
        'original_index': ['first', 'last']
    }
    
    g = df.groupby('bar_id', sort=True).agg(agg)
    
    # 展平列名
    g.columns = [
        'start_time', 'end_time',
        'open', 'high', 'low', 'close',
        'volume', 'dollar_value',
        'buy_volume', 'sell_volume',
        'trades','start_trade_idx', 
        'end_trade_idx'
    ]
    
    # 合并单次遍历累积得到的 per-bar 指标
    if len(per_bar_metrics) > 0:
        metrics_df = pd.DataFrame.from_dict(per_bar_metrics, orient='index')
        # 只合并新增指标，避免覆盖 groupby 的 start/end_trade_idx
        cols_to_join = [c for c in [
            'signed_volume','signed_dollar','pxqty_sum','vwap','rv','bpv','jump_rv_bpv','abs_return_sum','bar_duration_sec','intensity'
        ] if c in metrics_df.columns]
        g = g.join(metrics_df[cols_to_join], how='left')
    
    
    
    # 仅过滤最后一个可能不完整的bar（若其成交额不足阈值）
    if not g.empty and g.iloc[-1]['dollar_value'] < dollar_threshold:
        g = g.iloc[:-1]
    
    # 重置bar_id为连续整数（避免因过滤最后一个bar导致的断档）
    g = g.reset_index(drop=True)
    g['bar_id'] = g.index
    
    return g

In [6]:
def generate_date_range(start_date, end_date):    
    start = datetime.strptime(start_date, '%Y-%m-%d')
    end = datetime.strptime(end_date, '%Y-%m-%d')
    
    date_list = []
    current = start
    while current <= end:
        date_list.append(current.strftime('%Y-%m-%d'))
        current += timedelta(days=1)
    return date_list

raw_df = []
daily = True
start_date = '2025-06-01'
end_date = '2025-06-01'
dollar_threshold=10000*6000
dollar_threshold_str = str(dollar_threshold).replace("*", "_")
trades_zip_path = None

# 聚合成dollar bar的数据
bar_zip_path = None

if daily:
    trades_zip_path = f'/Users/aming/project/python/crypto-trade/output/trades-{start_date}-{end_date}-{dollar_threshold_str}.zip'
    bar_zip_path = f'/Users/aming/project/python/crypto-trade/output/bars-{start_date}-{end_date}-{dollar_threshold_str}.zip'
    date_list = generate_date_range(start_date, end_date)
    for date in date_list:
        raw_df.append(pd.read_csv(f'/Volumes/Ext-Disk/data/futures/um/daily/trades/ETHUSDT/ETHUSDT-trades-{date}.zip'))
else:
    trades_zip_path = f'/Volumes/Ext-Disk/data/futures/um/monthly/trades/ETHUSDT/ETHUSDT-trades-2025-05.zip'
    bar_zip_path = f'/Users/aming/project/python/crypto-trade/output/bars-2025-05-01-2025-06-01-{dollar_threshold_str}.zip'
    raw_df.append(pd.read_csv(trades_zip_path))

trades_df = pd.concat(raw_df, ignore_index=True)

In [7]:
bars = build_dollar_bars(trades_df, dollar_threshold=dollar_threshold)

In [None]:
bars