In [12]:
import pandas as pd
import numpy as np
import os
from tqdm import tqdm  # 进度条工具（可选）
from pathlib import Path
from datetime import datetime, timedelta
import re
import glob

In [13]:
# 数据加载与预处理
def load_and_preprocess_data(data_path):
    """
    加载所有CSV 通过文件名构建交易日历，创建正确的时间戳，并合并。
    """
    print("Scanning filenames to build trading calendar...")
    all_files = sorted(glob.glob(os.path.join(data_path, "*.csv")))
    
    # 从文件名中提取所有交易日，并去重排序
    trading_dates_str = sorted(list(set([os.path.basename(f).split('_')[1].split('.')[0] for f in all_files])))
    
    # 创建从一个交易日到其前一个交易日的映射字典
    prev_trading_date_map = {trading_dates_str[i]: trading_dates_str[i-1] for i in range(1, len(trading_dates_str))}
    
    df_list = []
    
    print("Processing each file with the new calendar logic...")
    for file_path in all_files:
        try:
            # 从文件名提取当前文件的交易日
            current_trading_date_str = os.path.basename(file_path).split('_')[1].split('.')[0]
            current_trading_date = pd.to_datetime(current_trading_date_str, format='%Y%m%d')
            
            # 查找前一个交易日
            prev_date_str = prev_trading_date_map.get(current_trading_date_str)
            if not prev_date_str:
                # 对于的第一个文件，假设其夜盘是前一个日历日 这种情况仅对数据集最开始的那个夜盘有影响。
                prev_calendar_date = current_trading_date - timedelta(days=1)
            else:
                prev_calendar_date = pd.to_datetime(prev_date_str, format='%Y%m%d')

            required_cols = ['InstrumentID', 'UpdateTime', 'UpdateMillisec', 'LastPrice'] 
            df = pd.read_csv(file_path, usecols = required_cols)
            df['date'] = current_trading_date_str
            print(f'{file_path} loaded...')

            # 创建真实时间戳
            def get_true_timestamp(row):
                time_part = pd.to_datetime(row['UpdateTime'], format='%H:%M:%S').time()
                
                # 夜盘的日历日期
                if time_part.hour >= 21:
                    calendar_date = prev_calendar_date
                # 日盘的日历日期
                else:
                    calendar_date = current_trading_date
                
                # 合并日期、时间和毫秒
                return pd.Timestamp(calendar_date.year, calendar_date.month, calendar_date.day,
                                    time_part.hour, time_part.minute, time_part.second) + \
                       pd.to_timedelta(row['UpdateMillisec'], unit='ms')

            df['Timestamp'] = df.apply(get_true_timestamp, axis=1)
            df_list.append(df)
            
        except Exception as e:
            print(f"Error processing file {file_path}: {e}")
            
    # 合并所有数据并按时间戳排序
    full_df = pd.concat(df_list, ignore_index=True)
    full_df = full_df.sort_values('Timestamp').reset_index(drop=True)
    
    return full_df


# --- 其他函数保持不变 ---

def filter_by_trading_hours(df):
    """
    只考虑连续竞价
    """
    df['Time'] = df['Timestamp'].dt.time
    
    # 定义交易时间段
    night_session = (df['Time'] >= pd.to_datetime('21:00:00').time()) & \
                    (df['Time'] <= pd.to_datetime('23:00:00').time())
    
    morning_session_1 = (df['Time'] >= pd.to_datetime('09:00:00').time()) & \
                        (df['Time'] <= pd.to_datetime('10:15:00').time())
    
    morning_session_2 = (df['Time'] >= pd.to_datetime('10:30:00').time()) & \
                        (df['Time'] <= pd.to_datetime('11:30:00').time())
                        
    afternoon_session = (df['Time'] >= pd.to_datetime('13:30:00').time()) & \
                        (df['Time'] <= pd.to_datetime('15:00:00').time())
                        
    # 过滤
    valid_time_mask =  morning_session_1 | morning_session_2 | afternoon_session
    
    return df[valid_time_mask].drop(columns=['Time']).copy()

def generate_5min_klines(df):
    """
    从tick数据生成固定切割的5分钟OHLC K线。如 09:00~09:05, 09:05~09:10等
    """
    klines_5min = df.set_index('Timestamp')['LastPrice'].resample('5min').agg(
        Open='first',
        High='max',
        Low='min',
        Close='last'
    ).dropna() # 出现缺失值意味着非交易时间的K线 直接删除
    
    return klines_5min

def calculate_tick_atr(df, klines_5min):
    """
    计算每条tick对应的ATR。
    """
    print("Preparing historical K-line data for ATR calculation...")
    klines_5min_shifted = klines_5min.copy()
    for i in range(1, 13):
        klines_5min_shifted[f'High_p{i}'] = klines_5min['High'].shift(i)
        klines_5min_shifted[f'Low_p{i}'] = klines_5min['Low'].shift(i)
        klines_5min_shifted[f'Close_p{i}'] = klines_5min['Close'].shift(i)
        
    df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')
    df = pd.merge(df, klines_5min_shifted, left_on='kline_timestamp', right_index=True, ) # how='left'

    print("Calculating values for the current K-line...")
    df['Current_K_High'] = df.groupby('kline_timestamp')['LastPrice'].cummax()
    df['Current_K_Low'] = df.groupby('kline_timestamp')['LastPrice'].cummin()
    
    print("Calculating 12 True Ranges (TR)...")
    tr_cols = []
    
    # TR_1
    true_range_1_h_l = df['Current_K_High'] - df['Current_K_Low']
    true_range_1_h_pc = (df['Current_K_High'] - df['Close_p1']).abs()
    true_range_1_l_pc = (df['Current_K_Low'] - df['Close_p1']).abs()
    df['TR_1'] = np.maximum.reduce([true_range_1_h_l, true_range_1_h_pc, true_range_1_l_pc])
    tr_cols.append('TR_1')

    # TR_2 to TR_12
    for i in range(2, 13):
        prev_k_high = df[f'High_p{i-1}']
        prev_k_low = df[f'Low_p{i-1}']
        prev_k_close = df[f'Close_p{i}']
        tr_h_l = prev_k_high - prev_k_low
        tr_h_pc = (prev_k_high - prev_k_close).abs()
        tr_l_pc = (prev_k_low - prev_k_close).abs()
        df[f'TR_{i}'] = np.maximum.reduce([tr_h_l, tr_h_pc, tr_l_pc])
        tr_cols.append(f'TR_{i}')

    print("Calculating final ATR...")
    df['ATR'] = df[tr_cols].mean(axis=1,skipna=False)

    final_cols = ['date', 'UpdateTime', 'UpdateMillisec','Timestamp', 'kline_timestamp', 'InstrumentID','LastPrice', 'ATR']
    final_df = df[final_cols].dropna(subset=['ATR'])
    
    return final_df

# 生成交易时间列表（间隔1秒）
def generate_trading_hours():
    morning_start = datetime.strptime("09:00:00", "%H:%M:%S").time()
    morning_end = datetime.strptime("11:30:00", "%H:%M:%S").time()
    afternoon_start = datetime.strptime("13:30:00", "%H:%M:%S").time()
    afternoon_end = datetime.strptime("14:59:59", "%H:%M:%S").time()
    
    time_list = []
    current_time = morning_start
    while current_time <= morning_end:
        time_list.append(current_time.strftime("%H:%M:%S"))
        current_time = (datetime.combine(datetime.min, current_time) + 
                       timedelta(seconds=1)).time()
    
    current_time = afternoon_start
    while current_time <= afternoon_end:
        time_list.append(current_time.strftime("%H:%M:%S"))
        current_time = (datetime.combine(datetime.min, current_time) + 
                       timedelta(seconds=1)).time()
    
    return time_list

def process_trading_day_ticks(df, time_univ, session_mark):
    
    # 提取交易日
    market_df = df.copy()

    market_df['UpdateMillisec'] = market_df['UpdateMillisec'].astype(int)

    template_df = pd.DataFrame({
        'UpdateTime': time_univ,
        'UpdateMillisec': 0  # 为我们的切片时间戳设置毫秒为0
    })

    combined_df = pd.concat([market_df, template_df], ignore_index=True)

    combined_df['session_part'] = (combined_df['UpdateTime'].str[:2].astype(int) < session_mark).astype(int)

    combined_df.sort_values(
        by=['session_part', 'UpdateTime', 'UpdateMillisec'], 
        inplace=True,
        kind='mergesort'
    )
    combined_df = combined_df.ffill()

    result_df = combined_df[combined_df['UpdateTime'].isin(time_univ) & (combined_df['UpdateMillisec'] == 0)]
    
    # 删除可能的重复项（如果一个秒正好有0毫秒的tick，排序之后切片行会出现在原始数据行之后, 此时保留最后一条是合理的）
    result_df.drop_duplicates(subset=['UpdateTime'], keep='last', inplace=True)
    
    new_order = [
       'date', 'InstrumentID', 'UpdateTime', 'session_part', 'LastPrice', 'ATR'
    ]
    result_df = result_df[new_order].dropna(subset=['InstrumentID']).reset_index(drop=True) # dropna是为了处理那些没有夜盘的交易日或者没有夜盘的品种

    return result_df.rename(columns={"date": "trading_date","UpdateTime": "ts"})


In [18]:
if __name__ == '__main__':
    comd_list = ['lc']
    for comd in comd_list:
        DATA_DIRECTORY = f"C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/{comd}"
        
        # 1. 加载和预处理
        tick_df = load_and_preprocess_data(DATA_DIRECTORY)
        print(f"Loaded {len(tick_df)} total ticks.")
        #tick_df.to_parquet('/data/datas/zhangyy/6100T/tick_2025_p.parquet')
        
        # 2. 过滤交易时间
        tick_df_filtered = filter_by_trading_hours(tick_df)
        print(f"Found {len(tick_df_filtered)} ticks within trading hours.")
        final_atr_df = []
        for _, df in tick_df_filtered.groupby('InstrumentID'): # 主力换月时分开计算
            final_atr_temp = calculate_tick_atr(df, generate_5min_klines(df))
            final_atr_df.append(final_atr_temp)
        df_atr = pd.concat(final_atr_df, axis=0)
        #print(df_atr)#.to_parquet('/data/datas/zhangyy/6100T/p.atr.parquet')

        # 3. 转化成秒级数据
        '''
        df_atr = df_atr.loc[df_atr.groupby(['UpdateTime', 'date'])['UpdateMillisec'].idxmin()]
        df_atr.sort_values(by=['Timestamp'], inplace=True)
        df_atr.reset_index(drop=True, inplace=True)
        print(df_atr)
        '''
        pd.options.mode.copy_on_write = True 
        DCE_TIME_UNIV = generate_trading_hours()
        final_atr_df = df_atr

        result = []
        for _, df in final_atr_df.groupby('date'):
            single_date = process_trading_day_ticks(df, DCE_TIME_UNIV, 20)
            print(single_date)
            result.append(single_date)
        result = pd.concat(result, axis=0)
        result.reset_index(inplace=True,drop=True)
        print(result)
        #result.to_parquet('C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/processed/{comd}_atr_1s_.parquet')
    

    

Scanning filenames to build trading calendar...
Processing each file with the new calendar logic...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230721.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230724.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230725.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230726.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230727.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230728.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230731.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230801.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/data/2023-2025/lc\sel_20230802.csv loaded...
C:/Users/CYWY2K/HKUST/STUDY/summer/M

  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...
Preparing historical K-line data for ATR calculation...
Calculating values for the current K-line...
Calculating 12 True Ranges (TR)...
Calculating final ATR...


  df['kline_timestamp'] = df['Timestamp'].dt.floor('5T')


      trading_date InstrumentID        ts  session_part  LastPrice  \
0         20230721       lc2401  10:00:01             1   220550.0   
1         20230721       lc2401  10:00:02             1   220550.0   
2         20230721       lc2401  10:00:03             1   220450.0   
3         20230721       lc2401  10:00:04             1   220650.0   
4         20230721       lc2401  10:00:05             1   220650.0   
...            ...          ...       ...           ...        ...   
10795     20230721       lc2401  14:59:55             1   215100.0   
10796     20230721       lc2401  14:59:56             1   215100.0   
10797     20230721       lc2401  14:59:57             1   215150.0   
10798     20230721       lc2401  14:59:58             1   215100.0   
10799     20230721       lc2401  14:59:59             1   215000.0   

               ATR  
0      3116.666667  
1      3116.666667  
2      3116.666667  
3      3125.000000  
4      3125.000000  
...            ...  
10795  1258.

In [19]:
result.to_parquet(f'C:/Users/CYWY2K/HKUST/STUDY/summer/MAFS6100T/week5/processed_data/2023-2025/{comd}/{comd}_atr_1s_.parquet')