In [6]:
import concurrent.futures
import logging
import traceback
import gc
import connectorx as cx
import pandas as pd
import numpy as np
import os
import duckdb
from datetime import datetime, timedelta
import glob
import multiprocessing as mp
from functools import partial
import time

In [None]:
import os
import pandas as pd
import duckdb
import multiprocessing
from functools import partial
import logging
import traceback
from datetime import datetime

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def calculate_daily_level_apb(trading_date):
    """计算分钟级APB因子"""
    try:
        trading_date = pd.to_datetime(trading_date)
        date_str = trading_date.strftime('%Y%m%d')
        
        order_pth = f"/data/cephfs/order/{pd.to_datetime(trading_date).strftime('%Y%m%d')}.parquet"
        
        # 确保输出目录存在
        output_dir = "./factors/DD_Ask_adjusted_APB"
        os.makedirs(output_dir, exist_ok=True)

        output_file = f"{output_dir}/{date_str}.parquet"
        
        if not os.path.exists(order_pth):
            logger.warning(f"订单数据文件不存在: {order_pth}")
            return date_str, False
        
        # 如果该日期的文件已存在，跳过处理
        if os.path.exists(output_file):
            logger.info(f"日期 {date_str} 已处理，跳过")
            return date_str, True

        conn = duckdb.connect(database=':memory:')

        # 1. 读取毫秒级订单簿数据并转换为分钟级
        query_order = f"""
            WITH buy_orders_with_time AS(
                SELECT 
                    security_code,
                    order_side, 
                    order_type,
                    order_details,
                    order_price,
                    order_price_adj,
                    order_volume,
                    order_time,
                FROM '{order_pth}'
                WHERE order_side = -1
                    AND order_type = 'A'
                    AND order_details = 'L'
                    AND order_time >= 93000000
                    AND order_time < 145700000
            )
            SELECT
                security_code,
                order_price,
                order_volume,
                '{date_str}' AS date,
            FROM buy_orders_with_time
            """

        # 将数据加载到临时表中
        conn.execute(f"CREATE TEMPORARY TABLE daily_order AS {query_order}")
       
        # 2. 计算每天的数据
        daily_query = """
        SELECT 
            date,
            security_code,
            SUM(order_volume * order_price) AS daily_value,
            SUM(order_price) AS daily_sum_price,
            SUM(order_volume) AS daily_volume,
            COUNT(order_volume) AS daily_count
        FROM daily_order
        GROUP BY date, security_code
        """

        # 3. 获取并保存数据
        daily_df = conn.execute(daily_query).fetchdf()
        
        # 保存数据
        daily_df.to_parquet(output_file)
        
        logger.info(f"成功处理日期: {date_str}")
        return date_str, True
    
    except Exception as e:
        logger.error(f"处理日期 {trading_date} 时出错: {str(e)}")
        logger.error(traceback.format_exc())
        return str(trading_date), False


def process_all_trading_days_parallel(trading_dates, num_processes=8):
    """使用多进程处理所有交易日的数据"""
    logger.info(f"开始处理 {len(trading_dates)} 个交易日的数据，使用 {num_processes} 个进程")
    
    start_time = datetime.now()
    
    # 创建进程池
    with multiprocessing.Pool(processes=num_processes) as pool:
        # 并行处理所有日期
        results = pool.map(calculate_daily_level_apb, trading_dates)
    
    # 处理结果统计
    total = len(results)
    success = sum(1 for _, status in results if status)
    failed = total - success
    
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    
    logger.info(f"处理完成！总共: {total}个日期, 成功: {success}个, 失败: {failed}个")
    logger.info(f"总耗时: {duration:.2f} 秒, 平均每个日期: {duration/total:.2f} 秒")
    
    # 返回失败的日期列表，方便后续处理
    failed_dates = [date for date, status in results if not status]
    if failed_dates:
        logger.warning(f"以下日期处理失败: {failed_dates}")
    
    return results

if __name__ == "__main__":
    # 假设这是您的交易日列表
    trading_dates = pd.date_range(start='2019-01-01', end='2025-05-31', freq='D')
    
    # 调用多进程处理函数
    process_all_trading_days_parallel(trading_dates, num_processes=12)


In [1]:
import os
import glob
import pandas as pd
import multiprocessing
from tqdm import tqdm
import time
from datetime import datetime

def get_adjust_price(df_close, adj_factors):
    """
    使用复权因子调整价格
    
    参数:
    df_close: 包含价格数据的DataFrame
    adj_factors: 当日对应的复权因子DataFrame
    
    返回:
    调整后的价格DataFrame
    """
    security_code = df_close['security_code'].iloc[0]
    # 从复权因子表中查找对应的股票
    df_adj_spec = adj_factors[adj_factors.security_code == security_code]
    
    if df_adj_spec.shape[0] == 0:
        return df_close
    
    # 保存原始日期格式
    original_date_type = df_close['date'].dtype
    original_date_format = None
    if original_date_type == 'object':
        # 如果是字符串，记录格式
        sample_date = df_close['date'].iloc[0]
        if isinstance(sample_date, str):
            if '-' in sample_date:
                original_date_format = '%Y-%m-%d'
            else:
                original_date_format = '%Y%m%d'
    
    # 转换为日期类型进行计算
    if original_date_type == 'object':
        df_close['date'] = pd.to_datetime(df_close['date'])
    
    df_close['IsReadjusted'] = False
    df_close['adj_value'] = df_close['daily_value']
    df_close['adj_price'] = df_close['daily_sum_price']
    cols2adj = ['adj_value', 'adj_price']
    
    # 按照复权日期排序
    df_adj_spec = df_adj_spec.sort_values(by='ExDiviDate', ascending=False)
    
    # 应用复权因子
    for d, f in df_adj_spec[['ExDiviDate', 'AdjustingFactor']].values:
        # 确保d是日期类型
        if not isinstance(d, pd.Timestamp):
            d = pd.to_datetime(d)
        
        mask = (df_close.date >= d) & ~df_close.IsReadjusted
        df_close.loc[mask, cols2adj] = df_close.loc[mask, cols2adj] * f
        df_close.loc[mask, 'IsReadjusted'] = True
    
    del df_close['IsReadjusted']
    
    # 转换回原始日期格式
    if original_date_format:
        df_close['date'] = df_close['date'].dt.strftime(original_date_format)
    
    return df_close

def process_file(file_path):
    """处理单个文件的函数"""
    try:
        # 检查是否已处理过
        file_name = os.path.basename(file_path)
        marker_file = os.path.join(os.path.dirname(file_path), ".processed", file_name + ".done")
        
        # 如果标记文件存在，说明已处理过
        if os.path.exists(marker_file):
            return {"file": file_path, "status": "skipped", "message": "Already processed"}
        
        # 读取数据
        daily_data = pd.read_parquet(file_path)
        
        # 检查数据内容是否已经处理过
        if 'adj_value' in daily_data.columns:
            return {"file": file_path, "status": "skipped", "message": "Data already has adjusted columns"}
        
        # 保存原始数据的schema
        original_schema = {col: daily_data[col].dtype for col in daily_data.columns}
        
        # 获取日期信息
        date_value = daily_data['date'].iloc[0]
        if isinstance(date_value, str):
            # 如果日期格式是'YYYY-MM-DD'，转换为'YYYYMMDD'
            if '-' in date_value:
                date_obj = datetime.strptime(date_value, '%Y-%m-%d')
                date_str = date_obj.strftime('%Y%m%d')
            else:
                date_str = date_value
        else:
            # 如果是Timestamp或其他日期类型
            date_str = pd.to_datetime(date_value).strftime('%Y%m%d')
        
        # 加载对应日期的复权因子
        adj_factor_path = f"./Daily_Adjusting_Factor/{date_str}.parquet"
        
        if not os.path.exists(adj_factor_path):
            return {"file": file_path, "status": "error", "message": f"Adjustment factor file not found for date {date_str}"}
        
        # 读取复权因子数据
        adj_factors = pd.read_parquet(adj_factor_path)
        
        # 确保ExDiviDate列是日期格式
        if 'ExDiviDate' in adj_factors.columns:
            adj_factors['ExDiviDate'] = pd.to_datetime(adj_factors['ExDiviDate'])
        
        # 处理数据 - 对每个股票应用复权因子
        result_dfs = []
        for code, group in daily_data.groupby('security_code'):
            adjusted_group = get_adjust_price(group.copy(), adj_factors)
            result_dfs.append(adjusted_group)
        
        daily_adjust = pd.concat(result_dfs, ignore_index=True)
        
        # 确保数据类型与原始数据一致
        for col in original_schema:
            if col in daily_adjust.columns:
                try:
                    daily_adjust[col] = daily_adjust[col].astype(original_schema[col])
                except:
                    # 如果无法转换，保留当前类型
                    pass
        
        # 尝试将日期列转换为整数，如果原始格式是整数
        try:
            if original_schema['date'] == 'int64':
                daily_adjust['date'] = daily_adjust['date'].astype('int64')
            elif original_schema['date'] == 'int32':
                daily_adjust['date'] = daily_adjust['date'].astype('int32')
        except:
            # 如果无法转换，尝试其他方法
            try:
                if isinstance(daily_adjust['date'].iloc[0], str) and daily_adjust['date'].iloc[0].isdigit():
                    daily_adjust['date'] = daily_adjust['date'].astype('int64')
            except:
                pass
        
        # 保存处理后的数据
        # 使用pyarrow引擎并指定schema
        daily_adjust.to_parquet(
            file_path, 
            index=False,
            engine='pyarrow',
            compression='snappy'
        )
        
        # 创建处理完成标记
        os.makedirs(os.path.dirname(marker_file), exist_ok=True)
        with open(marker_file, 'w') as f:
            f.write(f"Processed at {time.strftime('%Y-%m-%d %H:%M:%S')}")
        
        return {"file": file_path, "status": "success", "message": "Processed successfully"}
    
    except Exception as e:
        import traceback
        error_details = traceback.format_exc()
        return {"file": file_path, "status": "error", "message": f"{str(e)}\n{error_details}"}


def Adjust_price(num_processes=None):
    """使用多进程调整价格数据"""
    # 设置目录
    data_dir = "./factors/DD_Ask_adjusted_APB"
    
    # 创建处理标记目录
    processed_dir = os.path.join(data_dir, ".processed")
    os.makedirs(processed_dir, exist_ok=True)
    
    # 获取所有parquet文件
    parquet_files = sorted(glob.glob(os.path.join(data_dir, "*.parquet")))
    
    if not parquet_files:
        print(f"没有找到parquet文件在目录: {data_dir}")
        return
    
    print(f"找到 {len(parquet_files)} 个文件需要处理")
    
    # 检查复权因子目录是否存在
    adj_factor_dir = "./Daily_Adjusting_Factor"
    if not os.path.exists(adj_factor_dir):
        print(f"错误: 复权因子目录不存在: {adj_factor_dir}")
        return
    
    # 设置进程数
    if num_processes is None:
        num_processes = min(20, multiprocessing.cpu_count())
    
    # 创建进程池
    pool = multiprocessing.Pool(processes=num_processes)
    
    try:
        # 使用tqdm显示进度
        results = []
        for result in tqdm(pool.imap_unordered(process_file, parquet_files), 
                          total=len(parquet_files), 
                          desc="处理文件"):
            results.append(result)
        
        # 关闭进程池
        pool.close()
        pool.join()
        
        # 统计处理结果
        success_count = sum(1 for r in results if r["status"] == "success")
        skipped_count = sum(1 for r in results if r["status"] == "skipped")
        error_count = sum(1 for r in results if r["status"] == "error")
        
        print(f"处理完成: 成功 {success_count}, 跳过 {skipped_count}, 错误 {error_count}")
        
        # 如果有错误，打印错误详情
        if error_count > 0:
            print("\n错误详情:")
            for r in results:
                if r["status"] == "error":
                    print(f"  {r['file']}: {r['message']}")
        
    except KeyboardInterrupt:
        print("\n处理被中断，正在清理资源...")
        pool.terminate()
        pool.join()
    except Exception as e:
        print(f"发生错误: {str(e)}")
        pool.terminate()
        pool.join()

# 使用示例
if __name__ == "__main__":
    Adjust_price(num_processes=20)


找到 1553 个文件需要处理


处理文件: 100%|██████████| 1553/1553 [22:30<00:00,  1.15it/s] 

处理完成: 成功 1553, 跳过 0, 错误 0





In [3]:
import os
import glob
import pandas as pd
import numpy as np
import logging
import traceback
import multiprocessing
from functools import partial
import gc

# 假设logger已经在其他地方定义
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)


def process_single_date(file_dates, i, window_size=5, daily_min=3, output_dir="./factors/DD_Ask_5d_rolling_APB"):
    """处理单个日期的数据"""
    current_date, current_file = file_dates[i]
    logger.info(f"处理日期 {current_date} 的数据")
    
    # 检查输出文件是否已存在
    output_file = os.path.join(output_dir, f"{current_date}.parquet")  # 修正变量名
    
    if os.path.exists(output_file):
        logger.info(f"日期 {current_date} 的结果文件已存在，跳过处理")
        return current_date, True
    
    try:
        # 获取当前日期和前几天的文件
        start_idx = max(0, i-window_size+1)  # 确保不会索引到负数
        recent_files = [path for _, path in file_dates[start_idx:i+1]]
        
        # 读取这些天的数据
        dfs = []
        for file_path in recent_files:
            try:
                df = pd.read_parquet(file_path)
                dfs.append(df)
            except Exception as e:
                logger.error(f"读取文件 {file_path} 时出错: {str(e)}")
                continue
        
        if not dfs:
            logger.warning(f"日期 {current_date} 没有有效数据，跳过")
            return current_date, False
        
        # 合并数据
        combined_data = pd.concat(dfs, ignore_index=True)
        combined_data.sort_values(['security_code', 'date'], inplace=True)
        
        # 计算vwap和twap
        # 修正聚合操作的语法
        combined_vwap = combined_data.groupby('security_code').apply(
            lambda x: x['adj_value'].sum() / x['daily_volume'].sum()
        ).reset_index()
        combined_vwap.columns = ['security_code', 'adj_vwap']  # 修正列名设置方法
        
        combined_twap = combined_data.groupby('security_code').apply(
            lambda x: x['adj_price'].sum() / x['daily_count'].sum()
        ).reset_index()
        combined_twap.columns = ['security_code', 'adj_twap']  # 修正列名设置方法
        
        # 合并结果
        combined_result = pd.merge(
            combined_vwap,
            combined_twap,
            on=['security_code'],
            how='inner'
        )
        
        combined_result['date'] = current_date
        
        # 计算APB并处理极值
        combined_result['5d_apb'] = np.log(combined_result['adj_twap']/combined_result['adj_vwap'])
        combined_result['5d_apb'].replace([-np.inf, np.inf], np.nan, inplace=True)
        
        # 去除不需要的列
        combined_result.drop(columns=['adj_vwap', 'adj_twap'], inplace=True)
        
        # 确保输出目录存在
        os.makedirs(output_dir, exist_ok=True)  # 修正变量名
        
        # 保存结果
        combined_result.to_parquet(output_file)  # 修正变量名和引用
        
        return current_date, True
        
    except Exception as e:
        logger.error(f"处理日期 {current_date} 时发生错误: {str(e)}")
        logger.error(traceback.format_exc())
        return current_date, False


def calculate_rolling_average(window_size=5, daily_min=3, start_date=None, end_date=None, num_processes=15):
    """计算滚动平均APB因子，采用多进程处理方式"""
    data_dir = "./factors/DD_Ask_adjusted_APB"
    parquet_files = sorted(glob.glob(os.path.join(data_dir, "*.parquet")))

    if not parquet_files:
        raise ValueError("未找到分钟级数据文件")

    logger.info(f"找到 {len(parquet_files)} 个分钟级数据文件")
    
    # 假设文件名是日期格式，如 "20210602.parquet"
    # 提取文件名中的日期并排序
    file_dates = []
    for file_path in parquet_files:
        file_name = os.path.basename(file_path)
        date_str = file_name.split('.')[0]  # 假设文件名格式为 "YYYYMMDD.parquet"
        file_dates.append((date_str, file_path))
    
    # 按日期排序
    file_dates.sort(key=lambda x: x[0])
    
    # 如果指定了日期范围，筛选文件
    if start_date and end_date:
        file_dates = [(date, path) for date, path in file_dates 
                      if start_date <= date <= end_date]
    
    if not file_dates:
        raise ValueError("筛选后没有符合条件的数据文件")

    output_dir = "./factors/DD_Ask_5d_rolling_APB" 

    # 确保输出目录存在
    os.makedirs(output_dir, exist_ok=True)

    # 准备要处理的日期索引列表（从第window_size个文件开始，确保有前window_size-1天的数据）
    indices_to_process = list(range(window_size-1, len(file_dates)))
    
    # 使用多进程处理
    with multiprocessing.Pool(processes=num_processes) as pool:
        # 使用partial固定其他参数，只让索引i变化
        process_func = partial(
            process_single_date, 
            file_dates, 
            window_size=window_size, 
            daily_min=daily_min,
            output_dir=output_dir
        )
        
        # 并行处理所有日期
        results = pool.map(process_func, indices_to_process)
    
    # 处理结果
    success_count = sum(1 for _, success in results if success)
    logger.info(f"处理完成，成功处理 {success_count} 个日期，总共 {len(indices_to_process)} 个日期")
    
    return output_dir

# 如果直接运行此脚本
if __name__ == "__main__":
    calculate_rolling_average(window_size=5, num_processes=15)


2025-07-01 16:54:25,335 - __main__ - INFO - 找到 1553 个分钟级数据文件
2025-07-01 16:54:25,398 - __main__ - INFO - 处理日期 20190108 的数据
2025-07-01 16:54:25,398 - __main__ - INFO - 处理日期 20190220 的数据
2025-07-01 16:54:25,399 - __main__ - INFO - 处理日期 20190328 的数据
2025-07-01 16:54:25,399 - __main__ - INFO - 处理日期 20190509 的数据
2025-07-01 16:54:25,400 - __main__ - INFO - 处理日期 20190617 的数据
2025-07-01 16:54:25,400 - __main__ - INFO - 处理日期 20190723 的数据
2025-07-01 16:54:25,401 - __main__ - INFO - 处理日期 20190828 的数据
2025-07-01 16:54:25,401 - __main__ - INFO - 处理日期 20191011 的数据
2025-07-01 16:54:25,402 - __main__ - INFO - 处理日期 20191118 的数据
2025-07-01 16:54:25,402 - __main__ - INFO - 处理日期 20191224 的数据
2025-07-01 16:54:25,403 - __main__ - INFO - 处理日期 20200207 的数据
2025-07-01 16:54:25,404 - __main__ - INFO - 处理日期 20200316 的数据
2025-07-01 16:54:25,405 - __main__ - INFO - 处理日期 20200422 的数据
2025-07-01 16:54:25,406 - __main__ - INFO - 处理日期 20200602 的数据
2025-07-01 16:54:25,406 - __main__ - INFO - 处理日期 20200710 的数据
  combine

In [4]:
def calculate_rolling_20d_avg(factor_names, factor_dir, min_window=5):
    
    if not isinstance(factor_names, list):
        factor_names = [factor_names]
    
    parquet_pattern = os.path.join(factor_dir, "*.parquet")
    parquet_files = glob.glob(parquet_pattern)
    print(f"在目录 {factor_dir} 中找到 {len(parquet_files)} 个Parquet文件")
    

    conn = duckdb.connect(database=':memory:')
    all_factor_data = conn.execute(f"""
        SELECT * FROM read_parquet('{parquet_pattern}')
    """).fetchdf()
    conn.close()
    

    missing_factors = [f for f in factor_names if f not in all_factor_data.columns]
    if missing_factors:
        raise ValueError(f"在Parquet文件中未找到以下因子列: {', '.join(missing_factors)}")
    
    all_factor_data['date'] = pd.to_datetime(all_factor_data['date'])

    all_factor_data = all_factor_data.sort_values(['date', 'security_code'])
    

    # 获取所有个股代码
    all_securities = all_factor_data['security_code'].unique()

    result_df = all_factor_data[['date', 'security_code']].copy()

    # 为每个因子计算滚动平均
    for factor_name in factor_names:
        print(f"\n处理因子: {factor_name}")
        
        #转置成宽表
        pivot_data = all_factor_data.pivot(index='date', columns='security_code', values=factor_name)

        # 对宽表直接应用rolling
        rolling_avg_20d = pivot_data.rolling(window=20, min_periods=min_window).mean()
        rolling_avg_30d = pivot_data.rolling(window=30, min_periods=min_window).mean()

        # 将结果转换回长格式
        factor_df_20d = rolling_avg_20d.stack().reset_index()
        factor_df_20d.columns = ['date', 'security_code', factor_name]

        factor_df_30d = rolling_avg_30d.stack().reset_index()
        factor_df_30d.columns = ['date', 'security_code', factor_name]

        # 计算每日因子覆盖率
        # 计算每个日期非NaN的因子值数量
        non_nan_counts_20d = factor_df_20d.dropna(subset=[factor_name]).groupby('date').size()

        # 计算覆盖率
        coverage_20d = non_nan_counts_20d / len(all_securities)
        print(f"\n20d{factor_name}因子覆盖率统计: 平均={coverage_20d.mean():.2f}, 最小={coverage_20d.min():.2f}")

        result_df_20d = pd.merge(
            result_df, 
            factor_df_20d,
            on=['date', 'security_code'],
            how='left'
        )
        

        non_nan_counts_30d = factor_df_30d.dropna(subset=[factor_name]).groupby('date').size()

        # 计算覆盖率
        coverage_30d = non_nan_counts_30d / len(all_securities)
        print(f"\n30d{factor_name}因子覆盖率统计: 平均={coverage_30d.mean():.2f}, 最小={coverage_30d.min():.2f}")

        result_df_30d = pd.merge(
            result_df, 
            factor_df_30d,
            on=['date', 'security_code'],
            how='left'
        )
    
    return result_df_20d, result_df_30d

In [7]:
def main():
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    # 调用多进程函数
    df_5d_dir = "./factors/DD_Ask_5d_rolling_APB"

    factor_name ='5d_apb'
    result_df_20d, result_df_30d = calculate_rolling_20d_avg(factor_name, df_5d_dir)
    
    # 保存结果为CSV
    output_path = "/data/home/lexuanchen/Factors/Order/Signal/Alltime_Improved_APB"
    os.makedirs(output_path, exist_ok=True)
    output_file_path_20d = f"{output_path}/5d20d_Adj_Order_Ask_APB.csv"
    output_file_path_30d = f"{output_path}/5d30d_Adj_Order_Ask_APB.csv"
    

    result_df_20d.to_csv(output_file_path_20d, index=False)
    result_df_30d.to_csv(output_file_path_30d, index=False)
    
    print("跨日复权买方日度版APB因子计算完成")

if __name__ == "__main__":
    main()

在目录 ./factors/DD_Ask_5d_rolling_APB 中找到 1549 个Parquet文件

处理因子: 5d_apb

20d5d_apb因子覆盖率统计: 平均=0.80, 最小=0.65

30d5d_apb因子覆盖率统计: 平均=0.80, 最小=0.65
跨日复权买方日度版APB因子计算完成
