In [10]:
from rich.progress import Progress
from datetime import datetime, timedelta 
import pandas as pd
import numpy as np
import torch

def get_time_list(df: pd.DataFrame, interval: int):
    """Get sampled timestamps at given interval (in seconds)"""
    times = df['trade_time'].unique()
    times = sorted(times)
    # Sample every 'interval' seconds
    return [times[i] for i in range(0, len(times), interval)]

def get_x_y(instrument_id: str, timestamp: str, look_back: int, look_forward: int, df: pd.DataFrame):
    """Get features (X) and target (y) for given timestamp"""
    # Get past data for features
    """
    为每个时间点构建特征矩阵和预测目标
    
    Args:
        instrument_id: 交易品种ID 
        timestamp: 当前时间点
        look_back: 使用多少个历史数据点(如30)
        look_forward: 预测未来多少个数据点(如10)
        df: 原始数据DataFrame
    """
    past_data = df[df['trade_time'] <= timestamp].sort_values('trade_time', ascending=False)
    # Get future data for returns 
    future_data = df[df['trade_time'] > timestamp].sort_values('trade_time')
    
    if (past_data.shape[0] >= look_back) and (future_data.shape[0] >= look_forward):
        # Extract tick data features
        features = past_data.iloc[0:look_back][[
            'last_price', 'highest_price', 'lowest_price', 
            'cum_volume', 'cum_turnover',
            'bid_price1', 'bid_volume1', 'ask_price1', 'ask_volume1'
        ]].fillna(0)
        
        # Calculate target return
        future_prices = future_data['last_price'].iloc[0:look_forward]
        if len(future_prices) >= look_forward:
            ret = future_prices.iloc[-1]/future_prices.iloc[0] - 1
            return features.iloc[::-1].T.values, ret
            
    return None, None

def get_dataset(interval: int, look_back: int, look_forward: int, df: pd.DataFrame):
    """Build dataset from tick data"""
    X_train = []
    y_train = []
    
    # Sample timestamps at interval
    times = df['trade_time'].unique()
    times = sorted(times)
    time_list = [times[i] for i in range(0, len(times), interval)]
    
    with Progress() as progress:
        task = progress.add_task("[red]Processing...", total=len(time_list))
        
        for timestamp in time_list:
            progress.update(task, advance=1)
            instrument_id = df['instrument_id'].iloc[0]  # Assuming single instrument
            x, y = get_x_y(instrument_id, timestamp, look_back, look_forward, df)
            try:
                if (x.shape[0] == 9) & (x.shape[1] == look_back):  # 9 features
                    X_train.append(x)
                    y_train.append(y)
            except:
                continue
                    
    return X_train, y_train

def get_adaptive_dataset(
    df: pd.DataFrame,
    min_price_change: float = 0.0001,  # 最小价格变动阈值（0.01%）
    volume_threshold: float = 100,      # 默认100手
    time_threshold: int = 300,          # 默认5分钟
    volatility_adjust: bool = True      # 是否根据波动率调整阈值
):
    """基于市场活跃度的自适应采样
    
    Args:
        df: 原始tick数据
        min_price_change: 最小价格变动阈值（相对变动）
        volume_threshold: 成交量阈值（单位：手）
        time_threshold: 最大时间间隔（单位：秒）
        volatility_adjust: 是否根据波动率动态调整阈值
    """
    samples = []
    last_sample_time = df['trade_time'].iloc[0]
    last_price = df['last_price'].iloc[0]
    accumulated_volume = df['cum_volume'].iloc[0]
    
    # 计算动态阈值所需的变量
    if volatility_adjust:
        # 计算5分钟波动率
        df['returns'] = df['last_price'].pct_change()
        df['volatility'] = df['returns'].rolling(window=300).std()
        # 填充首个窗口的波动率
        df['volatility'] = df['volatility'].fillna(method='bfill')
        
        # 计算成交量的移动平均
        df['volume_ma'] = df['cum_volume'].diff().rolling(window=300).mean()
        df['volume_ma'] = df['volume_ma'].fillna(method='bfill')
    
    for idx, row in df.iterrows():
        current_time = row['trade_time']
        current_price = row['last_price']
        current_volume = row['cum_volume']
        
        # 基础变化计算
        price_change = abs(current_price - last_price) / last_price
        time_delta = (current_time - last_sample_time).total_seconds()
        volume_delta = current_volume - accumulated_volume
        
        # 动态调整阈值
        if volatility_adjust:
            current_vol = row['volatility']
            current_vol_ma = row['volume_ma']
            
            # 根据波动率调整价格阈值
            adjusted_price_threshold = min_price_change * (1 + current_vol * 100)
            
            # 根据历史成交量调整成交量阈值
            adjusted_volume_threshold = volume_threshold * (1 + current_vol_ma / volume_threshold)
            
            # 根据市场活跃度调整时间阈值
            market_activity = (current_vol * current_vol_ma) / (volume_threshold * min_price_change)
            adjusted_time_threshold = time_threshold * (1 / (1 + market_activity))
            
            # 采样条件
            should_sample = (
                (price_change >= adjusted_price_threshold) or  # 价格变动触发
                (volume_delta >= adjusted_volume_threshold) or # 成交量触发
                (time_delta >= adjusted_time_threshold)       # 时间触发
            )
        else:
            # 使用固定阈值的采样条件
            should_sample = (
                (price_change >= min_price_change) or  # 价格变动触发
                (volume_delta >= volume_threshold) or   # 成交量触发
                (time_delta >= time_threshold)         # 时间触发
            )
        
        # 特殊市场状态检查
        is_special_state = check_market_state(row)
        if is_special_state:
            should_sample = True
        
        if should_sample:
            samples.append(idx)
            last_sample_time = current_time
            last_price = current_price
            accumulated_volume = current_volume
            
    return samples

def add_futures_features(df: pd.DataFrame):
    """添加期货市场特有特征"""
    # 1. 买卖价差
    df['spread'] = df['ask_price1'] - df['bid_price1']
    df['relative_spread'] = df['spread'] / df['last_price']
    
    # 2. 买卖压力失衡
    df['order_imbalance'] = (df['bid_volume1'] - df['ask_volume1']) / (df['bid_volume1'] + df['ask_volume1'])
    
    # 3. 成交量变化
    df['volume_delta'] = df['cum_volume'].diff()
    
    # 4. 价格冲击
    df['price_impact'] = df['volume_delta'] * df['spread']
    
    return df

def build_optimized_dataset(df: pd.DataFrame):
    """构建优化后的数据集"""
    df = add_futures_features(df)
    
    # 3. 自适应采样
    sample_indices = get_adaptive_dataset(
        df,
        min_price_change=0.0001,  # 最小价格变动
        volume_threshold=1000,     # 成交量阈值
        time_threshold=300         # 最大5分钟
    )
    
    # 4. 构建数据集
    X_samples = []
    y_samples = []
    
    for idx in sample_indices:
        x, y = get_x_y(
            instrument_id=df['instrument_id'].iloc[0],
            timestamp=df['trade_time'].iloc[idx],
            look_back=30,
            look_forward=10,
            df=df
        )
        if x is not None and y is not None:
            X_samples.append(x)
            y_samples.append(y)
            
    return X_samples, y_samples

# Read tick data CSV
# Read tick data
df = pd.read_csv('real_tick.csv')

# Convert timestamps
df['trading_day'] = pd.to_datetime(df['trading_day'])
df['trade_time'] = pd.to_datetime(df['trade_time'])

# Split train/test
split_point = int(len(df) * 0.8)
df_train = df.iloc[:split_point]
df_test = df.iloc[split_point:]

# Get training and test datasets
X_train, y_train = get_dataset(
    interval=10,  # Sample every 10 ticks
    look_back=30, # Use 30 past ticks 
    look_forward=10, # Predict return over next 10 ticks
    df=df_train)

X_test, y_test = get_dataset(
    interval=10,
    look_back=30, 
    look_forward=10,
    df=df_test)

# Convert to tensors
trainx = torch.from_numpy(np.array(X_train)).reshape(
    len(X_train), 1, 9, 30)  # N x 1 x 9 x 30
trainy = torch.from_numpy(np.array(y_train)).reshape(
    len(y_train), 1)  # N x 1

print("there are in total", len(X_train), "training samples")
print("there are in total", len(X_test), "testing samples")

# Save arrays
if len(X_train) > 0 and len(X_test) > 0:
    Xa = np.array(X_train)
    ya = np.array(y_train)
    Xe = np.array(X_test)
    ye = np.array(y_test)
    np.save('./X_train.npy', Xa)
    np.save('./y_train.npy', ya)
    np.save('./X_test.npy', Xe)
    np.save('./y_test.npy', ye)
else:
    print("No valid samples were generated")

there are in total 433 training samples
there are in total 106 testing samples
